public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup 0/4] native local sync-jobs
@ 2023-02-13 15:45 Hannes Laimer
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional Hannes Laimer
                   ` (4 more replies)
  0 siblings, 5 replies; 10+ messages in thread
From: Hannes Laimer @ 2023-02-13 15:45 UTC (permalink / raw)
  To: pbs-devel

Add support for local sync. Sync-jobs without
a remote are considered local, and use a 
different logic for pulling. The main reason
for not reusing the existing pull logic is
that the current logic relies on having a 
HttpClient and without a remote we can't create
one. Having separate logic for local pulling
however should speed up local syncs a bit,
and for syncs in the same datastore, chunck
transmission can be skipped all together.
Also the autocompletion and UI is updated
to support local sync-jobs.

The new logic is mostly adding a local
alternative whenever the HttpClient is used.
Since the pulling process involves a lot of
functions calling functions to do smaller stuff
it was not really possible to split the changes
up more.

The first 3 patches are not buildable, I split
them for easier review.

Hannes Laimer (4):
  api2: make remote for sync-jobs optional
  pull: add logic for local pull
  manager: add completion for local sync-jobs
  ui: add ui support for local sync-jobs

 pbs-api-types/src/jobs.rs         |   4 +-
 pbs-client/src/backup_reader.rs   |   5 +
 src/api2/admin/datastore.rs       |  10 +
 src/api2/config/remote.rs         |   2 +-
 src/api2/config/sync.rs           |  41 ++-
 src/api2/node/tasks.rs            |   4 +-
 src/api2/pull.rs                  |  78 +++--
 src/bin/proxmox-backup-manager.rs |  67 ++--
 src/server/email_notifications.rs |  16 +-
 src/server/pull.rs                | 499 +++++++++++++++++++-----------
 www/form/RemoteTargetSelector.js  |  29 +-
 www/window/SyncJobEdit.js         |   8 +-
 12 files changed, 502 insertions(+), 261 deletions(-)

-- 
2.30.2





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional
  2023-02-13 15:45 [pbs-devel] [PATCH proxmox-backup 0/4] native local sync-jobs Hannes Laimer
@ 2023-02-13 15:45 ` Hannes Laimer
  2023-02-14 14:33   ` Fabian Grünbichler
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull Hannes Laimer
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 10+ messages in thread
From: Hannes Laimer @ 2023-02-13 15:45 UTC (permalink / raw)
  To: pbs-devel

... and update places where it is used.
A SyncJob not having a remote means it is pulling
from a local datastore.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 pbs-api-types/src/jobs.rs         |  4 +-
 src/api2/config/remote.rs         |  2 +-
 src/api2/config/sync.rs           | 41 ++++++++++------
 src/api2/node/tasks.rs            |  4 +-
 src/api2/pull.rs                  | 78 +++++++++++++++++++++++--------
 src/server/email_notifications.rs | 16 +++----
 6 files changed, 101 insertions(+), 44 deletions(-)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index cf7618c4..68db6cb8 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -462,6 +462,7 @@ pub const GROUP_FILTER_LIST_SCHEMA: Schema =
         },
         remote: {
             schema: REMOTE_ID_SCHEMA,
+            optional: true,
         },
         "remote-store": {
             schema: DATASTORE_SCHEMA,
@@ -506,7 +507,8 @@ pub struct SyncJobConfig {
     pub ns: Option<BackupNamespace>,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub owner: Option<Authid>,
-    pub remote: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub remote: Option<String>,
     pub remote_store: String,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub remote_ns: Option<BackupNamespace>,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 2f02d121..aa74bdc0 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
 
     let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
     for job in job_list {
-        if job.remote == name {
+        if job.remote.map_or(false, |id| id == name) {
             param_bail!(
                 "name",
                 "remote '{}' is used by sync job '{}' (datastore '{}')",
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index bd7373df..4c5d06e2 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
 
 use pbs_api_types::{
     Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
-    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
-    PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
+    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
+    PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
 };
 use pbs_config::sync;
 
@@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
         return false;
     }
 
-    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
-    remote_privs & PRIV_REMOTE_AUDIT != 0
+    if let Some(remote) = &job.remote {
+        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
+        remote_privs & PRIV_REMOTE_AUDIT != 0
+    } else {
+        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+        source_ds_privs & PRIV_DATASTORE_AUDIT != 0
+    }
 }
 
 /// checks whether user can run the corresponding pull job
@@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
         return false;
     }
 
-    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
-    remote_privs & PRIV_REMOTE_READ != 0
+    if let Some(remote) = &job.remote {
+        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
+        remote_privs & PRIV_REMOTE_READ != 0
+    } else {
+        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+        source_ds_privs & PRIV_DATASTORE_READ != 0
+    }
 }
 
 #[api(
@@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
 #[serde(rename_all = "kebab-case")]
 /// Deletable property name
 pub enum DeletableProperty {
+    /// Delete the remote property(-> meaning local).
+    Remote,
     /// Delete the owner property.
     Owner,
     /// Delete the comment property.
@@ -273,6 +285,9 @@ pub fn update_sync_job(
     if let Some(delete) = delete {
         for delete_prop in delete {
             match delete_prop {
+                DeletableProperty::Remote => {
+                    data.remote = None;
+                }
                 DeletableProperty::Owner => {
                     data.owner = None;
                 }
@@ -329,7 +344,7 @@ pub fn update_sync_job(
         data.ns = Some(ns);
     }
     if let Some(remote) = update.remote {
-        data.remote = remote;
+        data.remote = Some(remote);
     }
     if let Some(remote_store) = update.remote_store {
         data.remote_store = remote_store;
@@ -495,7 +510,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
 
     let mut job = SyncJobConfig {
         id: "regular".to_string(),
-        remote: "remote0".to_string(),
+        remote: Some("remote0".to_string()),
         remote_store: "remotestore1".to_string(),
         remote_ns: None,
         store: "localstore0".to_string(),
@@ -529,11 +544,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
     // reading without proper read permissions on local end must fail
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
     // reading without proper read permissions on remote end must fail
-    job.remote = "remote0".to_string();
+    job.remote = Some("remote0".to_string());
     job.store = "localstore1".to_string();
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
@@ -546,10 +561,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     ));
 
     // writing without proper write permissions on local end must fail
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
 
     // writing without proper write permissions on remote end must fail
-    job.remote = "remote0".to_string();
+    job.remote = Some("remote0".to_string());
     job.store = "localstore1".to_string();
     assert!(!check_sync_job_modify_access(
         &user_info,
@@ -558,7 +573,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     ));
 
     // reset remote to one where users have access
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
 
     // user with read permission can only read, but not modify/run
     assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index d386f805..780cb6d1 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -75,14 +75,14 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
                 let local_store = captures.get(3);
                 let local_ns = captures.get(4).map(|m| m.as_str());
 
-                if let (Some(remote), Some(remote_store), Some(local_store)) =
+                if let (remote, Some(remote_store), Some(local_store)) =
                     (remote, remote_store, local_store)
                 {
                     return check_pull_privs(
                         auth_id,
                         local_store.as_str(),
                         local_ns,
-                        remote.as_str(),
+                        remote.map(|remote| remote.as_str()),
                         remote_store.as_str(),
                         false,
                     );
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index b2473ec8..c4255254 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -9,7 +9,8 @@ use proxmox_sys::task_log;
 use pbs_api_types::{
     Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
     GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
-    PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
+    PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA,
+    REMOVE_VANISHED_BACKUPS_SCHEMA,
 };
 use pbs_config::CachedUserInfo;
 use proxmox_rest_server::WorkerTask;
@@ -21,7 +22,7 @@ pub fn check_pull_privs(
     auth_id: &Authid,
     store: &str,
     ns: Option<&str>,
-    remote: &str,
+    remote: Option<&str>,
     remote_store: &str,
     delete: bool,
 ) -> Result<(), Error> {
@@ -38,12 +39,22 @@ pub fn check_pull_privs(
         PRIV_DATASTORE_BACKUP,
         false,
     )?;
-    user_info.check_privs(
-        auth_id,
-        &["remote", remote, remote_store],
-        PRIV_REMOTE_READ,
-        false,
-    )?;
+
+    if let Some(remote) = remote {
+        user_info.check_privs(
+            auth_id,
+            &["remote", remote, remote_store],
+            PRIV_REMOTE_READ,
+            false,
+        )?;
+    } else {
+        user_info.check_privs(
+            auth_id,
+            &["datastore", remote_store],
+            PRIV_DATASTORE_READ,
+            false,
+        )?;
+    }
 
     if delete {
         user_info.check_privs(
@@ -64,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
         PullParameters::new(
             &sync_job.store,
             sync_job.ns.clone().unwrap_or_default(),
-            &sync_job.remote,
+            sync_job.remote.clone().as_deref(),
             &sync_job.remote_store,
             sync_job.remote_ns.clone().unwrap_or_default(),
             sync_job
@@ -89,7 +100,7 @@ pub fn do_sync_job(
 ) -> Result<String, Error> {
     let job_id = format!(
         "{}:{}:{}:{}:{}",
-        sync_job.remote,
+        sync_job.remote.clone().unwrap_or("localhost".to_string()),
         sync_job.remote_store,
         sync_job.store,
         sync_job.ns.clone().unwrap_or_default(),
@@ -112,7 +123,6 @@ pub fn do_sync_job(
 
             let worker_future = async move {
                 let pull_params = PullParameters::try_from(&sync_job)?;
-                let client = pull_params.client().await?;
 
                 task_log!(worker, "Starting datastore sync job '{}'", job_id);
                 if let Some(event_str) = schedule {
@@ -122,11 +132,35 @@ pub fn do_sync_job(
                     worker,
                     "sync datastore '{}' from '{}/{}'",
                     sync_job.store,
-                    sync_job.remote,
+                    sync_job.remote.clone().unwrap_or("local".to_string()),
                     sync_job.remote_store,
                 );
 
-                pull_store(&worker, &client, pull_params).await?;
+                if sync_job.remote.is_some() {
+                    let client = pull_params.client().await?;
+                    pull_store(&worker, Some(&client), pull_params).await?;
+                } else {
+                    match (sync_job.ns, sync_job.remote_ns) {
+                        (Some(target_ns), Some(source_ns))
+                            if target_ns.path().starts_with(source_ns.path())
+                                && sync_job.store == sync_job.remote_store =>
+                        {
+                            task_log!(
+                                worker,
+                                "Can't sync namespace into one of its sub-namespaces, skipping"
+                            );
+                        }
+                        (_, None) if sync_job.store == sync_job.remote_store => {
+                            task_log!(
+                                worker,
+                                "Can't sync root namespace into same datastore, skipping"
+                            );
+                        }
+                        _ => {
+                            pull_store(&worker, None, pull_params).await?;
+                        }
+                    }
+                }
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -178,6 +212,7 @@ pub fn do_sync_job(
             },
             remote: {
                 schema: REMOTE_ID_SCHEMA,
+                optional: true,
             },
             "remote-store": {
                 schema: DATASTORE_SCHEMA,
@@ -218,7 +253,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
 async fn pull(
     store: String,
     ns: Option<BackupNamespace>,
-    remote: String,
+    remote: Option<String>,
     remote_store: String,
     remote_ns: Option<BackupNamespace>,
     remove_vanished: Option<bool>,
@@ -241,7 +276,7 @@ async fn pull(
         &auth_id,
         &store,
         ns_str.as_deref(),
-        &remote,
+        remote.as_deref(),
         &remote_store,
         delete,
     )?;
@@ -249,7 +284,7 @@ async fn pull(
     let pull_params = PullParameters::new(
         &store,
         ns,
-        &remote,
+        remote.as_deref(),
         &remote_store,
         remote_ns.unwrap_or_default(),
         auth_id.clone(),
@@ -258,7 +293,12 @@ async fn pull(
         group_filter,
         limit,
     )?;
-    let client = pull_params.client().await?;
+
+    let client = if remote.is_some() {
+        Some(pull_params.client().await?)
+    } else {
+        None
+    };
 
     // fixme: set to_stdout to false?
     // FIXME: add namespace to worker id?
@@ -272,11 +312,11 @@ async fn pull(
                 worker,
                 "pull datastore '{}' from '{}/{}'",
                 store,
-                remote,
+                remote.as_deref().unwrap_or("localhost"),
                 remote_store,
             );
 
-            let pull_future = pull_store(&worker, &client, pull_params);
+            let pull_future = pull_store(&worker, client.as_ref(), pull_params);
             (select! {
                 success = pull_future.fuse() => success,
                 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
index b3298cf9..31a46b0f 100644
--- a/src/server/email_notifications.rs
+++ b/src/server/email_notifications.rs
@@ -486,15 +486,15 @@ pub fn send_sync_status(
         }
     };
 
+    let source_str = if let Some(remote) = job.remote.clone() {
+        format!("Sync remote '{}'", remote)
+    } else {
+        format!("Sync local")
+    };
+
     let subject = match result {
-        Ok(()) => format!(
-            "Sync remote '{}' datastore '{}' successful",
-            job.remote, job.remote_store,
-        ),
-        Err(_) => format!(
-            "Sync remote '{}' datastore '{}' failed",
-            job.remote, job.remote_store,
-        ),
+        Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
+        Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
     };
 
     send_job_status_mail(email, &subject, &text)?;
-- 
2.30.2





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull
  2023-02-13 15:45 [pbs-devel] [PATCH proxmox-backup 0/4] native local sync-jobs Hannes Laimer
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional Hannes Laimer
@ 2023-02-13 15:45 ` Hannes Laimer
  2023-02-14 14:33   ` Fabian Grünbichler
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 3/4] manager: add completion for local sync-jobs Hannes Laimer
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 10+ messages in thread
From: Hannes Laimer @ 2023-02-13 15:45 UTC (permalink / raw)
  To: pbs-devel

... since creating a HttpClient(which would be needed
to reuse existing pull logic) without a remote was not
possible. This also improves the speed for local
sync-jobs.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 pbs-client/src/backup_reader.rs |   5 +
 src/api2/admin/datastore.rs     |  10 +
 src/server/pull.rs              | 499 ++++++++++++++++++++------------
 3 files changed, 335 insertions(+), 179 deletions(-)

diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
index 2cd4dc27..9dacef74 100644
--- a/pbs-client/src/backup_reader.rs
+++ b/pbs-client/src/backup_reader.rs
@@ -20,6 +20,11 @@ use pbs_tools::sha::sha256;
 
 use super::{H2Client, HttpClient};
 
+pub enum BackupSource {
+    Remote(Arc<BackupReader>),
+    Local(pbs_datastore::BackupDir),
+}
+
 /// Backup Reader
 pub struct BackupReader {
     h2: H2Client,
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 8d3a6146..8ad78f29 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -445,6 +445,16 @@ pub async fn list_snapshots(
     _param: Value,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<SnapshotListItem>, Error> {
+    do_list_snapshots(store, ns, backup_type, backup_id, rpcenv).await
+}
+
+pub async fn do_list_snapshots(
+    store: String,
+    ns: Option<BackupNamespace>,
+    backup_type: Option<BackupType>,
+    backup_id: Option<String>,
+    rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<SnapshotListItem>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 65eedf2c..81df00c3 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,7 +1,7 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::{Seek, SeekFrom, Write};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
@@ -9,6 +9,7 @@ use std::time::SystemTime;
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
 use pbs_config::CachedUserInfo;
+use pbs_datastore::read_chunk::AsyncReadChunk;
 use serde_json::json;
 
 use proxmox_router::HttpError;
@@ -21,7 +22,7 @@ use pbs_api_types::{
 };
 
 use pbs_client::{
-    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
+    BackupReader, BackupRepository, BackupSource, HttpClient, HttpClientOptions, RemoteChunkReader,
 };
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -30,7 +31,7 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, LocalChunkReader, StoreProgress};
 use pbs_tools::sha::sha256;
 use proxmox_rest_server::WorkerTask;
 
@@ -40,7 +41,7 @@ use crate::tools::parallel_handler::ParallelHandler;
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
     /// Remote that is pulled from
-    remote: Remote,
+    remote: Option<Remote>,
     /// Full specification of remote datastore
     source: BackupRepository,
     /// Local store that is pulled into
@@ -70,7 +71,7 @@ impl PullParameters {
     pub(crate) fn new(
         store: &str,
         ns: BackupNamespace,
-        remote: &str,
+        remote: Option<&str>,
         remote_store: &str,
         remote_ns: BackupNamespace,
         owner: Authid,
@@ -86,18 +87,24 @@ impl PullParameters {
             remote_ns.check_max_depth(max_depth)?;
         }
 
-        let (remote_config, _digest) = pbs_config::remote::config()?;
-        let remote: Remote = remote_config.lookup("remote", remote)?;
+        let (remote, source): (Option<Remote>, BackupRepository) = if let Some(remote_str) = remote
+        {
+            let (remote_config, _digest) = pbs_config::remote::config()?;
+            let remote = remote_config.lookup::<Remote>("remote", remote_str)?;
+            let source = BackupRepository::new(
+                Some(remote.config.auth_id.clone()),
+                Some(remote.config.host.clone()),
+                remote.config.port,
+                remote_store.to_string(),
+            );
+            (Some(remote), source)
+        } else {
+            let source = BackupRepository::new(None, None, None, remote_store.to_string());
+            (None, source)
+        };
 
         let remove_vanished = remove_vanished.unwrap_or(false);
 
-        let source = BackupRepository::new(
-            Some(remote.config.auth_id.clone()),
-            Some(remote.config.host.clone()),
-            remote.config.port,
-            remote_store.to_string(),
-        );
-
         Ok(Self {
             remote,
             remote_ns,
@@ -114,13 +121,17 @@ impl PullParameters {
 
     /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
     pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+        if let Some(remote) = &self.remote {
+            crate::api2::config::remote::remote_client(remote, Some(self.limit.clone())).await
+        } else {
+            bail!("No remote specified. Do not use a HttpClient for a local sync.")
+        }
     }
 }
 
-async fn pull_index_chunks<I: IndexFile>(
+async fn pull_index_chunks<I: IndexFile, C: AsyncReadChunk + Clone>(
     worker: &WorkerTask,
-    chunk_reader: RemoteChunkReader,
+    chunk_reader: C,
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -256,10 +267,10 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// - Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
-async fn pull_single_archive(
+async fn pull_single_archive<C: AsyncReadChunk + Clone>(
     worker: &WorkerTask,
-    reader: &BackupReader,
-    chunk_reader: &mut RemoteChunkReader,
+    source: &BackupSource,
+    chunk_reader: C,
     snapshot: &pbs_datastore::BackupDir,
     archive_info: &FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -279,7 +290,15 @@ async fn pull_single_archive(
         .read(true)
         .open(&tmp_path)?;
 
-    reader.download(archive_name, &mut tmpfile).await?;
+    match source {
+        BackupSource::Remote(reader) => reader.download(archive_name, &mut tmpfile).await?,
+        BackupSource::Local(dir) => {
+            let mut source_path = dir.full_path();
+            source_path.push(archive_name);
+            let data = std::fs::read(source_path)?;
+            tmpfile.write_all(&data)?;
+        }
+    };
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
@@ -289,14 +308,23 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            match source {
+                BackupSource::Local(ref dir)
+                    if dir.datastore().name() == snapshot.datastore().name() =>
+                {
+                    task_log!(worker, "skipping chunk sync for same datatsore");
+                }
+                _ => {
+                    pull_index_chunks(
+                        worker,
+                        chunk_reader,
+                        snapshot.datastore().clone(),
+                        index,
+                        downloaded_chunks,
+                    )
+                    .await?;
+                }
+            };
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -304,15 +332,23 @@ async fn pull_single_archive(
             })?;
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
-
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            match source {
+                BackupSource::Local(ref dir)
+                    if dir.datastore().name() == snapshot.datastore().name() =>
+                {
+                    task_log!(worker, "skipping chunk sync for same datatsore");
+                }
+                _ => {
+                    pull_index_chunks(
+                        worker,
+                        chunk_reader,
+                        snapshot.datastore().clone(),
+                        index,
+                        downloaded_chunks,
+                    )
+                    .await?;
+                }
+            };
         }
         ArchiveType::Blob => {
             tmpfile.seek(SeekFrom::Start(0))?;
@@ -321,6 +357,9 @@ async fn pull_single_archive(
         }
     }
     if let Err(err) = std::fs::rename(&tmp_path, &path) {
+        task_log!(worker, "sync archive {}", archive_name);
+        task_log!(worker, "tmpfile path {:?}", tmp_path.as_os_str());
+        task_log!(worker, "path path {:?}", path.as_os_str());
         bail!("Atomic rename file {:?} failed - {}", path, err);
     }
     Ok(())
@@ -364,7 +403,7 @@ async fn try_client_log_download(
 /// - Download log if not already existing
 async fn pull_snapshot(
     worker: &WorkerTask,
-    reader: Arc<BackupReader>,
+    source: BackupSource,
     snapshot: &pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
@@ -377,31 +416,45 @@ async fn pull_snapshot(
     let mut tmp_manifest_name = manifest_name.clone();
     tmp_manifest_name.set_extension("tmp");
 
-    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
-    let mut tmp_manifest_file = match download_res {
-        Ok(manifest_file) => manifest_file,
-        Err(err) => {
-            match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match *code {
-                    StatusCode::NOT_FOUND => {
-                        task_log!(
-                            worker,
-                            "skipping snapshot {} - vanished since start of sync",
-                            snapshot.dir(),
-                        );
-                        return Ok(());
-                    }
-                    _ => {
-                        bail!("HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    return Err(err);
+    let tmp_manifest_blob = match source {
+        BackupSource::Remote(ref reader) => {
+            let mut tmp_manifest_file = match download_manifest(reader, &tmp_manifest_name).await {
+                Ok(manifest_file) => manifest_file,
+                Err(err) => {
+                    match err.downcast_ref::<HttpError>() {
+                        Some(HttpError { code, message }) => match *code {
+                            StatusCode::NOT_FOUND => {
+                                task_log!(
+                                    worker,
+                                    "skipping snapshot {} - vanished since start of sync",
+                                    snapshot.dir(),
+                                );
+                                return Ok(());
+                            }
+                            _ => {
+                                bail!("HTTP error {code} - {message}");
+                            }
+                        },
+                        None => {
+                            return Err(err);
+                        }
+                    };
                 }
             };
+            DataBlob::load_from_reader(&mut tmp_manifest_file)?
+        }
+        BackupSource::Local(ref dir) => {
+            let data = dir.load_blob(MANIFEST_BLOB_NAME)?;
+            let mut tmp_manifest_file = std::fs::OpenOptions::new()
+                .write(true)
+                .create(true)
+                .truncate(true)
+                .read(true)
+                .open(&tmp_manifest_name)?;
+            tmp_manifest_file.write_all(data.raw_data())?;
+            data
         }
     };
-    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
 
     if manifest_name.exists() {
         let manifest_blob = proxmox_lang::try_block!({
@@ -417,13 +470,17 @@ async fn pull_snapshot(
         })?;
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
-            if !client_log_name.exists() {
-                try_client_log_download(worker, reader, &client_log_name).await?;
+            if let BackupSource::Remote(ref reader) = source {
+                if !client_log_name.exists() {
+                    try_client_log_download(worker, reader.clone(), &client_log_name).await?;
+                };
+            }
+            if tmp_manifest_name.exists() {
+                let _ = std::fs::remove_file(&tmp_manifest_name);
             }
             task_log!(worker, "no data changes");
-            let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(()); // nothing changed
-        }
+        };
     }
 
     let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
@@ -467,32 +524,49 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(
-            reader.clone(),
-            None,
-            item.chunk_crypt_mode(),
-            HashMap::new(),
-        );
-
-        pull_single_archive(
-            worker,
-            &reader,
-            &mut chunk_reader,
-            snapshot,
-            item,
-            downloaded_chunks.clone(),
-        )
-        .await?;
+        match source {
+            BackupSource::Remote(ref reader) => {
+                let chunk_reader = RemoteChunkReader::new(
+                    reader.clone(),
+                    None,
+                    item.chunk_crypt_mode(),
+                    HashMap::new(),
+                );
+                pull_single_archive(
+                    worker,
+                    &source,
+                    chunk_reader,
+                    snapshot,
+                    item,
+                    downloaded_chunks.clone(),
+                )
+                .await?
+            }
+            BackupSource::Local(ref dir) => {
+                let chunk_reader =
+                    LocalChunkReader::new(dir.datastore().clone(), None, item.chunk_crypt_mode());
+                pull_single_archive(
+                    worker,
+                    &source,
+                    chunk_reader,
+                    snapshot,
+                    item,
+                    downloaded_chunks.clone(),
+                )
+                .await?
+            }
+        }
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
         bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
     }
 
-    if !client_log_name.exists() {
-        try_client_log_download(worker, reader, &client_log_name).await?;
+    if let BackupSource::Remote(reader) = source {
+        if !client_log_name.exists() {
+            try_client_log_download(worker, reader, &client_log_name).await?;
+        };
     }
-
     snapshot
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,7 +580,7 @@ async fn pull_snapshot(
 /// pointing to the local datastore and target namespace.
 async fn pull_snapshot_from(
     worker: &WorkerTask,
-    reader: Arc<BackupReader>,
+    source: BackupSource,
     snapshot: &pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
@@ -517,7 +591,7 @@ async fn pull_snapshot_from(
     if is_new {
         task_log!(worker, "sync snapshot {}", snapshot.dir());
 
-        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
+        if let Err(err) = pull_snapshot(worker, source, snapshot, downloaded_chunks).await {
             if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
                 snapshot.backup_ns(),
                 snapshot.as_ref(),
@@ -530,7 +604,7 @@ async fn pull_snapshot_from(
         task_log!(worker, "sync snapshot {} done", snapshot.dir());
     } else {
         task_log!(worker, "re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
+        pull_snapshot(worker, source, snapshot, downloaded_chunks).await?;
         task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
     }
 
@@ -600,36 +674,52 @@ impl std::fmt::Display for SkipInfo {
 /// - local group owner is already checked by pull_store
 async fn pull_group(
     worker: &WorkerTask,
-    client: &HttpClient,
+    client: Option<&HttpClient>,
     params: &PullParameters,
     group: &pbs_api_types::BackupGroup,
     remote_ns: BackupNamespace,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/snapshots",
-        params.source.store()
-    );
-
-    let mut args = json!({
-        "backup-type": group.ty,
-        "backup-id": group.id,
-    });
-
-    if !remote_ns.is_root() {
-        args["ns"] = serde_json::to_value(&remote_ns)?;
-    }
-
     let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
+    let mut list: Vec<SnapshotListItem> = if let Some(client) = client {
+        let path = format!(
+            "api2/json/admin/datastore/{}/snapshots",
+            params.source.store()
+        );
 
-    let mut result = client.get(&path, Some(args)).await?;
-    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+        let mut args = json!({
+            "backup-type": group.ty,
+            "backup-id": group.id,
+        });
 
-    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
+        if !remote_ns.is_root() {
+            args["ns"] = serde_json::to_value(&remote_ns)?;
+        }
 
-    client.login().await?; // make sure auth is complete
+        let mut result = client.get(&path, Some(args)).await?;
+        serde_json::from_value(result["data"].take())?
+    } else {
+        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam")));
+        let source_ns = if remote_ns.is_root() {
+            None
+        } else {
+            Some(remote_ns.clone())
+        };
+        crate::api2::admin::datastore::do_list_snapshots(
+            params.source.store().to_string(),
+            source_ns,
+            Some(group.ty),
+            Some(group.id.clone()),
+            &mut rpcenv,
+        )
+        .await?
+    };
+    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
 
-    let fingerprint = client.fingerprint();
+    if let Some(client) = client {
+        client.login().await?; // make sure auth is complete
+    }
 
     let last_sync = params.store.last_successful_backup(&target_ns, group)?;
 
@@ -646,6 +736,13 @@ async fn pull_group(
         count: 0,
     };
 
+    let datastore: Option<Arc<DataStore>> = match client {
+        None => Some(DataStore::lookup_datastore(
+            params.source.store(),
+            Some(Operation::Read),
+        )?),
+        _ => None,
+    };
     for (pos, item) in list.into_iter().enumerate() {
         let snapshot = item.backup;
 
@@ -668,33 +765,47 @@ async fn pull_group(
             }
         }
 
-        // get updated auth_info (new tickets)
-        let auth_info = client.login().await?;
-
-        let options =
-            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
-                .rate_limit(params.limit.clone());
+        let backup_source = if let Some(client) = client {
+            // get updated auth_info (new tickets)
+            let auth_info = client.login().await?;
+            let fingerprint = client.fingerprint();
 
-        let new_client = HttpClient::new(
-            params.source.host(),
-            params.source.port(),
-            params.source.auth_id(),
-            options,
-        )?;
-
-        let reader = BackupReader::start(
-            new_client,
-            None,
-            params.source.store(),
-            &remote_ns,
-            &snapshot,
-            true,
-        )
-        .await?;
+            let options = HttpClientOptions::new_non_interactive(
+                auth_info.ticket.clone(),
+                fingerprint.clone(),
+            )
+            .rate_limit(params.limit.clone());
+
+            let new_client = HttpClient::new(
+                params.source.host(),
+                params.source.port(),
+                params.source.auth_id(),
+                options,
+            )?;
+
+            BackupSource::Remote(
+                BackupReader::start(
+                    new_client,
+                    None,
+                    params.source.store(),
+                    &remote_ns,
+                    &snapshot,
+                    true,
+                )
+                .await?,
+            )
+        } else {
+            if let Some(datastore) = datastore.clone() {
+                BackupSource::Local(datastore.backup_dir(remote_ns.clone(), snapshot.clone())?)
+            } else {
+                unreachable!("if there is no client and no datastore, then the ds lookup would have failed earlier")
+            }
+        };
 
         let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
 
-        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+        let result =
+            pull_snapshot_from(worker, backup_source, &snapshot, downloaded_chunks.clone()).await;
 
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
@@ -735,49 +846,64 @@ async fn pull_group(
 // will modify params if switching to backwards mode for lack of NS support on remote end
 async fn query_namespaces(
     worker: &WorkerTask,
-    client: &HttpClient,
+    client: Option<&HttpClient>,
     params: &mut PullParameters,
 ) -> Result<Vec<BackupNamespace>, Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/namespace",
-        params.source.store()
-    );
-    let mut data = json!({});
-    if let Some(max_depth) = params.max_depth {
-        data["max-depth"] = json!(max_depth);
-    }
+    let mut list: Vec<NamespaceListItem> = if let Some(client) = client {
+        let path = format!(
+            "api2/json/admin/datastore/{}/namespace",
+            params.source.store()
+        );
+        let mut data = json!({});
+        if let Some(max_depth) = params.max_depth {
+            data["max-depth"] = json!(max_depth);
+        }
 
-    if !params.remote_ns.is_root() {
-        data["parent"] = json!(params.remote_ns);
-    }
+        if !params.remote_ns.is_root() {
+            data["parent"] = json!(params.remote_ns);
+        }
 
-    let mut result = match client.get(&path, Some(data)).await {
-        Ok(res) => res,
-        Err(err) => match err.downcast_ref::<HttpError>() {
-            Some(HttpError { code, message }) => match *code {
-                StatusCode::NOT_FOUND => {
-                    if params.remote_ns.is_root() && params.max_depth.is_none() {
-                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
-                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
-                        params.max_depth = Some(0);
-                    } else {
-                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
-                    }
+        let mut result = match client.get(&path, Some(data)).await {
+            Ok(res) => res,
+            Err(err) => match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match *code {
+                    StatusCode::NOT_FOUND => {
+                        if params.remote_ns.is_root() && params.max_depth.is_none() {
+                            task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+                            task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+                            params.max_depth = Some(0);
+                        } else {
+                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+                        }
 
-                    return Ok(vec![params.remote_ns.clone()]);
-                }
-                _ => {
-                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                        return Ok(vec![params.remote_ns.clone()]);
+                    }
+                    _ => {
+                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    bail!("Querying namespaces failed - {err}");
                 }
             },
-            None => {
-                bail!("Querying namespaces failed - {err}");
-            }
-        },
-    };
-
-    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
+        };
 
+        serde_json::from_value(result["data"].take())?
+    } else {
+        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam")));
+        let parent_ns = if params.remote_ns.is_root() {
+            None
+        } else {
+            Some(params.remote_ns.clone())
+        };
+        crate::api2::admin::namespace::list_namespaces(
+            params.source.store().to_string(),
+            parent_ns,
+            params.max_depth,
+            &mut rpcenv,
+        )?
+    };
     // parents first
     list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
 
@@ -897,7 +1023,7 @@ fn check_and_remove_vanished_ns(
 /// - access to sub-NS checked here
 pub(crate) async fn pull_store(
     worker: &WorkerTask,
-    client: &HttpClient,
+    client: Option<&HttpClient>,
     mut params: PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
@@ -1000,27 +1126,42 @@ pub(crate) async fn pull_store(
 /// - owner check for vanished groups done here
 pub(crate) async fn pull_ns(
     worker: &WorkerTask,
-    client: &HttpClient,
+    client: Option<&HttpClient>,
     params: &PullParameters,
     source_ns: BackupNamespace,
     target_ns: BackupNamespace,
 ) -> Result<(StoreProgress, bool), Error> {
-    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
+    let mut list: Vec<GroupListItem> = if let Some(client) = client {
+        let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
 
-    let args = if !source_ns.is_root() {
-        Some(json!({
-            "ns": source_ns,
-        }))
-    } else {
-        None
-    };
+        let args = if !source_ns.is_root() {
+            Some(json!({
+                "ns": source_ns,
+            }))
+        } else {
+            None
+        };
 
-    let mut result = client
-        .get(&path, args)
-        .await
-        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
+        let mut result = client
+            .get(&path, args)
+            .await
+            .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
 
-    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+        serde_json::from_value(result["data"].take())?
+    } else {
+        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam")));
+        let source_ns = if source_ns.is_root() {
+            None
+        } else {
+            Some(source_ns.clone())
+        };
+        crate::api2::admin::datastore::list_groups(
+            params.source.store().to_string(),
+            source_ns,
+            &mut rpcenv,
+        )?
+    };
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-- 
2.30.2





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 3/4] manager: add completion for local sync-jobs
  2023-02-13 15:45 [pbs-devel] [PATCH proxmox-backup 0/4] native local sync-jobs Hannes Laimer
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional Hannes Laimer
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull Hannes Laimer
@ 2023-02-13 15:45 ` Hannes Laimer
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: add ui support " Hannes Laimer
  2023-02-14  8:02 ` [pbs-devel] [PATCH proxmox-backup 0/4] native " Lukas Wagner
  4 siblings, 0 replies; 10+ messages in thread
From: Hannes Laimer @ 2023-02-13 15:45 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 src/bin/proxmox-backup-manager.rs | 67 +++++++++++++++++++------------
 1 file changed, 41 insertions(+), 26 deletions(-)

diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index 740fdc49..1944c468 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -526,35 +526,33 @@ fn get_remote(param: &HashMap<String, String>) -> Option<String> {
     param.get("remote").map(|r| r.to_owned()).or_else(|| {
         if let Some(id) = param.get("id") {
             if let Ok(job) = get_sync_job(id) {
-                return Some(job.remote);
+                return job.remote;
             }
         }
         None
     })
 }
 
-fn get_remote_store(param: &HashMap<String, String>) -> Option<(String, String)> {
+fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>, String)> {
     let mut job: Option<SyncJobConfig> = None;
 
     let remote = param.get("remote").map(|r| r.to_owned()).or_else(|| {
         if let Some(id) = param.get("id") {
             job = get_sync_job(id).ok();
             if let Some(ref job) = job {
-                return Some(job.remote.clone());
+                return job.remote.clone();
             }
         }
         None
     });
 
-    if let Some(remote) = remote {
-        let store = param
-            .get("remote-store")
-            .map(|r| r.to_owned())
-            .or_else(|| job.map(|job| job.remote_store));
+    let store = param
+        .get("remote-store")
+        .map(|r| r.to_owned())
+        .or_else(|| job.map(|job| job.remote_store));
 
-        if let Some(store) = store {
-            return Some((remote, store));
-        }
+    if let Some(store) = store {
+        return Some((remote, store));
     }
 
     None
@@ -575,7 +573,7 @@ fn get_remote_ns(param: &HashMap<String, String>) -> Option<BackupNamespace> {
 }
 
 // shell completion helper
-pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
+pub fn complete_remote_datastore_name(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
     let mut list = Vec::new();
 
     if let Some(remote) = get_remote(param) {
@@ -586,7 +584,9 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
                 list.push(item.store);
             }
         }
-    }
+    } else {
+        list = pbs_config::datastore::complete_datastore_name(arg, param);
+    };
 
     list
 }
@@ -598,17 +598,25 @@ pub fn complete_remote_datastore_namespace(
 ) -> Vec<String> {
     let mut list = Vec::new();
 
-    if let Some((remote, remote_store)) = get_remote_store(param) {
-        if let Ok(data) = proxmox_async::runtime::block_on(async move {
+    if let Some(data) = match get_remote_store(param) {
+        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_namespaces(
                 remote.clone(),
                 remote_store.clone(),
             )
             .await
-        }) {
-            for item in data {
-                list.push(item.ns.name());
-            }
+            .ok()
+        }),
+        Some((None, source_store)) => {
+            let mut rpcenv = CliEnvironment::new();
+            rpcenv.set_auth_id(Some(String::from("root@pam")));
+            crate::api2::admin::namespace::list_namespaces(source_store, None, None, &mut rpcenv)
+                .ok()
+        }
+        _ => None,
+    } {
+        for item in data {
+            list.push(item.ns.name());
         }
     }
 
@@ -653,19 +661,26 @@ pub fn complete_sync_local_datastore_namespace(
 pub fn complete_remote_datastore_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
     let mut list = Vec::new();
 
-    if let Some((remote, remote_store)) = get_remote_store(param) {
-        let ns = get_remote_ns(param);
-        if let Ok(data) = proxmox_async::runtime::block_on(async move {
+    let ns = get_remote_ns(param);
+    if let Some(data) = match get_remote_store(param) {
+        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_groups(
                 remote.clone(),
                 remote_store.clone(),
                 ns,
             )
             .await
-        }) {
-            for item in data {
-                list.push(format!("{}/{}", item.backup.ty, item.backup.id));
-            }
+            .ok()
+        }),
+        Some((None, source_store)) => {
+            let mut rpcenv = CliEnvironment::new();
+            rpcenv.set_auth_id(Some(String::from("root@pam")));
+            crate::api2::admin::datastore::list_groups(source_store, ns, &mut rpcenv).ok()
+        }
+        _ => None,
+    } {
+        for item in data {
+            list.push(format!("{}/{}", item.backup.ty, item.backup.id));
         }
     }
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 4/4] ui: add ui support for local sync-jobs
  2023-02-13 15:45 [pbs-devel] [PATCH proxmox-backup 0/4] native local sync-jobs Hannes Laimer
                   ` (2 preceding siblings ...)
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 3/4] manager: add completion for local sync-jobs Hannes Laimer
@ 2023-02-13 15:45 ` Hannes Laimer
  2023-02-14  8:02 ` [pbs-devel] [PATCH proxmox-backup 0/4] native " Lukas Wagner
  4 siblings, 0 replies; 10+ messages in thread
