* [pbs-devel] [PATCH proxmox-backup v5 0/6] local sync-jobs
@ 2023-10-06 14:05 Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 1/6] api: make Remote for SyncJob optional Hannes Laimer
` (5 more replies)
0 siblings, 6 replies; 11+ messages in thread
From: Hannes Laimer @ 2023-10-06 14:05 UTC (permalink / raw)
To: pbs-devel
Add support for local sync. SyncJobs without a remote are considered local, and
use a different logic for pulling. In the course of adding the extra pull logic,
the pull code was rewritten to basically be source independent. Also cli
completion and the UI was updated to allow Remotes in SyncJobs to be optional.
v2: thanks @Fabian for the feedback
* make pull logic more source independent
v3: thanks @Fabian and @Wolfang for the feedback
* remove enums for Local/Remote
* use traits, pull logic now expects a `dyn PullSource`(or `dyn PullReader`)
* add lock to dir for local sync
* split refactoring of pull logic and implementation of
local pulling into two commits
v4: thanks @Wolfgang and @Lukas for the feedback
* ui: disable rate limit for local sync jobs
* ui: rename `Source Remote` -> `Source PBS`
* update SYNC_JOB_WORKER_ID_REGEX, use '-' as remote for local
* fix problme with groups not being synced to the correct ns
* add check for source == target
* moved two changes from patch 3 to patch 1
v5: thanks @Thomas for the feedback
* ui: add radio buttons to make local vs remote more clear
* move ui patch to the end + api2 -> api
Hannes Laimer (6):
api: make Remote for SyncJob optional
manager: add completion for opt. Remote in SyncJob
accept a ref to a HttpClient
pull: refactor pulling from a datastore
pull: add support for pulling from local datastore
ui: add support for optional Remote in SyncJob
Cargo.toml | 2 +
examples/download-speed.rs | 2 +-
pbs-api-types/src/jobs.rs | 9 +-
pbs-client/src/backup_reader.rs | 2 +-
pbs-datastore/src/read_chunk.rs | 2 +-
proxmox-backup-client/src/catalog.rs | 4 +-
proxmox-backup-client/src/main.rs | 2 +-
proxmox-backup-client/src/mount.rs | 2 +-
proxmox-file-restore/src/main.rs | 4 +-
src/api2/config/remote.rs | 16 +-
src/api2/config/sync.rs | 41 +-
src/api2/node/tasks.rs | 3 +-
src/api2/pull.rs | 59 +-
src/bin/proxmox-backup-manager.rs | 67 +-
src/bin/proxmox_backup_debug/diff.rs | 2 +-
src/server/email_notifications.rs | 18 +-
src/server/pull.rs | 1073 ++++++++++++++++----------
www/form/RemoteTargetSelector.js | 29 +-
www/window/SyncJobEdit.js | 72 +-
19 files changed, 919 insertions(+), 490 deletions(-)
--
2.39.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v5 1/6] api: make Remote for SyncJob optional
2023-10-06 14:05 [pbs-devel] [PATCH proxmox-backup v5 0/6] local sync-jobs Hannes Laimer
@ 2023-10-06 14:05 ` Hannes Laimer
2023-11-08 10:53 ` Thomas Lamprecht
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 2/6] manager: add completion for opt. Remote in SyncJob Hannes Laimer
` (4 subsequent siblings)
5 siblings, 1 reply; 11+ messages in thread
From: Hannes Laimer @ 2023-10-06 14:05 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-api-types/src/jobs.rs | 9 ++--
src/api2/config/remote.rs | 2 +-
src/api2/config/sync.rs | 41 +++++++++++------
src/api2/node/tasks.rs | 3 +-
src/api2/pull.rs | 76 +++++++++++++++++++++++--------
src/bin/proxmox-backup-manager.rs | 6 +--
src/server/email_notifications.rs | 18 ++++----
7 files changed, 107 insertions(+), 48 deletions(-)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 23e19b7b..aedee090 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -17,8 +17,8 @@ const_regex! {
/// Regex for verification jobs 'DATASTORE:ACTUAL_JOB_ID'
pub VERIFICATION_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):");
- /// Regex for sync jobs 'REMOTE:REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID'
- pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r")(?::(", BACKUP_NS_RE!(), r"))?:");
+ /// Regex for sync jobs '(REMOTE|\-):REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID'
+ pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"|\-):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r")(?::(", BACKUP_NS_RE!(), r"))?:");
}
pub const JOB_ID_SCHEMA: Schema = StringSchema::new("Job ID.")
@@ -467,6 +467,7 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
},
remote: {
schema: REMOTE_ID_SCHEMA,
+ optional: true,
},
"remote-store": {
schema: DATASTORE_SCHEMA,
@@ -515,7 +516,9 @@ pub struct SyncJobConfig {
pub ns: Option<BackupNamespace>,
#[serde(skip_serializing_if = "Option::is_none")]
pub owner: Option<Authid>,
- pub remote: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ /// None implies local sync.
+ pub remote: Option<String>,
pub remote_store: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub remote_ns: Option<BackupNamespace>,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 76dd3b89..307cf3cd 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
for job in job_list {
- if job.remote == name {
+ if job.remote.map_or(false, |id| id == name) {
param_bail!(
"name",
"remote '{}' is used by sync job '{}' (datastore '{}')",
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 01e5f2ce..21634bd5 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
use pbs_api_types::{
Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
- PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
- PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
+ PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
+ PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
};
use pbs_config::sync;
@@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
return false;
}
- let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
- remote_privs & PRIV_REMOTE_AUDIT != 0
+ if let Some(remote) = &job.remote {
+ let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
+ remote_privs & PRIV_REMOTE_AUDIT != 0
+ } else {
+ let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+ source_ds_privs & PRIV_DATASTORE_AUDIT != 0
+ }
}
/// checks whether user can run the corresponding pull job
@@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
return false;
}
- let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
- remote_privs & PRIV_REMOTE_READ != 0
+ if let Some(remote) = &job.remote {
+ let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
+ remote_privs & PRIV_REMOTE_READ != 0
+ } else {
+ let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+ source_ds_privs & PRIV_DATASTORE_READ != 0
+ }
}
#[api(
@@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
#[serde(rename_all = "kebab-case")]
/// Deletable property name
pub enum DeletableProperty {
+ /// Delete the remote property(-> meaning local).
+ Remote,
/// Delete the owner property.
Owner,
/// Delete the comment property.
@@ -275,6 +287,9 @@ pub fn update_sync_job(
if let Some(delete) = delete {
for delete_prop in delete {
match delete_prop {
+ DeletableProperty::Remote => {
+ data.remote = None;
+ }
DeletableProperty::Owner => {
data.owner = None;
}
@@ -334,7 +349,7 @@ pub fn update_sync_job(
data.ns = Some(ns);
}
if let Some(remote) = update.remote {
- data.remote = remote;
+ data.remote = Some(remote);
}
if let Some(remote_store) = update.remote_store {
data.remote_store = remote_store;
@@ -503,7 +518,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
let mut job = SyncJobConfig {
id: "regular".to_string(),
- remote: "remote0".to_string(),
+ remote: Some("remote0".to_string()),
remote_store: "remotestore1".to_string(),
remote_ns: None,
store: "localstore0".to_string(),
@@ -538,11 +553,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
// reading without proper read permissions on local end must fail
- job.remote = "remote1".to_string();
+ job.remote = Some("remote1".to_string());
assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
// reading without proper read permissions on remote end must fail
- job.remote = "remote0".to_string();
+ job.remote = Some("remote0".to_string());
job.store = "localstore1".to_string();
assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
@@ -555,10 +570,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
));
// writing without proper write permissions on local end must fail
- job.remote = "remote1".to_string();
+ job.remote = Some("remote1".to_string());
// writing without proper write permissions on remote end must fail
- job.remote = "remote0".to_string();
+ job.remote = Some("remote0".to_string());
job.store = "localstore1".to_string();
assert!(!check_sync_job_modify_access(
&user_info,
@@ -567,7 +582,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
));
// reset remote to one where users have access
- job.remote = "remote1".to_string();
+ job.remote = Some("remote1".to_string());
// user with read permission can only read, but not modify/run
assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 866361c6..8f08d3af 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -78,11 +78,12 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
if let (Some(remote), Some(remote_store), Some(local_store)) =
(remote, remote_store, local_store)
{
+ let remote_str = remote.as_str();
return check_pull_privs(
auth_id,
local_store.as_str(),
local_ns,
- remote.as_str(),
+ (remote_str != "-").then_some(remote_str),
remote_store.as_str(),
false,
);
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index daeba7cf..9ed83046 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -1,5 +1,5 @@
//! Sync datastore from remote server
-use anyhow::{format_err, Error};
+use anyhow::{bail, format_err, Error};
use futures::{future::FutureExt, select};
use proxmox_router::{Permission, Router, RpcEnvironment};
@@ -8,7 +8,7 @@ use proxmox_sys::task_log;
use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
- GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+ GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
TRANSFER_LAST_SCHEMA,
};
@@ -22,7 +22,7 @@ pub fn check_pull_privs(
auth_id: &Authid,
store: &str,
ns: Option<&str>,
- remote: &str,
+ remote: Option<&str>,
remote_store: &str,
delete: bool,
) -> Result<(), Error> {
@@ -39,12 +39,22 @@ pub fn check_pull_privs(
PRIV_DATASTORE_BACKUP,
false,
)?;
- user_info.check_privs(
- auth_id,
- &["remote", remote, remote_store],
- PRIV_REMOTE_READ,
- false,
- )?;
+
+ if let Some(remote) = remote {
+ user_info.check_privs(
+ auth_id,
+ &["remote", remote, remote_store],
+ PRIV_REMOTE_READ,
+ false,
+ )?;
+ } else {
+ user_info.check_privs(
+ auth_id,
+ &["datastore", remote_store],
+ PRIV_DATASTORE_BACKUP,
+ false,
+ )?;
+ }
if delete {
user_info.check_privs(
@@ -65,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
PullParameters::new(
&sync_job.store,
sync_job.ns.clone().unwrap_or_default(),
- &sync_job.remote,
+ sync_job.remote.as_deref().unwrap_or("local"),
&sync_job.remote_store,
sync_job.remote_ns.clone().unwrap_or_default(),
sync_job
@@ -91,7 +101,7 @@ pub fn do_sync_job(
) -> Result<String, Error> {
let job_id = format!(
"{}:{}:{}:{}:{}",
- sync_job.remote,
+ sync_job.remote.as_deref().unwrap_or("-"),
sync_job.remote_store,
sync_job.store,
sync_job.ns.clone().unwrap_or_default(),
@@ -99,6 +109,13 @@ pub fn do_sync_job(
);
let worker_type = job.jobtype().to_string();
+ if sync_job.remote.is_none()
+ && sync_job.store == sync_job.remote_store
+ && sync_job.ns == sync_job.remote_ns
+ {
+ bail!("can't sync, source equals the target");
+ }
+
let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
let upid_str = WorkerTask::spawn(
@@ -122,13 +139,33 @@ pub fn do_sync_job(
}
task_log!(
worker,
- "sync datastore '{}' from '{}/{}'",
+ "sync datastore '{}' from '{}{}'",
sync_job.store,
- sync_job.remote,
+ sync_job
+ .remote
+ .as_deref()
+ .map_or(String::new(), |remote| format!("{remote}/")),
sync_job.remote_store,
);
- pull_store(&worker, &client, pull_params).await?;
+ if sync_job.remote.is_some() {
+ pull_store(&worker, &client, pull_params).await?;
+ } else {
+ if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
+ if target_ns.path().starts_with(source_ns.path())
+ && sync_job.store == sync_job.remote_store
+ && sync_job.max_depth.map_or(true, |sync_depth| {
+ target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
+ }) {
+ task_log!(
+ worker,
+ "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
+ );
+ }
+ } else {
+ pull_store(&worker, &client, pull_params).await?;
+ }
+ }
task_log!(worker, "sync job '{}' end", &job_id);
@@ -180,6 +217,7 @@ pub fn do_sync_job(
},
remote: {
schema: REMOTE_ID_SCHEMA,
+ optional: true,
},
"remote-store": {
schema: DATASTORE_SCHEMA,
@@ -224,7 +262,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
async fn pull(
store: String,
ns: Option<BackupNamespace>,
- remote: String,
+ remote: Option<String>,
remote_store: String,
remote_ns: Option<BackupNamespace>,
remove_vanished: Option<bool>,
@@ -248,7 +286,7 @@ async fn pull(
&auth_id,
&store,
ns_str.as_deref(),
- &remote,
+ remote.as_deref(),
&remote_store,
delete,
)?;
@@ -256,7 +294,7 @@ async fn pull(
let pull_params = PullParameters::new(
&store,
ns,
- &remote,
+ remote.as_deref().unwrap_or("local"),
&remote_store,
remote_ns.unwrap_or_default(),
auth_id.clone(),
@@ -280,7 +318,7 @@ async fn pull(
worker,
"pull datastore '{}' from '{}/{}'",
store,
- remote,
+ remote.as_deref().unwrap_or("-"),
remote_store,
);
@@ -299,4 +337,4 @@ async fn pull(
Ok(upid_str)
}
-pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
\ No newline at end of file
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index b4cb6cb3..1dd51951 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -535,21 +535,21 @@ fn get_remote(param: &HashMap<String, String>) -> Option<String> {
param.get("remote").map(|r| r.to_owned()).or_else(|| {
if let Some(id) = param.get("id") {
if let Ok(job) = get_sync_job(id) {
- return Some(job.remote);
+ return job.remote;
}
}
None
})
}
-fn get_remote_store(param: &HashMap<String, String>) -> Option<(String, String)> {
+fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>, String)> {
let mut job: Option<SyncJobConfig> = None;
let remote = param.get("remote").map(|r| r.to_owned()).or_else(|| {
if let Some(id) = param.get("id") {
job = get_sync_job(id).ok();
if let Some(ref job) = job {
- return Some(job.remote.clone());
+ return job.remote.clone();
}
}
None
diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
index ea1476d7..18881782 100644
--- a/src/server/email_notifications.rs
+++ b/src/server/email_notifications.rs
@@ -484,15 +484,17 @@ pub fn send_sync_status(
}
};
+ let tmp_src_string;
+ let source_str = if let Some(remote) = &job.remote {
+ tmp_src_string = format!("Sync remote '{}'", remote);
+ &tmp_src_string
+ } else {
+ "Sync local"
+ };
+
let subject = match result {
- Ok(()) => format!(
- "Sync remote '{}' datastore '{}' successful",
- job.remote, job.remote_store,
- ),
- Err(_) => format!(
- "Sync remote '{}' datastore '{}' failed",
- job.remote, job.remote_store,
- ),
+ Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
+ Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
};
send_job_status_mail(email, &subject, &text)?;
--
2.39.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v5 2/6] manager: add completion for opt. Remote in SyncJob
2023-10-06 14:05 [pbs-devel] [PATCH proxmox-backup v5 0/6] local sync-jobs Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 1/6] api: make Remote for SyncJob optional Hannes Laimer
@ 2023-10-06 14:05 ` Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 3/6] accept a ref to a HttpClient Hannes Laimer
` (3 subsequent siblings)
5 siblings, 0 replies; 11+ messages in thread
From: Hannes Laimer @ 2023-10-06 14:05 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/bin/proxmox-backup-manager.rs | 63 +++++++++++++++++++------------
1 file changed, 39 insertions(+), 24 deletions(-)
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index 1dd51951..eadfe547 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -555,15 +555,13 @@ fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>,
None
});
- if let Some(remote) = remote {
- let store = param
- .get("remote-store")
- .map(|r| r.to_owned())
- .or_else(|| job.map(|job| job.remote_store));
-
- if let Some(store) = store {
- return Some((remote, store));
- }
+ let store = param
+ .get("remote-store")
+ .map(|r| r.to_owned())
+ .or_else(|| job.map(|job| job.remote_store));
+
+ if let Some(store) = store {
+ return Some((remote, store));
}
None
@@ -584,7 +582,7 @@ fn get_remote_ns(param: &HashMap<String, String>) -> Option<BackupNamespace> {
}
// shell completion helper
-pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
+pub fn complete_remote_datastore_name(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
let mut list = Vec::new();
if let Some(remote) = get_remote(param) {
@@ -595,7 +593,9 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
list.push(item.store);
}
}
- }
+ } else {
+ list = pbs_config::datastore::complete_datastore_name(arg, param);
+ };
list
}
@@ -607,17 +607,25 @@ pub fn complete_remote_datastore_namespace(
) -> Vec<String> {
let mut list = Vec::new();
- if let Some((remote, remote_store)) = get_remote_store(param) {
- if let Ok(data) = proxmox_async::runtime::block_on(async move {
+ if let Some(data) = match get_remote_store(param) {
+ Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
crate::api2::config::remote::scan_remote_namespaces(
remote.clone(),
remote_store.clone(),
)
.await
- }) {
- for item in data {
- list.push(item.ns.name());
- }
+ .ok()
+ }),
+ Some((None, source_store)) => {
+ let mut rpcenv = CliEnvironment::new();
+ rpcenv.set_auth_id(Some(String::from("root@pam")));
+ crate::api2::admin::namespace::list_namespaces(source_store, None, None, &mut rpcenv)
+ .ok()
+ }
+ _ => None,
+ } {
+ for item in data {
+ list.push(item.ns.name());
}
}
@@ -662,19 +670,26 @@ pub fn complete_sync_local_datastore_namespace(
pub fn complete_remote_datastore_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
let mut list = Vec::new();
- if let Some((remote, remote_store)) = get_remote_store(param) {
- let ns = get_remote_ns(param);
- if let Ok(data) = proxmox_async::runtime::block_on(async move {
+ let ns = get_remote_ns(param);
+ if let Some(data) = match get_remote_store(param) {
+ Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
crate::api2::config::remote::scan_remote_groups(
remote.clone(),
remote_store.clone(),
ns,
)
.await
- }) {
- for item in data {
- list.push(format!("{}/{}", item.backup.ty, item.backup.id));
- }
+ .ok()
+ }),
+ Some((None, source_store)) => {
+ let mut rpcenv = CliEnvironment::new();
+ rpcenv.set_auth_id(Some(String::from("root@pam")));
+ crate::api2::admin::datastore::list_groups(source_store, ns, &mut rpcenv).ok()
+ }
+ _ => None,
+ } {
+ for item in data {
+ list.push(format!("{}/{}", item.backup.ty, item.backup.id));
}
}
--
2.39.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v5 3/6] accept a ref to a HttpClient
2023-10-06 14:05 [pbs-devel] [PATCH proxmox-backup v5 0/6] local sync-jobs Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 1/6] api: make Remote for SyncJob optional Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 2/6] manager: add completion for opt. Remote in SyncJob Hannes Laimer
@ 2023-10-06 14:05 ` Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 4/6] pull: refactor pulling from a datastore Hannes Laimer
` (2 subsequent siblings)
5 siblings, 0 replies; 11+ messages in thread
From: Hannes Laimer @ 2023-10-06 14:05 UTC (permalink / raw)
To: pbs-devel
... since the functions don't actually need to own the value.
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
examples/download-speed.rs | 2 +-
pbs-client/src/backup_reader.rs | 2 +-
proxmox-backup-client/src/catalog.rs | 4 ++--
proxmox-backup-client/src/main.rs | 2 +-
proxmox-backup-client/src/mount.rs | 2 +-
proxmox-file-restore/src/main.rs | 4 ++--
src/bin/proxmox_backup_debug/diff.rs | 2 +-
src/server/pull.rs | 2 +-
8 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/examples/download-speed.rs b/examples/download-speed.rs
index c487d704..fe700982 100644
--- a/examples/download-speed.rs
+++ b/examples/download-speed.rs
@@ -34,7 +34,7 @@ async fn run() -> Result<(), Error> {
let backup_time = proxmox_time::parse_rfc3339("2019-06-28T10:49:48Z")?;
let client = BackupReader::start(
- client,
+ &client,
None,
"store2",
&BackupNamespace::root(),
diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
index 2cd4dc27..36d8ebcf 100644
--- a/pbs-client/src/backup_reader.rs
+++ b/pbs-client/src/backup_reader.rs
@@ -44,7 +44,7 @@ impl BackupReader {
/// Create a new instance by upgrading the connection at '/api2/json/reader'
pub async fn start(
- client: HttpClient,
+ client: &HttpClient,
crypt_config: Option<Arc<CryptConfig>>,
datastore: &str,
ns: &BackupNamespace,
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index 8c8c1458..72b22e67 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -75,7 +75,7 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
let client = connect(&repo)?;
let client = BackupReader::start(
- client,
+ &client,
crypt_config.clone(),
repo.store(),
&backup_ns,
@@ -187,7 +187,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
};
let client = BackupReader::start(
- client,
+ &client,
crypt_config.clone(),
repo.store(),
&backup_ns,
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 1a13291a..b66e3fdb 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -1313,7 +1313,7 @@ async fn restore(
};
let client = BackupReader::start(
- client,
+ &client,
crypt_config.clone(),
repo.store(),
&ns,
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index 242556d0..4a2f8335 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -234,7 +234,7 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
};
let client = BackupReader::start(
- client,
+ &client,
crypt_config.clone(),
repo.store(),
&backup_ns,
diff --git a/proxmox-file-restore/src/main.rs b/proxmox-file-restore/src/main.rs
index 9c74a476..50875a63 100644
--- a/proxmox-file-restore/src/main.rs
+++ b/proxmox-file-restore/src/main.rs
@@ -107,7 +107,7 @@ async fn list_files(
) -> Result<Vec<ArchiveEntry>, Error> {
let client = connect(&repo)?;
let client = BackupReader::start(
- client,
+ &client,
crypt_config.clone(),
repo.store(),
&namespace,
@@ -430,7 +430,7 @@ async fn extract(
let client = connect(&repo)?;
let client = BackupReader::start(
- client,
+ &client,
crypt_config.clone(),
repo.store(),
&namespace,
diff --git a/src/bin/proxmox_backup_debug/diff.rs b/src/bin/proxmox_backup_debug/diff.rs
index 9924fb7b..1c64b27a 100644
--- a/src/bin/proxmox_backup_debug/diff.rs
+++ b/src/bin/proxmox_backup_debug/diff.rs
@@ -294,7 +294,7 @@ async fn create_backup_reader(
};
let client = connect(¶ms.repo)?;
let backup_reader = BackupReader::start(
- client,
+ &client,
params.crypt_config.clone(),
params.repo.store(),
¶ms.namespace,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index a973a10e..e55452d1 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -738,7 +738,7 @@ async fn pull_group(
)?;
let reader = BackupReader::start(
- new_client,
+ &new_client,
None,
params.source.store(),
&remote_ns,
--
2.39.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v5 4/6] pull: refactor pulling from a datastore
2023-10-06 14:05 [pbs-devel] [PATCH proxmox-backup v5 0/6] local sync-jobs Hannes Laimer
` (2 preceding siblings ...)
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 3/6] accept a ref to a HttpClient Hannes Laimer
@ 2023-10-06 14:05 ` Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 5/6] pull: add support for pulling from local datastore Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
5 siblings, 0 replies; 11+ messages in thread
From: Hannes Laimer @ 2023-10-06 14:05 UTC (permalink / raw)
To: pbs-devel
... making the pull logic independent from the actual source
using two traits.
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
Cargo.toml | 2 +
pbs-datastore/src/read_chunk.rs | 2 +-
src/api2/config/remote.rs | 14 +-
src/api2/pull.rs | 31 +-
src/server/pull.rs | 936 ++++++++++++++++++--------------
5 files changed, 564 insertions(+), 421 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index cfbf2ba1..1e4e030f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
# regular crates
anyhow = "1.0"
+async-trait = "0.1.56"
apt-pkg-native = "0.3.2"
base64 = "0.13"
bitflags = "1.2.1"
@@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
[dependencies]
anyhow.workspace = true
+async-trait.workspace = true
apt-pkg-native.workspace = true
base64.workspace = true
bitflags.workspace = true
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
}
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
/// Returns the encoded chunk data
fn read_raw_chunk<'a>(
&'a self,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 307cf3cd..2511c5d5 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
Ok(())
}
-/// Helper to get client for remote.cfg entry
-pub async fn remote_client(
+/// Helper to get client for remote.cfg entry without login, just config
+pub fn remote_client_config(
remote: &Remote,
limit: Option<RateLimitConfig>,
) -> Result<HttpClient, Error> {
@@ -320,6 +320,16 @@ pub async fn remote_client(
&remote.config.auth_id,
options,
)?;
+
+ Ok(client)
+}
+
+/// Helper to get client for remote.cfg entry
+pub async fn remote_client(
+ remote: &Remote,
+ limit: Option<RateLimitConfig>,
+) -> Result<HttpClient, Error> {
+ let client = remote_client_config(remote, limit)?;
let _auth_info = client
.login() // make sure we can auth
.await
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 9ed83046..8f1aad43 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -8,7 +8,7 @@ use proxmox_sys::task_log;
use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
- GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+ GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
TRANSFER_LAST_SCHEMA,
};
@@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
PullParameters::new(
&sync_job.store,
sync_job.ns.clone().unwrap_or_default(),
- sync_job.remote.as_deref().unwrap_or("local"),
+ sync_job.remote.as_deref(),
&sync_job.remote_store,
sync_job.remote_ns.clone().unwrap_or_default(),
sync_job
@@ -131,7 +131,6 @@ pub fn do_sync_job(
let worker_future = async move {
let pull_params = PullParameters::try_from(&sync_job)?;
- let client = pull_params.client().await?;
task_log!(worker, "Starting datastore sync job '{}'", job_id);
if let Some(event_str) = schedule {
@@ -148,24 +147,7 @@ pub fn do_sync_job(
sync_job.remote_store,
);
- if sync_job.remote.is_some() {
- pull_store(&worker, &client, pull_params).await?;
- } else {
- if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
- if target_ns.path().starts_with(source_ns.path())
- && sync_job.store == sync_job.remote_store
- && sync_job.max_depth.map_or(true, |sync_depth| {
- target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
- }) {
- task_log!(
- worker,
- "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
- );
- }
- } else {
- pull_store(&worker, &client, pull_params).await?;
- }
- }
+ pull_store(&worker, pull_params).await?;
task_log!(worker, "sync job '{}' end", &job_id);
@@ -294,7 +276,7 @@ async fn pull(
let pull_params = PullParameters::new(
&store,
ns,
- remote.as_deref().unwrap_or("local"),
+ remote.as_deref(),
&remote_store,
remote_ns.unwrap_or_default(),
auth_id.clone(),
@@ -304,7 +286,6 @@ async fn pull(
limit,
transfer_last,
)?;
- let client = pull_params.client().await?;
// fixme: set to_stdout to false?
// FIXME: add namespace to worker id?
@@ -322,7 +303,7 @@ async fn pull(
remote_store,
);
- let pull_future = pull_store(&worker, &client, pull_params);
+ let pull_future = pull_store(&worker, pull_params);
(select! {
success = pull_future.fuse() => success,
abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
@@ -337,4 +318,4 @@ async fn pull(
Ok(upid_str)
}
-pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
\ No newline at end of file
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e55452d1..ff3e6d0a 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
//! Sync datastore from remote server
use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::Seek;
+use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
use proxmox_router::HttpError;
-use proxmox_sys::task_log;
+use proxmox_sys::{task_log, task_warn};
+use serde_json::json;
use pbs_api_types::{
- print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
- Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+ GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
};
-
-use pbs_client::{
- BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
+use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
-/// Parameters for a pull operation.
-pub(crate) struct PullParameters {
- /// Remote that is pulled from
- remote: Remote,
- /// Full specification of remote datastore
- source: BackupRepository,
- /// Local store that is pulled into
+struct RemoteReader {
+ backup_reader: Arc<BackupReader>,
+ dir: BackupDir,
+}
+
+pub(crate) struct PullTarget {
store: Arc<DataStore>,
- /// Remote namespace
- remote_ns: BackupNamespace,
- /// Local namespace (anchor)
ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+ repo: BackupRepository,
+ ns: BackupNamespace,
+ client: HttpClient,
+}
+
+#[async_trait::async_trait]
+/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source
+trait PullSource: Send + Sync {
+ /// Lists namespaces from the source.
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error>;
+
+ /// Lists groups within a specific namespace from the source.
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error>;
+
+ /// Lists backup directories for a specific group within a specific namespace from the source.
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupDir>, Error>;
+ fn get_ns(&self) -> BackupNamespace;
+ fn print_store_and_ns(&self) -> String;
+
+ /// Returns a reader for reading data from a specific backup directory.
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn PullReader>, Error>;
+}
+
+#[async_trait::async_trait]
+impl PullSource for RemoteSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+ return Ok(vec![self.ns.clone()]);
+ }
+
+ let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+ let mut data = json!({});
+ if let Some(max_depth) = max_depth {
+ data["max-depth"] = json!(max_depth);
+ }
+
+ if !self.ns.is_root() {
+ data["parent"] = json!(self.ns);
+ }
+ self.client.login().await?;
+
+ let mut result = match self.client.get(&path, Some(data)).await {
+ Ok(res) => res,
+ Err(err) => match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match code {
+ &StatusCode::NOT_FOUND => {
+ if self.ns.is_root() && max_depth.is_none() {
+ task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+ task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+ max_depth.replace(0);
+ } else {
+ bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+ }
+
+ return Ok(vec![self.ns.clone()]);
+ }
+ _ => {
+ bail!("Querying namespaces failed - HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ bail!("Querying namespaces failed - {err}");
+ }
+ },
+ };
+
+ let list: Vec<BackupNamespace> =
+ serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+ .into_iter()
+ .map(|list_item| list_item.ns)
+ .collect();
+
+ Ok(list)
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ _owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+ let args = if !namespace.is_root() {
+ Some(json!({ "ns": namespace.clone() }))
+ } else {
+ None
+ };
+
+ self.client.login().await?;
+ let mut result =
+ self.client.get(&path, args).await.map_err(|err| {
+ format_err!("Failed to retrieve backup groups from remote - {}", err)
+ })?;
+
+ Ok(
+ serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+ .map_err(Error::from)?
+ .into_iter()
+ .map(|item| item.backup)
+ .collect::<Vec<BackupGroup>>(),
+ )
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ _namespace: &BackupNamespace,
+ group: &BackupGroup,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupDir>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+ let mut args = json!({
+ "backup-type": group.ty,
+ "backup-id": group.id,
+ });
+
+ if !self.ns.is_root() {
+ args["ns"] = serde_json::to_value(&self.ns)?;
+ }
+
+ self.client.login().await?;
+
+ let mut result = self.client.get(&path, Some(args)).await?;
+ let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+ Ok(snapshot_list
+ .into_iter()
+ .filter_map(|item: SnapshotListItem| {
+ let snapshot = item.backup;
+ // in-progress backups can't be synced
+ if item.size.is_none() {
+ task_log!(
+ worker,
+ "skipping snapshot {} - in-progress backup",
+ snapshot
+ );
+ return None;
+ }
+
+ Some(snapshot)
+ })
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn print_store_and_ns(&self) -> String {
+ print_store_and_ns(self.repo.store(), &self.ns)
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn PullReader>, Error> {
+ let backup_reader =
+ BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+ Ok(Arc::new(RemoteReader {
+ backup_reader,
+ dir: dir.clone(),
+ }))
+ }
+}
+
+#[async_trait::async_trait]
+/// `PullReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
+trait PullReader: Send + Sync {
+ /// Returns a chunk reader with the specified encryption mode.
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+ /// Asynchronously loads a file from the source into a local file.
+ /// `filename` is the name of the file to load from the source.
+ /// `into` is the path of the local file to load the source file into.
+ async fn load_file_into(
+ &self,
+ filename: &str,
+ into: &Path,
+ worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error>;
+
+ /// Tries to download the client log from the source and save it into a local file.
+ async fn try_download_client_log(
+ &self,
+ to_path: &Path,
+ worker: &WorkerTask,
+ ) -> Result<(), Error>;
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+#[async_trait::async_trait]
+impl PullReader for RemoteReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(RemoteChunkReader::new(
+ self.backup_reader.clone(),
+ None,
+ crypt_mode,
+ HashMap::new(),
+ ))
+ }
+
+ async fn load_file_into(
+ &self,
+ filename: &str,
+ into: &Path,
+ worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+ if let Err(err) = download_result {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ task_log!(
+ worker,
+ "skipping snapshot {} - vanished since start of sync",
+ &self.dir,
+ );
+ return Ok(None);
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ };
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(
+ &self,
+ to_path: &Path,
+ worker: &WorkerTask,
+ ) -> Result<(), Error> {
+ let mut tmp_path = to_path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = self
+ .backup_reader
+ .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+ .await
+ {
+ if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+ bail!("Atomic rename file {:?} failed - {}", to_path, err);
+ }
+ task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+ }
+
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+ false
+ }
+}
+
+/// Parameters for a pull operation.
+pub(crate) struct PullParameters {
+ /// Where data is pulled from
+ source: Arc<dyn PullSource>,
+ /// Where data should be pulled into
+ target: PullTarget,
/// Owner of synced groups (needs to match local owner of pre-existing groups)
owner: Authid,
/// Whether to remove groups which exist locally, but not on the remote end
@@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
max_depth: Option<usize>,
/// Filters for reducing the pull scope
group_filter: Option<Vec<GroupFilter>>,
- /// Rate limits for all transfers from `remote`
- limit: RateLimitConfig,
/// How many snapshots should be transferred at most (taking the newest N snapshots)
transfer_last: Option<usize>,
}
impl PullParameters {
/// Creates a new instance of `PullParameters`.
- ///
- /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
- /// [BackupRepository] with `remote_store`.
- #[allow(clippy::too_many_arguments)]
pub(crate) fn new(
store: &str,
ns: BackupNamespace,
- remote: &str,
+ remote: Option<&str>,
remote_store: &str,
remote_ns: BackupNamespace,
owner: Authid,
@@ -82,49 +376,51 @@ impl PullParameters {
limit: RateLimitConfig,
transfer_last: Option<usize>,
) -> Result<Self, Error> {
- let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
remote_ns.check_max_depth(max_depth)?;
- }
-
- let (remote_config, _digest) = pbs_config::remote::config()?;
- let remote: Remote = remote_config.lookup("remote", remote)?;
-
+ };
let remove_vanished = remove_vanished.unwrap_or(false);
- let source = BackupRepository::new(
- Some(remote.config.auth_id.clone()),
- Some(remote.config.host.clone()),
- remote.config.port,
- remote_store.to_string(),
- );
+ let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote: Remote = remote_config.lookup("remote", remote)?;
- Ok(Self {
- remote,
- remote_ns,
+ let repo = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
+ let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
+ Arc::new(RemoteSource {
+ repo,
+ ns: remote_ns,
+ client,
+ })
+ } else {
+ bail!("local sync not implemented yet")
+ };
+ let target = PullTarget {
+ store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
ns,
+ };
+
+ Ok(Self {
source,
- store,
+ target,
owner,
remove_vanished,
max_depth,
group_filter,
- limit,
transfer_last,
})
}
-
- /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
- pub async fn client(&self) -> Result<HttpClient, Error> {
- crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
- }
}
async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask,
- chunk_reader: RemoteChunkReader,
+ chunk_reader: Arc<dyn AsyncReadChunk>,
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -215,26 +511,6 @@ async fn pull_index_chunks<I: IndexFile>(
Ok(())
}
-async fn download_manifest(
- reader: &BackupReader,
- filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
- let mut tmp_manifest_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(filename)?;
-
- reader
- .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
- .await?;
-
- tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
- Ok(tmp_manifest_file)
-}
-
fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
if size != info.size {
bail!(
@@ -255,17 +531,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// Pulls a single file referenced by a manifest.
///
/// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
-/// - Verify tmp file checksum
+/// - Load archive file into tmp file
+/// -- Load file into tmp file
+/// -- Verify tmp file checksum
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
-async fn pull_single_archive(
- worker: &WorkerTask,
- reader: &BackupReader,
- chunk_reader: &mut RemoteChunkReader,
- snapshot: &pbs_datastore::BackupDir,
- archive_info: &FileInfo,
+async fn pull_single_archive<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
+ archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let archive_name = &archive_info.filename;
@@ -277,13 +552,11 @@ async fn pull_single_archive(
task_log!(worker, "sync archive {}", archive_name);
- let mut tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
+ reader
+ .load_file_into(archive_name, &tmp_path, worker)
+ .await?;
- reader.download(archive_name, &mut tmpfile).await?;
+ let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
match archive_type(archive_name)? {
ArchiveType::DynamicIndex => {
@@ -293,14 +566,18 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ if reader.skip_chunk_sync(snapshot.datastore().name()) {
+ task_log!(worker, "skipping chunk sync for same datastore");
+ } else {
+ pull_index_chunks(
+ worker,
+ reader.chunk_reader(archive_info.crypt_mode),
+ snapshot.datastore().clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -309,17 +586,21 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ if reader.skip_chunk_sync(snapshot.datastore().name()) {
+ task_log!(worker, "skipping chunk sync for same datastore");
+ } else {
+ pull_index_chunks(
+ worker,
+ reader.chunk_reader(archive_info.crypt_mode),
+ snapshot.datastore().clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
}
ArchiveType::Blob => {
- tmpfile.seek(SeekFrom::Start(0))?;
+ tmpfile.rewind()?;
let (csum, size) = sha256(&mut tmpfile)?;
verify_archive(archive_info, &csum, size)?;
}
@@ -330,33 +611,6 @@ async fn pull_single_archive(
Ok(())
}
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- path: &std::path::Path,
-) -> Result<(), Error> {
- let mut tmp_path = path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
- if let Err(err) = std::fs::rename(&tmp_path, path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
- }
- task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
- }
-
- Ok(())
-}
-
/// Actual implementation of pulling a snapshot.
///
/// Pulling a snapshot consists of the following steps:
@@ -366,10 +620,10 @@ async fn try_client_log_download(
/// -- if file already exists, verify contents
/// -- if not, pull it from the remote
/// - Download log if not already existing
-async fn pull_snapshot(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- snapshot: &pbs_datastore::BackupDir,
+async fn pull_snapshot<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let mut manifest_name = snapshot.full_path();
@@ -380,32 +634,15 @@ async fn pull_snapshot(
let mut tmp_manifest_name = manifest_name.clone();
tmp_manifest_name.set_extension("tmp");
-
- let download_res = download_manifest(&reader, &tmp_manifest_name).await;
- let mut tmp_manifest_file = match download_res {
- Ok(manifest_file) => manifest_file,
- Err(err) => {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- task_log!(
- worker,
- "skipping snapshot {} - vanished since start of sync",
- snapshot.dir(),
- );
- return Ok(());
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
- }
- };
- }
- };
- let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+ let tmp_manifest_blob;
+ if let Some(data) = reader
+ .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
+ .await?
+ {
+ tmp_manifest_blob = data;
+ } else {
+ return Ok(());
+ }
if manifest_name.exists() {
let manifest_blob = proxmox_lang::try_block!({
@@ -422,8 +659,10 @@ async fn pull_snapshot(
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
+ reader
+ .try_download_client_log(&client_log_name, worker)
+ .await?;
+ };
task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(()); // nothing changed
@@ -471,17 +710,9 @@ async fn pull_snapshot(
}
}
- let mut chunk_reader = RemoteChunkReader::new(
- reader.clone(),
- None,
- item.chunk_crypt_mode(),
- HashMap::new(),
- );
-
pull_single_archive(
worker,
- &reader,
- &mut chunk_reader,
+ reader.clone(),
snapshot,
item,
downloaded_chunks.clone(),
@@ -494,9 +725,10 @@ async fn pull_snapshot(
}
if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
-
+ reader
+ .try_download_client_log(&client_log_name, worker)
+ .await?;
+ };
snapshot
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,12 +738,12 @@ async fn pull_snapshot(
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
-async fn pull_snapshot_from(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- snapshot: &pbs_datastore::BackupDir,
+/// The `reader` is configured to read from the source backup directory, while the
+/// `snapshot` is pointing to the local datastore and target namespace.
+async fn pull_snapshot_from<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let (_path, is_new, _snap_lock) = snapshot
@@ -626,11 +858,10 @@ impl std::fmt::Display for SkipInfo {
/// - Sort by snapshot time
/// - Get last snapshot timestamp on local datastore
/// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
///
-/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
+/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
/// remote when querying snapshots. This allows us to interact with old remotes that don't have
/// namespace support yet.
///
@@ -639,117 +870,79 @@ impl std::fmt::Display for SkipInfo {
/// - local group owner is already checked by pull_store
async fn pull_group(
worker: &WorkerTask,
- client: &HttpClient,
params: &PullParameters,
- group: &pbs_api_types::BackupGroup,
- remote_ns: BackupNamespace,
+ source_namespace: &BackupNamespace,
+ group: &BackupGroup,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- task_log!(worker, "sync group {}", group);
-
- let path = format!(
- "api2/json/admin/datastore/{}/snapshots",
- params.source.store()
- );
-
- let mut args = json!({
- "backup-type": group.ty,
- "backup-id": group.id,
- });
-
- if !remote_ns.is_root() {
- args["ns"] = serde_json::to_value(&remote_ns)?;
- }
-
- let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
-
- let mut result = client.get(&path, Some(args)).await?;
- let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
- list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
- client.login().await?; // make sure auth is complete
-
- let fingerprint = client.fingerprint();
-
- let last_sync = params.store.last_successful_backup(&target_ns, group)?;
- let last_sync_time = last_sync.unwrap_or(i64::MIN);
-
- let mut remote_snapshots = std::collections::HashSet::new();
-
- // start with 65536 chunks (up to 256 GiB)
- let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
- progress.group_snapshots = list.len() as u64;
-
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
- let total_amount = list.len();
+ let mut raw_list: Vec<BackupDir> = params
+ .source
+ .list_backup_dirs(source_namespace, group, worker)
+ .await?;
+ raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
+
+ let total_amount = raw_list.len();
let cutoff = params
.transfer_last
.map(|count| total_amount.saturating_sub(count))
.unwrap_or_default();
- for (pos, item) in list.into_iter().enumerate() {
- let snapshot = item.backup;
-
- // in-progress backups can't be synced
- if item.size.is_none() {
- task_log!(
- worker,
- "skipping snapshot {} - in-progress backup",
- snapshot
- );
- continue;
- }
+ let target_ns = source_namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
- remote_snapshots.insert(snapshot.time);
-
- if last_sync_time > snapshot.time {
- already_synced_skip_info.update(snapshot.time);
- continue;
- } else if already_synced_skip_info.count > 0 {
- task_log!(worker, "{}", already_synced_skip_info);
- already_synced_skip_info.reset();
- }
-
- if pos < cutoff && last_sync_time != snapshot.time {
- transfer_last_skip_info.update(snapshot.time);
- continue;
- } else if transfer_last_skip_info.count > 0 {
- task_log!(worker, "{}", transfer_last_skip_info);
- transfer_last_skip_info.reset();
- }
-
- // get updated auth_info (new tickets)
- let auth_info = client.login().await?;
+ let mut source_snapshots = HashSet::new();
+ let last_sync_time = params
+ .target
+ .store
+ .last_successful_backup(&target_ns, group)?
+ .unwrap_or(i64::MIN);
+
+ let list: Vec<BackupDir> = raw_list
+ .into_iter()
+ .enumerate()
+ .filter(|&(pos, ref dir)| {
+ source_snapshots.insert(dir.time);
+ if last_sync_time > dir.time {
+ already_synced_skip_info.update(dir.time);
+ return false;
+ } else if already_synced_skip_info.count > 0 {
+ task_log!(worker, "{}", already_synced_skip_info);
+ already_synced_skip_info.reset();
+ return true;
+ }
- let options =
- HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
- .rate_limit(params.limit.clone());
+ if pos < cutoff && last_sync_time != dir.time {
+ transfer_last_skip_info.update(dir.time);
+ return false;
+ } else if transfer_last_skip_info.count > 0 {
+ task_log!(worker, "{}", transfer_last_skip_info);
+ transfer_last_skip_info.reset();
+ }
+ true
+ })
+ .map(|(_, dir)| dir)
+ .collect();
- let new_client = HttpClient::new(
- params.source.host(),
- params.source.port(),
- params.source.auth_id(),
- options,
- )?;
+ // start with 65536 chunks (up to 256 GiB)
+ let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
- let reader = BackupReader::start(
- &new_client,
- None,
- params.source.store(),
- &remote_ns,
- &snapshot,
- true,
- )
- .await?;
+ progress.group_snapshots = list.len() as u64;
- let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+ for (pos, from_snapshot) in list.into_iter().enumerate() {
+ let to_snapshot = params
+ .target
+ .store
+ .backup_dir(target_ns.clone(), from_snapshot.clone())?;
- let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+ let reader = params
+ .source
+ .reader(source_namespace, &from_snapshot)
+ .await?;
+ let result =
+ pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
progress.done_snapshots = pos as u64 + 1;
task_log!(worker, "percentage done: {}", progress);
@@ -758,11 +951,14 @@ async fn pull_group(
}
if params.remove_vanished {
- let group = params.store.backup_group(target_ns.clone(), group.clone());
+ let group = params
+ .target
+ .store
+ .backup_group(target_ns.clone(), group.clone());
let local_list = group.list_backups()?;
for info in local_list {
let snapshot = info.backup_dir;
- if remote_snapshots.contains(&snapshot.backup_time()) {
+ if source_snapshots.contains(&snapshot.backup_time()) {
continue;
}
if snapshot.is_protected() {
@@ -775,6 +971,7 @@ async fn pull_group(
}
task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
params
+ .target
.store
.remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
}
@@ -783,64 +980,12 @@ async fn pull_group(
Ok(())
}
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
- worker: &WorkerTask,
- client: &HttpClient,
- params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
- let path = format!(
- "api2/json/admin/datastore/{}/namespace",
- params.source.store()
- );
- let mut data = json!({});
- if let Some(max_depth) = params.max_depth {
- data["max-depth"] = json!(max_depth);
- }
-
- if !params.remote_ns.is_root() {
- data["parent"] = json!(params.remote_ns);
- }
-
- let mut result = match client.get(&path, Some(data)).await {
- Ok(res) => res,
- Err(err) => match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- if params.remote_ns.is_root() && params.max_depth.is_none() {
- task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
- task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
- params.max_depth = Some(0);
- } else {
- bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
- }
-
- return Ok(vec![params.remote_ns.clone()]);
- }
- _ => {
- bail!("Querying namespaces failed - HTTP error {code} - {message}");
- }
- },
- None => {
- bail!("Querying namespaces failed - {err}");
- }
- },
- };
-
- let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
- // parents first
- list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
- Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
let mut created = false;
- let store_ns_str = print_store_and_ns(params.store.name(), ns);
+ let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
- if !ns.is_root() && !params.store.namespace_path(ns).exists() {
- check_ns_modification_privs(params.store.name(), ns, ¶ms.owner)
+ if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+ check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner)
.map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
let name = match ns.components().last() {
@@ -850,14 +995,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
}
};
- if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+ if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
}
created = true;
}
check_ns_privs(
- params.store.name(),
+ params.target.store.name(),
ns,
¶ms.owner,
PRIV_DATASTORE_BACKUP,
@@ -868,10 +1013,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
}
fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
- check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner)
+ check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner)
.map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
- params.store.remove_namespace_recursive(local_ns, true)
+ params
+ .target
+ .store
+ .remove_namespace_recursive(local_ns, true)
}
fn check_and_remove_vanished_ns(
@@ -885,14 +1033,15 @@ fn check_and_remove_vanished_ns(
// clamp like remote does so that we don't list more than we can ever have synced.
let max_depth = params
.max_depth
- .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+ .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
let mut local_ns_list: Vec<BackupNamespace> = params
+ .target
.store
- .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+ .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
.filter(|ns| {
let user_privs =
- user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name()));
+ user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name()));
user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
})
.collect();
@@ -901,7 +1050,7 @@ fn check_and_remove_vanished_ns(
local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
for local_ns in local_ns_list {
- if local_ns == params.ns {
+ if local_ns == params.target.ns {
continue;
}
@@ -948,29 +1097,49 @@ fn check_and_remove_vanished_ns(
/// - access to sub-NS checked here
pub(crate) async fn pull_store(
worker: &WorkerTask,
- client: &HttpClient,
mut params: PullParameters,
) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+ let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
let old_max_depth = params.max_depth;
- let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
- vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
+ let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
+ vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
} else {
- query_namespaces(worker, client, &mut params).await?
+ params
+ .source
+ .list_namespaces(&mut params.max_depth, worker)
+ .await?
};
+
+ let ns_layers_to_be_pulled = namespaces
+ .iter()
+ .map(BackupNamespace::depth)
+ .max()
+ .map_or(0, |v| v - params.source.get_ns().depth());
+ let target_depth = params.target.ns.depth();
+
+ if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
+ bail!(
+ "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
+ ns_layers_to_be_pulled,
+ target_depth,
+ MAX_NAMESPACE_DEPTH
+ );
+ }
+
errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+ namespaces.sort_unstable_by_key(|a| a.name_len());
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
for namespace in namespaces {
- let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+ let source_store_ns_str = params.source.print_store_and_ns();
- let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
- let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+ let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
+ let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
task_log!(worker, "----");
task_log!(
@@ -998,7 +1167,7 @@ pub(crate) async fn pull_store(
}
}
- match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await {
+ match pull_ns(worker, &namespace, &mut params).await {
Ok((ns_progress, ns_errors)) => {
errors |= ns_errors;
@@ -1019,7 +1188,7 @@ pub(crate) async fn pull_store(
task_log!(
worker,
"Encountered errors while syncing namespace {} - {}",
- namespace,
+ &namespace,
err,
);
}
@@ -1051,48 +1220,28 @@ pub(crate) async fn pull_store(
/// - owner check for vanished groups done here
pub(crate) async fn pull_ns(
worker: &WorkerTask,
- client: &HttpClient,
- params: &PullParameters,
- source_ns: BackupNamespace,
- target_ns: BackupNamespace,
+ namespace: &BackupNamespace,
+ params: &mut PullParameters,
) -> Result<(StoreProgress, bool), Error> {
- let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
- let args = if !source_ns.is_root() {
- Some(json!({
- "ns": source_ns,
- }))
- } else {
- None
- };
-
- let mut result = client
- .get(&path, args)
- .await
- .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
- let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+ let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
let total_count = list.len();
list.sort_unstable_by(|a, b| {
- let type_order = a.backup.ty.cmp(&b.backup.ty);
+ let type_order = a.ty.cmp(&b.ty);
if type_order == std::cmp::Ordering::Equal {
- a.backup.id.cmp(&b.backup.id)
+ a.id.cmp(&b.id)
} else {
type_order
}
});
- let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+ let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
filters.iter().any(|filter| group.matches(filter))
};
- // Get groups with target NS set
- let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
let list = if let Some(ref group_filter) = ¶ms.group_filter {
let unfiltered_count = list.len();
- let list: Vec<pbs_api_types::BackupGroup> = list
+ let list: Vec<BackupGroup> = list
.into_iter()
.filter(|group| apply_filters(group, group_filter))
.collect();
@@ -1110,13 +1259,15 @@ pub(crate) async fn pull_ns(
let mut errors = false;
- let mut new_groups = std::collections::HashSet::new();
+ let mut new_groups = HashSet::new();
for group in list.iter() {
new_groups.insert(group.clone());
}
let mut progress = StoreProgress::new(list.len() as u64);
+ let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
+
for (done, group) in list.into_iter().enumerate() {
progress.done_groups = done as u64;
progress.done_snapshots = 0;
@@ -1124,6 +1275,7 @@ pub(crate) async fn pull_ns(
let (owner, _lock_guard) =
match params
+ .target
.store
.create_locked_backup_group(&target_ns, &group, ¶ms.owner)
{
@@ -1135,7 +1287,9 @@ pub(crate) async fn pull_ns(
&group,
err
);
- errors = true; // do not stop here, instead continue
+ errors = true;
+ // do not stop here, instead continue
+ task_log!(worker, "create_locked_backup_group failed");
continue;
}
};
@@ -1151,15 +1305,7 @@ pub(crate) async fn pull_ns(
owner
);
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(
- worker,
- client,
- params,
- &group,
- source_ns.clone(),
- &mut progress,
- )
- .await
+ } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
{
task_log!(worker, "sync group {} failed - {}", &group, err,);
errors = true; // do not stop here, instead continue
@@ -1168,13 +1314,13 @@ pub(crate) async fn pull_ns(
if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
- for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+ for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
let local_group = local_group?;
let local_group = local_group.group();
if new_groups.contains(local_group) {
continue;
}
- let owner = params.store.get_owner(&target_ns, local_group)?;
+ let owner = params.target.store.get_owner(&target_ns, local_group)?;
if check_backup_owner(&owner, ¶ms.owner).is_err() {
continue;
}
@@ -1184,7 +1330,11 @@ pub(crate) async fn pull_ns(
}
}
task_log!(worker, "delete vanished group '{local_group}'",);
- match params.store.remove_backup_group(&target_ns, local_group) {
+ match params
+ .target
+ .store
+ .remove_backup_group(&target_ns, local_group)
+ {
Ok(true) => {}
Ok(false) => {
task_log!(
--
2.39.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v5 5/6] pull: add support for pulling from local datastore
2023-10-06 14:05 [pbs-devel] [PATCH proxmox-backup v5 0/6] local sync-jobs Hannes Laimer
` (3 preceding siblings ...)
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 4/6] pull: refactor pulling from a datastore Hannes Laimer
@ 2023-10-06 14:05 ` Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
5 siblings, 0 replies; 11+ messages in thread
From: Hannes Laimer @ 2023-10-06 14:05 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/server/pull.rs | 143 +++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 138 insertions(+), 5 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index ff3e6d0a..1403c7a7 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,8 @@
//! Sync datastore from remote server
use std::collections::{HashMap, HashSet};
-use std::io::Seek;
-use std::path::Path;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
@@ -29,10 +29,12 @@ use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{
+ check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
+};
use pbs_tools::sha::sha256;
-use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
struct RemoteReader {
@@ -40,6 +42,12 @@ struct RemoteReader {
dir: BackupDir,
}
+struct LocalReader {
+ _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+ path: PathBuf,
+ datastore: Arc<DataStore>,
+}
+
pub(crate) struct PullTarget {
store: Arc<DataStore>,
ns: BackupNamespace,
@@ -51,6 +59,11 @@ pub(crate) struct RemoteSource {
client: HttpClient,
}
+pub(crate) struct LocalSource {
+ store: Arc<DataStore>,
+ ns: BackupNamespace,
+}
+
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -234,6 +247,81 @@ impl PullSource for RemoteSource {
}
}
+#[async_trait::async_trait]
+impl PullSource for LocalSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ _worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ ListNamespacesRecursive::new_max_depth(
+ self.store.clone(),
+ self.ns.clone(),
+ max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+ )?
+ .collect()
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ Ok(ListAccessibleBackupGroups::new_with_privs(
+ &self.store,
+ namespace.clone(),
+ 0,
+ None,
+ None,
+ Some(owner),
+ )?
+ .filter_map(Result::ok)
+ .map(|backup_group| backup_group.group().clone())
+ .collect::<Vec<pbs_api_types::BackupGroup>>())
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ _worker: &WorkerTask,
+ ) -> Result<Vec<BackupDir>, Error> {
+ Ok(self
+ .store
+ .backup_group(namespace.clone(), group.clone())
+ .iter_snapshots()?
+ .filter_map(Result::ok)
+ .map(|snapshot| snapshot.dir().to_owned())
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn print_store_and_ns(&self) -> String {
+ print_store_and_ns(self.store.name(), &self.ns)
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn PullReader>, Error> {
+ let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
+ let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
+ &dir.full_path(),
+ "snapshot",
+ "locked by another operation",
+ )?;
+ Ok(Arc::new(LocalReader {
+ _dir_lock: Arc::new(Mutex::new(dir_lock)),
+ path: dir.full_path(),
+ datastore: dir.datastore().clone(),
+ }))
+ }
+}
+
#[async_trait::async_trait]
/// `PullReader` is a trait that provides an interface for reading data from a source.
/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
@@ -343,6 +431,48 @@ impl PullReader for RemoteReader {
}
}
+#[async_trait::async_trait]
+impl PullReader for LocalReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(LocalChunkReader::new(
+ self.datastore.clone(),
+ None,
+ crypt_mode,
+ ))
+ }
+
+ async fn load_file_into(
+ &self,
+ filename: &str,
+ into: &Path,
+ _worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let mut from_path = self.path.clone();
+ from_path.push(filename);
+ tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(
+ &self,
+ _to_path: &Path,
+ _worker: &WorkerTask,
+ ) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+ self.datastore.name() == target_store_name
+ }
+}
+
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
@@ -399,7 +529,10 @@ impl PullParameters {
client,
})
} else {
- bail!("local sync not implemented yet")
+ Arc::new(LocalSource {
+ store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+ ns: remote_ns,
+ })
};
let target = PullTarget {
store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
--
2.39.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v5 6/6] ui: add support for optional Remote in SyncJob
2023-10-06 14:05 [pbs-devel] [PATCH proxmox-backup v5 0/6] local sync-jobs Hannes Laimer
` (4 preceding siblings ...)
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 5/6] pull: add support for pulling from local datastore Hannes Laimer
@ 2023-10-06 14:05 ` Hannes Laimer
2023-11-08 11:06 ` Thomas Lamprecht
2023-11-09 9:34 ` Gabriel Goller
5 siblings, 2 replies; 11+ messages in thread
From: Hannes Laimer @ 2023-10-06 14:05 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
www/form/RemoteTargetSelector.js | 29 ++++++++-----
www/window/SyncJobEdit.js | 72 +++++++++++++++++++++++++++++++-
2 files changed, 90 insertions(+), 11 deletions(-)
diff --git a/www/form/RemoteTargetSelector.js b/www/form/RemoteTargetSelector.js
index 2a94c4d7..e7b822d7 100644
--- a/www/form/RemoteTargetSelector.js
+++ b/www/form/RemoteTargetSelector.js
@@ -44,20 +44,18 @@ Ext.define('PBS.form.RemoteStoreSelector', {
me.store.removeAll();
+ me.setDisabled(false);
+ if (!me.firstLoad) {
+ me.clearValue();
+ }
if (me.remote) {
- me.setDisabled(false);
- if (!me.firstLoad) {
- me.clearValue();
- }
-
me.store.proxy.url = `/api2/json/config/remote/${encodeURIComponent(me.remote)}/scan`;
- me.store.load();
-
- me.firstLoad = false;
} else {
- me.setDisabled(true);
- me.clearValue();
+ me.store.proxy.url = '/api2/json/admin/datastore';
}
+ me.store.load();
+
+ me.firstLoad = false;
},
initComponent: function() {
@@ -175,6 +173,17 @@ Ext.define('PBS.form.RemoteNamespaceSelector', {
me.store.proxy.url = `/api2/json/config/remote/${encodedRemote}/scan/${encodedStore}/namespaces`;
me.store.load();
+ me.firstLoad = false;
+ } else if (me.remoteStore) {
+ me.setDisabled(false);
+ if (!me.firstLoad) {
+ me.clearValue();
+ }
+ let encodedStore = encodeURIComponent(me.remoteStore);
+
+ me.store.proxy.url = `/api2/json/admin/datastore/${encodedStore}/namespace`;
+ me.store.load();
+
me.firstLoad = false;
} else if (previousStore) {
me.setDisabled(true);
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index 48a0c7a9..d18f4a91 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -47,6 +47,15 @@ Ext.define('PBS.window.SyncJobEdit', {
},
},
+ setValues: function(values) {
+ let me = this;
+ if (values.id && !values.remote) {
+ values.location = 'local';
+ } else {
+ values.location = 'remote';
+ }
+ me.callParent([values]);
+ },
items: {
xtype: 'tabpanel',
@@ -134,16 +143,78 @@ Ext.define('PBS.window.SyncJobEdit', {
],
column2: [
+ {
+ xtype: 'radiogroup',
+ fieldLabel: gettext('Location'),
+ defaultType: 'radiofield',
+ items: [
+ {
+ boxLabel: 'Local',
+ name: 'location',
+ inputValue: 'local',
+ submitValue: false,
+ },
+ {
+ boxLabel: 'Remote',
+ name: 'location',
+ inputValue: 'remote',
+ submitValue: false,
+ },
+ ],
+ listeners: {
+ change: function(_group, radio) {
+ let me = this;
+ let form = me.up('pbsSyncJobEdit');
+ let remoteField = form.down('field[name=remote]');
+ let storeField = form.down('field[name=remote-store]');
+ let nsField = form.down('field[name=remote-ns]');
+
+ if (!storeField.value) {
+ nsField.clearValue();
+ nsField.setDisabled(true);
+ }
+
+ if (radio.location === 'local') {
+ remoteField.setDisabled(true);
+ remoteField.setValue(null);
+ remoteField.allowBlank = true;
+ if (remoteField.value) {
+ storeField.clearValue();
+ }
+ storeField.setDisabled(false);
+ let rateLimitField = form.down('field[name=rate-in]');
+ rateLimitField.setValue(null);
+ } else {
+ if (!remoteField.value) {
+ remoteField.setValue(null);
+ storeField.clearValue();
+ storeField.setDisabled(true);
+ }
+ remoteField.setDisabled(false);
+ remoteField.allowBlank = false;
+ }
+ },
+ },
+ },
{
fieldLabel: gettext('Source Remote'),
xtype: 'pbsRemoteSelector',
allowBlank: false,
name: 'remote',
+ cbind: {
+ deleteEmpty: '{!isCreate}',
+ },
+ skipEmptyText: true,
listeners: {
change: function(f, value) {
let me = this;
let remoteStoreField = me.up('pbsSyncJobEdit').down('field[name=remote-store]');
remoteStoreField.setRemote(value);
+ let rateLimitField = me.up('pbsSyncJobEdit').down('field[name=rate-in]');
+ rateLimitField.setDisabled(!value);
+ if (!value) {
+ rateLimitField.setValue(null);
+ }
let remoteNamespaceField = me.up('pbsSyncJobEdit').down('field[name=remote-ns]');
remoteNamespaceField.setRemote(value);
},
@@ -155,7 +226,6 @@ Ext.define('PBS.window.SyncJobEdit', {
allowBlank: false,
autoSelect: false,
name: 'remote-store',
- disabled: true,
listeners: {
change: function(field, value) {
let me = this;
--
2.39.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup v5 1/6] api: make Remote for SyncJob optional
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 1/6] api: make Remote for SyncJob optional Hannes Laimer
@ 2023-11-08 10:53 ` Thomas Lamprecht
2023-11-08 13:26 ` Hannes Laimer
0 siblings, 1 reply; 11+ messages in thread
From: Thomas Lamprecht @ 2023-11-08 10:53 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Hannes Laimer
Am 06/10/2023 um 16:05 schrieb Hannes Laimer:
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index daeba7cf..9ed83046 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -65,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
> PullParameters::new(
> &sync_job.store,
> sync_job.ns.clone().unwrap_or_default(),
> - &sync_job.remote,
> + sync_job.remote.as_deref().unwrap_or("local"),
isn't this still the same issue that Wolfgang asked about in his review for
v3 and v2? (Please be a bit more communicative and answer to review on list
too, else it's hard to know if you overlooked it or if it's actually OK the
way it is...)
faking a remote ID is probably never a good idea, should this stay None here?
But I actually checked the remaining code, and it fixes this in commit 4/6
"pull: refactor pulling from a datastore", so question is, why do we add API
support before it can even correctly work? Shouldn't this commit be ordered
after the aforementioned refactor one, and the "pull: add support for pulling
from local datastore" one, avoiding such intermediate "bugs" and making review
easier due to a more causal order.
> &sync_job.remote_store,
> sync_job.remote_ns.clone().unwrap_or_default(),
> sync_job
> @@ -91,7 +101,7 @@ pub fn do_sync_job(
> ) -> Result<String, Error> {
> let job_id = format!(
> "{}:{}:{}:{}:{}",
> - sync_job.remote,
> + sync_job.remote.as_deref().unwrap_or("-"),
> sync_job.remote_store,
> sync_job.store,
> sync_job.ns.clone().unwrap_or_default(),
> @@ -99,6 +109,13 @@ pub fn do_sync_job(
> );
> let worker_type = job.jobtype().to_string();
>
> + if sync_job.remote.is_none()
> + && sync_job.store == sync_job.remote_store
> + && sync_job.ns == sync_job.remote_ns
I'd disallow syncing to the same store even if different NS, as they still
might overlap and it's IMO just an odd use-case.
If, we should allow mocving around groups and namespaces inside the same
datastore as separate functionallity, without any jobs or syncing involved,
but that'd be unrelated to this series in any way.
> + {
> + bail!("can't sync, source equals the target");
> + }
> +
> let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
>
> let upid_str = WorkerTask::spawn(
> @@ -122,13 +139,33 @@ pub fn do_sync_job(
> }
> task_log!(
> worker,
> - "sync datastore '{}' from '{}/{}'",
> + "sync datastore '{}' from '{}{}'",
> sync_job.store,
> - sync_job.remote,
> + sync_job
> + .remote
> + .as_deref()
> + .map_or(String::new(), |remote| format!("{remote}/")),
nit: might be nicer to pull this out via a `let source_prefix = match ...`
statement up front.
> sync_job.remote_store,
> );
>
> - pull_store(&worker, &client, pull_params).await?;
> + if sync_job.remote.is_some() {
> + pull_store(&worker, &client, pull_params).await?;
> + } else {
> + if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
> + if target_ns.path().starts_with(source_ns.path())
> + && sync_job.store == sync_job.remote_store
> + && sync_job.max_depth.map_or(true, |sync_depth| {
> + target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
> + }) {
> + task_log!(
> + worker,
> + "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
> + );
> + }
> + } else {
> + pull_store(&worker, &client, pull_params).await?;
> + }
> + }
>
> task_log!(worker, "sync job '{}' end", &job_id);
>
> @@ -256,7 +294,7 @@ async fn pull(
> let pull_params = PullParameters::new(
> &store,
> ns,
> - &remote,
> + remote.as_deref().unwrap_or("local"),
same here w.r.t. intermediate bug (or at least hard to follow commit order)
> &remote_store,
> remote_ns.unwrap_or_default(),
> auth_id.clone(),
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup v5 6/6] ui: add support for optional Remote in SyncJob
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
@ 2023-11-08 11:06 ` Thomas Lamprecht
2023-11-09 9:34 ` Gabriel Goller
1 sibling, 0 replies; 11+ messages in thread
From: Thomas Lamprecht @ 2023-11-08 11:06 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Hannes Laimer
works quite nice now, a few smaller comments inline.
Am 06/10/2023 um 16:05 schrieb Hannes Laimer:
> @@ -134,16 +143,78 @@ Ext.define('PBS.window.SyncJobEdit', {
> ],
>
> column2: [
> + {
> + xtype: 'radiogroup',
> + fieldLabel: gettext('Location'),
> + defaultType: 'radiofield',
> + items: [
> + {
> + boxLabel: 'Local',
> + name: 'location',
> + inputValue: 'local',
> + submitValue: false,
> + },
> + {
> + boxLabel: 'Remote',
> + name: 'location',
> + inputValue: 'remote',
> + submitValue: false
Seems to miss a
checked: true,
here to correctly indicate the default from the start on.
> + },
> + ],
> + listeners: {
> + change: function(_group, radio) {
> + let me = this;
> + let form = me.up('pbsSyncJobEdit');
> + let remoteField = form.down('field[name=remote]');
> + let storeField = form.down('field[name=remote-store]');
> + let nsField = form.down('field[name=remote-ns]');
> +
> + if (!storeField.value) {
> + nsField.clearValue();
> + nsField.setDisabled(true);
> + }
> +
no hard feelings, but for below you could go for a pattern like:
let isLocalSync = radio.location === 'local';
remoteField.setDisabled(isLocalSync);
remoteField.allowBlank = isLocalSync;
and so on, as that might save us some duplication here, but as said, just a nit.
And I observed a small glitch, when switching the radio-group selection from local
to remote, the "Source Remote" field is not marked as required, i.e., a validate
call might be missing.
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup v5 1/6] api: make Remote for SyncJob optional
2023-11-08 10:53 ` Thomas Lamprecht
@ 2023-11-08 13:26 ` Hannes Laimer
0 siblings, 0 replies; 11+ messages in thread
From: Hannes Laimer @ 2023-11-08 13:26 UTC (permalink / raw)
To: Thomas Lamprecht, Proxmox Backup Server development discussion
On 11/8/23 11:53, Thomas Lamprecht wrote:
> Am 06/10/2023 um 16:05 schrieb Hannes Laimer:
>> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
>> index daeba7cf..9ed83046 100644
>> --- a/src/api2/pull.rs
>> +++ b/src/api2/pull.rs
>
>> @@ -65,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>> PullParameters::new(
>> &sync_job.store,
>> sync_job.ns.clone().unwrap_or_default(),
>> - &sync_job.remote,
>> + sync_job.remote.as_deref().unwrap_or("local"),
>
> isn't this still the same issue that Wolfgang asked about in his review for
> v3 and v2? (Please be a bit more communicative and answer to review on list
> too, else it's hard to know if you overlooked it or if it's actually OK the
> way it is...)
>
> faking a remote ID is probably never a good idea, should this stay None here?
>
> But I actually checked the remaining code, and it fixes this in commit 4/6
> "pull: refactor pulling from a datastore", so question is, why do we add API
> support before it can even correctly work? Shouldn't this commit be ordered
> after the aforementioned refactor one, and the "pull: add support for pulling
> from local datastore" one, avoiding such intermediate "bugs" and making review
> easier due to a more causal order.
>
makes sense, making it optional felt like the first
logical step, that's why they are ordered this way. But sure, can
re-order.
>
>> &sync_job.remote_store,
>> sync_job.remote_ns.clone().unwrap_or_default(),
>> sync_job
>> @@ -91,7 +101,7 @@ pub fn do_sync_job(
>> ) -> Result<String, Error> {
>> let job_id = format!(
>> "{}:{}:{}:{}:{}",
>> - sync_job.remote,
>> + sync_job.remote.as_deref().unwrap_or("-"),
>> sync_job.remote_store,
>> sync_job.store,
>> sync_job.ns.clone().unwrap_or_default(),
>> @@ -99,6 +109,13 @@ pub fn do_sync_job(
>> );
>> let worker_type = job.jobtype().to_string();
>>
>> + if sync_job.remote.is_none()
>> + && sync_job.store == sync_job.remote_store
>> + && sync_job.ns == sync_job.remote_ns
>
> I'd disallow syncing to the same store even if different NS, as they still
> might overlap and it's IMO just an odd use-case.
> If, we should allow mocving around groups and namespaces inside the same
> datastore as separate functionallity, without any jobs or syncing involved,
> but that'd be unrelated to this series in any way.
It is kinda odd, the idea was that currently its not possible to move
backups within a datastore (using the UI). But yes, a sync job is
probably not the right place for this.
>
>> + {
>> + bail!("can't sync, source equals the target");
>> + }
>> +
>> let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
>>
>> let upid_str = WorkerTask::spawn(
>> @@ -122,13 +139,33 @@ pub fn do_sync_job(
>> }
>> task_log!(
>> worker,
>> - "sync datastore '{}' from '{}/{}'",
>> + "sync datastore '{}' from '{}{}'",
>> sync_job.store,
>> - sync_job.remote,
>> + sync_job
>> + .remote
>> + .as_deref()
>> + .map_or(String::new(), |remote| format!("{remote}/")),
>
> nit: might be nicer to pull this out via a `let source_prefix = match ...`
> statement up front.
>
>> sync_job.remote_store,
>> );
>>
>> - pull_store(&worker, &client, pull_params).await?;
>> + if sync_job.remote.is_some() {
>> + pull_store(&worker, &client, pull_params).await?;
>> + } else {
>> + if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
>> + if target_ns.path().starts_with(source_ns.path())
>> + && sync_job.store == sync_job.remote_store
>> + && sync_job.max_depth.map_or(true, |sync_depth| {
>> + target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
>> + }) {
>> + task_log!(
>> + worker,
>> + "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
>> + );
>> + }
>> + } else {
>> + pull_store(&worker, &client, pull_params).await?;
>> + }
>> + }
>>
>> task_log!(worker, "sync job '{}' end", &job_id);
>>
>
>> @@ -256,7 +294,7 @@ async fn pull(
>> let pull_params = PullParameters::new(
>> &store,
>> ns,
>> - &remote,
>> + remote.as_deref().unwrap_or("local"),
>
> same here w.r.t. intermediate bug (or at least hard to follow commit order)
>
>> &remote_store,
>> remote_ns.unwrap_or_default(),
>> auth_id.clone(),
>
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup v5 6/6] ui: add support for optional Remote in SyncJob
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
2023-11-08 11:06 ` Thomas Lamprecht
@ 2023-11-09 9:34 ` Gabriel Goller
1 sibling, 0 replies; 11+ messages in thread
From: Gabriel Goller @ 2023-11-09 9:34 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Hannes Laimer
Hi!
I tested it a bit and it looks good so far, a few small ui issues though:
* When listing the sync-jobs, a local sync job has an empty 'Remote ID',
which
doesn't look that good. We could add a defaultValue that shows
'Local' or
something?
* We don't support syncs from and to the same datastore, so it also
shouldn't be
allowed to select it in the ui. Currently all the datastores are
shown on the
dropdown. These syncs also run successfully but don't sync
anything, we should
probably throw an error in the api if such a sync is requested. So
that ui and
api disallow same-datastore syncs.
On 10/6/23 16:05, Hannes Laimer wrote:
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
> www/form/RemoteTargetSelector.js | 29 ++++++++-----
> www/window/SyncJobEdit.js | 72 +++++++++++++++++++++++++++++++-
> 2 files changed, 90 insertions(+), 11 deletions(-)
> [..]
^ permalink raw reply [flat|nested] 11+ messages in thread
end of thread, other threads:[~2023-11-09 9:34 UTC | newest]
Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-06 14:05 [pbs-devel] [PATCH proxmox-backup v5 0/6] local sync-jobs Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 1/6] api: make Remote for SyncJob optional Hannes Laimer
2023-11-08 10:53 ` Thomas Lamprecht
2023-11-08 13:26 ` Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 2/6] manager: add completion for opt. Remote in SyncJob Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 3/6] accept a ref to a HttpClient Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 4/6] pull: refactor pulling from a datastore Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 5/6] pull: add support for pulling from local datastore Hannes Laimer
2023-10-06 14:05 ` [pbs-devel] [PATCH proxmox-backup v5 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
2023-11-08 11:06 ` Thomas Lamprecht
2023-11-09 9:34 ` Gabriel Goller
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox