* [pbs-devel] [PATCH proxmox-backup v4 1/6] api2: make Remote for SyncJob optional
2023-09-29 12:48 [pbs-devel] [PATCH proxmox-backup v4 0/6] local sync-jobs Hannes Laimer
@ 2023-09-29 12:48 ` Hannes Laimer
2023-09-29 12:48 ` [pbs-devel] [PATCH proxmox-backup v4 2/6] ui: add support for optional Remote in SyncJob Hannes Laimer
` (5 subsequent siblings)
6 siblings, 0 replies; 9+ messages in thread
From: Hannes Laimer @ 2023-09-29 12:48 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-api-types/src/jobs.rs | 9 ++--
src/api2/config/remote.rs | 2 +-
src/api2/config/sync.rs | 41 +++++++++++------
src/api2/node/tasks.rs | 3 +-
src/api2/pull.rs | 76 +++++++++++++++++++++++--------
src/bin/proxmox-backup-manager.rs | 6 +--
src/server/email_notifications.rs | 18 ++++----
7 files changed, 107 insertions(+), 48 deletions(-)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 23e19b7b..aedee090 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -17,8 +17,8 @@ const_regex! {
/// Regex for verification jobs 'DATASTORE:ACTUAL_JOB_ID'
pub VERIFICATION_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):");
- /// Regex for sync jobs 'REMOTE:REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID'
- pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r")(?::(", BACKUP_NS_RE!(), r"))?:");
+ /// Regex for sync jobs '(REMOTE|\-):REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID'
+ pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"|\-):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r")(?::(", BACKUP_NS_RE!(), r"))?:");
}
pub const JOB_ID_SCHEMA: Schema = StringSchema::new("Job ID.")
@@ -467,6 +467,7 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
},
remote: {
schema: REMOTE_ID_SCHEMA,
+ optional: true,
},
"remote-store": {
schema: DATASTORE_SCHEMA,
@@ -515,7 +516,9 @@ pub struct SyncJobConfig {
pub ns: Option<BackupNamespace>,
#[serde(skip_serializing_if = "Option::is_none")]
pub owner: Option<Authid>,
- pub remote: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ /// None implies local sync.
+ pub remote: Option<String>,
pub remote_store: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub remote_ns: Option<BackupNamespace>,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 76dd3b89..307cf3cd 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
for job in job_list {
- if job.remote == name {
+ if job.remote.map_or(false, |id| id == name) {
param_bail!(
"name",
"remote '{}' is used by sync job '{}' (datastore '{}')",
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 01e5f2ce..21634bd5 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
use pbs_api_types::{
Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
- PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
- PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
+ PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
+ PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
};
use pbs_config::sync;
@@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
return false;
}
- let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
- remote_privs & PRIV_REMOTE_AUDIT != 0
+ if let Some(remote) = &job.remote {
+ let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
+ remote_privs & PRIV_REMOTE_AUDIT != 0
+ } else {
+ let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+ source_ds_privs & PRIV_DATASTORE_AUDIT != 0
+ }
}
/// checks whether user can run the corresponding pull job
@@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
return false;
}
- let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
- remote_privs & PRIV_REMOTE_READ != 0
+ if let Some(remote) = &job.remote {
+ let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
+ remote_privs & PRIV_REMOTE_READ != 0
+ } else {
+ let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+ source_ds_privs & PRIV_DATASTORE_READ != 0
+ }
}
#[api(
@@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
#[serde(rename_all = "kebab-case")]
/// Deletable property name
pub enum DeletableProperty {
+ /// Delete the remote property(-> meaning local).
+ Remote,
/// Delete the owner property.
Owner,
/// Delete the comment property.
@@ -275,6 +287,9 @@ pub fn update_sync_job(
if let Some(delete) = delete {
for delete_prop in delete {
match delete_prop {
+ DeletableProperty::Remote => {
+ data.remote = None;
+ }
DeletableProperty::Owner => {
data.owner = None;
}
@@ -334,7 +349,7 @@ pub fn update_sync_job(
data.ns = Some(ns);
}
if let Some(remote) = update.remote {
- data.remote = remote;
+ data.remote = Some(remote);
}
if let Some(remote_store) = update.remote_store {
data.remote_store = remote_store;
@@ -503,7 +518,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
let mut job = SyncJobConfig {
id: "regular".to_string(),
- remote: "remote0".to_string(),
+ remote: Some("remote0".to_string()),
remote_store: "remotestore1".to_string(),
remote_ns: None,
store: "localstore0".to_string(),
@@ -538,11 +553,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
// reading without proper read permissions on local end must fail
- job.remote = "remote1".to_string();
+ job.remote = Some("remote1".to_string());
assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
// reading without proper read permissions on remote end must fail
- job.remote = "remote0".to_string();
+ job.remote = Some("remote0".to_string());
job.store = "localstore1".to_string();
assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
@@ -555,10 +570,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
));
// writing without proper write permissions on local end must fail
- job.remote = "remote1".to_string();
+ job.remote = Some("remote1".to_string());
// writing without proper write permissions on remote end must fail
- job.remote = "remote0".to_string();
+ job.remote = Some("remote0".to_string());
job.store = "localstore1".to_string();
assert!(!check_sync_job_modify_access(
&user_info,
@@ -567,7 +582,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
));
// reset remote to one where users have access
- job.remote = "remote1".to_string();
+ job.remote = Some("remote1".to_string());
// user with read permission can only read, but not modify/run
assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 866361c6..8f08d3af 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -78,11 +78,12 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
if let (Some(remote), Some(remote_store), Some(local_store)) =
(remote, remote_store, local_store)
{
+ let remote_str = remote.as_str();
return check_pull_privs(
auth_id,
local_store.as_str(),
local_ns,
- remote.as_str(),
+ (remote_str != "-").then_some(remote_str),
remote_store.as_str(),
false,
);
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index daeba7cf..9ed83046 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -1,5 +1,5 @@
//! Sync datastore from remote server
-use anyhow::{format_err, Error};
+use anyhow::{bail, format_err, Error};
use futures::{future::FutureExt, select};
use proxmox_router::{Permission, Router, RpcEnvironment};
@@ -8,7 +8,7 @@ use proxmox_sys::task_log;
use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
- GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+ GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
TRANSFER_LAST_SCHEMA,
};
@@ -22,7 +22,7 @@ pub fn check_pull_privs(
auth_id: &Authid,
store: &str,
ns: Option<&str>,
- remote: &str,
+ remote: Option<&str>,
remote_store: &str,
delete: bool,
) -> Result<(), Error> {
@@ -39,12 +39,22 @@ pub fn check_pull_privs(
PRIV_DATASTORE_BACKUP,
false,
)?;
- user_info.check_privs(
- auth_id,
- &["remote", remote, remote_store],
- PRIV_REMOTE_READ,
- false,
- )?;
+
+ if let Some(remote) = remote {
+ user_info.check_privs(
+ auth_id,
+ &["remote", remote, remote_store],
+ PRIV_REMOTE_READ,
+ false,
+ )?;
+ } else {
+ user_info.check_privs(
+ auth_id,
+ &["datastore", remote_store],
+ PRIV_DATASTORE_BACKUP,
+ false,
+ )?;
+ }
if delete {
user_info.check_privs(
@@ -65,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
PullParameters::new(
&sync_job.store,
sync_job.ns.clone().unwrap_or_default(),
- &sync_job.remote,
+ sync_job.remote.as_deref().unwrap_or("local"),
&sync_job.remote_store,
sync_job.remote_ns.clone().unwrap_or_default(),
sync_job
@@ -91,7 +101,7 @@ pub fn do_sync_job(
) -> Result<String, Error> {
let job_id = format!(
"{}:{}:{}:{}:{}",
- sync_job.remote,
+ sync_job.remote.as_deref().unwrap_or("-"),
sync_job.remote_store,
sync_job.store,
sync_job.ns.clone().unwrap_or_default(),
@@ -99,6 +109,13 @@ pub fn do_sync_job(
);
let worker_type = job.jobtype().to_string();
+ if sync_job.remote.is_none()
+ && sync_job.store == sync_job.remote_store
+ && sync_job.ns == sync_job.remote_ns
+ {
+ bail!("can't sync, source equals the target");
+ }
+
let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
let upid_str = WorkerTask::spawn(
@@ -122,13 +139,33 @@ pub fn do_sync_job(
}
task_log!(
worker,
- "sync datastore '{}' from '{}/{}'",
+ "sync datastore '{}' from '{}{}'",
sync_job.store,
- sync_job.remote,
+ sync_job
+ .remote
+ .as_deref()
+ .map_or(String::new(), |remote| format!("{remote}/")),
sync_job.remote_store,
);
- pull_store(&worker, &client, pull_params).await?;
+ if sync_job.remote.is_some() {
+ pull_store(&worker, &client, pull_params).await?;
+ } else {
+ if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
+ if target_ns.path().starts_with(source_ns.path())
+ && sync_job.store == sync_job.remote_store
+ && sync_job.max_depth.map_or(true, |sync_depth| {
+ target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
+ }) {
+ task_log!(
+ worker,
+ "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
+ );
+ }
+ } else {
+ pull_store(&worker, &client, pull_params).await?;
+ }
+ }
task_log!(worker, "sync job '{}' end", &job_id);
@@ -180,6 +217,7 @@ pub fn do_sync_job(
},
remote: {
schema: REMOTE_ID_SCHEMA,
+ optional: true,
},
"remote-store": {
schema: DATASTORE_SCHEMA,
@@ -224,7 +262,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
async fn pull(
store: String,
ns: Option<BackupNamespace>,
- remote: String,
+ remote: Option<String>,
remote_store: String,
remote_ns: Option<BackupNamespace>,
remove_vanished: Option<bool>,
@@ -248,7 +286,7 @@ async fn pull(
&auth_id,
&store,
ns_str.as_deref(),
- &remote,
+ remote.as_deref(),
&remote_store,
delete,
)?;
@@ -256,7 +294,7 @@ async fn pull(
let pull_params = PullParameters::new(
&store,
ns,
- &remote,
+ remote.as_deref().unwrap_or("local"),
&remote_store,
remote_ns.unwrap_or_default(),
auth_id.clone(),
@@ -280,7 +318,7 @@ async fn pull(
worker,
"pull datastore '{}' from '{}/{}'",
store,
- remote,
+ remote.as_deref().unwrap_or("-"),
remote_store,
);
@@ -299,4 +337,4 @@ async fn pull(
Ok(upid_str)
}
-pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
\ No newline at end of file
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index b4cb6cb3..1dd51951 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -535,21 +535,21 @@ fn get_remote(param: &HashMap<String, String>) -> Option<String> {
param.get("remote").map(|r| r.to_owned()).or_else(|| {
if let Some(id) = param.get("id") {
if let Ok(job) = get_sync_job(id) {
- return Some(job.remote);
+ return job.remote;
}
}
None
})
}
-fn get_remote_store(param: &HashMap<String, String>) -> Option<(String, String)> {
+fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>, String)> {
let mut job: Option<SyncJobConfig> = None;
let remote = param.get("remote").map(|r| r.to_owned()).or_else(|| {
if let Some(id) = param.get("id") {
job = get_sync_job(id).ok();
if let Some(ref job) = job {
- return Some(job.remote.clone());
+ return job.remote.clone();
}
}
None
diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
index ea1476d7..18881782 100644
--- a/src/server/email_notifications.rs
+++ b/src/server/email_notifications.rs
@@ -484,15 +484,17 @@ pub fn send_sync_status(
}
};
+ let tmp_src_string;
+ let source_str = if let Some(remote) = &job.remote {
+ tmp_src_string = format!("Sync remote '{}'", remote);
+ &tmp_src_string
+ } else {
+ "Sync local"
+ };
+
let subject = match result {
- Ok(()) => format!(
- "Sync remote '{}' datastore '{}' successful",
- job.remote, job.remote_store,
- ),
- Err(_) => format!(
- "Sync remote '{}' datastore '{}' failed",
- job.remote, job.remote_store,
- ),
+ Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
+ Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
};
send_job_status_mail(email, &subject, &text)?;
--
2.39.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v4 5/6] pull: refactor pulling from a datastore
2023-09-29 12:48 [pbs-devel] [PATCH proxmox-backup v4 0/6] local sync-jobs Hannes Laimer
` (3 preceding siblings ...)
2023-09-29 12:48 ` [pbs-devel] [PATCH proxmox-backup v4 4/6] accept a ref to a HttpClient Hannes Laimer
@ 2023-09-29 12:49 ` Hannes Laimer
2023-09-29 12:49 ` [pbs-devel] [PATCH proxmox-backup v4 6/6] pull: add support for pulling from local datastore Hannes Laimer
2023-10-03 17:17 ` [pbs-devel] [PATCH proxmox-backup v4 0/6] local sync-jobs Thomas Lamprecht
6 siblings, 0 replies; 9+ messages in thread
From: Hannes Laimer @ 2023-09-29 12:49 UTC (permalink / raw)
To: pbs-devel
... making the pull logic independent from the actual source
using two traits.
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
Cargo.toml | 2 +
pbs-datastore/src/read_chunk.rs | 2 +-
src/api2/config/remote.rs | 14 +-
src/api2/pull.rs | 31 +-
src/server/pull.rs | 936 ++++++++++++++++++--------------
5 files changed, 564 insertions(+), 421 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 6e848843..570e1673 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
# regular crates
anyhow = "1.0"
+async-trait = "0.1.56"
apt-pkg-native = "0.3.2"
base64 = "0.13"
bitflags = "1.2.1"
@@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
[dependencies]
anyhow.workspace = true
+async-trait.workspace = true
apt-pkg-native.workspace = true
base64.workspace = true
bitflags.workspace = true
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
}
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
/// Returns the encoded chunk data
fn read_raw_chunk<'a>(
&'a self,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 307cf3cd..2511c5d5 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
Ok(())
}
-/// Helper to get client for remote.cfg entry
-pub async fn remote_client(
+/// Helper to get client for remote.cfg entry without login, just config
+pub fn remote_client_config(
remote: &Remote,
limit: Option<RateLimitConfig>,
) -> Result<HttpClient, Error> {
@@ -320,6 +320,16 @@ pub async fn remote_client(
&remote.config.auth_id,
options,
)?;
+
+ Ok(client)
+}
+
+/// Helper to get client for remote.cfg entry
+pub async fn remote_client(
+ remote: &Remote,
+ limit: Option<RateLimitConfig>,
+) -> Result<HttpClient, Error> {
+ let client = remote_client_config(remote, limit)?;
let _auth_info = client
.login() // make sure we can auth
.await
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 9ed83046..8f1aad43 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -8,7 +8,7 @@ use proxmox_sys::task_log;
use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
- GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+ GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
TRANSFER_LAST_SCHEMA,
};
@@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
PullParameters::new(
&sync_job.store,
sync_job.ns.clone().unwrap_or_default(),
- sync_job.remote.as_deref().unwrap_or("local"),
+ sync_job.remote.as_deref(),
&sync_job.remote_store,
sync_job.remote_ns.clone().unwrap_or_default(),
sync_job
@@ -131,7 +131,6 @@ pub fn do_sync_job(
let worker_future = async move {
let pull_params = PullParameters::try_from(&sync_job)?;
- let client = pull_params.client().await?;
task_log!(worker, "Starting datastore sync job '{}'", job_id);
if let Some(event_str) = schedule {
@@ -148,24 +147,7 @@ pub fn do_sync_job(
sync_job.remote_store,
);
- if sync_job.remote.is_some() {
- pull_store(&worker, &client, pull_params).await?;
- } else {
- if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
- if target_ns.path().starts_with(source_ns.path())
- && sync_job.store == sync_job.remote_store
- && sync_job.max_depth.map_or(true, |sync_depth| {
- target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
- }) {
- task_log!(
- worker,
- "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
- );
- }
- } else {
- pull_store(&worker, &client, pull_params).await?;
- }
- }
+ pull_store(&worker, pull_params).await?;
task_log!(worker, "sync job '{}' end", &job_id);
@@ -294,7 +276,7 @@ async fn pull(
let pull_params = PullParameters::new(
&store,
ns,
- remote.as_deref().unwrap_or("local"),
+ remote.as_deref(),
&remote_store,
remote_ns.unwrap_or_default(),
auth_id.clone(),
@@ -304,7 +286,6 @@ async fn pull(
limit,
transfer_last,
)?;
- let client = pull_params.client().await?;
// fixme: set to_stdout to false?
// FIXME: add namespace to worker id?
@@ -322,7 +303,7 @@ async fn pull(
remote_store,
);
- let pull_future = pull_store(&worker, &client, pull_params);
+ let pull_future = pull_store(&worker, pull_params);
(select! {
success = pull_future.fuse() => success,
abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
@@ -337,4 +318,4 @@ async fn pull(
Ok(upid_str)
}
-pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
\ No newline at end of file
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e55452d1..ff3e6d0a 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
//! Sync datastore from remote server
use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::Seek;
+use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
use proxmox_router::HttpError;
-use proxmox_sys::task_log;
+use proxmox_sys::{task_log, task_warn};
+use serde_json::json;
use pbs_api_types::{
- print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
- Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+ GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
};
-
-use pbs_client::{
- BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
+use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
-/// Parameters for a pull operation.
-pub(crate) struct PullParameters {
- /// Remote that is pulled from
- remote: Remote,
- /// Full specification of remote datastore
- source: BackupRepository,
- /// Local store that is pulled into
+struct RemoteReader {
+ backup_reader: Arc<BackupReader>,
+ dir: BackupDir,
+}
+
+pub(crate) struct PullTarget {
store: Arc<DataStore>,
- /// Remote namespace
- remote_ns: BackupNamespace,
- /// Local namespace (anchor)
ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+ repo: BackupRepository,
+ ns: BackupNamespace,
+ client: HttpClient,
+}
+
+#[async_trait::async_trait]
+/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source
+trait PullSource: Send + Sync {
+ /// Lists namespaces from the source.
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error>;
+
+ /// Lists groups within a specific namespace from the source.
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error>;
+
+ /// Lists backup directories for a specific group within a specific namespace from the source.
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupDir>, Error>;
+ fn get_ns(&self) -> BackupNamespace;
+ fn print_store_and_ns(&self) -> String;
+
+ /// Returns a reader for reading data from a specific backup directory.
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn PullReader>, Error>;
+}
+
+#[async_trait::async_trait]
+impl PullSource for RemoteSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+ return Ok(vec![self.ns.clone()]);
+ }
+
+ let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+ let mut data = json!({});
+ if let Some(max_depth) = max_depth {
+ data["max-depth"] = json!(max_depth);
+ }
+
+ if !self.ns.is_root() {
+ data["parent"] = json!(self.ns);
+ }
+ self.client.login().await?;
+
+ let mut result = match self.client.get(&path, Some(data)).await {
+ Ok(res) => res,
+ Err(err) => match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match code {
+ &StatusCode::NOT_FOUND => {
+ if self.ns.is_root() && max_depth.is_none() {
+ task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+ task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+ max_depth.replace(0);
+ } else {
+ bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+ }
+
+ return Ok(vec![self.ns.clone()]);
+ }
+ _ => {
+ bail!("Querying namespaces failed - HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ bail!("Querying namespaces failed - {err}");
+ }
+ },
+ };
+
+ let list: Vec<BackupNamespace> =
+ serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+ .into_iter()
+ .map(|list_item| list_item.ns)
+ .collect();
+
+ Ok(list)
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ _owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+ let args = if !namespace.is_root() {
+ Some(json!({ "ns": namespace.clone() }))
+ } else {
+ None
+ };
+
+ self.client.login().await?;
+ let mut result =
+ self.client.get(&path, args).await.map_err(|err| {
+ format_err!("Failed to retrieve backup groups from remote - {}", err)
+ })?;
+
+ Ok(
+ serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+ .map_err(Error::from)?
+ .into_iter()
+ .map(|item| item.backup)
+ .collect::<Vec<BackupGroup>>(),
+ )
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ _namespace: &BackupNamespace,
+ group: &BackupGroup,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupDir>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+ let mut args = json!({
+ "backup-type": group.ty,
+ "backup-id": group.id,
+ });
+
+ if !self.ns.is_root() {
+ args["ns"] = serde_json::to_value(&self.ns)?;
+ }
+
+ self.client.login().await?;
+
+ let mut result = self.client.get(&path, Some(args)).await?;
+ let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+ Ok(snapshot_list
+ .into_iter()
+ .filter_map(|item: SnapshotListItem| {
+ let snapshot = item.backup;
+ // in-progress backups can't be synced
+ if item.size.is_none() {
+ task_log!(
+ worker,
+ "skipping snapshot {} - in-progress backup",
+ snapshot
+ );
+ return None;
+ }
+
+ Some(snapshot)
+ })
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn print_store_and_ns(&self) -> String {
+ print_store_and_ns(self.repo.store(), &self.ns)
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn PullReader>, Error> {
+ let backup_reader =
+ BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+ Ok(Arc::new(RemoteReader {
+ backup_reader,
+ dir: dir.clone(),
+ }))
+ }
+}
+
+#[async_trait::async_trait]
+/// `PullReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
+trait PullReader: Send + Sync {
+ /// Returns a chunk reader with the specified encryption mode.
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+ /// Asynchronously loads a file from the source into a local file.
+ /// `filename` is the name of the file to load from the source.
+ /// `into` is the path of the local file to load the source file into.
+ async fn load_file_into(
+ &self,
+ filename: &str,
+ into: &Path,
+ worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error>;
+
+ /// Tries to download the client log from the source and save it into a local file.
+ async fn try_download_client_log(
+ &self,
+ to_path: &Path,
+ worker: &WorkerTask,
+ ) -> Result<(), Error>;
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+#[async_trait::async_trait]
+impl PullReader for RemoteReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(RemoteChunkReader::new(
+ self.backup_reader.clone(),
+ None,
+ crypt_mode,
+ HashMap::new(),
+ ))
+ }
+
+ async fn load_file_into(
+ &self,
+ filename: &str,
+ into: &Path,
+ worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+ if let Err(err) = download_result {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ task_log!(
+ worker,
+ "skipping snapshot {} - vanished since start of sync",
+ &self.dir,
+ );
+ return Ok(None);
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ };
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(
+ &self,
+ to_path: &Path,
+ worker: &WorkerTask,
+ ) -> Result<(), Error> {
+ let mut tmp_path = to_path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = self
+ .backup_reader
+ .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+ .await
+ {
+ if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+ bail!("Atomic rename file {:?} failed - {}", to_path, err);
+ }
+ task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+ }
+
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+ false
+ }
+}
+
+/// Parameters for a pull operation.
+pub(crate) struct PullParameters {
+ /// Where data is pulled from
+ source: Arc<dyn PullSource>,
+ /// Where data should be pulled into
+ target: PullTarget,
/// Owner of synced groups (needs to match local owner of pre-existing groups)
owner: Authid,
/// Whether to remove groups which exist locally, but not on the remote end
@@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
max_depth: Option<usize>,
/// Filters for reducing the pull scope
group_filter: Option<Vec<GroupFilter>>,
- /// Rate limits for all transfers from `remote`
- limit: RateLimitConfig,
/// How many snapshots should be transferred at most (taking the newest N snapshots)
transfer_last: Option<usize>,
}
impl PullParameters {
/// Creates a new instance of `PullParameters`.
- ///
- /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
- /// [BackupRepository] with `remote_store`.
- #[allow(clippy::too_many_arguments)]
pub(crate) fn new(
store: &str,
ns: BackupNamespace,
- remote: &str,
+ remote: Option<&str>,
remote_store: &str,
remote_ns: BackupNamespace,
owner: Authid,
@@ -82,49 +376,51 @@ impl PullParameters {
limit: RateLimitConfig,
transfer_last: Option<usize>,
) -> Result<Self, Error> {
- let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
remote_ns.check_max_depth(max_depth)?;
- }
-
- let (remote_config, _digest) = pbs_config::remote::config()?;
- let remote: Remote = remote_config.lookup("remote", remote)?;
-
+ };
let remove_vanished = remove_vanished.unwrap_or(false);
- let source = BackupRepository::new(
- Some(remote.config.auth_id.clone()),
- Some(remote.config.host.clone()),
- remote.config.port,
- remote_store.to_string(),
- );
+ let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote: Remote = remote_config.lookup("remote", remote)?;
- Ok(Self {
- remote,
- remote_ns,
+ let repo = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
+ let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
+ Arc::new(RemoteSource {
+ repo,
+ ns: remote_ns,
+ client,
+ })
+ } else {
+ bail!("local sync not implemented yet")
+ };
+ let target = PullTarget {
+ store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
ns,
+ };
+
+ Ok(Self {
source,
- store,
+ target,
owner,
remove_vanished,
max_depth,
group_filter,
- limit,
transfer_last,
})
}
-
- /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
- pub async fn client(&self) -> Result<HttpClient, Error> {
- crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
- }
}
async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask,
- chunk_reader: RemoteChunkReader,
+ chunk_reader: Arc<dyn AsyncReadChunk>,
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -215,26 +511,6 @@ async fn pull_index_chunks<I: IndexFile>(
Ok(())
}
-async fn download_manifest(
- reader: &BackupReader,
- filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
- let mut tmp_manifest_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(filename)?;
-
- reader
- .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
- .await?;
-
- tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
- Ok(tmp_manifest_file)
-}
-
fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
if size != info.size {
bail!(
@@ -255,17 +531,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// Pulls a single file referenced by a manifest.
///
/// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
-/// - Verify tmp file checksum
+/// - Load archive file into tmp file
+/// -- Load file into tmp file
+/// -- Verify tmp file checksum
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
-async fn pull_single_archive(
- worker: &WorkerTask,
- reader: &BackupReader,
- chunk_reader: &mut RemoteChunkReader,
- snapshot: &pbs_datastore::BackupDir,
- archive_info: &FileInfo,
+async fn pull_single_archive<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
+ archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let archive_name = &archive_info.filename;
@@ -277,13 +552,11 @@ async fn pull_single_archive(
task_log!(worker, "sync archive {}", archive_name);
- let mut tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
+ reader
+ .load_file_into(archive_name, &tmp_path, worker)
+ .await?;
- reader.download(archive_name, &mut tmpfile).await?;
+ let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
match archive_type(archive_name)? {
ArchiveType::DynamicIndex => {
@@ -293,14 +566,18 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ if reader.skip_chunk_sync(snapshot.datastore().name()) {
+ task_log!(worker, "skipping chunk sync for same datastore");
+ } else {
+ pull_index_chunks(
+ worker,
+ reader.chunk_reader(archive_info.crypt_mode),
+ snapshot.datastore().clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -309,17 +586,21 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ if reader.skip_chunk_sync(snapshot.datastore().name()) {
+ task_log!(worker, "skipping chunk sync for same datastore");
+ } else {
+ pull_index_chunks(
+ worker,
+ reader.chunk_reader(archive_info.crypt_mode),
+ snapshot.datastore().clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
}
ArchiveType::Blob => {
- tmpfile.seek(SeekFrom::Start(0))?;
+ tmpfile.rewind()?;
let (csum, size) = sha256(&mut tmpfile)?;
verify_archive(archive_info, &csum, size)?;
}
@@ -330,33 +611,6 @@ async fn pull_single_archive(
Ok(())
}
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- path: &std::path::Path,
-) -> Result<(), Error> {
- let mut tmp_path = path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
- if let Err(err) = std::fs::rename(&tmp_path, path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
- }
- task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
- }
-
- Ok(())
-}
-
/// Actual implementation of pulling a snapshot.
///
/// Pulling a snapshot consists of the following steps:
@@ -366,10 +620,10 @@ async fn try_client_log_download(
/// -- if file already exists, verify contents
/// -- if not, pull it from the remote
/// - Download log if not already existing
-async fn pull_snapshot(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- snapshot: &pbs_datastore::BackupDir,
+async fn pull_snapshot<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let mut manifest_name = snapshot.full_path();
@@ -380,32 +634,15 @@ async fn pull_snapshot(
let mut tmp_manifest_name = manifest_name.clone();
tmp_manifest_name.set_extension("tmp");
-
- let download_res = download_manifest(&reader, &tmp_manifest_name).await;
- let mut tmp_manifest_file = match download_res {
- Ok(manifest_file) => manifest_file,
- Err(err) => {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- task_log!(
- worker,
- "skipping snapshot {} - vanished since start of sync",
- snapshot.dir(),
- );
- return Ok(());
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
- }
- };
- }
- };
- let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+ let tmp_manifest_blob;
+ if let Some(data) = reader
+ .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
+ .await?
+ {
+ tmp_manifest_blob = data;
+ } else {
+ return Ok(());
+ }
if manifest_name.exists() {
let manifest_blob = proxmox_lang::try_block!({
@@ -422,8 +659,10 @@ async fn pull_snapshot(
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
+ reader
+ .try_download_client_log(&client_log_name, worker)
+ .await?;
+ };
task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(()); // nothing changed
@@ -471,17 +710,9 @@ async fn pull_snapshot(
}
}
- let mut chunk_reader = RemoteChunkReader::new(
- reader.clone(),
- None,
- item.chunk_crypt_mode(),
- HashMap::new(),
- );
-
pull_single_archive(
worker,
- &reader,
- &mut chunk_reader,
+ reader.clone(),
snapshot,
item,
downloaded_chunks.clone(),
@@ -494,9 +725,10 @@ async fn pull_snapshot(
}
if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
-
+ reader
+ .try_download_client_log(&client_log_name, worker)
+ .await?;
+ };
snapshot
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,12 +738,12 @@ async fn pull_snapshot(
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
-async fn pull_snapshot_from(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- snapshot: &pbs_datastore::BackupDir,
+/// The `reader` is configured to read from the source backup directory, while the
+/// `snapshot` is pointing to the local datastore and target namespace.
+async fn pull_snapshot_from<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let (_path, is_new, _snap_lock) = snapshot
@@ -626,11 +858,10 @@ impl std::fmt::Display for SkipInfo {
/// - Sort by snapshot time
/// - Get last snapshot timestamp on local datastore
/// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
///
-/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
+/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
/// remote when querying snapshots. This allows us to interact with old remotes that don't have
/// namespace support yet.
///
@@ -639,117 +870,79 @@ impl std::fmt::Display for SkipInfo {
/// - local group owner is already checked by pull_store
async fn pull_group(
worker: &WorkerTask,
- client: &HttpClient,
params: &PullParameters,
- group: &pbs_api_types::BackupGroup,
- remote_ns: BackupNamespace,
+ source_namespace: &BackupNamespace,
+ group: &BackupGroup,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- task_log!(worker, "sync group {}", group);
-
- let path = format!(
- "api2/json/admin/datastore/{}/snapshots",
- params.source.store()
- );
-
- let mut args = json!({
- "backup-type": group.ty,
- "backup-id": group.id,
- });
-
- if !remote_ns.is_root() {
- args["ns"] = serde_json::to_value(&remote_ns)?;
- }
-
- let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
-
- let mut result = client.get(&path, Some(args)).await?;
- let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
- list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
- client.login().await?; // make sure auth is complete
-
- let fingerprint = client.fingerprint();
-
- let last_sync = params.store.last_successful_backup(&target_ns, group)?;
- let last_sync_time = last_sync.unwrap_or(i64::MIN);
-
- let mut remote_snapshots = std::collections::HashSet::new();
-
- // start with 65536 chunks (up to 256 GiB)
- let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
- progress.group_snapshots = list.len() as u64;
-
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
- let total_amount = list.len();
+ let mut raw_list: Vec<BackupDir> = params
+ .source
+ .list_backup_dirs(source_namespace, group, worker)
+ .await?;
+ raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
+
+ let total_amount = raw_list.len();
let cutoff = params
.transfer_last
.map(|count| total_amount.saturating_sub(count))
.unwrap_or_default();
- for (pos, item) in list.into_iter().enumerate() {
- let snapshot = item.backup;
-
- // in-progress backups can't be synced
- if item.size.is_none() {
- task_log!(
- worker,
- "skipping snapshot {} - in-progress backup",
- snapshot
- );
- continue;
- }
+ let target_ns = source_namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
- remote_snapshots.insert(snapshot.time);
-
- if last_sync_time > snapshot.time {
- already_synced_skip_info.update(snapshot.time);
- continue;
- } else if already_synced_skip_info.count > 0 {
- task_log!(worker, "{}", already_synced_skip_info);
- already_synced_skip_info.reset();
- }
-
- if pos < cutoff && last_sync_time != snapshot.time {
- transfer_last_skip_info.update(snapshot.time);
- continue;
- } else if transfer_last_skip_info.count > 0 {
- task_log!(worker, "{}", transfer_last_skip_info);
- transfer_last_skip_info.reset();
- }
-
- // get updated auth_info (new tickets)
- let auth_info = client.login().await?;
+ let mut source_snapshots = HashSet::new();
+ let last_sync_time = params
+ .target
+ .store
+ .last_successful_backup(&target_ns, group)?
+ .unwrap_or(i64::MIN);
+
+ let list: Vec<BackupDir> = raw_list
+ .into_iter()
+ .enumerate()
+ .filter(|&(pos, ref dir)| {
+ source_snapshots.insert(dir.time);
+ if last_sync_time > dir.time {
+ already_synced_skip_info.update(dir.time);
+ return false;
+ } else if already_synced_skip_info.count > 0 {
+ task_log!(worker, "{}", already_synced_skip_info);
+ already_synced_skip_info.reset();
+ return true;
+ }
- let options =
- HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
- .rate_limit(params.limit.clone());
+ if pos < cutoff && last_sync_time != dir.time {
+ transfer_last_skip_info.update(dir.time);
+ return false;
+ } else if transfer_last_skip_info.count > 0 {
+ task_log!(worker, "{}", transfer_last_skip_info);
+ transfer_last_skip_info.reset();
+ }
+ true
+ })
+ .map(|(_, dir)| dir)
+ .collect();
- let new_client = HttpClient::new(
- params.source.host(),
- params.source.port(),
- params.source.auth_id(),
- options,
- )?;
+ // start with 65536 chunks (up to 256 GiB)
+ let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
- let reader = BackupReader::start(
- &new_client,
- None,
- params.source.store(),
- &remote_ns,
- &snapshot,
- true,
- )
- .await?;
+ progress.group_snapshots = list.len() as u64;
- let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+ for (pos, from_snapshot) in list.into_iter().enumerate() {
+ let to_snapshot = params
+ .target
+ .store
+ .backup_dir(target_ns.clone(), from_snapshot.clone())?;
- let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+ let reader = params
+ .source
+ .reader(source_namespace, &from_snapshot)
+ .await?;
+ let result =
+ pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
progress.done_snapshots = pos as u64 + 1;
task_log!(worker, "percentage done: {}", progress);
@@ -758,11 +951,14 @@ async fn pull_group(
}
if params.remove_vanished {
- let group = params.store.backup_group(target_ns.clone(), group.clone());
+ let group = params
+ .target
+ .store
+ .backup_group(target_ns.clone(), group.clone());
let local_list = group.list_backups()?;
for info in local_list {
let snapshot = info.backup_dir;
- if remote_snapshots.contains(&snapshot.backup_time()) {
+ if source_snapshots.contains(&snapshot.backup_time()) {
continue;
}
if snapshot.is_protected() {
@@ -775,6 +971,7 @@ async fn pull_group(
}
task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
params
+ .target
.store
.remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
}
@@ -783,64 +980,12 @@ async fn pull_group(
Ok(())
}
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
- worker: &WorkerTask,
- client: &HttpClient,
- params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
- let path = format!(
- "api2/json/admin/datastore/{}/namespace",
- params.source.store()
- );
- let mut data = json!({});
- if let Some(max_depth) = params.max_depth {
- data["max-depth"] = json!(max_depth);
- }
-
- if !params.remote_ns.is_root() {
- data["parent"] = json!(params.remote_ns);
- }
-
- let mut result = match client.get(&path, Some(data)).await {
- Ok(res) => res,
- Err(err) => match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- if params.remote_ns.is_root() && params.max_depth.is_none() {
- task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
- task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
- params.max_depth = Some(0);
- } else {
- bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
- }
-
- return Ok(vec![params.remote_ns.clone()]);
- }
- _ => {
- bail!("Querying namespaces failed - HTTP error {code} - {message}");
- }
- },
- None => {
- bail!("Querying namespaces failed - {err}");
- }
- },
- };
-
- let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
- // parents first
- list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
- Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
let mut created = false;
- let store_ns_str = print_store_and_ns(params.store.name(), ns);
+ let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
- if !ns.is_root() && !params.store.namespace_path(ns).exists() {
- check_ns_modification_privs(params.store.name(), ns, ¶ms.owner)
+ if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+ check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner)
.map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
let name = match ns.components().last() {
@@ -850,14 +995,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
}
};
- if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+ if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
}
created = true;
}
check_ns_privs(
- params.store.name(),
+ params.target.store.name(),
ns,
¶ms.owner,
PRIV_DATASTORE_BACKUP,
@@ -868,10 +1013,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
}
fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
- check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner)
+ check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner)
.map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
- params.store.remove_namespace_recursive(local_ns, true)
+ params
+ .target
+ .store
+ .remove_namespace_recursive(local_ns, true)
}
fn check_and_remove_vanished_ns(
@@ -885,14 +1033,15 @@ fn check_and_remove_vanished_ns(
// clamp like remote does so that we don't list more than we can ever have synced.
let max_depth = params
.max_depth
- .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+ .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
let mut local_ns_list: Vec<BackupNamespace> = params
+ .target
.store
- .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+ .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
.filter(|ns| {
let user_privs =
- user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name()));
+ user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name()));
user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
})
.collect();
@@ -901,7 +1050,7 @@ fn check_and_remove_vanished_ns(
local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
for local_ns in local_ns_list {
- if local_ns == params.ns {
+ if local_ns == params.target.ns {
continue;
}
@@ -948,29 +1097,49 @@ fn check_and_remove_vanished_ns(
/// - access to sub-NS checked here
pub(crate) async fn pull_store(
worker: &WorkerTask,
- client: &HttpClient,
mut params: PullParameters,
) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+ let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
let old_max_depth = params.max_depth;
- let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
- vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
+ let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
+ vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
} else {
- query_namespaces(worker, client, &mut params).await?
+ params
+ .source
+ .list_namespaces(&mut params.max_depth, worker)
+ .await?
};
+
+ let ns_layers_to_be_pulled = namespaces
+ .iter()
+ .map(BackupNamespace::depth)
+ .max()
+ .map_or(0, |v| v - params.source.get_ns().depth());
+ let target_depth = params.target.ns.depth();
+
+ if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
+ bail!(
+ "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
+ ns_layers_to_be_pulled,
+ target_depth,
+ MAX_NAMESPACE_DEPTH
+ );
+ }
+
errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+ namespaces.sort_unstable_by_key(|a| a.name_len());
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
for namespace in namespaces {
- let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+ let source_store_ns_str = params.source.print_store_and_ns();
- let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
- let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+ let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
+ let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
task_log!(worker, "----");
task_log!(
@@ -998,7 +1167,7 @@ pub(crate) async fn pull_store(
}
}
- match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await {
+ match pull_ns(worker, &namespace, &mut params).await {
Ok((ns_progress, ns_errors)) => {
errors |= ns_errors;
@@ -1019,7 +1188,7 @@ pub(crate) async fn pull_store(
task_log!(
worker,
"Encountered errors while syncing namespace {} - {}",
- namespace,
+ &namespace,
err,
);
}
@@ -1051,48 +1220,28 @@ pub(crate) async fn pull_store(
/// - owner check for vanished groups done here
pub(crate) async fn pull_ns(
worker: &WorkerTask,
- client: &HttpClient,
- params: &PullParameters,
- source_ns: BackupNamespace,
- target_ns: BackupNamespace,
+ namespace: &BackupNamespace,
+ params: &mut PullParameters,
) -> Result<(StoreProgress, bool), Error> {
- let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
- let args = if !source_ns.is_root() {
- Some(json!({
- "ns": source_ns,
- }))
- } else {
- None
- };
-
- let mut result = client
- .get(&path, args)
- .await
- .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
- let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+ let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
let total_count = list.len();
list.sort_unstable_by(|a, b| {
- let type_order = a.backup.ty.cmp(&b.backup.ty);
+ let type_order = a.ty.cmp(&b.ty);
if type_order == std::cmp::Ordering::Equal {
- a.backup.id.cmp(&b.backup.id)
+ a.id.cmp(&b.id)
} else {
type_order
}
});
- let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+ let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
filters.iter().any(|filter| group.matches(filter))
};
- // Get groups with target NS set
- let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
let list = if let Some(ref group_filter) = ¶ms.group_filter {
let unfiltered_count = list.len();
- let list: Vec<pbs_api_types::BackupGroup> = list
+ let list: Vec<BackupGroup> = list
.into_iter()
.filter(|group| apply_filters(group, group_filter))
.collect();
@@ -1110,13 +1259,15 @@ pub(crate) async fn pull_ns(
let mut errors = false;
- let mut new_groups = std::collections::HashSet::new();
+ let mut new_groups = HashSet::new();
for group in list.iter() {
new_groups.insert(group.clone());
}
let mut progress = StoreProgress::new(list.len() as u64);
+ let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
+
for (done, group) in list.into_iter().enumerate() {
progress.done_groups = done as u64;
progress.done_snapshots = 0;
@@ -1124,6 +1275,7 @@ pub(crate) async fn pull_ns(
let (owner, _lock_guard) =
match params
+ .target
.store
.create_locked_backup_group(&target_ns, &group, ¶ms.owner)
{
@@ -1135,7 +1287,9 @@ pub(crate) async fn pull_ns(
&group,
err
);
- errors = true; // do not stop here, instead continue
+ errors = true;
+ // do not stop here, instead continue
+ task_log!(worker, "create_locked_backup_group failed");
continue;
}
};
@@ -1151,15 +1305,7 @@ pub(crate) async fn pull_ns(
owner
);
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(
- worker,
- client,
- params,
- &group,
- source_ns.clone(),
- &mut progress,
- )
- .await
+ } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
{
task_log!(worker, "sync group {} failed - {}", &group, err,);
errors = true; // do not stop here, instead continue
@@ -1168,13 +1314,13 @@ pub(crate) async fn pull_ns(
if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
- for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+ for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
let local_group = local_group?;
let local_group = local_group.group();
if new_groups.contains(local_group) {
continue;
}
- let owner = params.store.get_owner(&target_ns, local_group)?;
+ let owner = params.target.store.get_owner(&target_ns, local_group)?;
if check_backup_owner(&owner, ¶ms.owner).is_err() {
continue;
}
@@ -1184,7 +1330,11 @@ pub(crate) async fn pull_ns(
}
}
task_log!(worker, "delete vanished group '{local_group}'",);
- match params.store.remove_backup_group(&target_ns, local_group) {
+ match params
+ .target
+ .store
+ .remove_backup_group(&target_ns, local_group)
+ {
Ok(true) => {}
Ok(false) => {
task_log!(
--
2.39.2
^ permalink raw reply [flat|nested] 9+ messages in thread