From: Hannes Laimer @ 2023-02-13 15:45 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 www/form/RemoteTargetSelector.js | 29 +++++++++++++++++++----------
 www/window/SyncJobEdit.js        |  8 ++++++--
 2 files changed, 25 insertions(+), 12 deletions(-)

diff --git a/www/form/RemoteTargetSelector.js b/www/form/RemoteTargetSelector.js
index 2a94c4d7..e7b822d7 100644
--- a/www/form/RemoteTargetSelector.js
+++ b/www/form/RemoteTargetSelector.js
@@ -44,20 +44,18 @@ Ext.define('PBS.form.RemoteStoreSelector', {
 
 	me.store.removeAll();
 
+	me.setDisabled(false);
+	if (!me.firstLoad) {
+	    me.clearValue();
+	}
 	if (me.remote) {
-	    me.setDisabled(false);
-	    if (!me.firstLoad) {
-		me.clearValue();
-	    }
-
 	    me.store.proxy.url = `/api2/json/config/remote/${encodeURIComponent(me.remote)}/scan`;
-	    me.store.load();
-
-	    me.firstLoad = false;
 	} else {
-	    me.setDisabled(true);
-	    me.clearValue();
+	    me.store.proxy.url = '/api2/json/admin/datastore';
 	}
+	me.store.load();
+
+	me.firstLoad = false;
     },
 
     initComponent: function() {
@@ -175,6 +173,17 @@ Ext.define('PBS.form.RemoteNamespaceSelector', {
 	    me.store.proxy.url = `/api2/json/config/remote/${encodedRemote}/scan/${encodedStore}/namespaces`;
 	    me.store.load();
 
+	    me.firstLoad = false;
+	} else if (me.remoteStore) {
+	    me.setDisabled(false);
+	    if (!me.firstLoad) {
+		me.clearValue();
+	    }
+	    let encodedStore = encodeURIComponent(me.remoteStore);
+
+	    me.store.proxy.url = `/api2/json/admin/datastore/${encodedStore}/namespace`;
+	    me.store.load();
+
 	    me.firstLoad = false;
 	} else if (previousStore) {
 	    me.setDisabled(true);
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index 948ad5da..401a03b7 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -137,8 +137,13 @@ Ext.define('PBS.window.SyncJobEdit', {
 		    {
 			fieldLabel: gettext('Source Remote'),
 			xtype: 'pbsRemoteSelector',
-			allowBlank: false,
+			allowBlank: true,
+			emptyText: gettext('Local'),
 			name: 'remote',
+			cbind: {
+			    deleteEmpty: '{!isCreate}',
+			},
+			skipEmptyText: true,
 			listeners: {
 			    change: function(f, value) {
 				let me = this;
@@ -155,7 +160,6 @@ Ext.define('PBS.window.SyncJobEdit', {
 			allowBlank: false,
 			autoSelect: false,
 			name: 'remote-store',
-			disabled: true,
 			listeners: {
 			    change: function(field, value) {
 				let me = this;
-- 
2.30.2





^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 0/4] native local sync-jobs
  2023-02-13 15:45 [pbs-devel] [PATCH proxmox-backup 0/4] native local sync-jobs Hannes Laimer
                   ` (3 preceding siblings ...)
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: add ui support " Hannes Laimer
@ 2023-02-14  8:02 ` Lukas Wagner
  4 siblings, 0 replies; 10+ messages in thread
From: Lukas Wagner @ 2023-02-14  8:02 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Hannes Laimer

On 2/13/23 16:45, Hannes Laimer wrote:
> Add support for local sync. Sync-jobs without
> a remote are considered local, and use a
> different logic for pulling. The main reason
> for not reusing the existing pull logic is
> that the current logic relies on having a
> HttpClient and without a remote we can't create
> one. Having separate logic for local pulling
> however should speed up local syncs a bit,
> and for syncs in the same datastore, chunck
> transmission can be skipped all together.
> Also the autocompletion and UI is updated
> to support local sync-jobs.
> 
> The new logic is mostly adding a local
> alternative whenever the HttpClient is used.
> Since the pulling process involves a lot of
> functions calling functions to do smaller stuff
> it was not really possible to split the changes
> up more.
> 

Gave this a quick spin on the latest master, here's what I tested:
   * Local syncing withing the same datastore (between different namespaces)
   * Local syncing between different datastores
   * Remote syncing to a different PBS instance, to make sure that nothing broke

Everything seems to work as expected. Consider these patches:

Tested-by: Lukas Wagner <l.wagner@proxmox.com>




^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional Hannes Laimer
@ 2023-02-14 14:33   ` Fabian Grünbichler
  2023-02-15 11:40     ` Thomas Lamprecht
  0 siblings, 1 reply; 10+ messages in thread
From: Fabian Grünbichler @ 2023-02-14 14:33 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On February 13, 2023 4:45 pm, Hannes Laimer wrote:
> ... and update places where it is used.
> A SyncJob not having a remote means it is pulling
> from a local datastore.

high level: I wonder whether we really need this for sync jobs, or whether just
having it for pull (or as a new API/CLI endpoint copy/move?) would be enough as
a start? is there a use case for scheduled local syncing?
 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>  pbs-api-types/src/jobs.rs         |  4 +-
>  src/api2/config/remote.rs         |  2 +-
>  src/api2/config/sync.rs           | 41 ++++++++++------
>  src/api2/node/tasks.rs            |  4 +-
>  src/api2/pull.rs                  | 78 +++++++++++++++++++++++--------
>  src/server/email_notifications.rs | 16 +++----
>  6 files changed, 101 insertions(+), 44 deletions(-)
> 
> diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
> index cf7618c4..68db6cb8 100644
> --- a/pbs-api-types/src/jobs.rs
> +++ b/pbs-api-types/src/jobs.rs
> @@ -462,6 +462,7 @@ pub const GROUP_FILTER_LIST_SCHEMA: Schema =
>          },
>          remote: {
>              schema: REMOTE_ID_SCHEMA,
> +            optional: true,
>          },
>          "remote-store": {
>              schema: DATASTORE_SCHEMA,
> @@ -506,7 +507,8 @@ pub struct SyncJobConfig {
>      pub ns: Option<BackupNamespace>,
>      #[serde(skip_serializing_if = "Option::is_none")]
>      pub owner: Option<Authid>,
> -    pub remote: String,
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    pub remote: Option<String>,
>      pub remote_store: String,
>      #[serde(skip_serializing_if = "Option::is_none")]
>      pub remote_ns: Option<BackupNamespace>,
> diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
> index 2f02d121..aa74bdc0 100644
> --- a/src/api2/config/remote.rs
> +++ b/src/api2/config/remote.rs
> @@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
>  
>      let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
>      for job in job_list {
> -        if job.remote == name {
> +        if job.remote.map_or(false, |id| id == name) {
>              param_bail!(
>                  "name",
>                  "remote '{}' is used by sync job '{}' (datastore '{}')",
> diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
> index bd7373df..4c5d06e2 100644
> --- a/src/api2/config/sync.rs
> +++ b/src/api2/config/sync.rs
> @@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
>  
>  use pbs_api_types::{
>      Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
> -    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
> -    PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
> +    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
> +    PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
>  };
>  use pbs_config::sync;
>  
> @@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
>          return false;
>      }
>  
> -    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
> -    remote_privs & PRIV_REMOTE_AUDIT != 0
> +    if let Some(remote) = &job.remote {
> +        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
> +        remote_privs & PRIV_REMOTE_AUDIT != 0
> +    } else {
> +        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
> +        source_ds_privs & PRIV_DATASTORE_AUDIT != 0
> +    }
>  }
>  
>  /// checks whether user can run the corresponding pull job
> @@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
>          return false;
>      }
>  
> -    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
> -    remote_privs & PRIV_REMOTE_READ != 0
> +    if let Some(remote) = &job.remote {
> +        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
> +        remote_privs & PRIV_REMOTE_READ != 0
> +    } else {
> +        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
> +        source_ds_privs & PRIV_DATASTORE_READ != 0
> +    }
>  }
>  
>  #[api(
> @@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
>  #[serde(rename_all = "kebab-case")]
>  /// Deletable property name
>  pub enum DeletableProperty {
> +    /// Delete the remote property(-> meaning local).
> +    Remote,
>      /// Delete the owner property.
>      Owner,
>      /// Delete the comment property.
> @@ -273,6 +285,9 @@ pub fn update_sync_job(
>      if let Some(delete) = delete {
>          for delete_prop in delete {
>              match delete_prop {
> +                DeletableProperty::Remote => {
> +                    data.remote = None;
> +                }
>                  DeletableProperty::Owner => {
>                      data.owner = None;
>                  }
> @@ -329,7 +344,7 @@ pub fn update_sync_job(
>          data.ns = Some(ns);
>      }
>      if let Some(remote) = update.remote {
> -        data.remote = remote;
> +        data.remote = Some(remote);
>      }
>      if let Some(remote_store) = update.remote_store {
>          data.remote_store = remote_store;
> @@ -495,7 +510,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>  
>      let mut job = SyncJobConfig {
>          id: "regular".to_string(),
> -        remote: "remote0".to_string(),
> +        remote: Some("remote0".to_string()),
>          remote_store: "remotestore1".to_string(),
>          remote_ns: None,
>          store: "localstore0".to_string(),
> @@ -529,11 +544,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
>      // reading without proper read permissions on local end must fail
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
>      // reading without proper read permissions on remote end must fail
> -    job.remote = "remote0".to_string();
> +    job.remote = Some("remote0".to_string());
>      job.store = "localstore1".to_string();
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
> @@ -546,10 +561,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      ));
>  
>      // writing without proper write permissions on local end must fail
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>  
>      // writing without proper write permissions on remote end must fail
> -    job.remote = "remote0".to_string();
> +    job.remote = Some("remote0".to_string());
>      job.store = "localstore1".to_string();
>      assert!(!check_sync_job_modify_access(
>          &user_info,
> @@ -558,7 +573,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      ));
>  
>      // reset remote to one where users have access
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>  
>      // user with read permission can only read, but not modify/run
>      assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index d386f805..780cb6d1 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -75,14 +75,14 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
>                  let local_store = captures.get(3);
>                  let local_ns = captures.get(4).map(|m| m.as_str());
>  
> -                if let (Some(remote), Some(remote_store), Some(local_store)) =
> +                if let (remote, Some(remote_store), Some(local_store)) =
>                      (remote, remote_store, local_store)
>                  {
>                      return check_pull_privs(
>                          auth_id,
>                          local_store.as_str(),
>                          local_ns,
> -                        remote.as_str(),
> +                        remote.map(|remote| remote.as_str()),
>                          remote_store.as_str(),
>                          false,
>                      );
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index b2473ec8..c4255254 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -9,7 +9,8 @@ use proxmox_sys::task_log;
>  use pbs_api_types::{
>      Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
>      GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> -    PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
> +    PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA,
> +    REMOVE_VANISHED_BACKUPS_SCHEMA,
>  };
>  use pbs_config::CachedUserInfo;
>  use proxmox_rest_server::WorkerTask;
> @@ -21,7 +22,7 @@ pub fn check_pull_privs(
>      auth_id: &Authid,
>      store: &str,
>      ns: Option<&str>,
> -    remote: &str,
> +    remote: Option<&str>,
>      remote_store: &str,
>      delete: bool,
>  ) -> Result<(), Error> {
> @@ -38,12 +39,22 @@ pub fn check_pull_privs(
>          PRIV_DATASTORE_BACKUP,
>          false,
>      )?;
> -    user_info.check_privs(
> -        auth_id,
> -        &["remote", remote, remote_store],
> -        PRIV_REMOTE_READ,
> -        false,
> -    )?;
> +
> +    if let Some(remote) = remote {
> +        user_info.check_privs(
> +            auth_id,
> +            &["remote", remote, remote_store],
> +            PRIV_REMOTE_READ,
> +            false,
> +        )?;
> +    } else {
> +        user_info.check_privs(
> +            auth_id,
> +            &["datastore", remote_store],
> +            PRIV_DATASTORE_READ,
> +            false,
> +        )?;
> +    }
>  
>      if delete {
>          user_info.check_privs(
> @@ -64,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>          PullParameters::new(
>              &sync_job.store,
>              sync_job.ns.clone().unwrap_or_default(),
> -            &sync_job.remote,
> +            sync_job.remote.clone().as_deref(),
>              &sync_job.remote_store,
>              sync_job.remote_ns.clone().unwrap_or_default(),
>              sync_job
> @@ -89,7 +100,7 @@ pub fn do_sync_job(
>  ) -> Result<String, Error> {
>      let job_id = format!(
>          "{}:{}:{}:{}:{}",
> -        sync_job.remote,
> +        sync_job.remote.clone().unwrap_or("localhost".to_string()),
>          sync_job.remote_store,
>          sync_job.store,
>          sync_job.ns.clone().unwrap_or_default(),
> @@ -112,7 +123,6 @@ pub fn do_sync_job(
>  
>              let worker_future = async move {
>                  let pull_params = PullParameters::try_from(&sync_job)?;
> -                let client = pull_params.client().await?;
>  
>                  task_log!(worker, "Starting datastore sync job '{}'", job_id);
>                  if let Some(event_str) = schedule {
> @@ -122,11 +132,35 @@ pub fn do_sync_job(
>                      worker,
>                      "sync datastore '{}' from '{}/{}'",
>                      sync_job.store,
> -                    sync_job.remote,
> +                    sync_job.remote.clone().unwrap_or("local".to_string()),
>                      sync_job.remote_store,
>                  );
>  
> -                pull_store(&worker, &client, pull_params).await?;
> +                if sync_job.remote.is_some() {
> +                    let client = pull_params.client().await?;
> +                    pull_store(&worker, Some(&client), pull_params).await?;
> +                } else {
> +                    match (sync_job.ns, sync_job.remote_ns) {
> +                        (Some(target_ns), Some(source_ns))
> +                            if target_ns.path().starts_with(source_ns.path())
> +                                && sync_job.store == sync_job.remote_store =>
> +                        {
> +                            task_log!(
> +                                worker,
> +                                "Can't sync namespace into one of its sub-namespaces, skipping"
> +                            );
> +                        }
> +                        (_, None) if sync_job.store == sync_job.remote_store => {
> +                            task_log!(
> +                                worker,
> +                                "Can't sync root namespace into same datastore, skipping"
> +                            );
> +                        }
> +                        _ => {
> +                            pull_store(&worker, None, pull_params).await?;
> +                        }
> +                    }
> +                }
>  
>                  task_log!(worker, "sync job '{}' end", &job_id);
>  
> @@ -178,6 +212,7 @@ pub fn do_sync_job(
>              },
>              remote: {
>                  schema: REMOTE_ID_SCHEMA,
> +                optional: true,
>              },
>              "remote-store": {
>                  schema: DATASTORE_SCHEMA,
> @@ -218,7 +253,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
>  async fn pull(
>      store: String,
>      ns: Option<BackupNamespace>,
> -    remote: String,
> +    remote: Option<String>,
>      remote_store: String,
>      remote_ns: Option<BackupNamespace>,
>      remove_vanished: Option<bool>,
> @@ -241,7 +276,7 @@ async fn pull(
>          &auth_id,
>          &store,
>          ns_str.as_deref(),
> -        &remote,
> +        remote.as_deref(),
>          &remote_store,
>          delete,
>      )?;
> @@ -249,7 +284,7 @@ async fn pull(
>      let pull_params = PullParameters::new(
>          &store,
>          ns,
> -        &remote,
> +        remote.as_deref(),
>          &remote_store,
>          remote_ns.unwrap_or_default(),
>          auth_id.clone(),
> @@ -258,7 +293,12 @@ async fn pull(
>          group_filter,
>          limit,
>      )?;
> -    let client = pull_params.client().await?;
> +
> +    let client = if remote.is_some() {
> +        Some(pull_params.client().await?)
> +    } else {
> +        None
> +    };
>  
>      // fixme: set to_stdout to false?
>      // FIXME: add namespace to worker id?
> @@ -272,11 +312,11 @@ async fn pull(
>                  worker,
>                  "pull datastore '{}' from '{}/{}'",
>                  store,
> -                remote,
> +                remote.as_deref().unwrap_or("localhost"),
>                  remote_store,
>              );
>  
> -            let pull_future = pull_store(&worker, &client, pull_params);
> +            let pull_future = pull_store(&worker, client.as_ref(), pull_params);
>              (select! {
>                  success = pull_future.fuse() => success,
>                  abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
> index b3298cf9..31a46b0f 100644
> --- a/src/server/email_notifications.rs
> +++ b/src/server/email_notifications.rs
> @@ -486,15 +486,15 @@ pub fn send_sync_status(
>          }
>      };
>  
> +    let source_str = if let Some(remote) = job.remote.clone() {
> +        format!("Sync remote '{}'", remote)
> +    } else {
> +        format!("Sync local")
> +    };
> +
>      let subject = match result {
> -        Ok(()) => format!(
> -            "Sync remote '{}' datastore '{}' successful",
> -            job.remote, job.remote_store,
> -        ),
> -        Err(_) => format!(
> -            "Sync remote '{}' datastore '{}' failed",
> -            job.remote, job.remote_store,
> -        ),
> +        Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
> +        Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
>      };
>  
>      send_job_status_mail(email, &subject, &text)?;
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull
  2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull Hannes Laimer
@ 2023-02-14 14:33   ` Fabian Grünbichler
  0 siblings, 0 replies; 10+ messages in thread
From: Fabian Grünbichler @ 2023-02-14 14:33 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On February 13, 2023 4:45 pm, Hannes Laimer wrote:
> ... since creating a HttpClient(which would be needed
> to reuse existing pull logic) without a remote was not
> possible. This also improves the speed for local
> sync-jobs.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>

a high level remark - for all the parts below where I suggest not going over the
API as root for local pulling, you need to properly filter the results by the
"owner" from PullParameters (or maybe that doesn't make sense for the source
side, and we need to pass in the "local" authid of the "caller" from the API to
replace the "remote" authid we get currently). there is a
src/backup/hierarchy.rs module that might be of help for that ;)

> ---
>  pbs-client/src/backup_reader.rs |   5 +
>  src/api2/admin/datastore.rs     |  10 +
>  src/server/pull.rs              | 499 ++++++++++++++++++++------------
>  3 files changed, 335 insertions(+), 179 deletions(-)
> 
> diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
> index 2cd4dc27..9dacef74 100644
> --- a/pbs-client/src/backup_reader.rs
> +++ b/pbs-client/src/backup_reader.rs
> @@ -20,6 +20,11 @@ use pbs_tools::sha::sha256;
>  
>  use super::{H2Client, HttpClient};
>  
> +pub enum BackupSource {
> +    Remote(Arc<BackupReader>),
> +    Local(pbs_datastore::BackupDir),

the Local variant could also take a SnapshotReader instead of BackupDir, like
pbs-tape does. not sure whether that would simplify/improve code - did you
evaluate it?

this is also only used by server::pull, so likely it should go there.. the name
is also rather generic, something like `PullReader` or even just `Reader`, since
it's a pull-internal enum that doesn't need to be pub anyway.

but see below for some suggestions!

> +}
> +
>  /// Backup Reader
>  pub struct BackupReader {
>      h2: H2Client,
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 8d3a6146..8ad78f29 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -445,6 +445,16 @@ pub async fn list_snapshots(
>      _param: Value,
>      _info: &ApiMethod,
>      rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<Vec<SnapshotListItem>, Error> {
> +    do_list_snapshots(store, ns, backup_type, backup_id, rpcenv).await
> +}
> +
> +pub async fn do_list_snapshots(
> +    store: String,
> +    ns: Option<BackupNamespace>,
> +    backup_type: Option<BackupType>,
> +    backup_id: Option<String>,
> +    rpcenv: &mut dyn RpcEnvironment,

this change is not needed (see below)

>  ) -> Result<Vec<SnapshotListItem>, Error> {
>      let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>  
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 65eedf2c..81df00c3 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,7 +1,7 @@
>  //! Sync datastore from remote server
>  
>  use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::{Seek, SeekFrom, Write};
>  use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
> @@ -9,6 +9,7 @@ use std::time::SystemTime;
>  use anyhow::{bail, format_err, Error};
>  use http::StatusCode;
>  use pbs_config::CachedUserInfo;
> +use pbs_datastore::read_chunk::AsyncReadChunk;
>  use serde_json::json;
>  
>  use proxmox_router::HttpError;
> @@ -21,7 +22,7 @@ use pbs_api_types::{
>  };
>  
>  use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> +    BackupReader, BackupRepository, BackupSource, HttpClient, HttpClientOptions, RemoteChunkReader,
>  };
>  use pbs_datastore::data_blob::DataBlob;
>  use pbs_datastore::dynamic_index::DynamicIndexReader;
> @@ -30,7 +31,7 @@ use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
>      archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>  };
> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> +use pbs_datastore::{check_backup_owner, DataStore, LocalChunkReader, StoreProgress};
>  use pbs_tools::sha::sha256;
>  use proxmox_rest_server::WorkerTask;
>  
> @@ -40,7 +41,7 @@ use crate::tools::parallel_handler::ParallelHandler;
>  /// Parameters for a pull operation.
>  pub(crate) struct PullParameters {
>      /// Remote that is pulled from
> -    remote: Remote,
> +    remote: Option<Remote>,
>      /// Full specification of remote datastore
>      source: BackupRepository,

might make more sense to refactor this type to properly differentiate between
remote source and local source?

e.g., we could have

pub(crate) struct LocalSource {
    store: Arc<Datastore>,
    ns: BackupNamespace,
}

pub(crate) struct RemoteSource {
    remote: Remote,
    repo: BackupRepository,
    ns: BackupNamespace,
}

pub(crate) enum PullSource {
    Local(LocalSource),
    Remote(RemoteSOurce),
}

pub(crate) struct PullTarget {
    store: Arc<Datastore>,
    ns: BackupNamespace,
}

pub(crate) struct PullParameters {
    source: PullSource,
    target: PullTarget,
    /// Owner of synced groups (needs to match local owner of pre-existing groups)
    owner: Authid,
    /// Whether to remove groups which exist locally, but not on the remote end
    remove_vanished: bool,
    /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
    max_depth: Option<usize>,
    /// Filters for reducing the pull scope
    group_filter: Option<Vec<GroupFilter>>,
    /// Rate limits for all transfers from `remote`
    limit: RateLimitConfig,
}

and as a first refactoring step, just do this switch (without RemoteSource ;))
and dropping the Client from signatures.

just a suggestion, feel free to name/structure things differently if that makes
more sense!

>      /// Local store that is pulled into
> @@ -70,7 +71,7 @@ impl PullParameters {
>      pub(crate) fn new(
>          store: &str,
>          ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>          remote_store: &str,
>          remote_ns: BackupNamespace,

same here - could have a 

fn new_remote(..) and fn new_local(..) to keep the signatures and parameter
names sensible..

>          owner: Authid,
> @@ -86,18 +87,24 @@ impl PullParameters {
>              remote_ns.check_max_depth(max_depth)?;
>          }
>  
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> +        let (remote, source): (Option<Remote>, BackupRepository) = if let Some(remote_str) = remote
> +        {
> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote = remote_config.lookup::<Remote>("remote", remote_str)?;
> +            let source = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
> +            (Some(remote), source)
> +        } else {
> +            let source = BackupRepository::new(None, None, None, remote_store.to_string());
> +            (None, source)
> +        };
>  
>          let remove_vanished = remove_vanished.unwrap_or(false);
>  
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> -
>          Ok(Self {
>              remote,
>              remote_ns,
> @@ -114,13 +121,17 @@ impl PullParameters {
>  
>      /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
>      pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +        if let Some(remote) = &self.remote {
> +            crate::api2::config::remote::remote_client(remote, Some(self.limit.clone())).await
> +        } else {
> +            bail!("No remote specified. Do not use a HttpClient for a local sync.")
> +        }
>      }
>  }
>  
> -async fn pull_index_chunks<I: IndexFile>(
> +async fn pull_index_chunks<I: IndexFile, C: AsyncReadChunk + Clone>(
>      worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: C,
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -256,10 +267,10 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>  /// - Verify tmp file checksum
>  /// - if archive is an index, pull referenced chunks
>  /// - Rename tmp file into real path
> -async fn pull_single_archive(
> +async fn pull_single_archive<C: AsyncReadChunk + Clone>(
>      worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> +    source: &BackupSource,
> +    chunk_reader: C,
>      snapshot: &pbs_datastore::BackupDir,
>      archive_info: &FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -279,7 +290,15 @@ async fn pull_single_archive(
>          .read(true)
>          .open(&tmp_path)?;
>  
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    match source {
> +        BackupSource::Remote(reader) => reader.download(archive_name, &mut tmpfile).await?,
> +        BackupSource::Local(dir) => {
> +            let mut source_path = dir.full_path();
> +            source_path.push(archive_name);
> +            let data = std::fs::read(source_path)?;
> +            tmpfile.write_all(&data)?;

could use std::fs::copy()

> +        }
> +    };
>  
>      match archive_type(archive_name)? {
>          ArchiveType::DynamicIndex => {
> @@ -289,14 +308,23 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            match source {
> +                BackupSource::Local(ref dir)
> +                    if dir.datastore().name() == snapshot.datastore().name() =>
> +                {
> +                    task_log!(worker, "skipping chunk sync for same datatsore");
> +                }

not sure if this is a good idea - I think pulling locally should check that the
referenced chunks are all there and fail otherwise.. we can always re-introduce
skipping here (possibly opt-in?) later on if we think it's a good performance improvement.

> +                _ => {
> +                    pull_index_chunks(
> +                        worker,
> +                        chunk_reader,
> +                        snapshot.datastore().clone(),
> +                        index,
> +                        downloaded_chunks,
> +                    )
> +                    .await?;
> +                }
> +            };
>          }
>          ArchiveType::FixedIndex => {
>              let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -304,15 +332,23 @@ async fn pull_single_archive(
>              })?;
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
> -
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            match source {
> +                BackupSource::Local(ref dir)
> +                    if dir.datastore().name() == snapshot.datastore().name() =>
> +                {
> +                    task_log!(worker, "skipping chunk sync for same datatsore");
> +                }

same here

> +                _ => {
> +                    pull_index_chunks(
> +                        worker,
> +                        chunk_reader,
> +                        snapshot.datastore().clone(),
> +                        index,
> +                        downloaded_chunks,
> +                    )
> +                    .await?;
> +                }
> +            };
>          }
>          ArchiveType::Blob => {
>              tmpfile.seek(SeekFrom::Start(0))?;
> @@ -321,6 +357,9 @@ async fn pull_single_archive(
>          }
>      }
>      if let Err(err) = std::fs::rename(&tmp_path, &path) {
> +        task_log!(worker, "sync archive {}", archive_name);
> +        task_log!(worker, "tmpfile path {:?}", tmp_path.as_os_str());
> +        task_log!(worker, "path path {:?}", path.as_os_str());

left-over debug logs? ;)

>          bail!("Atomic rename file {:?} failed - {}", path, err);
>      }
>      Ok(())
> @@ -364,7 +403,7 @@ async fn try_client_log_download(
>  /// - Download log if not already existing
>  async fn pull_snapshot(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> +    source: BackupSource,
>      snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> @@ -377,31 +416,45 @@ async fn pull_snapshot(
>      let mut tmp_manifest_name = manifest_name.clone();
>      tmp_manifest_name.set_extension("tmp");

this whole part here could be refactored so that the new/refreshed manifest is
not stored in a tmpfile at all, but only kept in memory

> -    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> -    let mut tmp_manifest_file = match download_res {
> -        Ok(manifest_file) => manifest_file,
> -        Err(err) => {
> -            match err.downcast_ref::<HttpError>() {
> -                Some(HttpError { code, message }) => match *code {
> -                    StatusCode::NOT_FOUND => {
> -                        task_log!(
> -                            worker,
> -                            "skipping snapshot {} - vanished since start of sync",
> -                            snapshot.dir(),
> -                        );
> -                        return Ok(());
> -                    }
> -                    _ => {
> -                        bail!("HTTP error {code} - {message}");
> -                    }
> -                },
> -                None => {
> -                    return Err(err);
> +    let tmp_manifest_blob = match source {
> +        BackupSource::Remote(ref reader) => {
> +            let mut tmp_manifest_file = match download_manifest(reader, &tmp_manifest_name).await {
> +                Ok(manifest_file) => manifest_file,
> +                Err(err) => {
> +                    match err.downcast_ref::<HttpError>() {
> +                        Some(HttpError { code, message }) => match *code {
> +                            StatusCode::NOT_FOUND => {
> +                                task_log!(
> +                                    worker,
> +                                    "skipping snapshot {} - vanished since start of sync",
> +                                    snapshot.dir(),
> +                                );
> +                                return Ok(());
> +                            }
> +                            _ => {
> +                                bail!("HTTP error {code} - {message}");
> +                            }
> +                        },
> +                        None => {
> +                            return Err(err);
> +                        }
> +                    };
>                  }
>              };
> +            DataBlob::load_from_reader(&mut tmp_manifest_file)?
> +        }
> +        BackupSource::Local(ref dir) => {
> +            let data = dir.load_blob(MANIFEST_BLOB_NAME)?;
> +            let mut tmp_manifest_file = std::fs::OpenOptions::new()
> +                .write(true)
> +                .create(true)
> +                .truncate(true)
> +                .read(true)
> +                .open(&tmp_manifest_name)?;
> +            tmp_manifest_file.write_all(data.raw_data())?;
> +            data
>          }
>      };
> -    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
>  
>      if manifest_name.exists() {
>          let manifest_blob = proxmox_lang::try_block!({
> @@ -417,13 +470,17 @@ async fn pull_snapshot(
>          })?;
>  
>          if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
> -            if !client_log_name.exists() {
> -                try_client_log_download(worker, reader, &client_log_name).await?;
> +            if let BackupSource::Remote(ref reader) = source {
> +                if !client_log_name.exists() {
> +                    try_client_log_download(worker, reader.clone(), &client_log_name).await?;
> +                };

logs should also be fetched for local operations..

> +            }
> +            if tmp_manifest_name.exists() {
> +                let _ = std::fs::remove_file(&tmp_manifest_name);
>              }
>              task_log!(worker, "no data changes");
> -            let _ = std::fs::remove_file(&tmp_manifest_name);
>              return Ok(()); // nothing changed
> -        }
> +        };
>      }
>  
>      let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
> @@ -467,32 +524,49 @@ async fn pull_snapshot(
>              }
>          }
>  
> -        let mut chunk_reader = RemoteChunkReader::new(
> -            reader.clone(),
> -            None,
> -            item.chunk_crypt_mode(),
> -            HashMap::new(),
> -        );
> -
> -        pull_single_archive(
> -            worker,
> -            &reader,
> -            &mut chunk_reader,
> -            snapshot,
> -            item,
> -            downloaded_chunks.clone(),
> -        )
> -        .await?;
> +        match source {
> +            BackupSource::Remote(ref reader) => {
> +                let chunk_reader = RemoteChunkReader::new(
> +                    reader.clone(),
> +                    None,
> +                    item.chunk_crypt_mode(),
> +                    HashMap::new(),
> +                );
> +                pull_single_archive(
> +                    worker,
> +                    &source,
> +                    chunk_reader,
> +                    snapshot,
> +                    item,
> +                    downloaded_chunks.clone(),
> +                )
> +                .await?
> +            }
> +            BackupSource::Local(ref dir) => {
> +                let chunk_reader =
> +                    LocalChunkReader::new(dir.datastore().clone(), None, item.chunk_crypt_mode());
> +                pull_single_archive(
> +                    worker,
> +                    &source,
> +                    chunk_reader,
> +                    snapshot,
> +                    item,
> +                    downloaded_chunks.clone(),
> +                )
> +                .await?
> +            }
> +        }
>      }
>  
>      if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
>          bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
>      }

this could become a call to update_manifest that replaces the old one with the
new one instead, if the manifest is kept in-memory only..

>  
> -    if !client_log_name.exists() {
> -        try_client_log_download(worker, reader, &client_log_name).await?;
> +    if let BackupSource::Remote(reader) = source {
> +        if !client_log_name.exists() {
> +            try_client_log_download(worker, reader, &client_log_name).await?;
> +        };

not only relevant for local operations..

>      }
> -
>      snapshot
>          .cleanup_unreferenced_files(&manifest)
>          .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
> @@ -506,7 +580,7 @@ async fn pull_snapshot(
>  /// pointing to the local datastore and target namespace.
>  async fn pull_snapshot_from(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> +    source: BackupSource,
>      snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> @@ -517,7 +591,7 @@ async fn pull_snapshot_from(
>      if is_new {
>          task_log!(worker, "sync snapshot {}", snapshot.dir());
>  
> -        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
> +        if let Err(err) = pull_snapshot(worker, source, snapshot, downloaded_chunks).await {
>              if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
>                  snapshot.backup_ns(),
>                  snapshot.as_ref(),
> @@ -530,7 +604,7 @@ async fn pull_snapshot_from(
>          task_log!(worker, "sync snapshot {} done", snapshot.dir());
>      } else {
>          task_log!(worker, "re-sync snapshot {}", snapshot.dir());
> -        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
> +        pull_snapshot(worker, source, snapshot, downloaded_chunks).await?;
>          task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
>      }
>  
> @@ -600,36 +674,52 @@ impl std::fmt::Display for SkipInfo {
>  /// - local group owner is already checked by pull_store
>  async fn pull_group(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> +    client: Option<&HttpClient>,

the client here is redundant anyway, since for the remote case we can always get
it from params..

>      params: &PullParameters,
>      group: &pbs_api_types::BackupGroup,
>      remote_ns: BackupNamespace,
>      progress: &mut StoreProgress,
>  ) -> Result<(), Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/snapshots",
> -        params.source.store()
> -    );
> -
> -    let mut args = json!({
> -        "backup-type": group.ty,
> -        "backup-id": group.id,
> -    });
> -
> -    if !remote_ns.is_root() {
> -        args["ns"] = serde_json::to_value(&remote_ns)?;
> -    }
> -
>      let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
> +    let mut list: Vec<SnapshotListItem> = if let Some(client) = client {
> +        let path = format!(
> +            "api2/json/admin/datastore/{}/snapshots",
> +            params.source.store()
> +        );
>  
> -    let mut result = client.get(&path, Some(args)).await?;
> -    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> +        let mut args = json!({
> +            "backup-type": group.ty,
> +            "backup-id": group.id,
> +        });
>  
> -    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> +        if !remote_ns.is_root() {
> +            args["ns"] = serde_json::to_value(&remote_ns)?;
> +        }
>  
> -    client.login().await?; // make sure auth is complete
> +        let mut result = client.get(&path, Some(args)).await?;
> +        serde_json::from_value(result["data"].take())?
> +    } else {
> +        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
> +        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam")));

not needed (and would be wrong, since this would elevate logical privileges to root!)

> +        let source_ns = if remote_ns.is_root() {
> +            None
> +        } else {
> +            Some(remote_ns.clone())
> +        };
> +        crate::api2::admin::datastore::do_list_snapshots(
> +            params.source.store().to_string(),
> +            source_ns,
> +            Some(group.ty),
> +            Some(group.id.clone()),
> +            &mut rpcenv,
> +        )
> +        .await?

this could simply list the local snapshots via the local datastore's
store.backup_group().list_backups(), with a bit of refactoring:
- we need BackupDir and in-progress status for both remote and Local
- BackupDir is returned by both local and remote list
- skipping can be done based on size being None (remote) or manifest being
missing (local)

for example, this could be a Vec<(BackupDir, bool)> or skipping could be done
up front, and this could just become a Vec<BackupDir>

> +    };
> +    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
>  
> -    let fingerprint = client.fingerprint();
> +    if let Some(client) = client {
> +        client.login().await?; // make sure auth is complete
> +    }
>  
>      let last_sync = params.store.last_successful_backup(&target_ns, group)?;
>  
> @@ -646,6 +736,13 @@ async fn pull_group(
>          count: 0,
>      };
>  
> +    let datastore: Option<Arc<DataStore>> = match client {
> +        None => Some(DataStore::lookup_datastore(
> +            params.source.store(),
> +            Some(Operation::Read),
> +        )?),
> +        _ => None,
> +    };
>      for (pos, item) in list.into_iter().enumerate() {
>          let snapshot = item.backup;
>  
> @@ -668,33 +765,47 @@ async fn pull_group(
>              }
>          }
>  
> -        // get updated auth_info (new tickets)
> -        let auth_info = client.login().await?;
> -
> -        let options =
> -            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> -                .rate_limit(params.limit.clone());
> +        let backup_source = if let Some(client) = client {

with client possibly dropped, this should rather match on the refactored param
field that tells us whether we are pulling from a local or remote source..

> +            // get updated auth_info (new tickets)
> +            let auth_info = client.login().await?;
> +            let fingerprint = client.fingerprint();
>  
> -        let new_client = HttpClient::new(
> -            params.source.host(),
> -            params.source.port(),
> -            params.source.auth_id(),
> -            options,
> -        )?;
> -
> -        let reader = BackupReader::start(
> -            new_client,
> -            None,
> -            params.source.store(),
> -            &remote_ns,
> -            &snapshot,
> -            true,
> -        )
> -        .await?;
> +            let options = HttpClientOptions::new_non_interactive(
> +                auth_info.ticket.clone(),
> +                fingerprint.clone(),
> +            )
> +            .rate_limit(params.limit.clone());
> +
> +            let new_client = HttpClient::new(
> +                params.source.host(),
> +                params.source.port(),
> +                params.source.auth_id(),
> +                options,
> +            )?;
> +
> +            BackupSource::Remote(
> +                BackupReader::start(
> +                    new_client,
> +                    None,
> +                    params.source.store(),
> +                    &remote_ns,
> +                    &snapshot,
> +                    true,
> +                )
> +                .await?,
> +            )
> +        } else {
> +            if let Some(datastore) = datastore.clone() {

this would then be the second match arm

> +                BackupSource::Local(datastore.backup_dir(remote_ns.clone(), snapshot.clone())?)
> +            } else {

and this would no longer exist, since the match can be exhaustive ;)

> +                unreachable!("if there is no client and no datastore, then the ds lookup would have failed earlier")
> +            }
> +        };

otherwise, this } else { if ... else ... } can be collapsed (there are a few
more clippy lints triggered by this series that might be worthy of cleaning up
as well).

>  
>          let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
>  
> -        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> +        let result =
> +            pull_snapshot_from(worker, backup_source, &snapshot, downloaded_chunks.clone()).await;
>  
>          progress.done_snapshots = pos as u64 + 1;
>          task_log!(worker, "percentage done: {}", progress);
> @@ -735,49 +846,64 @@ async fn pull_group(
>  // will modify params if switching to backwards mode for lack of NS support on remote end
>  async fn query_namespaces(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> +    client: Option<&HttpClient>,

the client here is redundant anyway, since for the remote case we can always get
it from params..

>      params: &mut PullParameters,
>  ) -> Result<Vec<BackupNamespace>, Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/namespace",
> -        params.source.store()
> -    );
> -    let mut data = json!({});
> -    if let Some(max_depth) = params.max_depth {
> -        data["max-depth"] = json!(max_depth);
> -    }
> +    let mut list: Vec<NamespaceListItem> = if let Some(client) = client {

same here again - this should match on params.source being Local or Remote

> +        let path = format!(
> +            "api2/json/admin/datastore/{}/namespace",
> +            params.source.store()
> +        );
> +        let mut data = json!({});
> +        if let Some(max_depth) = params.max_depth {
> +            data["max-depth"] = json!(max_depth);
> +        }
>  
> -    if !params.remote_ns.is_root() {
> -        data["parent"] = json!(params.remote_ns);
> -    }
> +        if !params.remote_ns.is_root() {
> +            data["parent"] = json!(params.remote_ns);
> +        }
>  
> -    let mut result = match client.get(&path, Some(data)).await {
> -        Ok(res) => res,
> -        Err(err) => match err.downcast_ref::<HttpError>() {
> -            Some(HttpError { code, message }) => match *code {
> -                StatusCode::NOT_FOUND => {
> -                    if params.remote_ns.is_root() && params.max_depth.is_none() {
> -                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> -                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> -                        params.max_depth = Some(0);
> -                    } else {
> -                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> -                    }
> +        let mut result = match client.get(&path, Some(data)).await {
> +            Ok(res) => res,
> +            Err(err) => match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match *code {
> +                    StatusCode::NOT_FOUND => {
> +                        if params.remote_ns.is_root() && params.max_depth.is_none() {
> +                            task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                            task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> +                            params.max_depth = Some(0);
> +                        } else {
> +                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                        }
>  
> -                    return Ok(vec![params.remote_ns.clone()]);
> -                }
> -                _ => {
> -                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                        return Ok(vec![params.remote_ns.clone()]);
> +                    }
> +                    _ => {
> +                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    bail!("Querying namespaces failed - {err}");
>                  }
>              },
> -            None => {
> -                bail!("Querying namespaces failed - {err}");
> -            }
> -        },
> -    };
> -
> -    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> +        };
>  
> +        serde_json::from_value(result["data"].take())?
> +    } else {
> +        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
> +        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam")));
> +        let parent_ns = if params.remote_ns.is_root() {
> +            None
> +        } else {
> +            Some(params.remote_ns.clone())
> +        };
> +        crate::api2::admin::namespace::list_namespaces(
> +            params.source.store().to_string(),
> +            parent_ns,
> +            params.max_depth,
> +            &mut rpcenv,
> +        )?

and here as well - instead of pretending to be root@pam and querying over the
API, this could just query directly via pbs_datastore.. we only need a
Vec<BackupNamespace> after all.

> +    };
>      // parents first
>      list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
>  
> @@ -897,7 +1023,7 @@ fn check_and_remove_vanished_ns(
>  /// - access to sub-NS checked here
>  pub(crate) async fn pull_store(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> +    client: Option<&HttpClient>,

the client here is redundant anyway, since for the remote case we can always get
it from params..

>      mut params: PullParameters,
>  ) -> Result<(), Error> {
>      // explicit create shared lock to prevent GC on newly created chunks
> @@ -1000,27 +1126,42 @@ pub(crate) async fn pull_store(
>  /// - owner check for vanished groups done here
>  pub(crate) async fn pull_ns(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> +    client: Option<&HttpClient>,
>      params: &PullParameters,
>      source_ns: BackupNamespace,
>      target_ns: BackupNamespace,
>  ) -> Result<(StoreProgress, bool), Error> {
> -    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> +    let mut list: Vec<GroupListItem> = if let Some(client) = client {

and here we have the same pattern again
- should match on params.source
- we are actually only interested in a Vec<BackupGroup> down below

> +        let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
>  
> -    let args = if !source_ns.is_root() {
> -        Some(json!({
> -            "ns": source_ns,
> -        }))
> -    } else {
> -        None
> -    };
> +        let args = if !source_ns.is_root() {
> +            Some(json!({
> +                "ns": source_ns,
> +            }))
> +        } else {
> +            None
> +        };
>  
> -    let mut result = client
> -        .get(&path, args)
> -        .await
> -        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> +        let mut result = client
> +            .get(&path, args)
> +            .await
> +            .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
>  
> -    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +        serde_json::from_value(result["data"].take())?
> +    } else {
> +        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
> +        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam")));
> +        let source_ns = if source_ns.is_root() {
> +            None
> +        } else {
> +            Some(source_ns.clone())
> +        };
> +        crate::api2::admin::datastore::list_groups(
> +            params.source.store().to_string(),
> +            source_ns,
> +            &mut rpcenv,
> +        )?

so this doesn't need to query over the API as pretend-root, but can just do a local query of the store+NS.

> +    };
>  
>      let total_count = list.len();
>      list.sort_unstable_by(|a, b| {
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional
  2023-02-14 14:33   ` Fabian Grünbichler
@ 2023-02-15 11:40     ` Thomas Lamprecht
  2023-02-16  8:02       ` Fabian Grünbichler
  0 siblings, 1 reply; 10+ messages in thread
From: Thomas Lamprecht @ 2023-02-15 11:40 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Fabian Grünbichler

Am 14/02/2023 um 15:33 schrieb Fabian Grünbichler:
> On February 13, 2023 4:45 pm, Hannes Laimer wrote:
>> ... and update places where it is used.
>> A SyncJob not having a remote means it is pulling
>> from a local datastore.
> high level: I wonder whether we really need this for sync jobs, or whether just
> having it for pull (or as a new API/CLI endpoint copy/move?) would be enough as
> a start? is there a use case for scheduled local syncing?
>  

Yes, e.g. existing ones could be: having a small and fast "incoming" datastore,
which avoids blocking guests on backups and has the "hot" set of snapshots (most
recent) available while using a slower, but huge second one for long term archival.

Future ones would be sync to a S3 backed object storage, which we probably only
want to have done from existing data (similar to tape), but still avoid the media
catalogue and labelling overhead tape must have to be really useful.

Another future one is removable datastores, which this is upfront work for. While
we might not always have time trigged event there, its still useful to have use a
sync job for, e.g., hot-plug triggered events.

Besides that, I'm a bit reserved against adding a move that can cross datastore
boundaries, as doing that manually seems not that useful for any but the smallest
PBS instances (especially on the snapshot level) and for others a sync + prune
is normally better anyway. Moving groups and namespaces around in the same datastore
OTOH would be useful for organizing purpose, and without crossing into another CAS
also simple to implement.




^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional
  2023-02-15 11:40     ` Thomas Lamprecht
@ 2023-02-16  8:02       ` Fabian Grünbichler
  0 siblings, 0 replies; 10+ messages in thread
From: Fabian Grünbichler @ 2023-02-16  8:02 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Thomas Lamprecht

On February 15, 2023 12:40 pm, Thomas Lamprecht wrote:
> Am 14/02/2023 um 15:33 schrieb Fabian Grünbichler:
>> On February 13, 2023 4:45 pm, Hannes Laimer wrote:
>>> ... and update places where it is used.
>>> A SyncJob not having a remote means it is pulling
>>> from a local datastore.
>> high level: I wonder whether we really need this for sync jobs, or whether just
>> having it for pull (or as a new API/CLI endpoint copy/move?) would be enough as
>> a start? is there a use case for scheduled local syncing?
>>  
> 
> Yes, e.g. existing ones could be: having a small and fast "incoming" datastore,
> which avoids blocking guests on backups and has the "hot" set of snapshots (most
> recent) available while using a slower, but huge second one for long term archival.

yeah, that one makes sense.

> Future ones would be sync to a S3 backed object storage, which we probably only
> want to have done from existing data (similar to tape), but still avoid the media
> catalogue and labelling overhead tape must have to be really useful.

not sure - we'd probably also want to combine chunks into bigger objects for S3
to save costs? but that is something we can evaluate when we start designing
that feature in detail.

> Another future one is removable datastores, which this is upfront work for. While
> we might not always have time trigged event there, its still useful to have use a
> sync job for, e.g., hot-plug triggered events.

could be implemented "inline", but yeah, having a list of "jobs to trigger" is
nicer and more flexible.

why I asked the question is the following:
- PullParameters is an internal implementaion detail, and can be refactored like
we want
- SyncJobConfig is not - the naming of fields makes less sense now with
non-remote usage, we need to store an additional user there if we want
unprivileged local sync, and upgrading config files in place when it's not just
adding a new, optional field is yucky

so I guess we could
- live with the ugly config file/API/CLI parameters having remote optional for
the local case, job_owner/.. optional for the remote case, and remote_ns and
remote_store fields that are actually local for the local case (that's just what
came up so far, maybe more)
- split local and remote sync jobs (or copy and sync, or ..) into two different
configs so that each just has the fields it actually needs with names that make
sense - but also kinda meh

> Besides that, I'm a bit reserved against adding a move that can cross datastore
> boundaries, as doing that manually seems not that useful for any but the smallest
> PBS instances (especially on the snapshot level) and for others a sync + prune
> is normally better anyway. Moving groups and namespaces around in the same datastore
> OTOH would be useful for organizing purpose, and without crossing into another CAS
> also simple to implement.

yeah, move/copy would just be "alternative" endpoints for local pulling that
re-use the pull code under the hood, but expose a better set of API
parameters/terminology. "move" could definitely restricted to intra-datastore
operations, if we implement it.




^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2023-02-16  8:02 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-02-13 15:45 [pbs-devel] [PATCH proxmox-backup 0/4] native local sync-jobs Hannes Laimer
2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 1/4] api2: make remote for sync-jobs optional Hannes Laimer
2023-02-14 14:33   ` Fabian Grünbichler
2023-02-15 11:40     ` Thomas Lamprecht
2023-02-16  8:02       ` Fabian Grünbichler
2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull Hannes Laimer
2023-02-14 14:33   ` Fabian Grünbichler
2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 3/4] manager: add completion for local sync-jobs Hannes Laimer
2023-02-13 15:45 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: add ui support " Hannes Laimer
2023-02-14  8:02 ` [pbs-devel] [PATCH proxmox-backup 0/4] native " Lukas Wagner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal