public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs
@ 2023-08-08 12:13 Hannes Laimer
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional Hannes Laimer
                   ` (6 more replies)
  0 siblings, 7 replies; 13+ messages in thread
From: Hannes Laimer @ 2023-08-08 12:13 UTC (permalink / raw)
  To: pbs-devel

Add support for local sync. SyncJobs without a remote are considered local, and
use a different logic for pulling. In the course of adding the extra pull logic,
the pull code was rewritten to basically be source independent. Also cli
completion and the UI was updated to allow Remotes in SyncJobs to be optional.

v2, thanks @Fabian Grünbichler <f.gruenbichler@proxmox.com> for the feedback and
suggestions on v1.
 - make pull logic more source independent

v3: thanks @Fabian and @Wolfang for the feedback on v2
 - remove enums for Local/Remote
 - use traits, pull logic now expects a `dyn PullSource`(or `dyn PullReader`)
 - add lock to dir for local sync
 - split refactoring of pull logic and implementation of
    local pulling into two commits

Hannes Laimer (6):
  api2: make Remote for SyncJob optional
  ui: add support for optional Remote in SyncJob
  manager: add completion for opt. Remote in SyncJob
  accept a ref to a HttpClient
  pull: refactor pulling from a datastore
  pull: add support for pulling from local datastore

 Cargo.toml                           |    2 +
 examples/download-speed.rs           |    2 +-
 pbs-api-types/src/jobs.rs            |    5 +-
 pbs-client/src/backup_reader.rs      |    2 +-
 pbs-datastore/src/read_chunk.rs      |    2 +-
 proxmox-backup-client/src/catalog.rs |    4 +-
 proxmox-backup-client/src/main.rs    |    2 +-
 proxmox-backup-client/src/mount.rs   |    2 +-
 proxmox-file-restore/src/main.rs     |    4 +-
 src/api2/config/remote.rs            |   16 +-
 src/api2/config/sync.rs              |   41 +-
 src/api2/node/tasks.rs               |    4 +-
 src/api2/pull.rs                     |   45 +-
 src/bin/proxmox-backup-manager.rs    |   67 +-
 src/bin/proxmox_backup_debug/diff.rs |    2 +-
 src/server/email_notifications.rs    |   18 +-
 src/server/pull.rs                   | 1080 ++++++++++++++++----------
 www/form/RemoteTargetSelector.js     |   29 +-
 www/window/SyncJobEdit.js            |    8 +-
 19 files changed, 846 insertions(+), 489 deletions(-)

-- 
2.39.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional
  2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
@ 2023-08-08 12:13 ` Hannes Laimer
  2023-08-23 11:37   ` Wolfgang Bumiller
  2023-09-21 11:06   ` Lukas Wagner
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] ui: add support for optional Remote in SyncJob Hannes Laimer
                   ` (5 subsequent siblings)
  6 siblings, 2 replies; 13+ messages in thread
From: Hannes Laimer @ 2023-08-08 12:13 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 pbs-api-types/src/jobs.rs         |  5 ++-
 src/api2/config/remote.rs         |  2 +-
 src/api2/config/sync.rs           | 41 +++++++++++++-------
 src/api2/node/tasks.rs            |  4 +-
 src/api2/pull.rs                  | 62 ++++++++++++++++++++++---------
 src/server/email_notifications.rs | 18 +++++----
 6 files changed, 90 insertions(+), 42 deletions(-)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 23e19b7b..85fdbe41 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -467,6 +467,7 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
         },
         remote: {
             schema: REMOTE_ID_SCHEMA,
+            optional: true,
         },
         "remote-store": {
             schema: DATASTORE_SCHEMA,
@@ -515,7 +516,9 @@ pub struct SyncJobConfig {
     pub ns: Option<BackupNamespace>,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub owner: Option<Authid>,
-    pub remote: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// None implies local sync.
+    pub remote: Option<String>,
     pub remote_store: String,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub remote_ns: Option<BackupNamespace>,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 76dd3b89..307cf3cd 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
 
     let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
     for job in job_list {
-        if job.remote == name {
+        if job.remote.map_or(false, |id| id == name) {
             param_bail!(
                 "name",
                 "remote '{}' is used by sync job '{}' (datastore '{}')",
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 01e5f2ce..21634bd5 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
 
 use pbs_api_types::{
     Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
-    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
-    PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
+    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
+    PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
 };
 use pbs_config::sync;
 
@@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
         return false;
     }
 
-    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
-    remote_privs & PRIV_REMOTE_AUDIT != 0
+    if let Some(remote) = &job.remote {
+        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
+        remote_privs & PRIV_REMOTE_AUDIT != 0
+    } else {
+        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+        source_ds_privs & PRIV_DATASTORE_AUDIT != 0
+    }
 }
 
 /// checks whether user can run the corresponding pull job
@@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
         return false;
     }
 
-    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
-    remote_privs & PRIV_REMOTE_READ != 0
+    if let Some(remote) = &job.remote {
+        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
+        remote_privs & PRIV_REMOTE_READ != 0
+    } else {
+        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+        source_ds_privs & PRIV_DATASTORE_READ != 0
+    }
 }
 
 #[api(
@@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
 #[serde(rename_all = "kebab-case")]
 /// Deletable property name
 pub enum DeletableProperty {
+    /// Delete the remote property(-> meaning local).
+    Remote,
     /// Delete the owner property.
     Owner,
     /// Delete the comment property.
@@ -275,6 +287,9 @@ pub fn update_sync_job(
     if let Some(delete) = delete {
         for delete_prop in delete {
             match delete_prop {
+                DeletableProperty::Remote => {
+                    data.remote = None;
+                }
                 DeletableProperty::Owner => {
                     data.owner = None;
                 }
@@ -334,7 +349,7 @@ pub fn update_sync_job(
         data.ns = Some(ns);
     }
     if let Some(remote) = update.remote {
-        data.remote = remote;
+        data.remote = Some(remote);
     }
     if let Some(remote_store) = update.remote_store {
         data.remote_store = remote_store;
@@ -503,7 +518,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
 
     let mut job = SyncJobConfig {
         id: "regular".to_string(),
-        remote: "remote0".to_string(),
+        remote: Some("remote0".to_string()),
         remote_store: "remotestore1".to_string(),
         remote_ns: None,
         store: "localstore0".to_string(),
@@ -538,11 +553,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
     // reading without proper read permissions on local end must fail
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
     // reading without proper read permissions on remote end must fail
-    job.remote = "remote0".to_string();
+    job.remote = Some("remote0".to_string());
     job.store = "localstore1".to_string();
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
@@ -555,10 +570,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     ));
 
     // writing without proper write permissions on local end must fail
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
 
     // writing without proper write permissions on remote end must fail
-    job.remote = "remote0".to_string();
+    job.remote = Some("remote0".to_string());
     job.store = "localstore1".to_string();
     assert!(!check_sync_job_modify_access(
         &user_info,
@@ -567,7 +582,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     ));
 
     // reset remote to one where users have access
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
 
     // user with read permission can only read, but not modify/run
     assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 866361c6..12ce70f6 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -75,14 +75,14 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
                 let local_store = captures.get(3);
                 let local_ns = captures.get(4).map(|m| m.as_str());
 
-                if let (Some(remote), Some(remote_store), Some(local_store)) =
+                if let (remote, Some(remote_store), Some(local_store)) =
                     (remote, remote_store, local_store)
                 {
                     return check_pull_privs(
                         auth_id,
                         local_store.as_str(),
                         local_ns,
-                        remote.as_str(),
+                        remote.map(|remote| remote.as_str()),
                         remote_store.as_str(),
                         false,
                     );
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index daeba7cf..664ecce5 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -8,7 +8,7 @@ use proxmox_sys::task_log;
 
 use pbs_api_types::{
     Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
-    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+    GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
     PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     TRANSFER_LAST_SCHEMA,
 };
@@ -22,7 +22,7 @@ pub fn check_pull_privs(
     auth_id: &Authid,
     store: &str,
     ns: Option<&str>,
-    remote: &str,
+    remote: Option<&str>,
     remote_store: &str,
     delete: bool,
 ) -> Result<(), Error> {
@@ -39,12 +39,22 @@ pub fn check_pull_privs(
         PRIV_DATASTORE_BACKUP,
         false,
     )?;
-    user_info.check_privs(
-        auth_id,
-        &["remote", remote, remote_store],
-        PRIV_REMOTE_READ,
-        false,
-    )?;
+
+    if let Some(remote) = remote {
+        user_info.check_privs(
+            auth_id,
+            &["remote", remote, remote_store],
+            PRIV_REMOTE_READ,
+            false,
+        )?;
+    } else {
+        user_info.check_privs(
+            auth_id,
+            &["datastore", remote_store],
+            PRIV_DATASTORE_BACKUP,
+            false,
+        )?;
+    }
 
     if delete {
         user_info.check_privs(
@@ -65,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
         PullParameters::new(
             &sync_job.store,
             sync_job.ns.clone().unwrap_or_default(),
-            &sync_job.remote,
+            sync_job.remote.as_deref().unwrap_or("local"),
             &sync_job.remote_store,
             sync_job.remote_ns.clone().unwrap_or_default(),
             sync_job
@@ -91,7 +101,7 @@ pub fn do_sync_job(
 ) -> Result<String, Error> {
     let job_id = format!(
         "{}:{}:{}:{}:{}",
-        sync_job.remote,
+        sync_job.remote.clone().unwrap_or("localhost".to_string()),
         sync_job.remote_store,
         sync_job.store,
         sync_job.ns.clone().unwrap_or_default(),
@@ -124,11 +134,28 @@ pub fn do_sync_job(
                     worker,
                     "sync datastore '{}' from '{}/{}'",
                     sync_job.store,
-                    sync_job.remote,
+                    sync_job.remote.clone().unwrap_or("local".to_string()),
                     sync_job.remote_store,
                 );
 
-                pull_store(&worker, &client, pull_params).await?;
+                if sync_job.remote.is_some() {
+                    pull_store(&worker, &client, pull_params).await?;
+                } else {
+                    if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
+                        if target_ns.path().starts_with(source_ns.path())
+                            && sync_job.store == sync_job.remote_store
+                            && sync_job.max_depth.map_or(true, |sync_depth| {
+                            target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
+                        }) {
+                            task_log!(
+                                worker,
+                                "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
+                            );
+                        }
+                    } else {
+                        pull_store(&worker, &client, pull_params).await?;
+                    }
+                }
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -180,6 +207,7 @@ pub fn do_sync_job(
             },
             remote: {
                 schema: REMOTE_ID_SCHEMA,
+                optional: true,
             },
             "remote-store": {
                 schema: DATASTORE_SCHEMA,
@@ -224,7 +252,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
 async fn pull(
     store: String,
     ns: Option<BackupNamespace>,
-    remote: String,
+    remote: Option<String>,
     remote_store: String,
     remote_ns: Option<BackupNamespace>,
     remove_vanished: Option<bool>,
@@ -248,7 +276,7 @@ async fn pull(
         &auth_id,
         &store,
         ns_str.as_deref(),
-        &remote,
+        remote.as_deref(),
         &remote_store,
         delete,
     )?;
@@ -256,7 +284,7 @@ async fn pull(
     let pull_params = PullParameters::new(
         &store,
         ns,
-        &remote,
+        remote.as_deref().unwrap_or("local"),
         &remote_store,
         remote_ns.unwrap_or_default(),
         auth_id.clone(),
@@ -280,7 +308,7 @@ async fn pull(
                 worker,
                 "pull datastore '{}' from '{}/{}'",
                 store,
-                remote,
+                remote.as_deref().unwrap_or("localhost"),
                 remote_store,
             );
 
@@ -299,4 +327,4 @@ async fn pull(
     Ok(upid_str)
 }
 
-pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
\ No newline at end of file
diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
index ea1476d7..18881782 100644
--- a/src/server/email_notifications.rs
+++ b/src/server/email_notifications.rs
@@ -484,15 +484,17 @@ pub fn send_sync_status(
         }
     };
 
+    let tmp_src_string;
+    let source_str = if let Some(remote) = &job.remote {
+        tmp_src_string = format!("Sync remote '{}'", remote);
+        &tmp_src_string
+    } else {
+        "Sync local"
+    };
+
     let subject = match result {
-        Ok(()) => format!(
-            "Sync remote '{}' datastore '{}' successful",
-            job.remote, job.remote_store,
-        ),
-        Err(_) => format!(
-            "Sync remote '{}' datastore '{}' failed",
-            job.remote, job.remote_store,
-        ),
+        Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
+        Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
     };
 
     send_job_status_mail(email, &subject, &text)?;
-- 
2.39.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 2/6] ui: add support for optional Remote in SyncJob
  2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional Hannes Laimer
@ 2023-08-08 12:13 ` Hannes Laimer
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] manager: add completion for opt. " Hannes Laimer
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 13+ messages in thread
From: Hannes Laimer @ 2023-08-08 12:13 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 www/form/RemoteTargetSelector.js | 29 +++++++++++++++++++----------
 www/window/SyncJobEdit.js        |  8 ++++++--
 2 files changed, 25 insertions(+), 12 deletions(-)

diff --git a/www/form/RemoteTargetSelector.js b/www/form/RemoteTargetSelector.js
index 2a94c4d7..e7b822d7 100644
--- a/www/form/RemoteTargetSelector.js
+++ b/www/form/RemoteTargetSelector.js
@@ -44,20 +44,18 @@ Ext.define('PBS.form.RemoteStoreSelector', {
 
 	me.store.removeAll();
 
+	me.setDisabled(false);
+	if (!me.firstLoad) {
+	    me.clearValue();
+	}
 	if (me.remote) {
-	    me.setDisabled(false);
-	    if (!me.firstLoad) {
-		me.clearValue();
-	    }
-
 	    me.store.proxy.url = `/api2/json/config/remote/${encodeURIComponent(me.remote)}/scan`;
-	    me.store.load();
-
-	    me.firstLoad = false;
 	} else {
-	    me.setDisabled(true);
-	    me.clearValue();
+	    me.store.proxy.url = '/api2/json/admin/datastore';
 	}
+	me.store.load();
+
+	me.firstLoad = false;
     },
 
     initComponent: function() {
@@ -175,6 +173,17 @@ Ext.define('PBS.form.RemoteNamespaceSelector', {
 	    me.store.proxy.url = `/api2/json/config/remote/${encodedRemote}/scan/${encodedStore}/namespaces`;
 	    me.store.load();
 
+	    me.firstLoad = false;
+	} else if (me.remoteStore) {
+	    me.setDisabled(false);
+	    if (!me.firstLoad) {
+		me.clearValue();
+	    }
+	    let encodedStore = encodeURIComponent(me.remoteStore);
+
+	    me.store.proxy.url = `/api2/json/admin/datastore/${encodedStore}/namespace`;
+	    me.store.load();
+
 	    me.firstLoad = false;
 	} else if (previousStore) {
 	    me.setDisabled(true);
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index 48a0c7a9..c9398740 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -137,8 +137,13 @@ Ext.define('PBS.window.SyncJobEdit', {
 		    {
 			fieldLabel: gettext('Source Remote'),
 			xtype: 'pbsRemoteSelector',
-			allowBlank: false,
+			allowBlank: true,
+			emptyText: gettext('Local'),
 			name: 'remote',
+			cbind: {
+			    deleteEmpty: '{!isCreate}',
+			},
+			skipEmptyText: true,
 			listeners: {
 			    change: function(f, value) {
 				let me = this;
@@ -155,7 +160,6 @@ Ext.define('PBS.window.SyncJobEdit', {
 			allowBlank: false,
 			autoSelect: false,
 			name: 'remote-store',
-			disabled: true,
 			listeners: {
 			    change: function(field, value) {
 				let me = this;
-- 
2.39.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 3/6] manager: add completion for opt. Remote in SyncJob
  2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional Hannes Laimer
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] ui: add support for optional Remote in SyncJob Hannes Laimer
@ 2023-08-08 12:13 ` Hannes Laimer
  2023-08-24  9:24   ` Wolfgang Bumiller
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] accept a ref to a HttpClient Hannes Laimer
                   ` (3 subsequent siblings)
  6 siblings, 1 reply; 13+ messages in thread
From: Hannes Laimer @ 2023-08-08 12:13 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 src/bin/proxmox-backup-manager.rs | 67 +++++++++++++++++++------------
 1 file changed, 41 insertions(+), 26 deletions(-)

diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index b4cb6cb3..eadfe547 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -535,35 +535,33 @@ fn get_remote(param: &HashMap<String, String>) -> Option<String> {
     param.get("remote").map(|r| r.to_owned()).or_else(|| {
         if let Some(id) = param.get("id") {
             if let Ok(job) = get_sync_job(id) {
-                return Some(job.remote);
+                return job.remote;
             }
         }
         None
     })
 }
 
-fn get_remote_store(param: &HashMap<String, String>) -> Option<(String, String)> {
+fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>, String)> {
     let mut job: Option<SyncJobConfig> = None;
 
     let remote = param.get("remote").map(|r| r.to_owned()).or_else(|| {
         if let Some(id) = param.get("id") {
             job = get_sync_job(id).ok();
             if let Some(ref job) = job {
-                return Some(job.remote.clone());
+                return job.remote.clone();
             }
         }
         None
     });
 
-    if let Some(remote) = remote {
-        let store = param
-            .get("remote-store")
-            .map(|r| r.to_owned())
-            .or_else(|| job.map(|job| job.remote_store));
+    let store = param
+        .get("remote-store")
+        .map(|r| r.to_owned())
+        .or_else(|| job.map(|job| job.remote_store));
 
-        if let Some(store) = store {
-            return Some((remote, store));
-        }
+    if let Some(store) = store {
+        return Some((remote, store));
     }
 
     None
@@ -584,7 +582,7 @@ fn get_remote_ns(param: &HashMap<String, String>) -> Option<BackupNamespace> {
 }
 
 // shell completion helper
-pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
+pub fn complete_remote_datastore_name(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
     let mut list = Vec::new();
 
     if let Some(remote) = get_remote(param) {
@@ -595,7 +593,9 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
                 list.push(item.store);
             }
         }
-    }
+    } else {
+        list = pbs_config::datastore::complete_datastore_name(arg, param);
+    };
 
     list
 }
@@ -607,17 +607,25 @@ pub fn complete_remote_datastore_namespace(
 ) -> Vec<String> {
     let mut list = Vec::new();
 
-    if let Some((remote, remote_store)) = get_remote_store(param) {
-        if let Ok(data) = proxmox_async::runtime::block_on(async move {
+    if let Some(data) = match get_remote_store(param) {
+        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_namespaces(
                 remote.clone(),
                 remote_store.clone(),
             )
             .await
-        }) {
-            for item in data {
-                list.push(item.ns.name());
-            }
+            .ok()
+        }),
+        Some((None, source_store)) => {
+            let mut rpcenv = CliEnvironment::new();
+            rpcenv.set_auth_id(Some(String::from("root@pam")));
+            crate::api2::admin::namespace::list_namespaces(source_store, None, None, &mut rpcenv)
+                .ok()
+        }
+        _ => None,
+    } {
+        for item in data {
+            list.push(item.ns.name());
         }
     }
 
@@ -662,19 +670,26 @@ pub fn complete_sync_local_datastore_namespace(
 pub fn complete_remote_datastore_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
     let mut list = Vec::new();
 
-    if let Some((remote, remote_store)) = get_remote_store(param) {
-        let ns = get_remote_ns(param);
-        if let Ok(data) = proxmox_async::runtime::block_on(async move {
+    let ns = get_remote_ns(param);
+    if let Some(data) = match get_remote_store(param) {
+        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_groups(
                 remote.clone(),
                 remote_store.clone(),
                 ns,
             )
             .await
-        }) {
-            for item in data {
-                list.push(format!("{}/{}", item.backup.ty, item.backup.id));
-            }
+            .ok()
+        }),
+        Some((None, source_store)) => {
+            let mut rpcenv = CliEnvironment::new();
+            rpcenv.set_auth_id(Some(String::from("root@pam")));
+            crate::api2::admin::datastore::list_groups(source_store, ns, &mut rpcenv).ok()
+        }
+        _ => None,
+    } {
+        for item in data {
+            list.push(format!("{}/{}", item.backup.ty, item.backup.id));
         }
     }
 
-- 
2.39.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 4/6] accept a ref to a HttpClient
  2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
                   ` (2 preceding siblings ...)
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] manager: add completion for opt. " Hannes Laimer
@ 2023-08-08 12:13 ` Hannes Laimer
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore Hannes Laimer
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 13+ messages in thread
From: Hannes Laimer @ 2023-08-08 12:13 UTC (permalink / raw)
  To: pbs-devel

... since the functions don't actually need to own the value.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 examples/download-speed.rs           | 2 +-
 pbs-client/src/backup_reader.rs      | 2 +-
 proxmox-backup-client/src/catalog.rs | 4 ++--
 proxmox-backup-client/src/main.rs    | 2 +-
 proxmox-backup-client/src/mount.rs   | 2 +-
 proxmox-file-restore/src/main.rs     | 4 ++--
 src/bin/proxmox_backup_debug/diff.rs | 2 +-
 src/server/pull.rs                   | 2 +-
 8 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/examples/download-speed.rs b/examples/download-speed.rs
index c487d704..fe700982 100644
--- a/examples/download-speed.rs
+++ b/examples/download-speed.rs
@@ -34,7 +34,7 @@ async fn run() -> Result<(), Error> {
     let backup_time = proxmox_time::parse_rfc3339("2019-06-28T10:49:48Z")?;
 
     let client = BackupReader::start(
-        client,
+        &client,
         None,
         "store2",
         &BackupNamespace::root(),
diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
index 2cd4dc27..36d8ebcf 100644
--- a/pbs-client/src/backup_reader.rs
+++ b/pbs-client/src/backup_reader.rs
@@ -44,7 +44,7 @@ impl BackupReader {
 
     /// Create a new instance by upgrading the connection at '/api2/json/reader'
     pub async fn start(
-        client: HttpClient,
+        client: &HttpClient,
         crypt_config: Option<Arc<CryptConfig>>,
         datastore: &str,
         ns: &BackupNamespace,
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index 8c8c1458..72b22e67 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -75,7 +75,7 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
     let client = connect(&repo)?;
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
@@ -187,7 +187,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index d9e7b899..d67b1373 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -1295,7 +1295,7 @@ async fn restore(
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &ns,
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index 242556d0..4a2f8335 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -234,7 +234,7 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
diff --git a/proxmox-file-restore/src/main.rs b/proxmox-file-restore/src/main.rs
index 9c74a476..50875a63 100644
--- a/proxmox-file-restore/src/main.rs
+++ b/proxmox-file-restore/src/main.rs
@@ -107,7 +107,7 @@ async fn list_files(
 ) -> Result<Vec<ArchiveEntry>, Error> {
     let client = connect(&repo)?;
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &namespace,
@@ -430,7 +430,7 @@ async fn extract(
 
     let client = connect(&repo)?;
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &namespace,
diff --git a/src/bin/proxmox_backup_debug/diff.rs b/src/bin/proxmox_backup_debug/diff.rs
index 9924fb7b..1c64b27a 100644
--- a/src/bin/proxmox_backup_debug/diff.rs
+++ b/src/bin/proxmox_backup_debug/diff.rs
@@ -294,7 +294,7 @@ async fn create_backup_reader(
     };
     let client = connect(&params.repo)?;
     let backup_reader = BackupReader::start(
-        client,
+        &client,
         params.crypt_config.clone(),
         params.repo.store(),
         &params.namespace,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index a973a10e..e55452d1 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -738,7 +738,7 @@ async fn pull_group(
         )?;
 
         let reader = BackupReader::start(
-            new_client,
+            &new_client,
             None,
             params.source.store(),
             &remote_ns,
-- 
2.39.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore
  2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
                   ` (3 preceding siblings ...)
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] accept a ref to a HttpClient Hannes Laimer
@ 2023-08-08 12:13 ` Hannes Laimer
  2023-08-24 13:09   ` Wolfgang Bumiller
  2023-09-21 11:10   ` Lukas Wagner
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] pull: add support for pulling from local datastore Hannes Laimer
  2023-09-21 10:01 ` [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Lukas Wagner
  6 siblings, 2 replies; 13+ messages in thread
From: Hannes Laimer @ 2023-08-08 12:13 UTC (permalink / raw)
  To: pbs-devel

... making the pull logic independent from the actual source
using two traits.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 Cargo.toml                      |   2 +
 pbs-datastore/src/read_chunk.rs |   2 +-
 src/api2/config/remote.rs       |  14 +-
 src/api2/pull.rs                |  31 +-
 src/server/pull.rs              | 943 +++++++++++++++++++-------------
 5 files changed, 570 insertions(+), 422 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 4d34f8a1..74cb68e0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
 
 # regular crates
 anyhow = "1.0"
+async-trait = "0.1.56"
 apt-pkg-native = "0.3.2"
 base64 = "0.13"
 bitflags = "1.2.1"
@@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
 
 [dependencies]
 anyhow.workspace = true
+async-trait.workspace = true
 apt-pkg-native.workspace = true
 base64.workspace = true
 bitflags.workspace = true
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
     fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
 }
 
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
     /// Returns the encoded chunk data
     fn read_raw_chunk<'a>(
         &'a self,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 307cf3cd..2511c5d5 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
     Ok(())
 }
 
-/// Helper to get client for remote.cfg entry
-pub async fn remote_client(
+/// Helper to get client for remote.cfg entry without login, just config
+pub fn remote_client_config(
     remote: &Remote,
     limit: Option<RateLimitConfig>,
 ) -> Result<HttpClient, Error> {
@@ -320,6 +320,16 @@ pub async fn remote_client(
         &remote.config.auth_id,
         options,
     )?;
+
+    Ok(client)
+}
+
+/// Helper to get client for remote.cfg entry
+pub async fn remote_client(
+    remote: &Remote,
+    limit: Option<RateLimitConfig>,
+) -> Result<HttpClient, Error> {
+    let client = remote_client_config(remote, limit)?;
     let _auth_info = client
         .login() // make sure we can auth
         .await
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 664ecce5..e36a5b14 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -8,7 +8,7 @@ use proxmox_sys::task_log;
 
 use pbs_api_types::{
     Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
-    GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
     PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     TRANSFER_LAST_SCHEMA,
 };
@@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
         PullParameters::new(
             &sync_job.store,
             sync_job.ns.clone().unwrap_or_default(),
-            sync_job.remote.as_deref().unwrap_or("local"),
+            sync_job.remote.as_deref(),
             &sync_job.remote_store,
             sync_job.remote_ns.clone().unwrap_or_default(),
             sync_job
@@ -124,7 +124,6 @@ pub fn do_sync_job(
 
             let worker_future = async move {
                 let pull_params = PullParameters::try_from(&sync_job)?;
-                let client = pull_params.client().await?;
 
                 task_log!(worker, "Starting datastore sync job '{}'", job_id);
                 if let Some(event_str) = schedule {
@@ -138,24 +137,7 @@ pub fn do_sync_job(
                     sync_job.remote_store,
                 );
 
-                if sync_job.remote.is_some() {
-                    pull_store(&worker, &client, pull_params).await?;
-                } else {
-                    if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
-                        if target_ns.path().starts_with(source_ns.path())
-                            && sync_job.store == sync_job.remote_store
-                            && sync_job.max_depth.map_or(true, |sync_depth| {
-                            target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
-                        }) {
-                            task_log!(
-                                worker,
-                                "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
-                            );
-                        }
-                    } else {
-                        pull_store(&worker, &client, pull_params).await?;
-                    }
-                }
+                pull_store(&worker, pull_params).await?;
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -284,7 +266,7 @@ async fn pull(
     let pull_params = PullParameters::new(
         &store,
         ns,
-        remote.as_deref().unwrap_or("local"),
+        remote.as_deref(),
         &remote_store,
         remote_ns.unwrap_or_default(),
         auth_id.clone(),
@@ -294,7 +276,6 @@ async fn pull(
         limit,
         transfer_last,
     )?;
-    let client = pull_params.client().await?;
 
     // fixme: set to_stdout to false?
     // FIXME: add namespace to worker id?
@@ -312,7 +293,7 @@ async fn pull(
                 remote_store,
             );
 
-            let pull_future = pull_store(&worker, &client, pull_params);
+            let pull_future = pull_store(&worker, pull_params);
             (select! {
                 success = pull_future.fuse() => success,
                 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
@@ -327,4 +308,4 @@ async fn pull(
     Ok(upid_str)
 }
 
-pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
\ No newline at end of file
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e55452d1..e1a27a8c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::Seek;
+use std::path::Path;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
 use proxmox_router::HttpError;
-use proxmox_sys::task_log;
+use proxmox_sys::{task_log, task_warn};
+use serde_json::json;
 
 use pbs_api_types::{
-    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
-    Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
+    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+    GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
 };
-
-use pbs_client::{
-    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
+use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
 use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
 
 use crate::backup::{check_ns_modification_privs, check_ns_privs};
 use crate::tools::parallel_handler::ParallelHandler;
 
-/// Parameters for a pull operation.
-pub(crate) struct PullParameters {
-    /// Remote that is pulled from
-    remote: Remote,
-    /// Full specification of remote datastore
-    source: BackupRepository,
-    /// Local store that is pulled into
+struct RemoteReader {
+    backup_reader: Arc<BackupReader>,
+    dir: BackupDir,
+}
+
+pub(crate) struct PullTarget {
     store: Arc<DataStore>,
-    /// Remote namespace
-    remote_ns: BackupNamespace,
-    /// Local namespace (anchor)
     ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+    repo: BackupRepository,
+    ns: BackupNamespace,
+    client: HttpClient,
+}
+
+#[async_trait::async_trait]
+/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source
+trait PullSource: Send + Sync {
+    /// Lists namespaces from the source.
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error>;
+
+    /// Lists groups within a specific namespace from the source.
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error>;
+
+    /// Lists backup directories for a specific group within a specific namespace from the source.
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error>;
+    fn get_ns(&self) -> BackupNamespace;
+    fn print_store_and_ns(&self) -> String;
+
+    /// Returns a reader for reading data from a specific backup directory.
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error>;
+}
+
+#[async_trait::async_trait]
+impl PullSource for RemoteSource {
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+            vec![self.ns.clone()];
+        }
+
+        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+        let mut data = json!({});
+        if let Some(max_depth) = max_depth {
+            data["max-depth"] = json!(max_depth);
+        }
+
+        if !self.ns.is_root() {
+            data["parent"] = json!(self.ns);
+        }
+        self.client.login().await?;
+
+        let mut result = match self.client.get(&path, Some(data)).await {
+            Ok(res) => res,
+            Err(err) => match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match code {
+                    &StatusCode::NOT_FOUND => {
+                        if self.ns.is_root() && max_depth.is_none() {
+                            task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+                            task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+                            max_depth.replace(0);
+                        } else {
+                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+                        }
+
+                        return Ok(vec![self.ns.clone()]);
+                    }
+                    _ => {
+                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    bail!("Querying namespaces failed - {err}");
+                }
+            },
+        };
+
+        let list: Vec<BackupNamespace> =
+            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+                .iter()
+                .map(|list_item| list_item.ns.clone())
+                .collect();
+
+        Ok(list)
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        _owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error> {
+        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+        let args = if !namespace.is_root() {
+            Some(json!({ "ns": namespace.clone() }))
+        } else {
+            None
+        };
+
+        self.client.login().await?;
+        let mut result =
+            self.client.get(&path, args).await.map_err(|err| {
+                format_err!("Failed to retrieve backup groups from remote - {}", err)
+            })?;
+
+        Ok(
+            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+                .map_err(Error::from)?
+                .into_iter()
+                .map(|item| item.backup)
+                .collect::<Vec<BackupGroup>>(),
+        )
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        _namespace: &BackupNamespace,
+        group: &BackupGroup,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error> {
+        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+        let mut args = json!({
+            "backup-type": group.ty,
+            "backup-id": group.id,
+        });
+
+        if !self.ns.is_root() {
+            args["ns"] = serde_json::to_value(&self.ns)?;
+        }
+
+        self.client.login().await?;
+
+        let mut result = self.client.get(&path, Some(args)).await?;
+        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+        Ok(snapshot_list
+            .into_iter()
+            .filter_map(|item: SnapshotListItem| {
+                let snapshot = item.backup;
+                // in-progress backups can't be synced
+                if item.size.is_none() {
+                    task_log!(
+                        worker,
+                        "skipping snapshot {} - in-progress backup",
+                        snapshot
+                    );
+                    return None;
+                }
+
+                Some(snapshot)
+            })
+            .collect::<Vec<BackupDir>>())
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        self.ns.clone()
+    }
+
+    fn print_store_and_ns(&self) -> String {
+        print_store_and_ns(self.repo.store(), &self.ns)
+    }
+
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error> {
+        let backup_reader =
+            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+        Ok(Arc::new(RemoteReader {
+            backup_reader,
+            dir: dir.clone(),
+        }))
+    }
+}
+
+#[async_trait::async_trait]
+/// `PullReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
+trait PullReader: Send + Sync {
+    /// Returns a chunk reader with the specified encryption mode.
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+    /// Asynchronously loads a file from the source into a local file.
+    /// `filename` is the name of the file to load from the source.
+    /// `into` is the path of the local file to load the source file into.
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error>;
+
+    /// Tries to download the client log from the source and save it into a local file.
+    async fn try_download_client_log(
+        &self,
+        to_path: &Path,
+        worker: &WorkerTask,
+    ) -> Result<(), Error>;
+
+    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+#[async_trait::async_trait]
+impl PullReader for RemoteReader {
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+        Arc::new(RemoteChunkReader::new(
+            self.backup_reader.clone(),
+            None,
+            crypt_mode,
+            HashMap::new(),
+        ))
+    }
+
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+        if let Err(err) = download_result {
+            match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match *code {
+                    StatusCode::NOT_FOUND => {
+                        task_log!(
+                            worker,
+                            "skipping snapshot {} - vanished since start of sync",
+                            &self.dir,
+                        );
+                        return Ok(None);
+                    }
+                    _ => {
+                        bail!("HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    return Err(err);
+                }
+            };
+        };
+        tmp_file.rewind()?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    async fn try_download_client_log(
+        &self,
+        to_path: &Path,
+        worker: &WorkerTask,
+    ) -> Result<(), Error> {
+        let mut tmp_path = to_path.to_owned();
+        tmp_path.set_extension("tmp");
+
+        let tmpfile = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .read(true)
+            .open(&tmp_path)?;
+
+        // Note: be silent if there is no log - only log successful download
+        if let Ok(()) = self
+            .backup_reader
+            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+            .await
+        {
+            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+                bail!("Atomic rename file {:?} failed - {}", to_path, err);
+            }
+            task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+        }
+
+        Ok(())
+    }
+
+    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+        false
+    }
+}
+
+/// Parameters for a pull operation.
+pub(crate) struct PullParameters {
+    /// Where data is pulled from
+    source: Arc<dyn PullSource>,
+    /// Where data should be pulled into
+    target: PullTarget,
     /// Owner of synced groups (needs to match local owner of pre-existing groups)
     owner: Authid,
     /// Whether to remove groups which exist locally, but not on the remote end
@@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
     max_depth: Option<usize>,
     /// Filters for reducing the pull scope
     group_filter: Option<Vec<GroupFilter>>,
-    /// Rate limits for all transfers from `remote`
-    limit: RateLimitConfig,
     /// How many snapshots should be transferred at most (taking the newest N snapshots)
     transfer_last: Option<usize>,
 }
 
 impl PullParameters {
     /// Creates a new instance of `PullParameters`.
-    ///
-    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
-    /// [BackupRepository] with `remote_store`.
-    #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         store: &str,
         ns: BackupNamespace,
-        remote: &str,
+        remote: Option<&str>,
         remote_store: &str,
         remote_ns: BackupNamespace,
         owner: Authid,
@@ -82,49 +376,56 @@ impl PullParameters {
         limit: RateLimitConfig,
         transfer_last: Option<usize>,
     ) -> Result<Self, Error> {
-        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
         if let Some(max_depth) = max_depth {
             ns.check_max_depth(max_depth)?;
             remote_ns.check_max_depth(max_depth)?;
-        }
-
-        let (remote_config, _digest) = pbs_config::remote::config()?;
-        let remote: Remote = remote_config.lookup("remote", remote)?;
-
+        };
         let remove_vanished = remove_vanished.unwrap_or(false);
 
-        let source = BackupRepository::new(
-            Some(remote.config.auth_id.clone()),
-            Some(remote.config.host.clone()),
-            remote.config.port,
-            remote_store.to_string(),
-        );
+        let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+            let (remote_config, _digest) = pbs_config::remote::config()?;
+            let remote: Remote = remote_config.lookup("remote", remote)?;
 
-        Ok(Self {
-            remote,
-            remote_ns,
+            let repo = BackupRepository::new(
+                Some(remote.config.auth_id.clone()),
+                Some(remote.config.host.clone()),
+                remote.config.port,
+                remote_store.to_string(),
+            );
+            let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
+            Arc::new(RemoteSource {
+                repo,
+                ns: remote_ns,
+                client,
+            })
+        } else {
+            bail!("local sync not implemented yet")
+        };
+        let target = PullTarget {
+            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
             ns,
+        };
+
+        Ok(Self {
             source,
-            store,
+            target,
             owner,
             remove_vanished,
             max_depth,
             group_filter,
-            limit,
             transfer_last,
         })
     }
 
-    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
-    pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
+        let source_ns = self.source.get_ns();
+        source_ns.map_prefix(&source_ns, &self.target.ns)
     }
 }
 
 async fn pull_index_chunks<I: IndexFile>(
     worker: &WorkerTask,
-    chunk_reader: RemoteChunkReader,
+    chunk_reader: Arc<dyn AsyncReadChunk>,
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
     Ok(())
 }
 
-async fn download_manifest(
-    reader: &BackupReader,
-    filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
-    let mut tmp_manifest_file = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .truncate(true)
-        .read(true)
-        .open(filename)?;
-
-    reader
-        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
-        .await?;
-
-    tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
-    Ok(tmp_manifest_file)
-}
-
 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
     if size != info.size {
         bail!(
@@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// Pulls a single file referenced by a manifest.
 ///
 /// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
-/// - Verify tmp file checksum
+/// - Load archive file into tmp file
+/// -- Load file into tmp file
+/// -- Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
-async fn pull_single_archive(
-    worker: &WorkerTask,
-    reader: &BackupReader,
-    chunk_reader: &mut RemoteChunkReader,
-    snapshot: &pbs_datastore::BackupDir,
-    archive_info: &FileInfo,
+async fn pull_single_archive<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
+    archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let archive_name = &archive_info.filename;
@@ -277,13 +557,11 @@ async fn pull_single_archive(
 
     task_log!(worker, "sync archive {}", archive_name);
 
-    let mut tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
+    reader
+        .load_file_into(archive_name, &tmp_path, worker)
+        .await?;
 
-    reader.download(archive_name, &mut tmpfile).await?;
+    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
@@ -293,14 +571,18 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+                task_log!(worker, "skipping chunk sync for same datatsore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    reader.chunk_reader(archive_info.crypt_mode),
+                    snapshot.datastore().clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -309,17 +591,21 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+                task_log!(worker, "skipping chunk sync for same datatsore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    reader.chunk_reader(archive_info.crypt_mode),
+                    snapshot.datastore().clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::Blob => {
-            tmpfile.seek(SeekFrom::Start(0))?;
+            tmpfile.rewind()?;
             let (csum, size) = sha256(&mut tmpfile)?;
             verify_archive(archive_info, &csum, size)?;
         }
@@ -330,33 +616,6 @@ async fn pull_single_archive(
     Ok(())
 }
 
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    path: &std::path::Path,
-) -> Result<(), Error> {
-    let mut tmp_path = path.to_owned();
-    tmp_path.set_extension("tmp");
-
-    let tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
-
-    // Note: be silent if there is no log - only log successful download
-    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
-        if let Err(err) = std::fs::rename(&tmp_path, path) {
-            bail!("Atomic rename file {:?} failed - {}", path, err);
-        }
-        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
-    }
-
-    Ok(())
-}
-
 /// Actual implementation of pulling a snapshot.
 ///
 /// Pulling a snapshot consists of the following steps:
@@ -366,10 +625,10 @@ async fn try_client_log_download(
 /// -- if file already exists, verify contents
 /// -- if not, pull it from the remote
 /// - Download log if not already existing
-async fn pull_snapshot(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+async fn pull_snapshot<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let mut manifest_name = snapshot.full_path();
@@ -380,32 +639,15 @@ async fn pull_snapshot(
 
     let mut tmp_manifest_name = manifest_name.clone();
     tmp_manifest_name.set_extension("tmp");
-
-    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
-    let mut tmp_manifest_file = match download_res {
-        Ok(manifest_file) => manifest_file,
-        Err(err) => {
-            match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match *code {
-                    StatusCode::NOT_FOUND => {
-                        task_log!(
-                            worker,
-                            "skipping snapshot {} - vanished since start of sync",
-                            snapshot.dir(),
-                        );
-                        return Ok(());
-                    }
-                    _ => {
-                        bail!("HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    return Err(err);
-                }
-            };
-        }
-    };
-    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+    let tmp_manifest_blob;
+    if let Some(data) = reader
+        .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
+        .await?
+    {
+        tmp_manifest_blob = data;
+    } else {
+        return Ok(());
+    }
 
     if manifest_name.exists() {
         let manifest_blob = proxmox_lang::try_block!({
@@ -422,8 +664,10 @@ async fn pull_snapshot(
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
-                try_client_log_download(worker, reader, &client_log_name).await?;
-            }
+                reader
+                    .try_download_client_log(&client_log_name, worker)
+                    .await?;
+            };
             task_log!(worker, "no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(()); // nothing changed
@@ -471,17 +715,9 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(
-            reader.clone(),
-            None,
-            item.chunk_crypt_mode(),
-            HashMap::new(),
-        );
-
         pull_single_archive(
             worker,
-            &reader,
-            &mut chunk_reader,
+            reader.clone(),
             snapshot,
             item,
             downloaded_chunks.clone(),
@@ -494,9 +730,10 @@ async fn pull_snapshot(
     }
 
     if !client_log_name.exists() {
-        try_client_log_download(worker, reader, &client_log_name).await?;
-    }
-
+        reader
+            .try_download_client_log(&client_log_name, worker)
+            .await?;
+    };
     snapshot
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,12 +743,12 @@ async fn pull_snapshot(
 
 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
 ///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
-async fn pull_snapshot_from(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+/// The `reader` is configured to read from the source backup directory, while the
+/// `snapshot` is pointing to the local datastore and target namespace.
+async fn pull_snapshot_from<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let (_path, is_new, _snap_lock) = snapshot
@@ -626,11 +863,10 @@ impl std::fmt::Display for SkipInfo {
 /// - Sort by snapshot time
 /// - Get last snapshot timestamp on local datastore
 /// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
 ///
-/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
+/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
 /// namespace support yet.
 ///
@@ -639,117 +875,79 @@ impl std::fmt::Display for SkipInfo {
 /// - local group owner is already checked by pull_store
 async fn pull_group(
     worker: &WorkerTask,
-    client: &HttpClient,
     params: &PullParameters,
-    group: &pbs_api_types::BackupGroup,
-    remote_ns: BackupNamespace,
+    source_namespace: &BackupNamespace,
+    group: &BackupGroup,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    task_log!(worker, "sync group {}", group);
-
-    let path = format!(
-        "api2/json/admin/datastore/{}/snapshots",
-        params.source.store()
-    );
-
-    let mut args = json!({
-        "backup-type": group.ty,
-        "backup-id": group.id,
-    });
-
-    if !remote_ns.is_root() {
-        args["ns"] = serde_json::to_value(&remote_ns)?;
-    }
-
-    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
-
-    let mut result = client.get(&path, Some(args)).await?;
-    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
-    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
-    client.login().await?; // make sure auth is complete
-
-    let fingerprint = client.fingerprint();
-
-    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
-    let last_sync_time = last_sync.unwrap_or(i64::MIN);
-
-    let mut remote_snapshots = std::collections::HashSet::new();
-
-    // start with 65536 chunks (up to 256 GiB)
-    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
-    progress.group_snapshots = list.len() as u64;
-
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
 
-    let total_amount = list.len();
+    let mut raw_list: Vec<BackupDir> = params
+        .source
+        .list_backup_dirs(source_namespace, group, worker)
+        .await?;
+    raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
+
+    let total_amount = raw_list.len();
 
     let cutoff = params
         .transfer_last
         .map(|count| total_amount.saturating_sub(count))
         .unwrap_or_default();
 
-    for (pos, item) in list.into_iter().enumerate() {
-        let snapshot = item.backup;
-
-        // in-progress backups can't be synced
-        if item.size.is_none() {
-            task_log!(
-                worker,
-                "skipping snapshot {} - in-progress backup",
-                snapshot
-            );
-            continue;
-        }
-
-        remote_snapshots.insert(snapshot.time);
+    let target_ns = params.get_target_ns()?;
 
-        if last_sync_time > snapshot.time {
-            already_synced_skip_info.update(snapshot.time);
-            continue;
-        } else if already_synced_skip_info.count > 0 {
-            task_log!(worker, "{}", already_synced_skip_info);
-            already_synced_skip_info.reset();
-        }
-
-        if pos < cutoff && last_sync_time != snapshot.time {
-            transfer_last_skip_info.update(snapshot.time);
-            continue;
-        } else if transfer_last_skip_info.count > 0 {
-            task_log!(worker, "{}", transfer_last_skip_info);
-            transfer_last_skip_info.reset();
-        }
-
-        // get updated auth_info (new tickets)
-        let auth_info = client.login().await?;
+    let mut source_snapshots = HashSet::new();
+    let last_sync_time = params
+        .target
+        .store
+        .last_successful_backup(&target_ns, group)?
+        .unwrap_or(i64::MIN);
+
+    let list: Vec<BackupDir> = raw_list
+        .into_iter()
+        .enumerate()
+        .filter(|&(pos, ref dir)| {
+            source_snapshots.insert(dir.time);
+            if last_sync_time > dir.time {
+                already_synced_skip_info.update(dir.time);
+                return false;
+            } else if already_synced_skip_info.count > 0 {
+                task_log!(worker, "{}", already_synced_skip_info);
+                already_synced_skip_info.reset();
+                return true;
+            }
 
-        let options =
-            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
-                .rate_limit(params.limit.clone());
+            if pos < cutoff && last_sync_time != dir.time {
+                transfer_last_skip_info.update(dir.time);
+                return false;
+            } else if transfer_last_skip_info.count > 0 {
+                task_log!(worker, "{}", transfer_last_skip_info);
+                transfer_last_skip_info.reset();
+            }
+            true
+        })
+        .map(|(_, dir)| dir)
+        .collect();
 
-        let new_client = HttpClient::new(
-            params.source.host(),
-            params.source.port(),
-            params.source.auth_id(),
-            options,
-        )?;
+    // start with 65536 chunks (up to 256 GiB)
+    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-        let reader = BackupReader::start(
-            &new_client,
-            None,
-            params.source.store(),
-            &remote_ns,
-            &snapshot,
-            true,
-        )
-        .await?;
+    progress.group_snapshots = list.len() as u64;
 
-        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+    for (pos, from_snapshot) in list.into_iter().enumerate() {
+        let to_snapshot = params
+            .target
+            .store
+            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
 
-        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+        let reader = params
+            .source
+            .reader(source_namespace, &from_snapshot)
+            .await?;
+        let result =
+            pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
 
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
@@ -758,11 +956,14 @@ async fn pull_group(
     }
 
     if params.remove_vanished {
-        let group = params.store.backup_group(target_ns.clone(), group.clone());
+        let group = params
+            .target
+            .store
+            .backup_group(params.get_target_ns()?, group.clone());
         let local_list = group.list_backups()?;
         for info in local_list {
             let snapshot = info.backup_dir;
-            if remote_snapshots.contains(&snapshot.backup_time()) {
+            if source_snapshots.contains(&snapshot.backup_time()) {
                 continue;
             }
             if snapshot.is_protected() {
@@ -774,73 +975,23 @@ async fn pull_group(
                 continue;
             }
             task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
-            params
-                .store
-                .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
+            params.target.store.remove_backup_dir(
+                &params.get_target_ns()?,
+                snapshot.as_ref(),
+                false,
+            )?;
         }
     }
 
     Ok(())
 }
 
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
-    worker: &WorkerTask,
-    client: &HttpClient,
-    params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/namespace",
-        params.source.store()
-    );
-    let mut data = json!({});
-    if let Some(max_depth) = params.max_depth {
-        data["max-depth"] = json!(max_depth);
-    }
-
-    if !params.remote_ns.is_root() {
-        data["parent"] = json!(params.remote_ns);
-    }
-
-    let mut result = match client.get(&path, Some(data)).await {
-        Ok(res) => res,
-        Err(err) => match err.downcast_ref::<HttpError>() {
-            Some(HttpError { code, message }) => match *code {
-                StatusCode::NOT_FOUND => {
-                    if params.remote_ns.is_root() && params.max_depth.is_none() {
-                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
-                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
-                        params.max_depth = Some(0);
-                    } else {
-                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
-                    }
-
-                    return Ok(vec![params.remote_ns.clone()]);
-                }
-                _ => {
-                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
-                }
-            },
-            None => {
-                bail!("Querying namespaces failed - {err}");
-            }
-        },
-    };
-
-    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
-    // parents first
-    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
-    Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
     let mut created = false;
-    let store_ns_str = print_store_and_ns(params.store.name(), ns);
+    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
 
-    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
-        check_ns_modification_privs(params.store.name(), ns, &params.owner)
+    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
             .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
 
         let name = match ns.components().last() {
@@ -850,14 +1001,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
             }
         };
 
-        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
             bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
         }
         created = true;
     }
 
     check_ns_privs(
-        params.store.name(),
+        params.target.store.name(),
         ns,
         &params.owner,
         PRIV_DATASTORE_BACKUP,
@@ -868,10 +1019,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
 }
 
 fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
-    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
+    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
         .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
 
-    params.store.remove_namespace_recursive(local_ns, true)
+    params
+        .target
+        .store
+        .remove_namespace_recursive(local_ns, true)
 }
 
 fn check_and_remove_vanished_ns(
@@ -885,14 +1039,15 @@ fn check_and_remove_vanished_ns(
     // clamp like remote does so that we don't list more than we can ever have synced.
     let max_depth = params
         .max_depth
-        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
 
     let mut local_ns_list: Vec<BackupNamespace> = params
+        .target
         .store
-        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
         .filter(|ns| {
             let user_privs =
-                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
+                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
             user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
         })
         .collect();
@@ -901,7 +1056,7 @@ fn check_and_remove_vanished_ns(
     local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
 
     for local_ns in local_ns_list {
-        if local_ns == params.ns {
+        if local_ns == params.target.ns {
             continue;
         }
 
@@ -948,29 +1103,49 @@ fn check_and_remove_vanished_ns(
 /// - access to sub-NS checked here
 pub(crate) async fn pull_store(
     worker: &WorkerTask,
-    client: &HttpClient,
     mut params: PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
-    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
     let mut errors = false;
 
     let old_max_depth = params.max_depth;
-    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
-        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
+    let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
+        vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
     } else {
-        query_namespaces(worker, client, &mut params).await?
+        params
+            .source
+            .list_namespaces(&mut params.max_depth, worker)
+            .await?
     };
+
+    let ns_layers_to_be_pulled = namespaces
+        .iter()
+        .map(BackupNamespace::depth)
+        .max()
+        .map_or(0, |v| v - params.source.get_ns().depth());
+    let target_depth = params.target.ns.depth();
+
+    if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
+        bail!(
+            "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
+            ns_layers_to_be_pulled,
+            target_depth,
+            MAX_NAMESPACE_DEPTH
+        );
+    }
+
     errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+    namespaces.sort_unstable_by_key(|a| a.name_len());
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut synced_ns = HashSet::with_capacity(namespaces.len());
 
     for namespace in namespaces {
-        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+        let source_store_ns_str = params.source.print_store_and_ns();
 
-        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
-        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
+        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
 
         task_log!(worker, "----");
         task_log!(
@@ -998,7 +1173,7 @@ pub(crate) async fn pull_store(
             }
         }
 
-        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
+        match pull_ns(worker, &namespace, &mut params).await {
             Ok((ns_progress, ns_errors)) => {
                 errors |= ns_errors;
 
@@ -1019,7 +1194,7 @@ pub(crate) async fn pull_store(
                 task_log!(
                     worker,
                     "Encountered errors while syncing namespace {} - {}",
-                    namespace,
+                    &namespace,
                     err,
                 );
             }
@@ -1051,48 +1226,28 @@ pub(crate) async fn pull_store(
 /// - owner check for vanished groups done here
 pub(crate) async fn pull_ns(
     worker: &WorkerTask,
-    client: &HttpClient,
-    params: &PullParameters,
-    source_ns: BackupNamespace,
-    target_ns: BackupNamespace,
+    namespace: &BackupNamespace,
+    params: &mut PullParameters,
 ) -> Result<(StoreProgress, bool), Error> {
-    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
-    let args = if !source_ns.is_root() {
-        Some(json!({
-            "ns": source_ns,
-        }))
-    } else {
-        None
-    };
-
-    let mut result = client
-        .get(&path, args)
-        .await
-        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
-    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+    let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-        let type_order = a.backup.ty.cmp(&b.backup.ty);
+        let type_order = a.ty.cmp(&b.ty);
         if type_order == std::cmp::Ordering::Equal {
-            a.backup.id.cmp(&b.backup.id)
+            a.id.cmp(&b.id)
         } else {
             type_order
         }
     });
 
-    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
         filters.iter().any(|filter| group.matches(filter))
     };
 
-    // Get groups with target NS set
-    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
     let list = if let Some(ref group_filter) = &params.group_filter {
         let unfiltered_count = list.len();
-        let list: Vec<pbs_api_types::BackupGroup> = list
+        let list: Vec<BackupGroup> = list
             .into_iter()
             .filter(|group| apply_filters(group, group_filter))
             .collect();
@@ -1110,13 +1265,14 @@ pub(crate) async fn pull_ns(
 
     let mut errors = false;
 
-    let mut new_groups = std::collections::HashSet::new();
+    let mut new_groups = HashSet::new();
     for group in list.iter() {
         new_groups.insert(group.clone());
     }
 
     let mut progress = StoreProgress::new(list.len() as u64);
 
+    let target_ns = params.get_target_ns()?;
     for (done, group) in list.into_iter().enumerate() {
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
@@ -1124,6 +1280,7 @@ pub(crate) async fn pull_ns(
 
         let (owner, _lock_guard) =
             match params
+                .target
                 .store
                 .create_locked_backup_group(&target_ns, &group, &params.owner)
             {
@@ -1135,7 +1292,9 @@ pub(crate) async fn pull_ns(
                         &group,
                         err
                     );
-                    errors = true; // do not stop here, instead continue
+                    errors = true;
+                    // do not stop here, instead continue
+                    task_log!(worker, "create_locked_backup_group failed");
                     continue;
                 }
             };
@@ -1151,15 +1310,7 @@ pub(crate) async fn pull_ns(
                 owner
             );
             errors = true; // do not stop here, instead continue
-        } else if let Err(err) = pull_group(
-            worker,
-            client,
-            params,
-            &group,
-            source_ns.clone(),
-            &mut progress,
-        )
-        .await
+        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
         {
             task_log!(worker, "sync group {} failed - {}", &group, err,);
             errors = true; // do not stop here, instead continue
@@ -1168,13 +1319,13 @@ pub(crate) async fn pull_ns(
 
     if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
                 let local_group = local_group?;
                 let local_group = local_group.group();
                 if new_groups.contains(local_group) {
                     continue;
                 }
-                let owner = params.store.get_owner(&target_ns, local_group)?;
+                let owner = params.target.store.get_owner(&target_ns, local_group)?;
                 if check_backup_owner(&owner, &params.owner).is_err() {
                     continue;
                 }
@@ -1184,7 +1335,11 @@ pub(crate) async fn pull_ns(
                     }
                 }
                 task_log!(worker, "delete vanished group '{local_group}'",);
-                match params.store.remove_backup_group(&target_ns, local_group) {
+                match params
+                    .target
+                    .store
+                    .remove_backup_group(&target_ns, local_group)
+                {
                     Ok(true) => {}
                     Ok(false) => {
                         task_log!(
-- 
2.39.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 6/6] pull: add support for pulling from local datastore
  2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
                   ` (4 preceding siblings ...)
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore Hannes Laimer
@ 2023-08-08 12:13 ` Hannes Laimer
  2023-09-21 10:01 ` [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Lukas Wagner
  6 siblings, 0 replies; 13+ messages in thread
From: Hannes Laimer @ 2023-08-08 12:13 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 src/server/pull.rs | 143 +++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 138 insertions(+), 5 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index e1a27a8c..a685bccb 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,8 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::Seek;
-use std::path::Path;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
@@ -29,10 +29,12 @@ use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
 use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{
+    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
+};
 use pbs_tools::sha::sha256;
 
-use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
 use crate::tools::parallel_handler::ParallelHandler;
 
 struct RemoteReader {
@@ -40,6 +42,12 @@ struct RemoteReader {
     dir: BackupDir,
 }
 
+struct LocalReader {
+    _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+    path: PathBuf,
+    datastore: Arc<DataStore>,
+}
+
 pub(crate) struct PullTarget {
     store: Arc<DataStore>,
     ns: BackupNamespace,
@@ -51,6 +59,11 @@ pub(crate) struct RemoteSource {
     client: HttpClient,
 }
 
+pub(crate) struct LocalSource {
+    store: Arc<DataStore>,
+    ns: BackupNamespace,
+}
+
 #[async_trait::async_trait]
 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
 /// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -234,6 +247,81 @@ impl PullSource for RemoteSource {
     }
 }
 
+#[async_trait::async_trait]
+impl PullSource for LocalSource {
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        _worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        ListNamespacesRecursive::new_max_depth(
+            self.store.clone(),
+            self.ns.clone(),
+            max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+        )?
+        .collect()
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error> {
+        Ok(ListAccessibleBackupGroups::new_with_privs(
+            &self.store,
+            namespace.clone(),
+            MAX_NAMESPACE_DEPTH,
+            None,
+            None,
+            Some(owner),
+        )?
+        .filter_map(Result::ok)
+        .map(|backup_group| backup_group.group().clone())
+        .collect::<Vec<pbs_api_types::BackupGroup>>())
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+        _worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error> {
+        Ok(self
+            .store
+            .backup_group(namespace.clone(), group.clone())
+            .iter_snapshots()?
+            .filter_map(Result::ok)
+            .map(|snapshot| snapshot.dir().to_owned())
+            .collect::<Vec<BackupDir>>())
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        self.ns.clone()
+    }
+
+    fn print_store_and_ns(&self) -> String {
+        print_store_and_ns(self.store.name(), &self.ns)
+    }
+
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error> {
+        let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
+        let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
+            &dir.full_path(),
+            "snapshot",
+            "locked by another operation",
+        )?;
+        Ok(Arc::new(LocalReader {
+            _dir_lock: Arc::new(Mutex::new(dir_lock)),
+            path: dir.full_path(),
+            datastore: dir.datastore().clone(),
+        }))
+    }
+}
+
 #[async_trait::async_trait]
 /// `PullReader` is a trait that provides an interface for reading data from a source.
 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
@@ -343,6 +431,48 @@ impl PullReader for RemoteReader {
     }
 }
 
+#[async_trait::async_trait]
+impl PullReader for LocalReader {
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+        Arc::new(LocalChunkReader::new(
+            self.datastore.clone(),
+            None,
+            crypt_mode,
+        ))
+    }
+
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        _worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        let mut from_path = self.path.clone();
+        from_path.push(filename);
+        tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+        tmp_file.rewind()?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    async fn try_download_client_log(
+        &self,
+        _to_path: &Path,
+        _worker: &WorkerTask,
+    ) -> Result<(), Error> {
+        Ok(())
+    }
+
+    fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+        self.datastore.name() == target_store_name
+    }
+}
+
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
     /// Where data is pulled from
@@ -399,7 +529,10 @@ impl PullParameters {
                 client,
             })
         } else {
-            bail!("local sync not implemented yet")
+            Arc::new(LocalSource {
+                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+                ns: remote_ns,
+            })
         };
         let target = PullTarget {
             store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
-- 
2.39.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional Hannes Laimer
@ 2023-08-23 11:37   ` Wolfgang Bumiller
  2023-09-21 11:06   ` Lukas Wagner
  1 sibling, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2023-08-23 11:37 UTC (permalink / raw)
  To: Hannes Laimer; +Cc: pbs-devel

On Tue, Aug 08, 2023 at 02:13:39PM +0200, Hannes Laimer wrote:
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>  pbs-api-types/src/jobs.rs         |  5 ++-
>  src/api2/config/remote.rs         |  2 +-
>  src/api2/config/sync.rs           | 41 +++++++++++++-------
>  src/api2/node/tasks.rs            |  4 +-
>  src/api2/pull.rs                  | 62 ++++++++++++++++++++++---------
>  src/server/email_notifications.rs | 18 +++++----
>  6 files changed, 90 insertions(+), 42 deletions(-)
> 
> diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
> index 23e19b7b..85fdbe41 100644
> --- a/pbs-api-types/src/jobs.rs
> +++ b/pbs-api-types/src/jobs.rs
> @@ -467,6 +467,7 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
>          },
>          remote: {
>              schema: REMOTE_ID_SCHEMA,
> +            optional: true,
>          },
>          "remote-store": {
>              schema: DATASTORE_SCHEMA,
> @@ -515,7 +516,9 @@ pub struct SyncJobConfig {
>      pub ns: Option<BackupNamespace>,
>      #[serde(skip_serializing_if = "Option::is_none")]
>      pub owner: Option<Authid>,
> -    pub remote: String,
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    /// None implies local sync.
> +    pub remote: Option<String>,
>      pub remote_store: String,
>      #[serde(skip_serializing_if = "Option::is_none")]
>      pub remote_ns: Option<BackupNamespace>,
> diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
> index 76dd3b89..307cf3cd 100644
> --- a/src/api2/config/remote.rs
> +++ b/src/api2/config/remote.rs
> @@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
>  
>      let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
>      for job in job_list {
> -        if job.remote == name {
> +        if job.remote.map_or(false, |id| id == name) {
>              param_bail!(
>                  "name",
>                  "remote '{}' is used by sync job '{}' (datastore '{}')",
> diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
> index 01e5f2ce..21634bd5 100644
> --- a/src/api2/config/sync.rs
> +++ b/src/api2/config/sync.rs
> @@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
>  
>  use pbs_api_types::{
>      Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
> -    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
> -    PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
> +    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
> +    PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
>  };
>  use pbs_config::sync;
>  
> @@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
>          return false;
>      }
>  
> -    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
> -    remote_privs & PRIV_REMOTE_AUDIT != 0
> +    if let Some(remote) = &job.remote {
> +        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
> +        remote_privs & PRIV_REMOTE_AUDIT != 0
> +    } else {
> +        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
> +        source_ds_privs & PRIV_DATASTORE_AUDIT != 0
> +    }
>  }
>  
>  /// checks whether user can run the corresponding pull job
> @@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
>          return false;
>      }
>  
> -    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
> -    remote_privs & PRIV_REMOTE_READ != 0
> +    if let Some(remote) = &job.remote {
> +        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
> +        remote_privs & PRIV_REMOTE_READ != 0
> +    } else {
> +        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
> +        source_ds_privs & PRIV_DATASTORE_READ != 0
> +    }
>  }
>  
>  #[api(
> @@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
>  #[serde(rename_all = "kebab-case")]
>  /// Deletable property name
>  pub enum DeletableProperty {
> +    /// Delete the remote property(-> meaning local).
> +    Remote,
>      /// Delete the owner property.
>      Owner,
>      /// Delete the comment property.
> @@ -275,6 +287,9 @@ pub fn update_sync_job(
>      if let Some(delete) = delete {
>          for delete_prop in delete {
>              match delete_prop {
> +                DeletableProperty::Remote => {
> +                    data.remote = None;
> +                }
>                  DeletableProperty::Owner => {
>                      data.owner = None;
>                  }
> @@ -334,7 +349,7 @@ pub fn update_sync_job(
>          data.ns = Some(ns);
>      }
>      if let Some(remote) = update.remote {
> -        data.remote = remote;
> +        data.remote = Some(remote);
>      }
>      if let Some(remote_store) = update.remote_store {
>          data.remote_store = remote_store;
> @@ -503,7 +518,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>  
>      let mut job = SyncJobConfig {
>          id: "regular".to_string(),
> -        remote: "remote0".to_string(),
> +        remote: Some("remote0".to_string()),
>          remote_store: "remotestore1".to_string(),
>          remote_ns: None,
>          store: "localstore0".to_string(),
> @@ -538,11 +553,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
>      // reading without proper read permissions on local end must fail
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
>      // reading without proper read permissions on remote end must fail
> -    job.remote = "remote0".to_string();
> +    job.remote = Some("remote0".to_string());
>      job.store = "localstore1".to_string();
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
> @@ -555,10 +570,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      ));
>  
>      // writing without proper write permissions on local end must fail
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>  
>      // writing without proper write permissions on remote end must fail
> -    job.remote = "remote0".to_string();
> +    job.remote = Some("remote0".to_string());
>      job.store = "localstore1".to_string();
>      assert!(!check_sync_job_modify_access(
>          &user_info,
> @@ -567,7 +582,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      ));
>  
>      // reset remote to one where users have access
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>  
>      // user with read permission can only read, but not modify/run
>      assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index 866361c6..12ce70f6 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -75,14 +75,14 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
>                  let local_store = captures.get(3);
>                  let local_ns = captures.get(4).map(|m| m.as_str());
>  
> -                if let (Some(remote), Some(remote_store), Some(local_store)) =
> +                if let (remote, Some(remote_store), Some(local_store)) =

^ remote here comes from a capture group in `SYNC_JOB_WORKER_ID_REGEX`
where the remote is not actually optional here.

>                      (remote, remote_store, local_store)
>                  {
>                      return check_pull_privs(
>                          auth_id,
>                          local_store.as_str(),
>                          local_ns,
> -                        remote.as_str(),
> +                        remote.map(|remote| remote.as_str()),
>                          remote_store.as_str(),
>                          false,
>                      );
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index daeba7cf..664ecce5 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -8,7 +8,7 @@ use proxmox_sys::task_log;
>  
>  use pbs_api_types::{
>      Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
> -    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> +    GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
>      PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
>      TRANSFER_LAST_SCHEMA,
>  };
> @@ -22,7 +22,7 @@ pub fn check_pull_privs(
>      auth_id: &Authid,
>      store: &str,
>      ns: Option<&str>,
> -    remote: &str,
> +    remote: Option<&str>,
>      remote_store: &str,
>      delete: bool,
>  ) -> Result<(), Error> {
> @@ -39,12 +39,22 @@ pub fn check_pull_privs(
>          PRIV_DATASTORE_BACKUP,
>          false,
>      )?;
> -    user_info.check_privs(
> -        auth_id,
> -        &["remote", remote, remote_store],
> -        PRIV_REMOTE_READ,
> -        false,
> -    )?;
> +
> +    if let Some(remote) = remote {
> +        user_info.check_privs(
> +            auth_id,
> +            &["remote", remote, remote_store],
> +            PRIV_REMOTE_READ,
> +            false,
> +        )?;
> +    } else {
> +        user_info.check_privs(
> +            auth_id,
> +            &["datastore", remote_store],
> +            PRIV_DATASTORE_BACKUP,
> +            false,
> +        )?;
> +    }
>  
>      if delete {
>          user_info.check_privs(
> @@ -65,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>          PullParameters::new(
>              &sync_job.store,
>              sync_job.ns.clone().unwrap_or_default(),
> -            &sync_job.remote,
> +            sync_job.remote.as_deref().unwrap_or("local"),
>              &sync_job.remote_store,
>              sync_job.remote_ns.clone().unwrap_or_default(),
>              sync_job
> @@ -91,7 +101,7 @@ pub fn do_sync_job(
>  ) -> Result<String, Error> {
>      let job_id = format!(
>          "{}:{}:{}:{}:{}",
> -        sync_job.remote,
> +        sync_job.remote.clone().unwrap_or("localhost".to_string()),

This was still left unanswered in the last revision:
Can I create a remote and name it `localhost`?
This and the corresponding SYNC_JOB_WORKER_ID_REGEX need to be adapted
accordingly.

Also this patch still uses both "localhost" vs "local"?

I'm starting to wonder whether we should even expose the regexes this
way or instead have a structural type implementing FromStr and Display
instead so it's harder to use wrongly and have both directions in the
same place.

>          sync_job.remote_store,
>          sync_job.store,
>          sync_job.ns.clone().unwrap_or_default(),
> @@ -124,11 +134,28 @@ pub fn do_sync_job(
>                      worker,
>                      "sync datastore '{}' from '{}/{}'",
>                      sync_job.store,
> -                    sync_job.remote,
> +                    sync_job.remote.clone().unwrap_or("local".to_string()),
>                      sync_job.remote_store,
>                  );
>  
> -                pull_store(&worker, &client, pull_params).await?;
> +                if sync_job.remote.is_some() {
> +                    pull_store(&worker, &client, pull_params).await?;
> +                } else {
> +                    if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
> +                        if target_ns.path().starts_with(source_ns.path())
> +                            && sync_job.store == sync_job.remote_store
> +                            && sync_job.max_depth.map_or(true, |sync_depth| {
> +                            target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
> +                        }) {
> +                            task_log!(
> +                                worker,
> +                                "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
> +                            );
> +                        }
> +                    } else {
> +                        pull_store(&worker, &client, pull_params).await?;
> +                    }
> +                }
>  
>                  task_log!(worker, "sync job '{}' end", &job_id);
>  
> @@ -180,6 +207,7 @@ pub fn do_sync_job(
>              },
>              remote: {
>                  schema: REMOTE_ID_SCHEMA,
> +                optional: true,
>              },
>              "remote-store": {
>                  schema: DATASTORE_SCHEMA,
> @@ -224,7 +252,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
>  async fn pull(
>      store: String,
>      ns: Option<BackupNamespace>,
> -    remote: String,
> +    remote: Option<String>,
>      remote_store: String,
>      remote_ns: Option<BackupNamespace>,
>      remove_vanished: Option<bool>,
> @@ -248,7 +276,7 @@ async fn pull(
>          &auth_id,
>          &store,
>          ns_str.as_deref(),
> -        &remote,
> +        remote.as_deref(),
>          &remote_store,
>          delete,
>      )?;
> @@ -256,7 +284,7 @@ async fn pull(
>      let pull_params = PullParameters::new(
>          &store,
>          ns,
> -        &remote,
> +        remote.as_deref().unwrap_or("local"),
>          &remote_store,
>          remote_ns.unwrap_or_default(),
>          auth_id.clone(),
> @@ -280,7 +308,7 @@ async fn pull(
>                  worker,
>                  "pull datastore '{}' from '{}/{}'",
>                  store,
> -                remote,
> +                remote.as_deref().unwrap_or("localhost"),

And I don't think messages should be formatted with slashes if we
normally use this for remote + datastore.

>                  remote_store,
>              );
>  
> @@ -299,4 +327,4 @@ async fn pull(
>      Ok(upid_str)
>  }
>  
> -pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> +pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> \ No newline at end of file
> diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
> index ea1476d7..18881782 100644
> --- a/src/server/email_notifications.rs
> +++ b/src/server/email_notifications.rs
> @@ -484,15 +484,17 @@ pub fn send_sync_status(
>          }
>      };
>  
> +    let tmp_src_string;
> +    let source_str = if let Some(remote) = &job.remote {
> +        tmp_src_string = format!("Sync remote '{}'", remote);
> +        &tmp_src_string
> +    } else {
> +        "Sync local"
> +    };
> +
>      let subject = match result {
> -        Ok(()) => format!(
> -            "Sync remote '{}' datastore '{}' successful",
> -            job.remote, job.remote_store,
> -        ),
> -        Err(_) => format!(
> -            "Sync remote '{}' datastore '{}' failed",
> -            job.remote, job.remote_store,
> -        ),
> +        Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
> +        Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
>      };
>  
>      send_job_status_mail(email, &subject, &text)?;
> -- 
> 2.39.2




^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 3/6] manager: add completion for opt. Remote in SyncJob
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] manager: add completion for opt. " Hannes Laimer
@ 2023-08-24  9:24   ` Wolfgang Bumiller
  0 siblings, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2023-08-24  9:24 UTC (permalink / raw)
  To: Hannes Laimer; +Cc: pbs-devel

On Tue, Aug 08, 2023 at 02:13:41PM +0200, Hannes Laimer wrote:
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>  src/bin/proxmox-backup-manager.rs | 67 +++++++++++++++++++------------
>  1 file changed, 41 insertions(+), 26 deletions(-)
> 
> diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
> index b4cb6cb3..eadfe547 100644
> --- a/src/bin/proxmox-backup-manager.rs
> +++ b/src/bin/proxmox-backup-manager.rs
> @@ -535,35 +535,33 @@ fn get_remote(param: &HashMap<String, String>) -> Option<String> {
>      param.get("remote").map(|r| r.to_owned()).or_else(|| {
>          if let Some(id) = param.get("id") {
>              if let Ok(job) = get_sync_job(id) {
> -                return Some(job.remote);
> +                return job.remote;

^ belongs in patch 1

>              }
>          }
>          None
>      })
>  }
>  
> -fn get_remote_store(param: &HashMap<String, String>) -> Option<(String, String)> {
> +fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>, String)> {
>      let mut job: Option<SyncJobConfig> = None;
>  
>      let remote = param.get("remote").map(|r| r.to_owned()).or_else(|| {
>          if let Some(id) = param.get("id") {
>              job = get_sync_job(id).ok();
>              if let Some(ref job) = job {
> -                return Some(job.remote.clone());
> +                return job.remote.clone();

^ belongs in patch 1

>              }
>          }
>          None
>      });
>  
> -    if let Some(remote) = remote {
> -        let store = param
> -            .get("remote-store")
> -            .map(|r| r.to_owned())
> -            .or_else(|| job.map(|job| job.remote_store));
> +    let store = param
> +        .get("remote-store")
> +        .map(|r| r.to_owned())
> +        .or_else(|| job.map(|job| job.remote_store));
>  
> -        if let Some(store) = store {
> -            return Some((remote, store));
> -        }
> +    if let Some(store) = store {
> +        return Some((remote, store));
>      }
>  
>      None
> @@ -584,7 +582,7 @@ fn get_remote_ns(param: &HashMap<String, String>) -> Option<BackupNamespace> {
>  }
>  
>  // shell completion helper
> -pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
> +pub fn complete_remote_datastore_name(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
>      let mut list = Vec::new();
>  
>      if let Some(remote) = get_remote(param) {
> @@ -595,7 +593,9 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
>                  list.push(item.store);
>              }
>          }
> -    }
> +    } else {
> +        list = pbs_config::datastore::complete_datastore_name(arg, param);
> +    };
>  
>      list
>  }
> @@ -607,17 +607,25 @@ pub fn complete_remote_datastore_namespace(
>  ) -> Vec<String> {
>      let mut list = Vec::new();
>  
> -    if let Some((remote, remote_store)) = get_remote_store(param) {
> -        if let Ok(data) = proxmox_async::runtime::block_on(async move {
> +    if let Some(data) = match get_remote_store(param) {
> +        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
>              crate::api2::config::remote::scan_remote_namespaces(
>                  remote.clone(),
>                  remote_store.clone(),
>              )
>              .await
> -        }) {
> -            for item in data {
> -                list.push(item.ns.name());
> -            }
> +            .ok()
> +        }),
> +        Some((None, source_store)) => {

Something about "remote store" vs "source store" just bugs me here...
Pull jobs should altogether just have a 'source' instead of a 'remote'
:-/

> +            let mut rpcenv = CliEnvironment::new();
> +            rpcenv.set_auth_id(Some(String::from("root@pam")));
> +            crate::api2::admin::namespace::list_namespaces(source_store, None, None, &mut rpcenv)
> +                .ok()
> +        }
> +        _ => None,
> +    } {
> +        for item in data {
> +            list.push(item.ns.name());
>          }
>      }
>  
> @@ -662,19 +670,26 @@ pub fn complete_sync_local_datastore_namespace(
>  pub fn complete_remote_datastore_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
>      let mut list = Vec::new();
>  
> -    if let Some((remote, remote_store)) = get_remote_store(param) {
> -        let ns = get_remote_ns(param);
> -        if let Ok(data) = proxmox_async::runtime::block_on(async move {
> +    let ns = get_remote_ns(param);
> +    if let Some(data) = match get_remote_store(param) {
> +        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
>              crate::api2::config::remote::scan_remote_groups(
>                  remote.clone(),
>                  remote_store.clone(),
>                  ns,
>              )
>              .await
> -        }) {
> -            for item in data {
> -                list.push(format!("{}/{}", item.backup.ty, item.backup.id));
> -            }
> +            .ok()
> +        }),
> +        Some((None, source_store)) => {
> +            let mut rpcenv = CliEnvironment::new();
> +            rpcenv.set_auth_id(Some(String::from("root@pam")));
> +            crate::api2::admin::datastore::list_groups(source_store, ns, &mut rpcenv).ok()
> +        }
> +        _ => None,
> +    } {
> +        for item in data {
> +            list.push(format!("{}/{}", item.backup.ty, item.backup.id));
>          }
>      }
>  
> -- 
> 2.39.2




^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore Hannes Laimer
@ 2023-08-24 13:09   ` Wolfgang Bumiller
  2023-09-21 11:10   ` Lukas Wagner
  1 sibling, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2023-08-24 13:09 UTC (permalink / raw)
  To: Hannes Laimer; +Cc: pbs-devel

On Tue, Aug 08, 2023 at 02:13:43PM +0200, Hannes Laimer wrote:
> ... making the pull logic independent from the actual source
> using two traits.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>  Cargo.toml                      |   2 +
>  pbs-datastore/src/read_chunk.rs |   2 +-
>  src/api2/config/remote.rs       |  14 +-
>  src/api2/pull.rs                |  31 +-
>  src/server/pull.rs              | 943 +++++++++++++++++++-------------
>  5 files changed, 570 insertions(+), 422 deletions(-)
> 
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index e55452d1..e1a27a8c 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
>  //! Sync datastore from remote server
>  
>  use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::Seek;
> +use std::path::Path;
>  use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
>  
>  use anyhow::{bail, format_err, Error};
>  use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
>  use proxmox_router::HttpError;
> -use proxmox_sys::task_log;
> +use proxmox_sys::{task_log, task_warn};
> +use serde_json::json;
>  
>  use pbs_api_types::{
> -    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> -    Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> +    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
> +    GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
>      PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
>  };
> -
> -use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
>  use pbs_datastore::data_blob::DataBlob;
>  use pbs_datastore::dynamic_index::DynamicIndexReader;
>  use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
>      archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>  };
> +use pbs_datastore::read_chunk::AsyncReadChunk;
>  use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
>  use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>  
>  use crate::backup::{check_ns_modification_privs, check_ns_privs};
>  use crate::tools::parallel_handler::ParallelHandler;
>  
> -/// Parameters for a pull operation.
> -pub(crate) struct PullParameters {
> -    /// Remote that is pulled from
> -    remote: Remote,
> -    /// Full specification of remote datastore
> -    source: BackupRepository,
> -    /// Local store that is pulled into
> +struct RemoteReader {
> +    backup_reader: Arc<BackupReader>,
> +    dir: BackupDir,
> +}
> +
> +pub(crate) struct PullTarget {
>      store: Arc<DataStore>,
> -    /// Remote namespace
> -    remote_ns: BackupNamespace,
> -    /// Local namespace (anchor)
>      ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> +    repo: BackupRepository,
> +    ns: BackupNamespace,
> +    client: HttpClient,
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
> +/// The trait includes methods for listing namespaces, groups, and backup directories,
> +/// as well as retrieving a reader for reading data from the source
> +trait PullSource: Send + Sync {
> +    /// Lists namespaces from the source.
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error>;
> +
> +    /// Lists groups within a specific namespace from the source.
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        owner: &Authid,
> +    ) -> Result<Vec<BackupGroup>, Error>;
> +
> +    /// Lists backup directories for a specific group within a specific namespace from the source.
> +    async fn list_backup_dirs(
> +        &self,
> +        namespace: &BackupNamespace,
> +        group: &BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupDir>, Error>;
> +    fn get_ns(&self) -> BackupNamespace;
> +    fn print_store_and_ns(&self) -> String;
> +
> +    /// Returns a reader for reading data from a specific backup directory.
> +    async fn reader(
> +        &self,
> +        ns: &BackupNamespace,
> +        dir: &BackupDir,
> +    ) -> Result<Arc<dyn PullReader>, Error>;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullSource for RemoteSource {
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error> {
> +        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> +            vec![self.ns.clone()];

This (still) does nothing, as mentioned in v2 ;-)

> +        }
> +
> +        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
> +        let mut data = json!({});
> +        if let Some(max_depth) = max_depth {
> +            data["max-depth"] = json!(max_depth);
> +        }
> +
> +        if !self.ns.is_root() {
> +            data["parent"] = json!(self.ns);
> +        }
> +        self.client.login().await?;
> +
> +        let mut result = match self.client.get(&path, Some(data)).await {
> +            Ok(res) => res,
> +            Err(err) => match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match code {
> +                    &StatusCode::NOT_FOUND => {
> +                        if self.ns.is_root() && max_depth.is_none() {
> +                            task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                            task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> +                            max_depth.replace(0);
> +                        } else {
> +                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                        }
> +
> +                        return Ok(vec![self.ns.clone()]);
> +                    }
> +                    _ => {
> +                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    bail!("Querying namespaces failed - {err}");
> +                }
> +            },
> +        };
> +
> +        let list: Vec<BackupNamespace> =
> +            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> +                .iter()
> +                .map(|list_item| list_item.ns.clone())

If you're already modifying this, use

    .into_iter()
    .map(|list_item| list_item.ns)

since we don't really need to clone() here
  

> +                .collect();
> +
> +        Ok(list)
> +    }
> +
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        _owner: &Authid,
> +    ) -> Result<Vec<BackupGroup>, Error> {
> +        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
> +
> +        let args = if !namespace.is_root() {
> +            Some(json!({ "ns": namespace.clone() }))
> +        } else {
> +            None
> +        };
> +
> +        self.client.login().await?;
> +        let mut result =
> +            self.client.get(&path, args).await.map_err(|err| {
> +                format_err!("Failed to retrieve backup groups from remote - {}", err)
> +            })?;
> +
> +        Ok(
> +            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> +                .map_err(Error::from)?
> +                .into_iter()
> +                .map(|item| item.backup)
> +                .collect::<Vec<BackupGroup>>(),
> +        )
> +    }
> +
> +    async fn list_backup_dirs(
> +        &self,
> +        _namespace: &BackupNamespace,
> +        group: &BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupDir>, Error> {
> +        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
> +
> +        let mut args = json!({
> +            "backup-type": group.ty,
> +            "backup-id": group.id,
> +        });
> +
> +        if !self.ns.is_root() {
> +            args["ns"] = serde_json::to_value(&self.ns)?;
> +        }
> +
> +        self.client.login().await?;
> +
> +        let mut result = self.client.get(&path, Some(args)).await?;
> +        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> +        Ok(snapshot_list
> +            .into_iter()
> +            .filter_map(|item: SnapshotListItem| {
> +                let snapshot = item.backup;
> +                // in-progress backups can't be synced
> +                if item.size.is_none() {
> +                    task_log!(
> +                        worker,
> +                        "skipping snapshot {} - in-progress backup",
> +                        snapshot
> +                    );
> +                    return None;
> +                }
> +
> +                Some(snapshot)
> +            })
> +            .collect::<Vec<BackupDir>>())
> +    }
> +
> +    fn get_ns(&self) -> BackupNamespace {
> +        self.ns.clone()
> +    }
> +
> +    fn print_store_and_ns(&self) -> String {
> +        print_store_and_ns(self.repo.store(), &self.ns)
> +    }
> +
> +    async fn reader(
> +        &self,
> +        ns: &BackupNamespace,
> +        dir: &BackupDir,
> +    ) -> Result<Arc<dyn PullReader>, Error> {
> +        let backup_reader =
> +            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
> +        Ok(Arc::new(RemoteReader {
> +            backup_reader,
> +            dir: dir.clone(),
> +        }))
> +    }
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullReader` is a trait that provides an interface for reading data from a source.
> +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
> +trait PullReader: Send + Sync {
> +    /// Returns a chunk reader with the specified encryption mode.
> +    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
> +
> +    /// Asynchronously loads a file from the source into a local file.
> +    /// `filename` is the name of the file to load from the source.
> +    /// `into` is the path of the local file to load the source file into.
> +    async fn load_file_into(
> +        &self,
> +        filename: &str,
> +        into: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error>;
> +
> +    /// Tries to download the client log from the source and save it into a local file.
> +    async fn try_download_client_log(
> +        &self,
> +        to_path: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error>;
> +
> +    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullReader for RemoteReader {
> +    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
> +        Arc::new(RemoteChunkReader::new(
> +            self.backup_reader.clone(),
> +            None,
> +            crypt_mode,
> +            HashMap::new(),
> +        ))
> +    }
> +
> +    async fn load_file_into(
> +        &self,
> +        filename: &str,
> +        into: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        let mut tmp_file = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .truncate(true)
> +            .read(true)
> +            .open(into)?;
> +        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
> +        if let Err(err) = download_result {
> +            match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match *code {
> +                    StatusCode::NOT_FOUND => {
> +                        task_log!(
> +                            worker,
> +                            "skipping snapshot {} - vanished since start of sync",
> +                            &self.dir,
> +                        );
> +                        return Ok(None);
> +                    }
> +                    _ => {
> +                        bail!("HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    return Err(err);
> +                }
> +            };
> +        };
> +        tmp_file.rewind()?;
> +        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
> +    }
> +
> +    async fn try_download_client_log(
> +        &self,
> +        to_path: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error> {
> +        let mut tmp_path = to_path.to_owned();
> +        tmp_path.set_extension("tmp");
> +
> +        let tmpfile = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .read(true)
> +            .open(&tmp_path)?;
> +
> +        // Note: be silent if there is no log - only log successful download
> +        if let Ok(()) = self
> +            .backup_reader
> +            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
> +            .await
> +        {
> +            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> +                bail!("Atomic rename file {:?} failed - {}", to_path, err);
> +            }
> +            task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
> +        false
> +    }
> +}
> +
> +/// Parameters for a pull operation.
> +pub(crate) struct PullParameters {
> +    /// Where data is pulled from
> +    source: Arc<dyn PullSource>,
> +    /// Where data should be pulled into
> +    target: PullTarget,
>      /// Owner of synced groups (needs to match local owner of pre-existing groups)
>      owner: Authid,
>      /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
>      max_depth: Option<usize>,
>      /// Filters for reducing the pull scope
>      group_filter: Option<Vec<GroupFilter>>,
> -    /// Rate limits for all transfers from `remote`
> -    limit: RateLimitConfig,
>      /// How many snapshots should be transferred at most (taking the newest N snapshots)
>      transfer_last: Option<usize>,
>  }
>  
>  impl PullParameters {
>      /// Creates a new instance of `PullParameters`.
> -    ///
> -    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> -    /// [BackupRepository] with `remote_store`.
> -    #[allow(clippy::too_many_arguments)]
>      pub(crate) fn new(
>          store: &str,
>          ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>          remote_store: &str,
>          remote_ns: BackupNamespace,
>          owner: Authid,
> @@ -82,49 +376,56 @@ impl PullParameters {
>          limit: RateLimitConfig,
>          transfer_last: Option<usize>,
>      ) -> Result<Self, Error> {
> -        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
>          if let Some(max_depth) = max_depth {
>              ns.check_max_depth(max_depth)?;
>              remote_ns.check_max_depth(max_depth)?;
> -        }
> -
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> -
> +        };
>          let remove_vanished = remove_vanished.unwrap_or(false);
>  
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> +        let source: Arc<dyn PullSource> = if let Some(remote) = remote {
> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote: Remote = remote_config.lookup("remote", remote)?;
>  
> -        Ok(Self {
> -            remote,
> -            remote_ns,
> +            let repo = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
> +            let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
> +            Arc::new(RemoteSource {
> +                repo,
> +                ns: remote_ns,
> +                client,
> +            })
> +        } else {
> +            bail!("local sync not implemented yet")
> +        };
> +        let target = PullTarget {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
>              ns,
> +        };
> +
> +        Ok(Self {
>              source,
> -            store,
> +            target,
>              owner,
>              remove_vanished,
>              max_depth,
>              group_filter,
> -            limit,
>              transfer_last,
>          })
>      }
>  
> -    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> -    pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> +        let source_ns = self.source.get_ns();
> +        source_ns.map_prefix(&source_ns, &self.target.ns)

^ This code is still weird, again, as already mentioned in v2

>      }
>  }
>  
>  async fn pull_index_chunks<I: IndexFile>(
>      worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
>      Ok(())
>  }
>  
> -async fn download_manifest(
> -    reader: &BackupReader,
> -    filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> -    let mut tmp_manifest_file = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .truncate(true)
> -        .read(true)
> -        .open(filename)?;
> -
> -    reader
> -        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> -        .await?;
> -
> -    tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> -    Ok(tmp_manifest_file)
> -}
> -
>  fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
>      if size != info.size {
>          bail!(
> @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>  /// Pulls a single file referenced by a manifest.
>  ///
>  /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> -/// - Verify tmp file checksum
> +/// - Load archive file into tmp file
> +/// -- Load file into tmp file
> +/// -- Verify tmp file checksum
>  /// - if archive is an index, pull referenced chunks
>  /// - Rename tmp file into real path
> -async fn pull_single_archive(
> -    worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> -    snapshot: &pbs_datastore::BackupDir,
> -    archive_info: &FileInfo,
> +async fn pull_single_archive<'a>(
> +    worker: &'a WorkerTask,
> +    reader: Arc<dyn PullReader + 'a>,
> +    snapshot: &'a pbs_datastore::BackupDir,
> +    archive_info: &'a FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
>      let archive_name = &archive_info.filename;
> @@ -277,13 +557,11 @@ async fn pull_single_archive(
>  
>      task_log!(worker, "sync archive {}", archive_name);
>  
> -    let mut tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> +    reader
> +        .load_file_into(archive_name, &tmp_path, worker)
> +        .await?;
>  
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>  
>      match archive_type(archive_name)? {
>          ArchiveType::DynamicIndex => {
> @@ -293,14 +571,18 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if reader.skip_chunk_sync(snapshot.datastore().name()) {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

The t<->s typo is still there, as mentioned in v2.

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    reader.chunk_reader(archive_info.crypt_mode),
> +                    snapshot.datastore().clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::FixedIndex => {
>              let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -309,17 +591,21 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if reader.skip_chunk_sync(snapshot.datastore().name()) {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

The t<->s typo is still there, as mentioned in v2.

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    reader.chunk_reader(archive_info.crypt_mode),
> +                    snapshot.datastore().clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::Blob => {
> -            tmpfile.seek(SeekFrom::Start(0))?;
> +            tmpfile.rewind()?;
>              let (csum, size) = sha256(&mut tmpfile)?;
>              verify_archive(archive_info, &csum, size)?;
>          }





^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs
  2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
                   ` (5 preceding siblings ...)
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] pull: add support for pulling from local datastore Hannes Laimer
@ 2023-09-21 10:01 ` Lukas Wagner
  6 siblings, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2023-09-21 10:01 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, h.laimer

On Tue Aug 8, 2023 at 2:13 PM CEST, Hannes Laimer wrote:
> Add support for local sync. SyncJobs without a remote are considered local, and
> use a different logic for pulling. In the course of adding the extra pull logic,
> the pull code was rewritten to basically be source independent. Also cli
> completion and the UI was updated to allow Remotes in SyncJobs to be optional.
>

Gave these patches a quick test on the latest master, here is what I found:
  - Rate limiting does not seem to be implemented for local sync jobs. 
    Would be a nice addition in the future (e.g. to reduce IO load during sync
    jobs). For the time being, it would be good to structure the sync job UI
    in such a way that it is clear that a rate limit has no effect on local
    sync jobs.
  - It seems to be possible to create a local sync job that syncs a store to
    itself (same namespace). The job will ultimately fail since we read/write
    from the same (locked) snapshot. I guess it would be good to catch this
    earlier on and provide a better error in that case. Also, maybe there
    should be checks in place when creating the sync job. Might be tricky to
    figure out which variants to allow, since syncing a namespace to another
    namespace within the same store might make sense in *some* use cases.

Apart from that, everything seems to work as expected, consider this:

Tested-by: Lukas Wagner <l.wagner@proxmox.com>




^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional Hannes Laimer
  2023-08-23 11:37   ` Wolfgang Bumiller
@ 2023-09-21 11:06   ` Lukas Wagner
  1 sibling, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2023-09-21 11:06 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Hannes Laimer

Comments inline:

On 8/8/23 14:13, Hannes Laimer wrote:
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index daeba7cf..664ecce5 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -8,7 +8,7 @@ use proxmox_sys::task_log;
>   
>   use pbs_api_types::{
>       Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
> -    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> +    GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
>       PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
>       TRANSFER_LAST_SCHEMA,
>   };
> @@ -22,7 +22,7 @@ pub fn check_pull_privs(
>       auth_id: &Authid,
>       store: &str,
>       ns: Option<&str>,
> -    remote: &str,
> +    remote: Option<&str>,
>       remote_store: &str,
>       delete: bool,
>   ) -> Result<(), Error> {
> @@ -39,12 +39,22 @@ pub fn check_pull_privs(
>           PRIV_DATASTORE_BACKUP,
>           false,
>       )?;
> -    user_info.check_privs(
> -        auth_id,
> -        &["remote", remote, remote_store],
> -        PRIV_REMOTE_READ,
> -        false,
> -    )?;
> +
> +    if let Some(remote) = remote {
> +        user_info.check_privs(
> +            auth_id,
> +            &["remote", remote, remote_store],
> +            PRIV_REMOTE_READ,
> +            false,
> +        )?;
> +    } else {
> +        user_info.check_privs(
> +            auth_id,
> +            &["datastore", remote_store],
> +            PRIV_DATASTORE_BACKUP,
> +            false,
> +        )?;
> +    }
>   
>       if delete {
>           user_info.check_privs(
> @@ -65,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>           PullParameters::new(
>               &sync_job.store,
>               sync_job.ns.clone().unwrap_or_default(),
> -            &sync_job.remote,
> +            sync_job.remote.as_deref().unwrap_or("local"),
>               &sync_job.remote_store,
>               sync_job.remote_ns.clone().unwrap_or_default(),
>               sync_job
> @@ -91,7 +101,7 @@ pub fn do_sync_job(
>   ) -> Result<String, Error> {
>       let job_id = format!(
>           "{}:{}:{}:{}:{}",
> -        sync_job.remote,
> +        sync_job.remote.clone().unwrap_or("localhost".to_string()),
>           sync_job.remote_store,
>           sync_job.store,
>           sync_job.ns.clone().unwrap_or_default(),
> @@ -124,11 +134,28 @@ pub fn do_sync_job(
>                       worker,
>                       "sync datastore '{}' from '{}/{}'",
>                       sync_job.store,
> -                    sync_job.remote,
> +                    sync_job.remote.clone().unwrap_or("local".to_string()),
>                       sync_job.remote_store,
>                   );
>   
> -                pull_store(&worker, &client, pull_params).await?;
> +                if sync_job.remote.is_some() {
> +                    pull_store(&worker, &client, pull_params).await?;
> +                } else {
> +                    if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
> +                        if target_ns.path().starts_with(source_ns.path())
> +                            && sync_job.store == sync_job.remote_store
> +                            && sync_job.max_depth.map_or(true, |sync_depth| {
> +                            target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
> +                        }) {
> +                            task_log!(
> +                                worker,
> +                                "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
> +                            );
> +                        }
> +                    } else {
> +                        pull_store(&worker, &client, pull_params).await?;
> +                    }
> +                }
>   
>                   task_log!(worker, "sync job '{}' end", &job_id);
>   
> @@ -180,6 +207,7 @@ pub fn do_sync_job(
>               },
>               remote: {
>                   schema: REMOTE_ID_SCHEMA,
> +                optional: true,
>               },
>               "remote-store": {
>                   schema: DATASTORE_SCHEMA,
> @@ -224,7 +252,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
>   async fn pull(
>       store: String,
>       ns: Option<BackupNamespace>,
> -    remote: String,
> +    remote: Option<String>,
>       remote_store: String,
>       remote_ns: Option<BackupNamespace>,
>       remove_vanished: Option<bool>,
> @@ -248,7 +276,7 @@ async fn pull(
>           &auth_id,
>           &store,
>           ns_str.as_deref(),
> -        &remote,
> +        remote.as_deref(),
>           &remote_store,
>           delete,
>       )?;
> @@ -256,7 +284,7 @@ async fn pull(
>       let pull_params = PullParameters::new(
>           &store,
>           ns,
> -        &remote,
> +        remote.as_deref().unwrap_or("local"),
>           &remote_store,
>           remote_ns.unwrap_or_default(),
>           auth_id.clone(),
> @@ -280,7 +308,7 @@ async fn pull(
>                   worker,
>                   "pull datastore '{}' from '{}/{}'",
>                   store,
> -                remote,
> +                remote.as_deref().unwrap_or("localhost"),
>                   remote_store,
>               );
>   
> @@ -299,4 +327,4 @@ async fn pull(
>       Ok(upid_str)
>   }
>   
> -pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> +pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> \ No newline at end of file

You stripped the newline at the end of file (and added it back a few 
patches later)

> diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
> index ea1476d7..18881782 100644
> --- a/src/server/email_notifications.rs
> +++ b/src/server/email_notifications.rs
> @@ -484,15 +484,17 @@ pub fn send_sync_status(
>           }
>       };
>   
> +    let tmp_src_string;
> +    let source_str = if let Some(remote) = &job.remote {
> +        tmp_src_string = format!("Sync remote '{}'", remote);
> +        &tmp_src_string
> +    } else {
> +        "Sync local"
> +    };
> +

nit: considering that this is not in a hot loop or anything, I'd
just do a

let source_str = if let Some(...) {
   format!(...)
} else {
   "Sync Local".into()
}

instead of using the temporary variable. Looks far nicer and I guess we 
can spare a couple extra µs when sending the notification email. ;)
No hard feelings tho.

-- 
- Lukas




^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore
  2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore Hannes Laimer
  2023-08-24 13:09   ` Wolfgang Bumiller
@ 2023-09-21 11:10   ` Lukas Wagner
  1 sibling, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2023-09-21 11:10 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Hannes Laimer

Some of the changed lines seem to be overly long (>100 chars), I've 
noted some of the places, but probably did not catch everything.

On 8/8/23 14:13, Hannes Laimer wrote:
> ... making the pull logic independent from the actual source
> using two traits.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>   Cargo.toml                      |   2 +
>   pbs-datastore/src/read_chunk.rs |   2 +-
>   src/api2/config/remote.rs       |  14 +-
>   src/api2/pull.rs                |  31 +-
>   src/server/pull.rs              | 943 +++++++++++++++++++-------------
>   5 files changed, 570 insertions(+), 422 deletions(-)
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index 4d34f8a1..74cb68e0 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
>   
>   # regular crates
>   anyhow = "1.0"
> +async-trait = "0.1.56"
>   apt-pkg-native = "0.3.2"
>   base64 = "0.13"
>   bitflags = "1.2.1"
> @@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
>   
>   [dependencies]
>   anyhow.workspace = true
> +async-trait.workspace = true
>   apt-pkg-native.workspace = true
>   base64.workspace = true
>   bitflags.workspace = true
> diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
> index c04a7431..29ee2d4c 100644
> --- a/pbs-datastore/src/read_chunk.rs
> +++ b/pbs-datastore/src/read_chunk.rs
> @@ -14,7 +14,7 @@ pub trait ReadChunk {
>       fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
>   }
>   
> -pub trait AsyncReadChunk: Send {
> +pub trait AsyncReadChunk: Send + Sync {
>       /// Returns the encoded chunk data
>       fn read_raw_chunk<'a>(
>           &'a self,
> diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
> index 307cf3cd..2511c5d5 100644
> --- a/src/api2/config/remote.rs
> +++ b/src/api2/config/remote.rs
> @@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
>       Ok(())
>   }
>   
> -/// Helper to get client for remote.cfg entry
> -pub async fn remote_client(
> +/// Helper to get client for remote.cfg entry without login, just config
> +pub fn remote_client_config(
>       remote: &Remote,
>       limit: Option<RateLimitConfig>,
>   ) -> Result<HttpClient, Error> {
> @@ -320,6 +320,16 @@ pub async fn remote_client(
>           &remote.config.auth_id,
>           options,
>       )?;
> +
> +    Ok(client)
> +}
> +
> +/// Helper to get client for remote.cfg entry
> +pub async fn remote_client(
> +    remote: &Remote,
> +    limit: Option<RateLimitConfig>,
> +) -> Result<HttpClient, Error> {
> +    let client = remote_client_config(remote, limit)?;
>       let _auth_info = client
>           .login() // make sure we can auth
>           .await
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index 664ecce5..e36a5b14 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -8,7 +8,7 @@ use proxmox_sys::task_log;
>   
>   use pbs_api_types::{
>       Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
> -    GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> +    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
>       PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
>       TRANSFER_LAST_SCHEMA,
>   };
> @@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>           PullParameters::new(
>               &sync_job.store,
>               sync_job.ns.clone().unwrap_or_default(),
> -            sync_job.remote.as_deref().unwrap_or("local"),
> +            sync_job.remote.as_deref(),
>               &sync_job.remote_store,
>               sync_job.remote_ns.clone().unwrap_or_default(),
>               sync_job
> @@ -124,7 +124,6 @@ pub fn do_sync_job(
>   
>               let worker_future = async move {
>                   let pull_params = PullParameters::try_from(&sync_job)?;
> -                let client = pull_params.client().await?;
>   
>                   task_log!(worker, "Starting datastore sync job '{}'", job_id);
>                   if let Some(event_str) = schedule {
> @@ -138,24 +137,7 @@ pub fn do_sync_job(
>                       sync_job.remote_store,
>                   );
>   
> -                if sync_job.remote.is_some() {
> -                    pull_store(&worker, &client, pull_params).await?;
> -                } else {
> -                    if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
> -                        if target_ns.path().starts_with(source_ns.path())
> -                            && sync_job.store == sync_job.remote_store
> -                            && sync_job.max_depth.map_or(true, |sync_depth| {
> -                            target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
> -                        }) {
> -                            task_log!(
> -                                worker,
> -                                "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
> -                            );
> -                        }
> -                    } else {
> -                        pull_store(&worker, &client, pull_params).await?;
> -                    }
> -                }
> +                pull_store(&worker, pull_params).await?;
>   
>                   task_log!(worker, "sync job '{}' end", &job_id);
>   
> @@ -284,7 +266,7 @@ async fn pull(
>       let pull_params = PullParameters::new(
>           &store,
>           ns,
> -        remote.as_deref().unwrap_or("local"),
> +        remote.as_deref(),
>           &remote_store,
>           remote_ns.unwrap_or_default(),
>           auth_id.clone(),
> @@ -294,7 +276,6 @@ async fn pull(
>           limit,
>           transfer_last,
>       )?;
> -    let client = pull_params.client().await?;
>   
>       // fixme: set to_stdout to false?
>       // FIXME: add namespace to worker id?
> @@ -312,7 +293,7 @@ async fn pull(
>                   remote_store,
>               );
>   
> -            let pull_future = pull_store(&worker, &client, pull_params);
> +            let pull_future = pull_store(&worker, pull_params);
>               (select! {
>                   success = pull_future.fuse() => success,
>                   abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> @@ -327,4 +308,4 @@ async fn pull(
>       Ok(upid_str)
>   }
>   
> -pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> \ No newline at end of file
> +pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index e55452d1..e1a27a8c 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
>   //! Sync datastore from remote server
>   
>   use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::Seek;
> +use std::path::Path;
>   use std::sync::atomic::{AtomicUsize, Ordering};
>   use std::sync::{Arc, Mutex};
>   use std::time::SystemTime;
>   
>   use anyhow::{bail, format_err, Error};
>   use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
>   use proxmox_router::HttpError;
> -use proxmox_sys::task_log;
> +use proxmox_sys::{task_log, task_warn};
> +use serde_json::json;
>   
>   use pbs_api_types::{
> -    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> -    Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> +    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
> +    GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
>       PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
>   };
> -
> -use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
>   use pbs_datastore::data_blob::DataBlob;
>   use pbs_datastore::dynamic_index::DynamicIndexReader;
>   use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
>   use pbs_datastore::manifest::{
>       archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>   };
> +use pbs_datastore::read_chunk::AsyncReadChunk;
>   use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
>   use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>   
>   use crate::backup::{check_ns_modification_privs, check_ns_privs};
>   use crate::tools::parallel_handler::ParallelHandler;
>   
> -/// Parameters for a pull operation.
> -pub(crate) struct PullParameters {
> -    /// Remote that is pulled from
> -    remote: Remote,
> -    /// Full specification of remote datastore
> -    source: BackupRepository,
> -    /// Local store that is pulled into
> +struct RemoteReader {
> +    backup_reader: Arc<BackupReader>,
> +    dir: BackupDir,
> +}
> +
> +pub(crate) struct PullTarget {
>       store: Arc<DataStore>,
> -    /// Remote namespace
> -    remote_ns: BackupNamespace,
> -    /// Local namespace (anchor)
>       ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> +    repo: BackupRepository,
> +    ns: BackupNamespace,
> +    client: HttpClient,
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
> +/// The trait includes methods for listing namespaces, groups, and backup directories,
> +/// as well as retrieving a reader for reading data from the source
> +trait PullSource: Send + Sync {
> +    /// Lists namespaces from the source.
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error>;
> +
> +    /// Lists groups within a specific namespace from the source.
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        owner: &Authid,
> +    ) -> Result<Vec<BackupGroup>, Error>;
> +
> +    /// Lists backup directories for a specific group within a specific namespace from the source.
> +    async fn list_backup_dirs(
> +        &self,
> +        namespace: &BackupNamespace,
> +        group: &BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupDir>, Error>;
> +    fn get_ns(&self) -> BackupNamespace;
> +    fn print_store_and_ns(&self) -> String;
> +
> +    /// Returns a reader for reading data from a specific backup directory.
> +    async fn reader(
> +        &self,
> +        ns: &BackupNamespace,
> +        dir: &BackupDir,
> +    ) -> Result<Arc<dyn PullReader>, Error>;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullSource for RemoteSource {
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error> {
> +        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> +            vec![self.ns.clone()];
> +        }
> +
> +        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
> +        let mut data = json!({});
> +        if let Some(max_depth) = max_depth {
> +            data["max-depth"] = json!(max_depth);
> +        }
> +
> +        if !self.ns.is_root() {
> +            data["parent"] = json!(self.ns);
> +        }
> +        self.client.login().await?;
> +
> +        let mut result = match self.client.get(&path, Some(data)).await {
> +            Ok(res) => res,
> +            Err(err) => match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match code {
> +                    &StatusCode::NOT_FOUND => {
> +                        if self.ns.is_root() && max_depth.is_none() {
> +                            task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                            task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");

These lines exceed our 100 character limit.

> +                            max_depth.replace(0);
> +                        } else {
> +                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                        }
> +
> +                        return Ok(vec![self.ns.clone()]);
> +                    }
> +                    _ => {
> +                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    bail!("Querying namespaces failed - {err}");
> +                }
> +            },
> +        };
> +
> +        let list: Vec<BackupNamespace> =
> +            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> +                .iter()
> +                .map(|list_item| list_item.ns.clone())
> +                .collect();
> +
> +        Ok(list)
> +    }
> +
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        _owner: &Authid,
> +    ) -> Result<Vec<BackupGroup>, Error> {
> +        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
> +
> +        let args = if !namespace.is_root() {
> +            Some(json!({ "ns": namespace.clone() }))
> +        } else {
> +            None
> +        };
> +
> +        self.client.login().await?;
> +        let mut result =
> +            self.client.get(&path, args).await.map_err(|err| {
> +                format_err!("Failed to retrieve backup groups from remote - {}", err)
> +            })?;
> +
> +        Ok(
> +            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> +                .map_err(Error::from)?
> +                .into_iter()
> +                .map(|item| item.backup)
> +                .collect::<Vec<BackupGroup>>(),
> +        )
> +    }
> +
> +    async fn list_backup_dirs(
> +        &self,
> +        _namespace: &BackupNamespace,
> +        group: &BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupDir>, Error> {
> +        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
> +
> +        let mut args = json!({
> +            "backup-type": group.ty,
> +            "backup-id": group.id,
> +        });
> +
> +        if !self.ns.is_root() {
> +            args["ns"] = serde_json::to_value(&self.ns)?;
> +        }
> +
> +        self.client.login().await?;
> +
> +        let mut result = self.client.get(&path, Some(args)).await?;
> +        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> +        Ok(snapshot_list
> +            .into_iter()
> +            .filter_map(|item: SnapshotListItem| {
> +                let snapshot = item.backup;
> +                // in-progress backups can't be synced
> +                if item.size.is_none() {
> +                    task_log!(
> +                        worker,
> +                        "skipping snapshot {} - in-progress backup",
> +                        snapshot
> +                    );
> +                    return None;
> +                }
> +
> +                Some(snapshot)
> +            })
> +            .collect::<Vec<BackupDir>>())
> +    }
> +
> +    fn get_ns(&self) -> BackupNamespace {
> +        self.ns.clone()
> +    }
> +
> +    fn print_store_and_ns(&self) -> String {
> +        print_store_and_ns(self.repo.store(), &self.ns)
> +    }
> +
> +    async fn reader(
> +        &self,
> +        ns: &BackupNamespace,
> +        dir: &BackupDir,
> +    ) -> Result<Arc<dyn PullReader>, Error> {
> +        let backup_reader =
> +            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
> +        Ok(Arc::new(RemoteReader {
> +            backup_reader,
> +            dir: dir.clone(),
> +        }))
> +    }
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullReader` is a trait that provides an interface for reading data from a source.
> +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.

Long line again

> +trait PullReader: Send + Sync {
> +    /// Returns a chunk reader with the specified encryption mode.
> +    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
> +
> +    /// Asynchronously loads a file from the source into a local file.
> +    /// `filename` is the name of the file to load from the source.
> +    /// `into` is the path of the local file to load the source file into.
> +    async fn load_file_into(
> +        &self,
> +        filename: &str,
> +        into: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error>;
> +
> +    /// Tries to download the client log from the source and save it into a local file.
> +    async fn try_download_client_log(
> +        &self,
> +        to_path: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error>;
> +
> +    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullReader for RemoteReader {
> +    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
> +        Arc::new(RemoteChunkReader::new(
> +            self.backup_reader.clone(),
> +            None,
> +            crypt_mode,
> +            HashMap::new(),
> +        ))
> +    }
> +
> +    async fn load_file_into(
> +        &self,
> +        filename: &str,
> +        into: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        let mut tmp_file = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .truncate(true)
> +            .read(true)
> +            .open(into)?;
> +        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
> +        if let Err(err) = download_result {
> +            match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match *code {
> +                    StatusCode::NOT_FOUND => {
> +                        task_log!(
> +                            worker,
> +                            "skipping snapshot {} - vanished since start of sync",
> +                            &self.dir,
> +                        );
> +                        return Ok(None);
> +                    }
> +                    _ => {
> +                        bail!("HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    return Err(err);
> +                }
> +            };
> +        };
> +        tmp_file.rewind()?;
> +        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
> +    }
> +
> +    async fn try_download_client_log(
> +        &self,
> +        to_path: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error> {
> +        let mut tmp_path = to_path.to_owned();
> +        tmp_path.set_extension("tmp");
> +
> +        let tmpfile = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .read(true)
> +            .open(&tmp_path)?;
> +
> +        // Note: be silent if there is no log - only log successful download
> +        if let Ok(()) = self
> +            .backup_reader
> +            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
> +            .await
> +        {
> +            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> +                bail!("Atomic rename file {:?} failed - {}", to_path, err);
> +            }
> +            task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
> +        false
> +    }
> +}
> +
> +/// Parameters for a pull operation.
> +pub(crate) struct PullParameters {
> +    /// Where data is pulled from
> +    source: Arc<dyn PullSource>,
> +    /// Where data should be pulled into
> +    target: PullTarget,
>       /// Owner of synced groups (needs to match local owner of pre-existing groups)
>       owner: Authid,
>       /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
>       max_depth: Option<usize>,
>       /// Filters for reducing the pull scope
>       group_filter: Option<Vec<GroupFilter>>,
> -    /// Rate limits for all transfers from `remote`
> -    limit: RateLimitConfig,
>       /// How many snapshots should be transferred at most (taking the newest N snapshots)
>       transfer_last: Option<usize>,
>   }
>   
>   impl PullParameters {
>       /// Creates a new instance of `PullParameters`.
> -    ///
> -    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> -    /// [BackupRepository] with `remote_store`.
> -    #[allow(clippy::too_many_arguments)]
>       pub(crate) fn new(
>           store: &str,
>           ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>           remote_store: &str,
>           remote_ns: BackupNamespace,
>           owner: Authid,
> @@ -82,49 +376,56 @@ impl PullParameters {
>           limit: RateLimitConfig,
>           transfer_last: Option<usize>,
>       ) -> Result<Self, Error> {
> -        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
>           if let Some(max_depth) = max_depth {
>               ns.check_max_depth(max_depth)?;
>               remote_ns.check_max_depth(max_depth)?;
> -        }
> -
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> -
> +        };
>           let remove_vanished = remove_vanished.unwrap_or(false);
>   
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> +        let source: Arc<dyn PullSource> = if let Some(remote) = remote {
> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote: Remote = remote_config.lookup("remote", remote)?;
>   
> -        Ok(Self {
> -            remote,
> -            remote_ns,
> +            let repo = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
> +            let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
> +            Arc::new(RemoteSource {
> +                repo,
> +                ns: remote_ns,
> +                client,
> +            })
> +        } else {
> +            bail!("local sync not implemented yet")
> +        };
> +        let target = PullTarget {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
>               ns,
> +        };
> +
> +        Ok(Self {
>               source,
> -            store,
> +            target,
>               owner,
>               remove_vanished,
>               max_depth,
>               group_filter,
> -            limit,
>               transfer_last,
>           })
>       }
>   
> -    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> -    pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> +        let source_ns = self.source.get_ns();
> +        source_ns.map_prefix(&source_ns, &self.target.ns)
>       }
>   }
>   
>   async fn pull_index_chunks<I: IndexFile>(
>       worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>       target: Arc<DataStore>,
>       index: I,
>       downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
>       Ok(())
>   }
>   
> -async fn download_manifest(
> -    reader: &BackupReader,
> -    filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> -    let mut tmp_manifest_file = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .truncate(true)
> -        .read(true)
> -        .open(filename)?;
> -
> -    reader
> -        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> -        .await?;
> -
> -    tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> -    Ok(tmp_manifest_file)
> -}
> -
>   fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
>       if size != info.size {
>           bail!(
> @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>   /// Pulls a single file referenced by a manifest.
>   ///
>   /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> -/// - Verify tmp file checksum
> +/// - Load archive file into tmp file
> +/// -- Load file into tmp file
> +/// -- Verify tmp file checksum
>   /// - if archive is an index, pull referenced chunks
>   /// - Rename tmp file into real path
> -async fn pull_single_archive(
> -    worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> -    snapshot: &pbs_datastore::BackupDir,
> -    archive_info: &FileInfo,
> +async fn pull_single_archive<'a>(
> +    worker: &'a WorkerTask,
> +    reader: Arc<dyn PullReader + 'a>,
> +    snapshot: &'a pbs_datastore::BackupDir,
> +    archive_info: &'a FileInfo,
>       downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>   ) -> Result<(), Error> {
>       let archive_name = &archive_info.filename;
> @@ -277,13 +557,11 @@ async fn pull_single_archive(
>   
>       task_log!(worker, "sync archive {}", archive_name);
>   
> -    let mut tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> +    reader
> +        .load_file_into(archive_name, &tmp_path, worker)
> +        .await?;
>   
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>   
>       match archive_type(archive_name)? {
>           ArchiveType::DynamicIndex => {
> @@ -293,14 +571,18 @@ async fn pull_single_archive(
>               let (csum, size) = index.compute_csum();
>               verify_archive(archive_info, &csum, size)?;
>   
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if reader.skip_chunk_sync(snapshot.datastore().name()) {
> +                task_log!(worker, "skipping chunk sync for same datatsore");
> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    reader.chunk_reader(archive_info.crypt_mode),
> +                    snapshot.datastore().clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>           }
>           ArchiveType::FixedIndex => {
>               let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -309,17 +591,21 @@ async fn pull_single_archive(
>               let (csum, size) = index.compute_csum();
>               verify_archive(archive_info, &csum, size)?;
>   
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if reader.skip_chunk_sync(snapshot.datastore().name()) {
> +                task_log!(worker, "skipping chunk sync for same datatsore");
> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    reader.chunk_reader(archive_info.crypt_mode),
> +                    snapshot.datastore().clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>           }
>           ArchiveType::Blob => {
> -            tmpfile.seek(SeekFrom::Start(0))?;
> +            tmpfile.rewind()?;
>               let (csum, size) = sha256(&mut tmpfile)?;
>               verify_archive(archive_info, &csum, size)?;
>           }
> @@ -330,33 +616,6 @@ async fn pull_single_archive(
>       Ok(())
>   }
>   
> -// Note: The client.log.blob is uploaded after the backup, so it is
> -// not mentioned in the manifest.
> -async fn try_client_log_download(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    path: &std::path::Path,
> -) -> Result<(), Error> {
> -    let mut tmp_path = path.to_owned();
> -    tmp_path.set_extension("tmp");
> -
> -    let tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> -
> -    // Note: be silent if there is no log - only log successful download
> -    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> -        if let Err(err) = std::fs::rename(&tmp_path, path) {
> -            bail!("Atomic rename file {:?} failed - {}", path, err);
> -        }
> -        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> -    }
> -
> -    Ok(())
> -}
> -
>   /// Actual implementation of pulling a snapshot.
>   ///
>   /// Pulling a snapshot consists of the following steps:
> @@ -366,10 +625,10 @@ async fn try_client_log_download(
>   /// -- if file already exists, verify contents
>   /// -- if not, pull it from the remote
>   /// - Download log if not already existing
> -async fn pull_snapshot(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +async fn pull_snapshot<'a>(
> +    worker: &'a WorkerTask,
> +    reader: Arc<dyn PullReader + 'a>,
> +    snapshot: &'a pbs_datastore::BackupDir,
>       downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>   ) -> Result<(), Error> {
>       let mut manifest_name = snapshot.full_path();
> @@ -380,32 +639,15 @@ async fn pull_snapshot(
>   
>       let mut tmp_manifest_name = manifest_name.clone();
>       tmp_manifest_name.set_extension("tmp");
> -
> -    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> -    let mut tmp_manifest_file = match download_res {
> -        Ok(manifest_file) => manifest_file,
> -        Err(err) => {
> -            match err.downcast_ref::<HttpError>() {
> -                Some(HttpError { code, message }) => match *code {
> -                    StatusCode::NOT_FOUND => {
> -                        task_log!(
> -                            worker,
> -                            "skipping snapshot {} - vanished since start of sync",
> -                            snapshot.dir(),
> -                        );
> -                        return Ok(());
> -                    }
> -                    _ => {
> -                        bail!("HTTP error {code} - {message}");
> -                    }
> -                },
> -                None => {
> -                    return Err(err);
> -                }
> -            };
> -        }
> -    };
> -    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
> +    let tmp_manifest_blob;
> +    if let Some(data) = reader
> +        .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
> +        .await?
> +    {
> +        tmp_manifest_blob = data;
> +    } else {
> +        return Ok(());
> +    }
>   
>       if manifest_name.exists() {
>           let manifest_blob = proxmox_lang::try_block!({
> @@ -422,8 +664,10 @@ async fn pull_snapshot(
>   
>           if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
>               if !client_log_name.exists() {
> -                try_client_log_download(worker, reader, &client_log_name).await?;
> -            }
> +                reader
> +                    .try_download_client_log(&client_log_name, worker)
> +                    .await?;
> +            };
>               task_log!(worker, "no data changes");
>               let _ = std::fs::remove_file(&tmp_manifest_name);
>               return Ok(()); // nothing changed
> @@ -471,17 +715,9 @@ async fn pull_snapshot(
>               }
>           }
>   
> -        let mut chunk_reader = RemoteChunkReader::new(
> -            reader.clone(),
> -            None,
> -            item.chunk_crypt_mode(),
> -            HashMap::new(),
> -        );
> -
>           pull_single_archive(
>               worker,
> -            &reader,
> -            &mut chunk_reader,
> +            reader.clone(),
>               snapshot,
>               item,
>               downloaded_chunks.clone(),
> @@ -494,9 +730,10 @@ async fn pull_snapshot(
>       }
>   
>       if !client_log_name.exists() {
> -        try_client_log_download(worker, reader, &client_log_name).await?;
> -    }
> -
> +        reader
> +            .try_download_client_log(&client_log_name, worker)
> +            .await?;
> +    };
>       snapshot
>           .cleanup_unreferenced_files(&manifest)
>           .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
> @@ -506,12 +743,12 @@ async fn pull_snapshot(
>   
>   /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
>   ///
> -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
> -/// pointing to the local datastore and target namespace.
> -async fn pull_snapshot_from(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +/// The `reader` is configured to read from the source backup directory, while the
> +/// `snapshot` is pointing to the local datastore and target namespace.
> +async fn pull_snapshot_from<'a>(
> +    worker: &'a WorkerTask,
> +    reader: Arc<dyn PullReader + 'a>,
> +    snapshot: &'a pbs_datastore::BackupDir,
>       downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>   ) -> Result<(), Error> {
>       let (_path, is_new, _snap_lock) = snapshot
> @@ -626,11 +863,10 @@ impl std::fmt::Display for SkipInfo {
>   /// - Sort by snapshot time
>   /// - Get last snapshot timestamp on local datastore
>   /// - Iterate over list of snapshots
> -/// -- Recreate client/BackupReader
>   /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
>   /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
>   ///
> -/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
> +/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
>   /// remote when querying snapshots. This allows us to interact with old remotes that don't have
>   /// namespace support yet.
>   ///
> @@ -639,117 +875,79 @@ impl std::fmt::Display for SkipInfo {
>   /// - local group owner is already checked by pull_store
>   async fn pull_group(
>       worker: &WorkerTask,
> -    client: &HttpClient,
>       params: &PullParameters,
> -    group: &pbs_api_types::BackupGroup,
> -    remote_ns: BackupNamespace,
> +    source_namespace: &BackupNamespace,
> +    group: &BackupGroup,
>       progress: &mut StoreProgress,
>   ) -> Result<(), Error> {
> -    task_log!(worker, "sync group {}", group);
> -
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/snapshots",
> -        params.source.store()
> -    );
> -
> -    let mut args = json!({
> -        "backup-type": group.ty,
> -        "backup-id": group.id,
> -    });
> -
> -    if !remote_ns.is_root() {
> -        args["ns"] = serde_json::to_value(&remote_ns)?;
> -    }
> -
> -    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
> -
> -    let mut result = client.get(&path, Some(args)).await?;
> -    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> -
> -    client.login().await?; // make sure auth is complete
> -
> -    let fingerprint = client.fingerprint();
> -
> -    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
> -    let last_sync_time = last_sync.unwrap_or(i64::MIN);
> -
> -    let mut remote_snapshots = std::collections::HashSet::new();
> -
> -    // start with 65536 chunks (up to 256 GiB)
> -    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
> -
> -    progress.group_snapshots = list.len() as u64;
> -
>       let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
>       let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
>   
> -    let total_amount = list.len();
> +    let mut raw_list: Vec<BackupDir> = params
> +        .source
> +        .list_backup_dirs(source_namespace, group, worker)
> +        .await?;
> +    raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
> +
> +    let total_amount = raw_list.len();
>   
>       let cutoff = params
>           .transfer_last
>           .map(|count| total_amount.saturating_sub(count))
>           .unwrap_or_default();
>   
> -    for (pos, item) in list.into_iter().enumerate() {
> -        let snapshot = item.backup;
> -
> -        // in-progress backups can't be synced
> -        if item.size.is_none() {
> -            task_log!(
> -                worker,
> -                "skipping snapshot {} - in-progress backup",
> -                snapshot
> -            );
> -            continue;
> -        }
> -
> -        remote_snapshots.insert(snapshot.time);
> +    let target_ns = params.get_target_ns()?;
>   
> -        if last_sync_time > snapshot.time {
> -            already_synced_skip_info.update(snapshot.time);
> -            continue;
> -        } else if already_synced_skip_info.count > 0 {
> -            task_log!(worker, "{}", already_synced_skip_info);
> -            already_synced_skip_info.reset();
> -        }
> -
> -        if pos < cutoff && last_sync_time != snapshot.time {
> -            transfer_last_skip_info.update(snapshot.time);
> -            continue;
> -        } else if transfer_last_skip_info.count > 0 {
> -            task_log!(worker, "{}", transfer_last_skip_info);
> -            transfer_last_skip_info.reset();
> -        }
> -
> -        // get updated auth_info (new tickets)
> -        let auth_info = client.login().await?;
> +    let mut source_snapshots = HashSet::new();
> +    let last_sync_time = params
> +        .target
> +        .store
> +        .last_successful_backup(&target_ns, group)?
> +        .unwrap_or(i64::MIN);
> +
> +    let list: Vec<BackupDir> = raw_list
> +        .into_iter()
> +        .enumerate()
> +        .filter(|&(pos, ref dir)| {
> +            source_snapshots.insert(dir.time);
> +            if last_sync_time > dir.time {
> +                already_synced_skip_info.update(dir.time);
> +                return false;
> +            } else if already_synced_skip_info.count > 0 {
> +                task_log!(worker, "{}", already_synced_skip_info);
> +                already_synced_skip_info.reset();
> +                return true;
> +            }
>   
> -        let options =
> -            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> -                .rate_limit(params.limit.clone());
> +            if pos < cutoff && last_sync_time != dir.time {
> +                transfer_last_skip_info.update(dir.time);
> +                return false;
> +            } else if transfer_last_skip_info.count > 0 {
> +                task_log!(worker, "{}", transfer_last_skip_info);
> +                transfer_last_skip_info.reset();
> +            }
> +            true
> +        })
> +        .map(|(_, dir)| dir)
> +        .collect();
>   
> -        let new_client = HttpClient::new(
> -            params.source.host(),
> -            params.source.port(),
> -            params.source.auth_id(),
> -            options,
> -        )?;
> +    // start with 65536 chunks (up to 256 GiB)
> +    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>   
> -        let reader = BackupReader::start(
> -            &new_client,
> -            None,
> -            params.source.store(),
> -            &remote_ns,
> -            &snapshot,
> -            true,
> -        )
> -        .await?;
> +    progress.group_snapshots = list.len() as u64;
>   
> -        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
> +    for (pos, from_snapshot) in list.into_iter().enumerate() {
> +        let to_snapshot = params
> +            .target
> +            .store
> +            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
>   
> -        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> +        let reader = params
> +            .source
> +            .reader(source_namespace, &from_snapshot)
> +            .await?;
> +        let result =
> +            pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
>   
>           progress.done_snapshots = pos as u64 + 1;
>           task_log!(worker, "percentage done: {}", progress);
> @@ -758,11 +956,14 @@ async fn pull_group(
>       }
>   
>       if params.remove_vanished {
> -        let group = params.store.backup_group(target_ns.clone(), group.clone());
> +        let group = params
> +            .target
> +            .store
> +            .backup_group(params.get_target_ns()?, group.clone());
>           let local_list = group.list_backups()?;
>           for info in local_list {
>               let snapshot = info.backup_dir;
> -            if remote_snapshots.contains(&snapshot.backup_time()) {
> +            if source_snapshots.contains(&snapshot.backup_time()) {
>                   continue;
>               }
>               if snapshot.is_protected() {
> @@ -774,73 +975,23 @@ async fn pull_group(
>                   continue;
>               }
>               task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
> -            params
> -                .store
> -                .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
> +            params.target.store.remove_backup_dir(
> +                &params.get_target_ns()?,
> +                snapshot.as_ref(),
> +                false,
> +            )?;
>           }
>       }
>   
>       Ok(())
>   }
>   
> -// will modify params if switching to backwards mode for lack of NS support on remote end
> -async fn query_namespaces(
> -    worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &mut PullParameters,
> -) -> Result<Vec<BackupNamespace>, Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/namespace",
> -        params.source.store()
> -    );
> -    let mut data = json!({});
> -    if let Some(max_depth) = params.max_depth {
> -        data["max-depth"] = json!(max_depth);
> -    }
> -
> -    if !params.remote_ns.is_root() {
> -        data["parent"] = json!(params.remote_ns);
> -    }
> -
> -    let mut result = match client.get(&path, Some(data)).await {
> -        Ok(res) => res,
> -        Err(err) => match err.downcast_ref::<HttpError>() {
> -            Some(HttpError { code, message }) => match *code {
> -                StatusCode::NOT_FOUND => {
> -                    if params.remote_ns.is_root() && params.max_depth.is_none() {
> -                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> -                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> -                        params.max_depth = Some(0);
> -                    } else {
> -                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> -                    }
> -
> -                    return Ok(vec![params.remote_ns.clone()]);
> -                }
> -                _ => {
> -                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> -                }
> -            },
> -            None => {
> -                bail!("Querying namespaces failed - {err}");
> -            }
> -        },
> -    };
> -
> -    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    // parents first
> -    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
> -
> -    Ok(list.iter().map(|item| item.ns.clone()).collect())
> -}
> -
>   fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
>       let mut created = false;
> -    let store_ns_str = print_store_and_ns(params.store.name(), ns);
> +    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
>   
> -    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
> -        check_ns_modification_privs(params.store.name(), ns, &params.owner)
> +    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
> +        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
>               .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
>   
>           let name = match ns.components().last() {
> @@ -850,14 +1001,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>               }
>           };
>   
> -        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
> +        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
>               bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
>           }
>           created = true;
>       }
>   
>       check_ns_privs(
> -        params.store.name(),
> +        params.target.store.name(),
>           ns,
>           &params.owner,
>           PRIV_DATASTORE_BACKUP,
> @@ -868,10 +1019,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>   }
>   
>   fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
> -    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
> +    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
>           .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
>   
> -    params.store.remove_namespace_recursive(local_ns, true)
> +    params
> +        .target
> +        .store
> +        .remove_namespace_recursive(local_ns, true)
>   }
>   
>   fn check_and_remove_vanished_ns(
> @@ -885,14 +1039,15 @@ fn check_and_remove_vanished_ns(
>       // clamp like remote does so that we don't list more than we can ever have synced.
>       let max_depth = params
>           .max_depth
> -        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
> +        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
>   
>       let mut local_ns_list: Vec<BackupNamespace> = params
> +        .target
>           .store
> -        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
> +        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
>           .filter(|ns| {
>               let user_privs =
> -                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
> +                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
>               user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
>           })
>           .collect();
> @@ -901,7 +1056,7 @@ fn check_and_remove_vanished_ns(
>       local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
>   
>       for local_ns in local_ns_list {
> -        if local_ns == params.ns {
> +        if local_ns == params.target.ns {
>               continue;
>           }
>   
> @@ -948,29 +1103,49 @@ fn check_and_remove_vanished_ns(
>   /// - access to sub-NS checked here
>   pub(crate) async fn pull_store(
>       worker: &WorkerTask,
> -    client: &HttpClient,
>       mut params: PullParameters,
>   ) -> Result<(), Error> {
>       // explicit create shared lock to prevent GC on newly created chunks
> -    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
> +    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
>       let mut errors = false;
>   
>       let old_max_depth = params.max_depth;
> -    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
> -        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
> +    let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
> +        vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
>       } else {
> -        query_namespaces(worker, client, &mut params).await?
> +        params
> +            .source
> +            .list_namespaces(&mut params.max_depth, worker)
> +            .await?
>       };
> +
> +    let ns_layers_to_be_pulled = namespaces
> +        .iter()
> +        .map(BackupNamespace::depth)
> +        .max()
> +        .map_or(0, |v| v - params.source.get_ns().depth());
> +    let target_depth = params.target.ns.depth();
> +
> +    if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
> +        bail!(
> +            "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
> +            ns_layers_to_be_pulled,
> +            target_depth,
> +            MAX_NAMESPACE_DEPTH
> +        );
> +    }
> +
>       errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
> +    namespaces.sort_unstable_by_key(|a| a.name_len());
>   
>       let (mut groups, mut snapshots) = (0, 0);
>       let mut synced_ns = HashSet::with_capacity(namespaces.len());
>   
>       for namespace in namespaces {
> -        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
> +        let source_store_ns_str = params.source.print_store_and_ns();
>   
> -        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
> -        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
> +        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
> +        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
>   
>           task_log!(worker, "----");
>           task_log!(
> @@ -998,7 +1173,7 @@ pub(crate) async fn pull_store(
>               }
>           }
>   
> -        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
> +        match pull_ns(worker, &namespace, &mut params).await {
>               Ok((ns_progress, ns_errors)) => {
>                   errors |= ns_errors;
>   
> @@ -1019,7 +1194,7 @@ pub(crate) async fn pull_store(
>                   task_log!(
>                       worker,
>                       "Encountered errors while syncing namespace {} - {}",
> -                    namespace,
> +                    &namespace,
>                       err,
>                   );
>               }
> @@ -1051,48 +1226,28 @@ pub(crate) async fn pull_store(
>   /// - owner check for vanished groups done here
>   pub(crate) async fn pull_ns(
>       worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> -    source_ns: BackupNamespace,
> -    target_ns: BackupNamespace,
> +    namespace: &BackupNamespace,
> +    params: &mut PullParameters,
>   ) -> Result<(StoreProgress, bool), Error> {
> -    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> -
> -    let args = if !source_ns.is_root() {
> -        Some(json!({
> -            "ns": source_ns,
> -        }))
> -    } else {
> -        None
> -    };
> -
> -    let mut result = client
> -        .get(&path, args)
> -        .await
> -        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> -
> -    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +    let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
>   
>       let total_count = list.len();
>       list.sort_unstable_by(|a, b| {
> -        let type_order = a.backup.ty.cmp(&b.backup.ty);
> +        let type_order = a.ty.cmp(&b.ty);
>           if type_order == std::cmp::Ordering::Equal {
> -            a.backup.id.cmp(&b.backup.id)
> +            a.id.cmp(&b.id)
>           } else {
>               type_order
>           }
>       });
>   
> -    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
> +    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
>           filters.iter().any(|filter| group.matches(filter))
>       };
>   
> -    // Get groups with target NS set
> -    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
> -
>       let list = if let Some(ref group_filter) = &params.group_filter {
>           let unfiltered_count = list.len();
> -        let list: Vec<pbs_api_types::BackupGroup> = list
> +        let list: Vec<BackupGroup> = list
>               .into_iter()
>               .filter(|group| apply_filters(group, group_filter))
>               .collect();
> @@ -1110,13 +1265,14 @@ pub(crate) async fn pull_ns(
>   
>       let mut errors = false;
>   
> -    let mut new_groups = std::collections::HashSet::new();
> +    let mut new_groups = HashSet::new();
>       for group in list.iter() {
>           new_groups.insert(group.clone());
>       }
>   
>       let mut progress = StoreProgress::new(list.len() as u64);
>   
> +    let target_ns = params.get_target_ns()?;
>       for (done, group) in list.into_iter().enumerate() {
>           progress.done_groups = done as u64;
>           progress.done_snapshots = 0;
> @@ -1124,6 +1280,7 @@ pub(crate) async fn pull_ns(
>   
>           let (owner, _lock_guard) =
>               match params
> +                .target
>                   .store
>                   .create_locked_backup_group(&target_ns, &group, &params.owner)
>               {
> @@ -1135,7 +1292,9 @@ pub(crate) async fn pull_ns(
>                           &group,
>                           err
>                       );
> -                    errors = true; // do not stop here, instead continue
> +                    errors = true;
> +                    // do not stop here, instead continue
> +                    task_log!(worker, "create_locked_backup_group failed");
>                       continue;
>                   }
>               };
> @@ -1151,15 +1310,7 @@ pub(crate) async fn pull_ns(
>                   owner
>               );
>               errors = true; // do not stop here, instead continue
> -        } else if let Err(err) = pull_group(
> -            worker,
> -            client,
> -            params,
> -            &group,
> -            source_ns.clone(),
> -            &mut progress,
> -        )
> -        .await
> +        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
>           {
>               task_log!(worker, "sync group {} failed - {}", &group, err,);
>               errors = true; // do not stop here, instead continue
> @@ -1168,13 +1319,13 @@ pub(crate) async fn pull_ns(
>   
>       if params.remove_vanished {
>           let result: Result<(), Error> = proxmox_lang::try_block!({
> -            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
> +            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
>                   let local_group = local_group?;
>                   let local_group = local_group.group();
>                   if new_groups.contains(local_group) {
>                       continue;
>                   }
> -                let owner = params.store.get_owner(&target_ns, local_group)?;
> +                let owner = params.target.store.get_owner(&target_ns, local_group)?;
>                   if check_backup_owner(&owner, &params.owner).is_err() {
>                       continue;
>                   }
> @@ -1184,7 +1335,11 @@ pub(crate) async fn pull_ns(
>                       }
>                   }
>                   task_log!(worker, "delete vanished group '{local_group}'",);
> -                match params.store.remove_backup_group(&target_ns, local_group) {
> +                match params
> +                    .target
> +                    .store
> +                    .remove_backup_group(&target_ns, local_group)
> +                {
>                       Ok(true) => {}
>                       Ok(false) => {
>                           task_log!(

-- 
- Lukas




^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2023-09-21 11:11 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional Hannes Laimer
2023-08-23 11:37   ` Wolfgang Bumiller
2023-09-21 11:06   ` Lukas Wagner
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] ui: add support for optional Remote in SyncJob Hannes Laimer
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] manager: add completion for opt. " Hannes Laimer
2023-08-24  9:24   ` Wolfgang Bumiller
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] accept a ref to a HttpClient Hannes Laimer
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore Hannes Laimer
2023-08-24 13:09   ` Wolfgang Bumiller
2023-09-21 11:10   ` Lukas Wagner
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] pull: add support for pulling from local datastore Hannes Laimer
2023-09-21 10:01 ` [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Lukas Wagner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal