* [pbs-devel] [RFC proxmox-backup 01/24] datastore: data blob: fix typos in comments
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 02/24] server: pull: be more specific in module comment Christian Ebner
` (24 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-datastore/src/data_blob.rs | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/pbs-datastore/src/data_blob.rs b/pbs-datastore/src/data_blob.rs
index e3039354e..a7a55fb70 100644
--- a/pbs-datastore/src/data_blob.rs
+++ b/pbs-datastore/src/data_blob.rs
@@ -30,7 +30,7 @@ pub struct ChunkInfo {
/// ".didx").
///
pub struct DataBlob {
- raw_data: Vec<u8>, // tagged, compressed, encryped data
+ raw_data: Vec<u8>, // tagged, compressed, encrypted data
}
impl DataBlob {
@@ -212,7 +212,7 @@ impl DataBlob {
let data_start = std::mem::size_of::<DataBlobHeader>();
let mut reader = &self.raw_data[data_start..];
let data = zstd::stream::decode_all(&mut reader)?;
- // zstd::block::decompress is abou 10% slower
+ // zstd::block::decompress is about 10% slower
// let data = zstd::block::decompress(&self.raw_data[data_start..], MAX_BLOB_SIZE)?;
if let Some(digest) = digest {
Self::verify_digest(&data, None, digest)?;
@@ -472,7 +472,7 @@ impl DataBlob {
/// Builder for chunk DataBlobs
///
/// Main purpose is to centralize digest computation. Digest
-/// computation differ for encryped chunk, and this interface ensures that
+/// computation differ for encrypted chunk, and this interface ensures that
/// we always compute the correct one.
pub struct DataChunkBuilder<'a, 'b> {
config: Option<&'b CryptConfig>,
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 02/24] server: pull: be more specific in module comment
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 01/24] datastore: data blob: fix typos in comments Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 03/24] server: pull: silence clippy to many arguments warning Christian Ebner
` (23 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Describe the `pull` direction of the sync operation more precisely
before adding also a `push` direction as synchronization operation.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 1ff9b92ab..b505976ad 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,4 +1,4 @@
-//! Sync datastore from remote server
+//! Sync datastore by pulling contents from remote server
use std::collections::{HashMap, HashSet};
use std::io::{Seek, Write};
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 03/24] server: pull: silence clippy to many arguments warning
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 01/24] datastore: data blob: fix typos in comments Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 02/24] server: pull: be more specific in module comment Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 04/24] www: sync edit: indetation style fix Christian Ebner
` (22 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index b505976ad..e8b1d0013 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -503,6 +503,7 @@ pub(crate) struct PullParameters {
impl PullParameters {
/// Creates a new instance of `PullParameters`.
+ #[allow(clippy::too_many_arguments)]
pub(crate) fn new(
store: &str,
ns: BackupNamespace,
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 04/24] www: sync edit: indetation style fix
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (2 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 03/24] server: pull: silence clippy to many arguments warning Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 05/24] server: pull: fix sync info message for root namespace Christian Ebner
` (21 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
www/window/SyncJobEdit.js | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index e06fdd81a..6543995e8 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -132,8 +132,8 @@ Ext.define('PBS.window.SyncJobEdit', {
name: 'schedule',
emptyText: gettext('none (disabled)'),
cbind: {
- deleteEmpty: '{!isCreate}',
- value: '{scheduleValue}',
+ deleteEmpty: '{!isCreate}',
+ value: '{scheduleValue}',
},
},
{
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 05/24] server: pull: fix sync info message for root namespace
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (3 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 04/24] www: sync edit: indetation style fix Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 06/24] server: sync: move sync related stats to common module Christian Ebner
` (20 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
The root namespace is displayed as empty string when used in the
format string. Distinguish and explicitly write out the root namespace
in the sync info message shown in the sync jobs task log.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e8b1d0013..823515e9a 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1311,8 +1311,14 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
if params.max_depth != Some(0) {
groups += ns_progress.done_groups;
snapshots += ns_progress.done_snapshots;
+
+ let ns = if namespace.is_root() {
+ "root namespace".into()
+ } else {
+ format!("namespace {namespace}")
+ };
info!(
- "Finished syncing namespace {namespace}, current progress: {groups} groups, {snapshots} snapshots"
+ "Finished syncing {ns}, current progress: {groups} groups, {snapshots} snapshots"
);
}
}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 06/24] server: sync: move sync related stats to common module
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (4 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 05/24] server: pull: fix sync info message for root namespace Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module Christian Ebner
` (19 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Move and rename the `PullStats` to `SyncStats` as well as moving the
`RemovedVanishedStats` to make them reusable for sync operations in
push direction as well as pull direction.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/mod.rs | 1 +
src/server/pull.rs | 121 ++++++++++++++-------------------------------
src/server/sync.rs | 51 +++++++++++++++++++
3 files changed, 89 insertions(+), 84 deletions(-)
create mode 100644 src/server/sync.rs
diff --git a/src/server/mod.rs b/src/server/mod.rs
index d2cbc931c..3acfcc1c4 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -34,6 +34,7 @@ pub use report::*;
pub mod auth;
pub(crate) mod pull;
+pub(crate) mod sync;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 823515e9a..24422ef41 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,7 +5,7 @@ use std::io::{Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
-use std::time::{Duration, SystemTime};
+use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use http::StatusCode;
@@ -34,6 +34,7 @@ use pbs_datastore::{
};
use pbs_tools::sha::sha256;
+use super::sync::{RemovedVanishedStats, SyncStats};
use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
@@ -64,54 +65,6 @@ pub(crate) struct LocalSource {
ns: BackupNamespace,
}
-#[derive(Default)]
-pub(crate) struct RemovedVanishedStats {
- pub(crate) groups: usize,
- pub(crate) snapshots: usize,
- pub(crate) namespaces: usize,
-}
-
-impl RemovedVanishedStats {
- fn add(&mut self, rhs: RemovedVanishedStats) {
- self.groups += rhs.groups;
- self.snapshots += rhs.snapshots;
- self.namespaces += rhs.namespaces;
- }
-}
-
-#[derive(Default)]
-pub(crate) struct PullStats {
- pub(crate) chunk_count: usize,
- pub(crate) bytes: usize,
- pub(crate) elapsed: Duration,
- pub(crate) removed: Option<RemovedVanishedStats>,
-}
-
-impl From<RemovedVanishedStats> for PullStats {
- fn from(removed: RemovedVanishedStats) -> Self {
- Self {
- removed: Some(removed),
- ..Default::default()
- }
- }
-}
-
-impl PullStats {
- fn add(&mut self, rhs: PullStats) {
- self.chunk_count += rhs.chunk_count;
- self.bytes += rhs.bytes;
- self.elapsed += rhs.elapsed;
-
- if let Some(rhs_removed) = rhs.removed {
- if let Some(ref mut removed) = self.removed {
- removed.add(rhs_removed);
- } else {
- self.removed = Some(rhs_removed);
- }
- }
- }
-}
-
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -569,7 +522,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
let start_time = SystemTime::now();
@@ -656,7 +609,7 @@ async fn pull_index_chunks<I: IndexFile>(
HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
);
- Ok(PullStats {
+ Ok(SyncStats {
chunk_count,
bytes,
elapsed,
@@ -694,7 +647,7 @@ async fn pull_single_archive<'a>(
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
path.push(archive_name);
@@ -702,7 +655,7 @@ async fn pull_single_archive<'a>(
let mut tmp_path = path.clone();
tmp_path.set_extension("tmp");
- let mut pull_stats = PullStats::default();
+ let mut sync_stats = SyncStats::default();
info!("sync archive {archive_name}");
@@ -728,7 +681,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
- pull_stats.add(stats);
+ sync_stats.add(stats);
}
}
ArchiveType::FixedIndex => {
@@ -748,7 +701,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
- pull_stats.add(stats);
+ sync_stats.add(stats);
}
}
ArchiveType::Blob => {
@@ -760,7 +713,7 @@ async fn pull_single_archive<'a>(
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
- Ok(pull_stats)
+ Ok(sync_stats)
}
/// Actual implementation of pulling a snapshot.
@@ -776,8 +729,8 @@ async fn pull_snapshot<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
- let mut pull_stats = PullStats::default();
+) -> Result<SyncStats, Error> {
+ let mut sync_stats = SyncStats::default();
let mut manifest_name = snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME);
@@ -793,7 +746,7 @@ async fn pull_snapshot<'a>(
{
tmp_manifest_blob = data;
} else {
- return Ok(pull_stats);
+ return Ok(sync_stats);
}
if manifest_name.exists() {
@@ -815,7 +768,7 @@ async fn pull_snapshot<'a>(
};
info!("no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
- return Ok(pull_stats); // nothing changed
+ return Ok(sync_stats); // nothing changed
}
}
@@ -862,7 +815,7 @@ async fn pull_snapshot<'a>(
let stats =
pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
- pull_stats.add(stats);
+ sync_stats.add(stats);
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -876,7 +829,7 @@ async fn pull_snapshot<'a>(
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
- Ok(pull_stats)
+ Ok(sync_stats)
}
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
@@ -887,12 +840,12 @@ async fn pull_snapshot_from<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
- let pull_stats = if is_new {
+ let sync_stats = if is_new {
info!("sync snapshot {}", snapshot.dir());
match pull_snapshot(reader, snapshot, downloaded_chunks).await {
@@ -906,9 +859,9 @@ async fn pull_snapshot_from<'a>(
}
return Err(err);
}
- Ok(pull_stats) => {
+ Ok(sync_stats) => {
info!("sync snapshot {} done", snapshot.dir());
- pull_stats
+ sync_stats
}
}
} else {
@@ -916,7 +869,7 @@ async fn pull_snapshot_from<'a>(
pull_snapshot(reader, snapshot, downloaded_chunks).await?
};
- Ok(pull_stats)
+ Ok(sync_stats)
}
#[derive(PartialEq, Eq)]
@@ -1020,7 +973,7 @@ async fn pull_group(
source_namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1077,7 +1030,7 @@ async fn pull_group(
progress.group_snapshots = list.len() as u64;
- let mut pull_stats = PullStats::default();
+ let mut sync_stats = SyncStats::default();
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
@@ -1095,7 +1048,7 @@ async fn pull_group(
info!("percentage done: {progress}");
let stats = result?; // stop on error
- pull_stats.add(stats);
+ sync_stats.add(stats);
}
if params.remove_vanished {
@@ -1121,7 +1074,7 @@ async fn pull_group(
.target
.store
.remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
- pull_stats.add(PullStats::from(RemovedVanishedStats {
+ sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: 1,
groups: 0,
namespaces: 0,
@@ -1129,7 +1082,7 @@ async fn pull_group(
}
}
- Ok(pull_stats)
+ Ok(sync_stats)
}
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
@@ -1246,7 +1199,7 @@ fn check_and_remove_vanished_ns(
/// - remote namespaces are filtered by remote
/// - creation and removal of sub-NS checked here
/// - access to sub-NS checked here
-pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats, Error> {
+pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats, Error> {
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
@@ -1279,7 +1232,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
- let mut pull_stats = PullStats::default();
+ let mut sync_stats = SyncStats::default();
for namespace in namespaces {
let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1303,10 +1256,10 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
}
match pull_ns(&namespace, &mut params).await {
- Ok((ns_progress, ns_pull_stats, ns_errors)) => {
+ Ok((ns_progress, ns_sync_stats, ns_errors)) => {
errors |= ns_errors;
- pull_stats.add(ns_pull_stats);
+ sync_stats.add(ns_sync_stats);
if params.max_depth != Some(0) {
groups += ns_progress.done_groups;
@@ -1335,14 +1288,14 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
if params.remove_vanished {
let (has_errors, stats) = check_and_remove_vanished_ns(¶ms, synced_ns)?;
errors |= has_errors;
- pull_stats.add(PullStats::from(stats));
+ sync_stats.add(SyncStats::from(stats));
}
if errors {
bail!("sync failed with some errors.");
}
- Ok(pull_stats)
+ Ok(sync_stats)
}
/// Pulls a namespace according to `params`.
@@ -1360,7 +1313,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
pub(crate) async fn pull_ns(
namespace: &BackupNamespace,
params: &mut PullParameters,
-) -> Result<(StoreProgress, PullStats, bool), Error> {
+) -> Result<(StoreProgress, SyncStats, bool), Error> {
let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
list.sort_unstable_by(|a, b| {
@@ -1390,7 +1343,7 @@ pub(crate) async fn pull_ns(
}
let mut progress = StoreProgress::new(list.len() as u64);
- let mut pull_stats = PullStats::default();
+ let mut sync_stats = SyncStats::default();
let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
@@ -1425,7 +1378,7 @@ pub(crate) async fn pull_ns(
errors = true; // do not stop here, instead continue
} else {
match pull_group(params, namespace, &group, &mut progress).await {
- Ok(stats) => pull_stats.add(stats),
+ Ok(stats) => sync_stats.add(stats),
Err(err) => {
info!("sync group {} failed - {err}", &group);
errors = true; // do not stop here, instead continue
@@ -1459,13 +1412,13 @@ pub(crate) async fn pull_ns(
Ok(stats) => {
if !stats.all_removed() {
info!("kept some protected snapshots of group '{local_group}'");
- pull_stats.add(PullStats::from(RemovedVanishedStats {
+ sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 0,
namespaces: 0,
}));
} else {
- pull_stats.add(PullStats::from(RemovedVanishedStats {
+ sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 1,
namespaces: 0,
@@ -1486,5 +1439,5 @@ pub(crate) async fn pull_ns(
};
}
- Ok((progress, pull_stats, errors))
+ Ok((progress, sync_stats, errors))
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
new file mode 100644
index 000000000..5f143ef63
--- /dev/null
+++ b/src/server/sync.rs
@@ -0,0 +1,51 @@
+//! Sync datastore contents from source to target, either in push or pull direction
+
+use std::time::Duration;
+
+#[derive(Default)]
+pub(crate) struct RemovedVanishedStats {
+ pub(crate) groups: usize,
+ pub(crate) snapshots: usize,
+ pub(crate) namespaces: usize,
+}
+
+impl RemovedVanishedStats {
+ pub(crate) fn add(&mut self, rhs: RemovedVanishedStats) {
+ self.groups += rhs.groups;
+ self.snapshots += rhs.snapshots;
+ self.namespaces += rhs.namespaces;
+ }
+}
+
+#[derive(Default)]
+pub(crate) struct SyncStats {
+ pub(crate) chunk_count: usize,
+ pub(crate) bytes: usize,
+ pub(crate) elapsed: Duration,
+ pub(crate) removed: Option<RemovedVanishedStats>,
+}
+
+impl From<RemovedVanishedStats> for SyncStats {
+ fn from(removed: RemovedVanishedStats) -> Self {
+ Self {
+ removed: Some(removed),
+ ..Default::default()
+ }
+ }
+}
+
+impl SyncStats {
+ pub(crate) fn add(&mut self, rhs: SyncStats) {
+ self.chunk_count += rhs.chunk_count;
+ self.bytes += rhs.bytes;
+ self.elapsed += rhs.elapsed;
+
+ if let Some(rhs_removed) = rhs.removed {
+ if let Some(ref mut removed) = self.removed {
+ removed.add(rhs_removed);
+ } else {
+ self.removed = Some(rhs_removed);
+ }
+ }
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (5 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 06/24] server: sync: move sync related stats to common module Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-16 9:53 ` Gabriel Goller
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source " Christian Ebner
` (18 subsequent siblings)
25 siblings, 1 reply; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Move the `PullReader` trait and the types implementing it to the
common sync module, so this can be reused for the push direction
variant for a sync job as well.
Adapt the naming to be more ambiguous by renaming `PullReader` trait to
`SyncSourceReader`, `LocalReader` to `LocalSourceReader` and
`RemoteReader` to `RemoteSourceReader`.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 167 +++++----------------------------------------
src/server/sync.rs | 152 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 168 insertions(+), 151 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 24422ef41..5efe2d5f7 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,7 @@
//! Sync datastore by pulling contents from remote server
-use std::collections::{HashMap, HashSet};
-use std::io::{Seek, Write};
-use std::path::{Path, PathBuf};
+use std::collections::HashSet;
+use std::io::Seek;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
@@ -15,11 +14,11 @@ use serde_json::json;
use tracing::{info, warn};
use pbs_api_types::{
- print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter,
GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
-use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_client::{BackupReader, BackupRepository, HttpClient};
use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -29,26 +28,15 @@ use pbs_datastore::manifest::{
ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{
- check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
-};
+use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress};
use pbs_tools::sha::sha256;
-use super::sync::{RemovedVanishedStats, SyncStats};
+use super::sync::{
+ LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats,
+};
use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
-struct RemoteReader {
- backup_reader: Arc<BackupReader>,
- dir: BackupDir,
-}
-
-struct LocalReader {
- _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
- path: PathBuf,
- datastore: Arc<DataStore>,
-}
-
pub(crate) struct PullTarget {
store: Arc<DataStore>,
ns: BackupNamespace,
@@ -97,7 +85,7 @@ trait PullSource: Send + Sync {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error>;
+ ) -> Result<Arc<dyn SyncSourceReader>, Error>;
}
#[async_trait::async_trait]
@@ -230,10 +218,10 @@ impl PullSource for RemoteSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error> {
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
let backup_reader =
BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
- Ok(Arc::new(RemoteReader {
+ Ok(Arc::new(RemoteSourceReader {
backup_reader,
dir: dir.clone(),
}))
@@ -298,14 +286,14 @@ impl PullSource for LocalSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error> {
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
&dir.full_path(),
"snapshot",
"locked by another operation",
)?;
- Ok(Arc::new(LocalReader {
+ Ok(Arc::new(LocalSourceReader {
_dir_lock: Arc::new(Mutex::new(dir_lock)),
path: dir.full_path(),
datastore: dir.datastore().clone(),
@@ -313,129 +301,6 @@ impl PullSource for LocalSource {
}
}
-#[async_trait::async_trait]
-/// `PullReader` is a trait that provides an interface for reading data from a source.
-/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
-trait PullReader: Send + Sync {
- /// Returns a chunk reader with the specified encryption mode.
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
-
- /// Asynchronously loads a file from the source into a local file.
- /// `filename` is the name of the file to load from the source.
- /// `into` is the path of the local file to load the source file into.
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
-
- /// Tries to download the client log from the source and save it into a local file.
- async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
-
- fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
-}
-
-#[async_trait::async_trait]
-impl PullReader for RemoteReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(RemoteChunkReader::new(
- self.backup_reader.clone(),
- None,
- crypt_mode,
- HashMap::new(),
- ))
- }
-
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
- let mut tmp_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(into)?;
- let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
- if let Err(err) = download_result {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- info!(
- "skipping snapshot {} - vanished since start of sync",
- &self.dir,
- );
- return Ok(None);
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
- }
- };
- };
- tmp_file.rewind()?;
- Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
- }
-
- async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
- let mut tmp_path = to_path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = self
- .backup_reader
- .download(CLIENT_LOG_BLOB_NAME, tmpfile)
- .await
- {
- if let Err(err) = std::fs::rename(&tmp_path, to_path) {
- bail!("Atomic rename file {:?} failed - {}", to_path, err);
- }
- info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
- }
-
- Ok(())
- }
-
- fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
- false
- }
-}
-
-#[async_trait::async_trait]
-impl PullReader for LocalReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(LocalChunkReader::new(
- self.datastore.clone(),
- None,
- crypt_mode,
- ))
- }
-
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
- let mut tmp_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(into)?;
- let mut from_path = self.path.clone();
- from_path.push(filename);
- tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
- tmp_file.rewind()?;
- Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
- }
-
- async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
- Ok(())
- }
-
- fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
- self.datastore.name() == target_store_name
- }
-}
-
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
@@ -643,7 +508,7 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
async fn pull_single_archive<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -726,7 +591,7 @@ async fn pull_single_archive<'a>(
/// -- if not, pull it from the remote
/// - Download log if not already existing
async fn pull_snapshot<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
@@ -837,7 +702,7 @@ async fn pull_snapshot<'a>(
/// The `reader` is configured to read from the source backup directory, while the
/// `snapshot` is pointing to the local datastore and target namespace.
async fn pull_snapshot_from<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 5f143ef63..323bc1a27 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -1,7 +1,24 @@
//! Sync datastore contents from source to target, either in push or pull direction
+use std::collections::HashMap;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
use std::time::Duration;
+use anyhow::{bail, Error};
+use http::StatusCode;
+use tracing::info;
+
+use proxmox_router::HttpError;
+
+use pbs_api_types::{BackupDir, CryptMode};
+use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_datastore::data_blob::DataBlob;
+use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{DataStore, LocalChunkReader};
+
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
pub(crate) groups: usize,
@@ -49,3 +66,138 @@ impl SyncStats {
}
}
}
+
+#[async_trait::async_trait]
+/// `SyncReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log,
+/// and checking whether chunk sync should be skipped.
+pub(crate) trait SyncSourceReader: Send + Sync {
+ /// Returns a chunk reader with the specified encryption mode.
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+ /// Asynchronously loads a file from the source into a local file.
+ /// `filename` is the name of the file to load from the source.
+ /// `into` is the path of the local file to load the source file into.
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
+
+ /// Tries to download the client log from the source and save it into a local file.
+ async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+pub(crate) struct RemoteSourceReader {
+ pub(crate) backup_reader: Arc<BackupReader>,
+ pub(crate) dir: BackupDir,
+}
+
+pub(crate) struct LocalSourceReader {
+ pub(crate) _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+ pub(crate) path: PathBuf,
+ pub(crate) datastore: Arc<DataStore>,
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for RemoteSourceReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(RemoteChunkReader::new(
+ self.backup_reader.clone(),
+ None,
+ crypt_mode,
+ HashMap::new(),
+ ))
+ }
+
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+ if let Err(err) = download_result {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ info!(
+ "skipping snapshot {} - vanished since start of sync",
+ &self.dir
+ );
+ return Ok(None);
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ };
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
+ let mut tmp_path = to_path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = self
+ .backup_reader
+ .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+ .await
+ {
+ if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+ bail!("Atomic rename file {to_path:?} failed - {err}");
+ }
+ info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
+ }
+
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+ false
+ }
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for LocalSourceReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(LocalChunkReader::new(
+ self.datastore.clone(),
+ None,
+ crypt_mode,
+ ))
+ }
+
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let mut from_path = self.path.clone();
+ from_path.push(filename);
+ tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+ self.datastore.name() == target_store_name
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module Christian Ebner
@ 2024-07-16 9:53 ` Gabriel Goller
2024-07-23 7:32 ` Christian Ebner
0 siblings, 1 reply; 36+ messages in thread
From: Gabriel Goller @ 2024-07-16 9:53 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
On 15.07.2024 12:15, Christian Ebner wrote:
> pub(crate) struct PullTarget {
> store: Arc<DataStore>,
> ns: BackupNamespace,
>@@ -97,7 +85,7 @@ trait PullSource: Send + Sync {
> &self,
> ns: &BackupNamespace,
> dir: &BackupDir,
>- ) -> Result<Arc<dyn PullReader>, Error>;
>+ ) -> Result<Arc<dyn SyncSourceReader>, Error>;
> }
>
> #[async_trait::async_trait]
AFAIK we are currently on rust 1.79.0, which has async fn's in traits
already stabilized. This means we can remove the async_trait create.
https://releases.rs/docs/1.75.0/
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module
2024-07-16 9:53 ` Gabriel Goller
@ 2024-07-23 7:32 ` Christian Ebner
2024-07-30 8:38 ` Gabriel Goller
0 siblings, 1 reply; 36+ messages in thread
From: Christian Ebner @ 2024-07-23 7:32 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Gabriel Goller
On 7/16/24 11:53, Gabriel Goller wrote:
> On 15.07.2024 12:15, Christian Ebner wrote:
>> pub(crate) struct PullTarget {
>> store: Arc<DataStore>,
>> ns: BackupNamespace,
>> @@ -97,7 +85,7 @@ trait PullSource: Send + Sync {
>> &self,
>> ns: &BackupNamespace,
>> dir: &BackupDir,
>> - ) -> Result<Arc<dyn PullReader>, Error>;
>> + ) -> Result<Arc<dyn SyncSourceReader>, Error>;
>> }
>>
>> #[async_trait::async_trait]
>
> AFAIK we are currently on rust 1.79.0, which has async fn's in traits
> already stabilized. This means we can remove the async_trait create.
> https://releases.rs/docs/1.75.0/
Thanks for pointing this out!
However, dropping the `async_trait` macro does not work out, as these
traits are used as base for trait objects, which therefore require the
trait to be object safe, see
https://doc.rust-lang.org/reference/items/traits.html#object-safety
So the Rust compiler complains with, e.g.
```
error[E0038]: the trait `SyncSource` cannot be made into an object
```
Therefore I would opt to keep this as is, unless I'm overlooking
something here.
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module
2024-07-23 7:32 ` Christian Ebner
@ 2024-07-30 8:38 ` Gabriel Goller
0 siblings, 0 replies; 36+ messages in thread
From: Gabriel Goller @ 2024-07-30 8:38 UTC (permalink / raw)
To: Christian Ebner; +Cc: Proxmox Backup Server development discussion
On 23.07.2024 09:32, Christian Ebner wrote:
>On 7/16/24 11:53, Gabriel Goller wrote:
>>On 15.07.2024 12:15, Christian Ebner wrote:
>>>pub(crate) struct PullTarget {
>>> store: Arc<DataStore>,
>>> ns: BackupNamespace,
>>>@@ -97,7 +85,7 @@ trait PullSource: Send + Sync {
>>> &self,
>>> ns: &BackupNamespace,
>>> dir: &BackupDir,
>>>- ) -> Result<Arc<dyn PullReader>, Error>;
>>>+ ) -> Result<Arc<dyn SyncSourceReader>, Error>;
>>>}
>>>
>>>#[async_trait::async_trait]
>>
>>AFAIK we are currently on rust 1.79.0, which has async fn's in traits
>>already stabilized. This means we can remove the async_trait create.
>>https://releases.rs/docs/1.75.0/
>
>Thanks for pointing this out!
>
>However, dropping the `async_trait` macro does not work out, as these
>traits are used as base for trait objects, which therefore require the
>trait to be object safe, see
>https://doc.rust-lang.org/reference/items/traits.html#object-safety
>
>So the Rust compiler complains with, e.g.
>
>```
>error[E0038]: the trait `SyncSource` cannot be made into an object
>```
>
>Therefore I would opt to keep this as is, unless I'm overlooking
>something here.
Oops, yeah missed that. This was even mentioned in the initial async fn
blog post:
https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html#dynamic-dispatch.
We could make PullParameters generic and change source to accept a `impl
PullSource`, but I agree, let's keep this as it is for now :).
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source to common sync module
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (6 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 09/24] client: backup writer: bundle upload stats counters Christian Ebner
` (17 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Rename the `PullSource` trait to `SyncSource` and move the trait and
types implementing it to the common sync module, making them
reusable for both sync directions, push and pull.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 281 ++-------------------------------------------
src/server/sync.rs | 276 +++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 280 insertions(+), 277 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5efe2d5f7..c6932dcc5 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -7,18 +7,14 @@ use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
-use http::StatusCode;
use proxmox_human_byte::HumanByte;
-use proxmox_router::HttpError;
-use serde_json::json;
-use tracing::{info, warn};
+use tracing::info;
use pbs_api_types::{
- print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter,
- GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
- PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter, Operation,
+ RateLimitConfig, Remote, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
};
-use pbs_client::{BackupReader, BackupRepository, HttpClient};
+use pbs_client::BackupRepository;
use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -28,13 +24,13 @@ use pbs_datastore::manifest::{
ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
use super::sync::{
- LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats,
+ LocalSource, RemoteSource, RemovedVanishedStats, SyncSource, SyncSourceReader, SyncStats,
};
-use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
+use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
pub(crate) struct PullTarget {
@@ -42,269 +38,10 @@ pub(crate) struct PullTarget {
ns: BackupNamespace,
}
-pub(crate) struct RemoteSource {
- repo: BackupRepository,
- ns: BackupNamespace,
- client: HttpClient,
-}
-
-pub(crate) struct LocalSource {
- store: Arc<DataStore>,
- ns: BackupNamespace,
-}
-
-#[async_trait::async_trait]
-/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
-/// The trait includes methods for listing namespaces, groups, and backup directories,
-/// as well as retrieving a reader for reading data from the source
-trait PullSource: Send + Sync {
- /// Lists namespaces from the source.
- async fn list_namespaces(
- &self,
- max_depth: &mut Option<usize>,
- ) -> Result<Vec<BackupNamespace>, Error>;
-
- /// Lists groups within a specific namespace from the source.
- async fn list_groups(
- &self,
- namespace: &BackupNamespace,
- owner: &Authid,
- ) -> Result<Vec<BackupGroup>, Error>;
-
- /// Lists backup directories for a specific group within a specific namespace from the source.
- async fn list_backup_dirs(
- &self,
- namespace: &BackupNamespace,
- group: &BackupGroup,
- ) -> Result<Vec<BackupDir>, Error>;
- fn get_ns(&self) -> BackupNamespace;
- fn get_store(&self) -> &str;
-
- /// Returns a reader for reading data from a specific backup directory.
- async fn reader(
- &self,
- ns: &BackupNamespace,
- dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error>;
-}
-
-#[async_trait::async_trait]
-impl PullSource for RemoteSource {
- async fn list_namespaces(
- &self,
- max_depth: &mut Option<usize>,
- ) -> Result<Vec<BackupNamespace>, Error> {
- if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
- return Ok(vec![self.ns.clone()]);
- }
-
- let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
- let mut data = json!({});
- if let Some(max_depth) = max_depth {
- data["max-depth"] = json!(max_depth);
- }
-
- if !self.ns.is_root() {
- data["parent"] = json!(self.ns);
- }
- self.client.login().await?;
-
- let mut result = match self.client.get(&path, Some(data)).await {
- Ok(res) => res,
- Err(err) => match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match code {
- &StatusCode::NOT_FOUND => {
- if self.ns.is_root() && max_depth.is_none() {
- warn!("Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
- warn!("Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
- max_depth.replace(0);
- } else {
- bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
- }
-
- return Ok(vec![self.ns.clone()]);
- }
- _ => {
- bail!("Querying namespaces failed - HTTP error {code} - {message}");
- }
- },
- None => {
- bail!("Querying namespaces failed - {err}");
- }
- },
- };
-
- let list: Vec<BackupNamespace> =
- serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
- .into_iter()
- .map(|list_item| list_item.ns)
- .collect();
-
- Ok(list)
- }
-
- async fn list_groups(
- &self,
- namespace: &BackupNamespace,
- _owner: &Authid,
- ) -> Result<Vec<BackupGroup>, Error> {
- let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
-
- let args = if !namespace.is_root() {
- Some(json!({ "ns": namespace.clone() }))
- } else {
- None
- };
-
- self.client.login().await?;
- let mut result =
- self.client.get(&path, args).await.map_err(|err| {
- format_err!("Failed to retrieve backup groups from remote - {}", err)
- })?;
-
- Ok(
- serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
- .map_err(Error::from)?
- .into_iter()
- .map(|item| item.backup)
- .collect::<Vec<BackupGroup>>(),
- )
- }
-
- async fn list_backup_dirs(
- &self,
- namespace: &BackupNamespace,
- group: &BackupGroup,
- ) -> Result<Vec<BackupDir>, Error> {
- let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
-
- let mut args = json!({
- "backup-type": group.ty,
- "backup-id": group.id,
- });
-
- if !namespace.is_root() {
- args["ns"] = serde_json::to_value(namespace)?;
- }
-
- self.client.login().await?;
-
- let mut result = self.client.get(&path, Some(args)).await?;
- let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
- Ok(snapshot_list
- .into_iter()
- .filter_map(|item: SnapshotListItem| {
- let snapshot = item.backup;
- // in-progress backups can't be synced
- if item.size.is_none() {
- info!("skipping snapshot {snapshot} - in-progress backup");
- return None;
- }
-
- Some(snapshot)
- })
- .collect::<Vec<BackupDir>>())
- }
-
- fn get_ns(&self) -> BackupNamespace {
- self.ns.clone()
- }
-
- fn get_store(&self) -> &str {
- self.repo.store()
- }
-
- async fn reader(
- &self,
- ns: &BackupNamespace,
- dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error> {
- let backup_reader =
- BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
- Ok(Arc::new(RemoteSourceReader {
- backup_reader,
- dir: dir.clone(),
- }))
- }
-}
-
-#[async_trait::async_trait]
-impl PullSource for LocalSource {
- async fn list_namespaces(
- &self,
- max_depth: &mut Option<usize>,
- ) -> Result<Vec<BackupNamespace>, Error> {
- ListNamespacesRecursive::new_max_depth(
- self.store.clone(),
- self.ns.clone(),
- max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
- )?
- .collect()
- }
-
- async fn list_groups(
- &self,
- namespace: &BackupNamespace,
- owner: &Authid,
- ) -> Result<Vec<BackupGroup>, Error> {
- Ok(ListAccessibleBackupGroups::new_with_privs(
- &self.store,
- namespace.clone(),
- 0,
- Some(PRIV_DATASTORE_READ),
- Some(PRIV_DATASTORE_BACKUP),
- Some(owner),
- )?
- .filter_map(Result::ok)
- .map(|backup_group| backup_group.group().clone())
- .collect::<Vec<pbs_api_types::BackupGroup>>())
- }
-
- async fn list_backup_dirs(
- &self,
- namespace: &BackupNamespace,
- group: &BackupGroup,
- ) -> Result<Vec<BackupDir>, Error> {
- Ok(self
- .store
- .backup_group(namespace.clone(), group.clone())
- .iter_snapshots()?
- .filter_map(Result::ok)
- .map(|snapshot| snapshot.dir().to_owned())
- .collect::<Vec<BackupDir>>())
- }
-
- fn get_ns(&self) -> BackupNamespace {
- self.ns.clone()
- }
-
- fn get_store(&self) -> &str {
- self.store.name()
- }
-
- async fn reader(
- &self,
- ns: &BackupNamespace,
- dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error> {
- let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
- let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
- &dir.full_path(),
- "snapshot",
- "locked by another operation",
- )?;
- Ok(Arc::new(LocalSourceReader {
- _dir_lock: Arc::new(Mutex::new(dir_lock)),
- path: dir.full_path(),
- datastore: dir.datastore().clone(),
- }))
- }
-}
-
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
- source: Arc<dyn PullSource>,
+ source: Arc<dyn SyncSource>,
/// Where data should be pulled into
target: PullTarget,
/// Owner of synced groups (needs to match local owner of pre-existing groups)
@@ -341,7 +78,7 @@ impl PullParameters {
};
let remove_vanished = remove_vanished.unwrap_or(false);
- let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+ let source: Arc<dyn SyncSource> = if let Some(remote) = remote {
let (remote_config, _digest) = pbs_config::remote::config()?;
let remote: Remote = remote_config.lookup("remote", remote)?;
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 323bc1a27..f8a1e133d 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -6,18 +6,24 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use anyhow::{bail, Error};
+use anyhow::{bail, format_err, Error};
use http::StatusCode;
-use tracing::info;
+use serde_json::json;
+use tracing::{info, warn};
use proxmox_router::HttpError;
-use pbs_api_types::{BackupDir, CryptMode};
-use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_api_types::{
+ Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem,
+ MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{DataStore, LocalChunkReader};
+use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader};
+
+use crate::backup::ListAccessibleBackupGroups;
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
@@ -201,3 +207,263 @@ impl SyncSourceReader for LocalSourceReader {
self.datastore.name() == target_store_name
}
}
+
+#[async_trait::async_trait]
+/// `SyncSource` is a trait that provides an interface for synchronizing data/information from a
+/// source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source.
+pub(crate) trait SyncSource: Send + Sync {
+ /// Lists namespaces from the source.
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ ) -> Result<Vec<BackupNamespace>, Error>;
+
+ /// Lists groups within a specific namespace from the source.
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error>;
+
+ /// Lists backup directories for a specific group within a specific namespace from the source.
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ ) -> Result<Vec<BackupDir>, Error>;
+ fn get_ns(&self) -> BackupNamespace;
+ fn get_store(&self) -> &str;
+
+ /// Returns a reader for reading data from a specific backup directory.
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn SyncSourceReader>, Error>;
+}
+
+pub(crate) struct RemoteSource {
+ pub(crate) repo: BackupRepository,
+ pub(crate) ns: BackupNamespace,
+ pub(crate) client: HttpClient,
+}
+
+pub(crate) struct LocalSource {
+ pub(crate) store: Arc<DataStore>,
+ pub(crate) ns: BackupNamespace,
+}
+
+#[async_trait::async_trait]
+impl SyncSource for RemoteSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+ return Ok(vec![self.ns.clone()]);
+ }
+
+ let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+ let mut data = json!({});
+ if let Some(max_depth) = max_depth {
+ data["max-depth"] = json!(max_depth);
+ }
+
+ if !self.ns.is_root() {
+ data["parent"] = json!(self.ns);
+ }
+ self.client.login().await?;
+
+ let mut result = match self.client.get(&path, Some(data)).await {
+ Ok(res) => res,
+ Err(err) => match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match code {
+ &StatusCode::NOT_FOUND => {
+ if self.ns.is_root() && max_depth.is_none() {
+ warn!("Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+ warn!("Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+ max_depth.replace(0);
+ } else {
+ bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+ }
+
+ return Ok(vec![self.ns.clone()]);
+ }
+ _ => {
+ bail!("Querying namespaces failed - HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ bail!("Querying namespaces failed - {err}");
+ }
+ },
+ };
+
+ let list: Vec<BackupNamespace> =
+ serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+ .into_iter()
+ .map(|list_item| list_item.ns)
+ .collect();
+
+ Ok(list)
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ _owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+ let args = if !namespace.is_root() {
+ Some(json!({ "ns": namespace.clone() }))
+ } else {
+ None
+ };
+
+ self.client.login().await?;
+ let mut result =
+ self.client.get(&path, args).await.map_err(|err| {
+ format_err!("Failed to retrieve backup groups from remote - {}", err)
+ })?;
+
+ Ok(
+ serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+ .map_err(Error::from)?
+ .into_iter()
+ .map(|item| item.backup)
+ .collect::<Vec<BackupGroup>>(),
+ )
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ ) -> Result<Vec<BackupDir>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+ let mut args = json!({
+ "backup-type": group.ty,
+ "backup-id": group.id,
+ });
+
+ if !namespace.is_root() {
+ args["ns"] = serde_json::to_value(namespace)?;
+ }
+
+ self.client.login().await?;
+
+ let mut result = self.client.get(&path, Some(args)).await?;
+ let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+ Ok(snapshot_list
+ .into_iter()
+ .filter_map(|item: SnapshotListItem| {
+ let snapshot = item.backup;
+ // in-progress backups can't be synced
+ if item.size.is_none() {
+ info!("skipping snapshot {snapshot} - in-progress backup");
+ return None;
+ }
+
+ Some(snapshot)
+ })
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn get_store(&self) -> &str {
+ self.repo.store()
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+ let backup_reader =
+ BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+ Ok(Arc::new(RemoteSourceReader {
+ backup_reader,
+ dir: dir.clone(),
+ }))
+ }
+}
+
+#[async_trait::async_trait]
+impl SyncSource for LocalSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ ListNamespacesRecursive::new_max_depth(
+ self.store.clone(),
+ self.ns.clone(),
+ max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+ )?
+ .collect()
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ Ok(ListAccessibleBackupGroups::new_with_privs(
+ &self.store,
+ namespace.clone(),
+ 0,
+ Some(PRIV_DATASTORE_READ),
+ Some(PRIV_DATASTORE_BACKUP),
+ Some(owner),
+ )?
+ .filter_map(Result::ok)
+ .map(|backup_group| backup_group.group().clone())
+ .collect::<Vec<pbs_api_types::BackupGroup>>())
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ ) -> Result<Vec<BackupDir>, Error> {
+ Ok(self
+ .store
+ .backup_group(namespace.clone(), group.clone())
+ .iter_snapshots()?
+ .filter_map(Result::ok)
+ .map(|snapshot| snapshot.dir().to_owned())
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn get_store(&self) -> &str {
+ self.store.name()
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+ let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
+ let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
+ &dir.full_path(),
+ "snapshot",
+ "locked by another operation",
+ )?;
+ Ok(Arc::new(LocalSourceReader {
+ _dir_lock: Arc::new(Mutex::new(dir_lock)),
+ path: dir.full_path(),
+ datastore: dir.datastore().clone(),
+ }))
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 09/24] client: backup writer: bundle upload stats counters
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (7 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source " Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload Christian Ebner
` (16 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
In preparation for push support in sync jobs.
Introduce `UploadStatsCounters` struct to hold the Arc clones of the
chunk upload statistics counters. By bundling them into the struct,
they can be passed as single function parameter when factoring out
the common stream future implementation in the subsequent
implementation of the chunk upload for push support in sync jobs.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/backup_writer.rs | 52 ++++++++++++++++++---------------
1 file changed, 28 insertions(+), 24 deletions(-)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 813c8d602..a67b471a7 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -66,6 +66,16 @@ struct UploadStats {
csum: [u8; 32],
}
+struct UploadStatsCounters {
+ injected_chunk_count: Arc<AtomicUsize>,
+ known_chunk_count: Arc<AtomicUsize>,
+ total_chunks: Arc<AtomicUsize>,
+ compressed_stream_len: Arc<AtomicU64>,
+ injected_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ stream_len: Arc<AtomicUsize>,
+}
+
type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
@@ -647,20 +657,23 @@ impl BackupWriter {
injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
) -> impl Future<Output = Result<UploadStats, Error>> {
let total_chunks = Arc::new(AtomicUsize::new(0));
- let total_chunks2 = total_chunks.clone();
let known_chunk_count = Arc::new(AtomicUsize::new(0));
- let known_chunk_count2 = known_chunk_count.clone();
let injected_chunk_count = Arc::new(AtomicUsize::new(0));
- let injected_chunk_count2 = injected_chunk_count.clone();
let stream_len = Arc::new(AtomicUsize::new(0));
- let stream_len2 = stream_len.clone();
let compressed_stream_len = Arc::new(AtomicU64::new(0));
- let compressed_stream_len2 = compressed_stream_len.clone();
let reused_len = Arc::new(AtomicUsize::new(0));
- let reused_len2 = reused_len.clone();
let injected_len = Arc::new(AtomicUsize::new(0));
- let injected_len2 = injected_len.clone();
+
+ let counters = UploadStatsCounters {
+ injected_chunk_count: injected_chunk_count.clone(),
+ known_chunk_count: known_chunk_count.clone(),
+ total_chunks: total_chunks.clone(),
+ compressed_stream_len: compressed_stream_len.clone(),
+ injected_len: injected_len.clone(),
+ reused_len: reused_len.clone(),
+ stream_len: stream_len.clone(),
+ };
let append_chunk_path = format!("{}_index", prefix);
let upload_chunk_path = format!("{}_chunk", prefix);
@@ -803,27 +816,18 @@ impl BackupWriter {
})
.then(move |result| async move { upload_result.await?.and(result) }.boxed())
.and_then(move |_| {
- let duration = start_time.elapsed();
- let chunk_count = total_chunks2.load(Ordering::SeqCst);
- let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
- let chunk_injected = injected_chunk_count2.load(Ordering::SeqCst);
- let size = stream_len2.load(Ordering::SeqCst);
- let size_reused = reused_len2.load(Ordering::SeqCst);
- let size_injected = injected_len2.load(Ordering::SeqCst);
- let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
-
let mut guard = index_csum_2.lock().unwrap();
let csum = guard.take().unwrap().finish();
futures::future::ok(UploadStats {
- chunk_count,
- chunk_reused,
- chunk_injected,
- size,
- size_reused,
- size_injected,
- size_compressed,
- duration,
+ chunk_count: counters.total_chunks.load(Ordering::SeqCst),
+ chunk_reused: counters.known_chunk_count.load(Ordering::SeqCst),
+ chunk_injected: counters.injected_chunk_count.load(Ordering::SeqCst),
+ size: counters.stream_len.load(Ordering::SeqCst),
+ size_reused: counters.reused_len.load(Ordering::SeqCst),
+ size_injected: counters.injected_len.load(Ordering::SeqCst),
+ size_compressed: counters.compressed_stream_len.load(Ordering::SeqCst) as usize,
+ duration: start_time.elapsed(),
csum,
})
})
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (8 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 09/24] client: backup writer: bundle upload stats counters Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 11/24] client: backup writer: add chunk count and duration stats Christian Ebner
` (15 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
In preparation for implementing push support for sync jobs.
Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/backup_writer.rs | 47 ++++++++++++++++++++-------------
1 file changed, 29 insertions(+), 18 deletions(-)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index a67b471a7..6daad9fde 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
@@ -675,19 +676,12 @@ impl BackupWriter {
stream_len: stream_len.clone(),
};
- let append_chunk_path = format!("{}_index", prefix);
- let upload_chunk_path = format!("{}_chunk", prefix);
let is_fixed_chunk_size = prefix == "fixed";
- let (upload_queue, upload_result) =
- Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
-
- let start_time = std::time::Instant::now();
-
let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
let index_csum_2 = index_csum.clone();
- stream
+ let stream = stream
.inject_reused_chunks(injections, stream_len.clone())
.and_then(move |chunk_info| match chunk_info {
InjectedChunksInfo::Known(chunks) => {
@@ -758,7 +752,28 @@ impl BackupWriter {
}
}
})
- .merge_known_chunks()
+ .merge_known_chunks();
+
+ Self::upload_merged_chunk_stream(h2, wid, prefix, stream, index_csum_2, counters)
+ }
+
+ fn upload_merged_chunk_stream(
+ h2: H2Client,
+ wid: u64,
+ prefix: &str,
+ stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+ index_csum: Arc<Mutex<Option<Sha256>>>,
+ counters: UploadStatsCounters,
+ ) -> impl Future<Output = Result<UploadStats, Error>> {
+ let append_chunk_path = format!("{prefix}_index");
+ let upload_chunk_path = format!("{prefix}_chunk");
+
+ let (upload_queue, upload_result) =
+ Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
+
+ let start_time = std::time::Instant::now();
+
+ stream
.try_for_each(move |merged_chunk_info| {
let upload_queue = upload_queue.clone();
@@ -768,10 +783,8 @@ impl BackupWriter {
let digest_str = hex::encode(digest);
log::trace!(
- "upload new chunk {} ({} bytes, offset {})",
- digest_str,
- chunk_info.chunk_len,
- offset
+ "upload new chunk {digest_str} ({chunk_len} bytes, offset {offset})",
+ chunk_len = chunk_info.chunk_len,
);
let chunk_data = chunk_info.chunk.into_inner();
@@ -800,9 +813,7 @@ impl BackupWriter {
upload_queue
.send((new_info, Some(response)))
.await
- .map_err(|err| {
- format_err!("failed to send to upload queue: {}", err)
- })
+ .map_err(|err| format_err!("failed to send to upload queue: {err}"))
},
))
} else {
@@ -810,13 +821,13 @@ impl BackupWriter {
upload_queue
.send((merged_chunk_info, None))
.await
- .map_err(|err| format_err!("failed to send to upload queue: {}", err))
+ .map_err(|err| format_err!("failed to send to upload queue: {err}"))
})
}
})
.then(move |result| async move { upload_result.await?.and(result) }.boxed())
.and_then(move |_| {
- let mut guard = index_csum_2.lock().unwrap();
+ let mut guard = index_csum.lock().unwrap();
let csum = guard.take().unwrap().finish();
futures::future::ok(UploadStats {
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 11/24] client: backup writer: add chunk count and duration stats
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (9 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 12/24] client: backup writer: allow push uploading index and chunks Christian Ebner
` (14 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
In addition to size and checksum, return also the chunk count and
duration to the upload stats in order to show this information in
the task log of sync jobs in push direction.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/backup_writer.rs | 23 ++++++++++++++++++++---
1 file changed, 20 insertions(+), 3 deletions(-)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 6daad9fde..e36c43569 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -3,6 +3,7 @@ use std::future::Future;
use std::os::unix::fs::OpenOptionsExt;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
use anyhow::{bail, format_err, Error};
use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
@@ -44,6 +45,8 @@ impl Drop for BackupWriter {
pub struct BackupStats {
pub size: u64,
pub csum: [u8; 32],
+ pub duration: Duration,
+ pub chunk_count: u64,
}
/// Options for uploading blobs/streams to the server
@@ -63,7 +66,7 @@ struct UploadStats {
size_reused: usize,
size_injected: usize,
size_compressed: usize,
- duration: std::time::Duration,
+ duration: Duration,
csum: [u8; 32],
}
@@ -200,6 +203,7 @@ impl BackupWriter {
mut reader: R,
file_name: &str,
) -> Result<BackupStats, Error> {
+ let start_time = Instant::now();
let mut raw_data = Vec::new();
// fixme: avoid loading into memory
reader.read_to_end(&mut raw_data)?;
@@ -217,7 +221,12 @@ impl BackupWriter {
raw_data,
)
.await?;
- Ok(BackupStats { size, csum })
+ Ok(BackupStats {
+ size,
+ csum,
+ duration: start_time.elapsed(),
+ chunk_count: 0,
+ })
}
pub async fn upload_blob_from_data(
@@ -226,6 +235,7 @@ impl BackupWriter {
file_name: &str,
options: UploadOptions,
) -> Result<BackupStats, Error> {
+ let start_time = Instant::now();
let blob = match (options.encrypt, &self.crypt_config) {
(false, _) => DataBlob::encode(&data, None, options.compress)?,
(true, None) => bail!("requested encryption without a crypt config"),
@@ -249,7 +259,12 @@ impl BackupWriter {
raw_data,
)
.await?;
- Ok(BackupStats { size, csum })
+ Ok(BackupStats {
+ size,
+ csum,
+ duration: start_time.elapsed(),
+ chunk_count: 0,
+ })
}
pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
@@ -428,6 +443,8 @@ impl BackupWriter {
Ok(BackupStats {
size: upload_stats.size as u64,
csum: upload_stats.csum,
+ duration: upload_stats.duration,
+ chunk_count: upload_stats.chunk_count as u64,
})
}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 12/24] client: backup writer: allow push uploading index and chunks
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (10 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 11/24] client: backup writer: add chunk count and duration stats Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 13/24] api: backup: add ignore-previous flag to backup endpoint Christian Ebner
` (13 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Add a method `upload_index_chunk_info` to be used for uploading an
existing index and the corresponding chunk stream.
Instead of taking an input stream of raw bytes as the
`upload_stream`, this takes a stream of `ChunkInfo` object provided
by the local chunk reader of the sync jobs source.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/backup_writer.rs | 106 ++++++++++++++++++++++++++++++++
1 file changed, 106 insertions(+)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index e36c43569..e5d217608 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -289,6 +289,112 @@ impl BackupWriter {
.await
}
+ /// Upload chunks and index
+ pub async fn upload_index_chunk_info(
+ &self,
+ archive_name: &str,
+ stream: impl Stream<Item = Result<ChunkInfo, Error>>,
+ options: UploadOptions,
+ known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ ) -> Result<BackupStats, Error> {
+ let mut param = json!({ "archive-name": archive_name });
+ let prefix = if let Some(size) = options.fixed_size {
+ param["size"] = size.into();
+ "fixed"
+ } else {
+ "dynamic"
+ };
+
+ if options.encrypt && self.crypt_config.is_none() {
+ bail!("requested encryption without a crypt config");
+ }
+
+ let wid = self
+ .h2
+ .post(&format!("{prefix}_index"), Some(param))
+ .await?
+ .as_u64()
+ .unwrap();
+
+ let total_chunks = Arc::new(AtomicUsize::new(0));
+ let known_chunk_count = Arc::new(AtomicUsize::new(0));
+
+ let stream_len = Arc::new(AtomicUsize::new(0));
+ let compressed_stream_len = Arc::new(AtomicU64::new(0));
+ let reused_len = Arc::new(AtomicUsize::new(0));
+
+ let counters = UploadStatsCounters {
+ injected_chunk_count: Arc::new(AtomicUsize::new(0)),
+ known_chunk_count: known_chunk_count.clone(),
+ total_chunks: total_chunks.clone(),
+ compressed_stream_len: compressed_stream_len.clone(),
+ injected_len: Arc::new(AtomicUsize::new(0)),
+ reused_len: reused_len.clone(),
+ stream_len: stream_len.clone(),
+ };
+
+ let is_fixed_chunk_size = prefix == "fixed";
+
+ let index_csum = Arc::new(Mutex::new(Some(Sha256::new())));
+ let index_csum_2 = index_csum.clone();
+
+ let stream = stream
+ .and_then(move |chunk_info| {
+ total_chunks.fetch_add(1, Ordering::SeqCst);
+ reused_len.fetch_add(chunk_info.chunk_len as usize, Ordering::SeqCst);
+ let offset = stream_len.fetch_add(chunk_info.chunk_len as usize, Ordering::SeqCst);
+
+ let end_offset = offset as u64 + chunk_info.chunk_len;
+ let mut guard = index_csum.lock().unwrap();
+ let csum = guard.as_mut().unwrap();
+ if !is_fixed_chunk_size {
+ csum.update(&end_offset.to_le_bytes());
+ }
+ csum.update(&chunk_info.digest);
+
+ let mut known_chunks = known_chunks.lock().unwrap();
+ if known_chunks.contains(&chunk_info.digest) {
+ known_chunk_count.fetch_add(1, Ordering::SeqCst);
+ future::ok(MergedChunkInfo::Known(vec![(
+ chunk_info.offset,
+ chunk_info.digest,
+ )]))
+ } else {
+ known_chunks.insert(chunk_info.digest);
+ future::ok(MergedChunkInfo::New(chunk_info))
+ }
+ })
+ .merge_known_chunks();
+
+ let upload_stats = Self::upload_merged_chunk_stream(
+ self.h2.clone(),
+ wid,
+ prefix,
+ stream,
+ index_csum_2,
+ counters,
+ )
+ .await?;
+
+ let param = json!({
+ "wid": wid ,
+ "chunk-count": upload_stats.chunk_count,
+ "size": upload_stats.size,
+ "csum": hex::encode(upload_stats.csum),
+ });
+ let _value = self
+ .h2
+ .post(&format!("{prefix}_close"), Some(param))
+ .await?;
+
+ Ok(BackupStats {
+ size: upload_stats.size as u64,
+ csum: upload_stats.csum,
+ duration: upload_stats.duration,
+ chunk_count: upload_stats.chunk_count as u64,
+ })
+ }
+
pub async fn upload_stream(
&self,
archive_name: &str,
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 13/24] api: backup: add ignore-previous flag to backup endpoint
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (11 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 12/24] client: backup writer: allow push uploading index and chunks Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 14/24] server: sync: move skip info/reason to common sync module Christian Ebner
` (12 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
By setting the optional `ignore-previous` endpoint, the previous
backup in the group is disregarded. This will be used when pushing
backups to a target, where monotonic increase of the backup time
cannot be guaranteed.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
examples/upload-speed.rs | 1 +
pbs-client/src/backup_writer.rs | 4 +++-
proxmox-backup-client/src/benchmark.rs | 1 +
proxmox-backup-client/src/main.rs | 1 +
src/api2/backup/mod.rs | 6 +++++-
5 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/examples/upload-speed.rs b/examples/upload-speed.rs
index e4b570ec5..8a6594a47 100644
--- a/examples/upload-speed.rs
+++ b/examples/upload-speed.rs
@@ -25,6 +25,7 @@ async fn upload_speed() -> Result<f64, Error> {
&(BackupType::Host, "speedtest".to_string(), backup_time).into(),
false,
true,
+ false,
)
.await?;
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index e5d217608..8a148cb07 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -102,6 +102,7 @@ impl BackupWriter {
backup: &BackupDir,
debug: bool,
benchmark: bool,
+ ignore_previous: bool,
) -> Result<Arc<BackupWriter>, Error> {
let mut param = json!({
"backup-type": backup.ty(),
@@ -109,7 +110,8 @@ impl BackupWriter {
"backup-time": backup.time,
"store": datastore,
"debug": debug,
- "benchmark": benchmark
+ "benchmark": benchmark,
+ "ignore-previous": ignore_previous,
});
if !ns.is_root() {
diff --git a/proxmox-backup-client/src/benchmark.rs b/proxmox-backup-client/src/benchmark.rs
index 1262fb46d..f4b084d8d 100644
--- a/proxmox-backup-client/src/benchmark.rs
+++ b/proxmox-backup-client/src/benchmark.rs
@@ -236,6 +236,7 @@ async fn test_upload_speed(
&(BackupType::Host, "benchmark".to_string(), backup_time).into(),
false,
true,
+ false,
)
.await?;
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 6a7d09047..e9d00a223 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -963,6 +963,7 @@ async fn create_backup(
&snapshot,
true,
false,
+ false,
)
.await?;
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index ea0d0292e..c7d402ed0 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -52,6 +52,7 @@ pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
("backup-time", false, &BACKUP_TIME_SCHEMA),
("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
("benchmark", true, &BooleanSchema::new("Job is a benchmark (do not keep data).").schema()),
+ ("ignore-previous", true, &BooleanSchema::new("Do not lookup previous snapshot.").schema()),
]),
)
).access(
@@ -78,6 +79,7 @@ fn upgrade_to_backup_protocol(
async move {
let debug = param["debug"].as_bool().unwrap_or(false);
let benchmark = param["benchmark"].as_bool().unwrap_or(false);
+ let ignore_previous = param["ignore-previous"].as_bool().unwrap_or(false);
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -155,7 +157,7 @@ fn upgrade_to_backup_protocol(
bail!("backup owner check failed ({} != {})", auth_id, owner);
}
- let last_backup = {
+ let last_backup = if !ignore_previous {
let info = backup_group.last_backup(true).unwrap_or(None);
if let Some(info) = info {
let (manifest, _) = info.backup_dir.load_manifest()?;
@@ -173,6 +175,8 @@ fn upgrade_to_backup_protocol(
} else {
None
}
+ } else {
+ None
};
let backup_dir = backup_group.backup_dir(backup_dir_arg.time)?;
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 14/24] server: sync: move skip info/reason to common sync module
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (12 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 13/24] api: backup: add ignore-previous flag to backup endpoint Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 15/24] server: sync: make skip reason message more genenric Christian Ebner
` (11 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Make `SkipReason` and `SkipInfo` accessible for sync operations of
both direction variants, push and pull.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 82 ++--------------------------------------------
src/server/sync.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 81 insertions(+), 80 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index c6932dcc5..d18c6d643 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -28,7 +28,8 @@ use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
use super::sync::{
- LocalSource, RemoteSource, RemovedVanishedStats, SyncSource, SyncSourceReader, SyncStats,
+ LocalSource, RemoteSource, RemovedVanishedStats, SkipInfo, SkipReason, SyncSource,
+ SyncSourceReader, SyncStats,
};
use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
@@ -474,85 +475,6 @@ async fn pull_snapshot_from<'a>(
Ok(sync_stats)
}
-#[derive(PartialEq, Eq)]
-enum SkipReason {
- AlreadySynced,
- TransferLast,
-}
-
-impl std::fmt::Display for SkipReason {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "{}",
- match self {
- SkipReason::AlreadySynced => "older than the newest local snapshot",
- SkipReason::TransferLast => "due to transfer-last",
- }
- )
- }
-}
-
-struct SkipInfo {
- oldest: i64,
- newest: i64,
- count: u64,
- skip_reason: SkipReason,
-}
-
-impl SkipInfo {
- fn new(skip_reason: SkipReason) -> Self {
- SkipInfo {
- oldest: i64::MAX,
- newest: i64::MIN,
- count: 0,
- skip_reason,
- }
- }
-
- fn reset(&mut self) {
- self.count = 0;
- self.oldest = i64::MAX;
- self.newest = i64::MIN;
- }
-
- fn update(&mut self, backup_time: i64) {
- self.count += 1;
-
- if backup_time < self.oldest {
- self.oldest = backup_time;
- }
-
- if backup_time > self.newest {
- self.newest = backup_time;
- }
- }
-
- fn affected(&self) -> Result<String, Error> {
- match self.count {
- 0 => Ok(String::new()),
- 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
- _ => Ok(format!(
- "{} .. {}",
- proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
- proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
- )),
- }
- }
-}
-
-impl std::fmt::Display for SkipInfo {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "skipped: {} snapshot(s) ({}) - {}",
- self.count,
- self.affected().map_err(|_| std::fmt::Error)?,
- self.skip_reason,
- )
- }
-}
-
/// Pulls a group according to `params`.
///
/// Pulling a group consists of the following steps:
diff --git a/src/server/sync.rs b/src/server/sync.rs
index f8a1e133d..ffc32f45f 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -467,3 +467,82 @@ impl SyncSource for LocalSource {
}))
}
}
+
+#[derive(PartialEq, Eq)]
+pub(crate) enum SkipReason {
+ AlreadySynced,
+ TransferLast,
+}
+
+impl std::fmt::Display for SkipReason {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ match self {
+ SkipReason::AlreadySynced => "older than the newest local snapshot",
+ SkipReason::TransferLast => "due to transfer-last",
+ }
+ )
+ }
+}
+
+pub(crate) struct SkipInfo {
+ oldest: i64,
+ newest: i64,
+ pub(crate) count: u64,
+ skip_reason: SkipReason,
+}
+
+impl SkipInfo {
+ pub(crate) fn new(skip_reason: SkipReason) -> Self {
+ SkipInfo {
+ oldest: i64::MAX,
+ newest: i64::MIN,
+ count: 0,
+ skip_reason,
+ }
+ }
+
+ pub(crate) fn reset(&mut self) {
+ self.count = 0;
+ self.oldest = i64::MAX;
+ self.newest = i64::MIN;
+ }
+
+ pub(crate) fn update(&mut self, backup_time: i64) {
+ self.count += 1;
+
+ if backup_time < self.oldest {
+ self.oldest = backup_time;
+ }
+
+ if backup_time > self.newest {
+ self.newest = backup_time;
+ }
+ }
+
+ fn affected(&self) -> Result<String, Error> {
+ match self.count {
+ 0 => Ok(String::new()),
+ 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
+ _ => Ok(format!(
+ "{} .. {}",
+ proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
+ proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for SkipInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "skipped: {} snapshot(s) ({}) - {}",
+ self.count,
+ self.affected().map_err(|_| std::fmt::Error)?,
+ self.skip_reason,
+ )
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 15/24] server: sync: make skip reason message more genenric
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (13 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 14/24] server: sync: move skip info/reason to common sync module Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 16/24] server: sync: factor out namespace depth check into sync module Christian Ebner
` (10 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
By specifying that the snapshot is being skipped because of the
condition met on the sync target instead of 'local', the same message
can be reused for the sync job in push direction without loosing
sense.
---
src/server/sync.rs | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/src/server/sync.rs b/src/server/sync.rs
index ffc32f45f..ee40d0b9d 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -480,7 +480,8 @@ impl std::fmt::Display for SkipReason {
f,
"{}",
match self {
- SkipReason::AlreadySynced => "older than the newest local snapshot",
+ SkipReason::AlreadySynced =>
+ "older than the newest snapshot present on sync target",
SkipReason::TransferLast => "due to transfer-last",
}
)
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 16/24] server: sync: factor out namespace depth check into sync module
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (14 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 15/24] server: sync: make skip reason message more genenric Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 17/24] api types: define remote permissions and roles for push sync Christian Ebner
` (9 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
By moving and refactoring the check for a sync job exceeding the
global maximum namespace limit, the same function can be reused for
sync jobs in push direction.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 20 +++-----------------
src/server/sync.rs | 21 +++++++++++++++++++++
2 files changed, 24 insertions(+), 17 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index d18c6d643..3117f7d2c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -28,8 +28,8 @@ use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
use super::sync::{
- LocalSource, RemoteSource, RemovedVanishedStats, SkipInfo, SkipReason, SyncSource,
- SyncSourceReader, SyncStats,
+ check_namespace_depth_limit, LocalSource, RemoteSource, RemovedVanishedStats, SkipInfo,
+ SkipReason, SyncSource, SyncSourceReader, SyncStats,
};
use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
@@ -735,21 +735,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
params.source.list_namespaces(&mut params.max_depth).await?
};
- let ns_layers_to_be_pulled = namespaces
- .iter()
- .map(BackupNamespace::depth)
- .max()
- .map_or(0, |v| v - params.source.get_ns().depth());
- let target_depth = params.target.ns.depth();
-
- if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
- bail!(
- "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
- ns_layers_to_be_pulled,
- target_depth,
- MAX_NAMESPACE_DEPTH
- );
- }
+ check_namespace_depth_limit(¶ms.source.get_ns(), ¶ms.target.ns, &namespaces)?;
errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
namespaces.sort_unstable_by_key(|a| a.name_len());
diff --git a/src/server/sync.rs b/src/server/sync.rs
index ee40d0b9d..bd68dda46 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -547,3 +547,24 @@ impl std::fmt::Display for SkipInfo {
)
}
}
+
+/// Check if a sync from source to target of given namespaces exceeds the global namespace depth limit
+pub(crate) fn check_namespace_depth_limit(
+ source_namespace: &BackupNamespace,
+ target_namespace: &BackupNamespace,
+ namespaces: &[BackupNamespace],
+) -> Result<(), Error> {
+ let target_ns_depth = target_namespace.depth();
+ let sync_ns_depth = namespaces
+ .iter()
+ .map(BackupNamespace::depth)
+ .max()
+ .map_or(0, |v| v - source_namespace.depth());
+
+ if sync_ns_depth + target_ns_depth > MAX_NAMESPACE_DEPTH {
+ bail!(
+ "Syncing would exceed max allowed namespace depth. ({sync_ns_depth}+{target_ns_depth} > {MAX_NAMESPACE_DEPTH})",
+ );
+ }
+ Ok(())
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 17/24] api types: define remote permissions and roles for push sync
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (15 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 16/24] server: sync: factor out namespace depth check into sync module Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 18/24] fix #3044: server: implement push support for sync operations Christian Ebner
` (8 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Adding the privileges to allow backup and prune on remote targets, to
be used for sync jobs in push direction.
Also adds a dedicated role collecting the required privileges.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-api-types/src/acl.rs | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git a/pbs-api-types/src/acl.rs b/pbs-api-types/src/acl.rs
index ef6398629..f644029fa 100644
--- a/pbs-api-types/src/acl.rs
+++ b/pbs-api-types/src/acl.rs
@@ -58,6 +58,12 @@ constnamedbitmap! {
PRIV_REMOTE_MODIFY("Remote.Modify");
/// Remote.Read allows reading data from a configured `Remote`
PRIV_REMOTE_READ("Remote.Read");
+ /// Remote.Backup allows Remote.Read and creating new snapshots on a configured `Remote`,
+ /// but also requires backup ownership
+ PRIV_REMOTE_BACKUP("Remote.Backup");
+ /// Remote.Prune allows deleting snapshots on a configured `Remote`,
+ /// but also requires backup ownership
+ PRIV_REMOTE_PRUNE("Remote.Prune");
/// Sys.Console allows access to the system's console
PRIV_SYS_CONSOLE("Sys.Console");
@@ -151,6 +157,7 @@ pub const ROLE_REMOTE_AUDIT: u64 = 0
pub const ROLE_REMOTE_ADMIN: u64 = 0
| PRIV_REMOTE_AUDIT
| PRIV_REMOTE_MODIFY
+ | PRIV_REMOTE_BACKUP
| PRIV_REMOTE_READ;
#[rustfmt::skip]
@@ -160,6 +167,14 @@ pub const ROLE_REMOTE_SYNC_OPERATOR: u64 = 0
| PRIV_REMOTE_AUDIT
| PRIV_REMOTE_READ;
+#[rustfmt::skip]
+#[allow(clippy::identity_op)]
+/// Remote.SyncPushOperator can do read, backup and prune on the remote.
+pub const ROLE_REMOTE_SYNC_PUSH_OPERATOR: u64 = 0
+ | PRIV_REMOTE_AUDIT
+ | PRIV_REMOTE_BACKUP
+ | PRIV_REMOTE_READ;
+
#[rustfmt::skip]
#[allow(clippy::identity_op)]
/// Tape.Audit can audit the tape backup configuration and media content
@@ -225,6 +240,8 @@ pub enum Role {
RemoteAdmin = ROLE_REMOTE_ADMIN,
/// Syncronisation Opertator
RemoteSyncOperator = ROLE_REMOTE_SYNC_OPERATOR,
+ /// Syncronisation Opertator (push direction)
+ RemoteSyncPushOperator = ROLE_REMOTE_SYNC_PUSH_OPERATOR,
/// Tape Auditor
TapeAudit = ROLE_TAPE_AUDIT,
/// Tape Administrator
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 18/24] fix #3044: server: implement push support for sync operations
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (16 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 17/24] api types: define remote permissions and roles for push sync Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 19/24] api: config: extend sync job config by sync direction Christian Ebner
` (7 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Adds the functionality required to push datastore contents from a
source to a remote target.
This includes syncing of the namespaces, backup groups and snapshots
based on the provided filters as well as removing vanished contents
from the target when requested.
While trying to mimic the pull direction of sync jobs, the
implementation is different as access to the remote must be performed
via the REST API, not needed for the pull job which can access the
local datastore via the filesystem directly.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/mod.rs | 1 +
src/server/push.rs | 835 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 836 insertions(+)
create mode 100644 src/server/push.rs
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 3acfcc1c4..83620c133 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -34,6 +34,7 @@ pub use report::*;
pub mod auth;
pub(crate) mod pull;
+pub(crate) mod push;
pub(crate) mod sync;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
diff --git a/src/server/push.rs b/src/server/push.rs
new file mode 100644
index 000000000..bab7a5dd1
--- /dev/null
+++ b/src/server/push.rs
@@ -0,0 +1,835 @@
+//! Sync datastore by pushing contents to remote server
+
+use std::cmp::Ordering;
+use std::collections::HashSet;
+use std::sync::{Arc, Mutex};
+use std::time::SystemTime;
+
+use anyhow::{bail, Error};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tracing::info;
+
+use pbs_api_types::{
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+ GroupListItem, NamespaceListItem, Operation, RateLimitConfig, Remote, SnapshotListItem,
+};
+use pbs_client::{BackupRepository, BackupWriter, HttpClient, UploadOptions};
+use pbs_datastore::data_blob::ChunkInfo;
+use pbs_datastore::dynamic_index::DynamicIndexReader;
+use pbs_datastore::fixed_index::FixedIndexReader;
+use pbs_datastore::index::IndexFile;
+use pbs_datastore::manifest::{ArchiveType, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME};
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{BackupManifest, DataStore, StoreProgress};
+
+use super::sync::{
+ check_namespace_depth_limit, LocalSource, RemovedVanishedStats, SkipInfo, SkipReason,
+ SyncSource, SyncStats,
+};
+use crate::api2::config::remote;
+
+/// Target for backups to be pushed to
+pub(crate) struct PushTarget {
+ // Target repository on remote
+ repo: BackupRepository,
+ // Target namespace on remote
+ ns: BackupNamespace,
+ // Http client to connect to remote
+ client: HttpClient,
+}
+
+/// Parameters for a push operation
+pub(crate) struct PushParameters {
+ /// Source of backups to be pushed to remote
+ source: Arc<LocalSource>,
+ /// Target for backups to be pushed to
+ target: PushTarget,
+ /// Owner of synced groups (needs to match owner of pre-existing groups on target)
+ owner: Authid,
+ /// Whether to remove groups which exist locally, but not on the remote end
+ remove_vanished: bool,
+ /// How many levels of sub-namespaces to push (0 == no recursion, None == maximum recursion)
+ max_depth: Option<usize>,
+ /// Filters for reducing the push scope
+ group_filter: Vec<GroupFilter>,
+ /// How many snapshots should be transferred at most (taking the newest N snapshots)
+ transfer_last: Option<usize>,
+}
+
+impl PushParameters {
+ /// Creates a new instance of `PushParameters`.
+ #[allow(clippy::too_many_arguments)]
+ pub(crate) fn new(
+ store: &str,
+ ns: BackupNamespace,
+ remote: &str,
+ remote_store: &str,
+ remote_ns: BackupNamespace,
+ owner: Authid,
+ remove_vanished: Option<bool>,
+ max_depth: Option<usize>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
+ transfer_last: Option<usize>,
+ ) -> Result<Self, Error> {
+ if let Some(max_depth) = max_depth {
+ ns.check_max_depth(max_depth)?;
+ remote_ns.check_max_depth(max_depth)?;
+ };
+ let remove_vanished = remove_vanished.unwrap_or(false);
+
+ let source = Arc::new(LocalSource {
+ store: DataStore::lookup_datastore(store, Some(Operation::Read))?,
+ ns,
+ });
+
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote: Remote = remote_config.lookup("remote", remote)?;
+
+ let repo = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
+
+ let client = remote::remote_client_config(&remote, Some(limit))?;
+ let target = PushTarget {
+ repo,
+ ns: remote_ns,
+ client,
+ };
+ let group_filter = group_filter.unwrap_or_default();
+
+ Ok(Self {
+ source,
+ target,
+ owner,
+ remove_vanished,
+ max_depth,
+ group_filter,
+ transfer_last,
+ })
+ }
+}
+
+// Fetch the list of namespaces found on target
+async fn fetch_target_namespaces(params: &PushParameters) -> Result<Vec<BackupNamespace>, Error> {
+ let api_path = format!(
+ "api2/json/admin/datastore/{store}/namespace",
+ store = params.target.repo.store(),
+ );
+ let mut result = params.target.client.get(&api_path, None).await?;
+ let namespaces: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
+ let mut namespaces: Vec<BackupNamespace> = namespaces
+ .into_iter()
+ .map(|namespace| namespace.ns)
+ .collect();
+ namespaces.sort_unstable_by_key(|a| a.name_len());
+
+ Ok(namespaces)
+}
+
+// Remove the provided namespace from the target
+async fn remove_target_namespace(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+) -> Result<(), Error> {
+ if namespace.is_root() {
+ bail!("cannot remove root namespace from target");
+ }
+
+ let api_path = format!(
+ "api2/json/admin/datastore/{store}/namespace",
+ store = params.target.repo.store(),
+ );
+
+ let target_ns = namespace.map_prefix(¶ms.source.ns, ¶ms.target.ns)?;
+ let args = serde_json::json!({
+ "ns": target_ns.name(),
+ "delete-groups": true,
+ });
+
+ params.target.client.delete(&api_path, Some(args)).await?;
+
+ Ok(())
+}
+
+// Fetch the list of groups found on target in given namespace
+async fn fetch_target_groups(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+) -> Result<Vec<BackupGroup>, Error> {
+ let api_path = format!(
+ "api2/json/admin/datastore/{store}/groups",
+ store = params.target.repo.store(),
+ );
+
+ let args = if !namespace.is_root() {
+ let target_ns = namespace.map_prefix(¶ms.source.ns, ¶ms.target.ns)?;
+ Some(serde_json::json!({ "ns": target_ns.name() }))
+ } else {
+ None
+ };
+
+ let mut result = params.target.client.get(&api_path, args).await?;
+ let groups: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+ let mut groups: Vec<BackupGroup> = groups.into_iter().map(|group| group.backup).collect();
+
+ groups.sort_unstable_by(|a, b| {
+ let type_order = a.ty.cmp(&b.ty);
+ if type_order == Ordering::Equal {
+ a.id.cmp(&b.id)
+ } else {
+ type_order
+ }
+ });
+
+ Ok(groups)
+}
+
+// Remove the provided backup group in given namespace from the target
+async fn remove_target_group(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ backup_group: &BackupGroup,
+) -> Result<(), Error> {
+ let api_path = format!(
+ "api2/json/admin/datastore/{store}/groups",
+ store = params.target.repo.store(),
+ );
+
+ let mut args = serde_json::json!({
+ "backup-id": backup_group.id,
+ "backup-type": backup_group.ty,
+ });
+ if !namespace.is_root() {
+ let target_ns = namespace.map_prefix(¶ms.source.ns, ¶ms.target.ns)?;
+ args["ns"] = serde_json::to_value(target_ns.name())?;
+ }
+
+ params.target.client.delete(&api_path, Some(args)).await?;
+
+ Ok(())
+}
+
+// Check if the namespace is already present on the target, create it otherwise
+async fn check_or_create_target_namespace(
+ params: &PushParameters,
+ target_namespaces: &[BackupNamespace],
+ namespace: &BackupNamespace,
+) -> Result<bool, Error> {
+ let mut created = false;
+
+ if !namespace.is_root() && !target_namespaces.contains(namespace) {
+ // Namespace not present on target, create namespace.
+ // Sub-namespaces have to be created by creating parent components first.
+ let mut parent = BackupNamespace::root();
+ for namespace_component in namespace.components() {
+ let namespace = BackupNamespace::new(namespace_component)?;
+ let api_path = format!(
+ "api2/json/admin/datastore/{store}/namespace",
+ store = params.target.repo.store(),
+ );
+ let mut args = serde_json::json!({ "name": namespace.name() });
+ if !parent.is_root() {
+ args["parent"] = serde_json::to_value(parent.clone())?;
+ }
+ if let Err(err) = params.target.client.post(&api_path, Some(args)).await {
+ let target_store_and_ns =
+ print_store_and_ns(params.target.repo.store(), &namespace);
+ bail!("sync into {target_store_and_ns} failed - namespace creation failed: {err}");
+ }
+ parent.push(namespace.name())?;
+ }
+
+ created = true;
+ }
+
+ Ok(created)
+}
+
+/// Push contents of source datastore matched by given push parameters to target.
+pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats, Error> {
+ let mut errors = false;
+
+ // Generate list of source namespaces to push to target, limited by max-depth
+ let mut namespaces = params.source.list_namespaces(&mut params.max_depth).await?;
+
+ check_namespace_depth_limit(¶ms.source.get_ns(), ¶ms.target.ns, &namespaces)?;
+
+ namespaces.sort_unstable_by_key(|a| a.name_len());
+
+ // Fetch all accessible namespaces already present on the target
+ let target_namespaces = fetch_target_namespaces(¶ms).await?;
+ // Avoid double upload penalty by remembering already seen chunks
+ let known_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 1024)));
+ // Remember synced namespaces, removing non-synced ones when remove vanished flag is set
+ let mut synced_namespaces = HashSet::with_capacity(namespaces.len());
+
+ let (mut groups, mut snapshots) = (0, 0);
+ let mut stats = SyncStats::default();
+ for namespace in namespaces {
+ let source_store_and_ns = print_store_and_ns(params.source.store.name(), &namespace);
+ let target_namespace = namespace.map_prefix(¶ms.source.ns, ¶ms.target.ns)?;
+ let target_store_and_ns = print_store_and_ns(params.target.repo.store(), &target_namespace);
+
+ info!("----");
+ info!("Syncing {source_store_and_ns} into {target_store_and_ns}");
+
+ synced_namespaces.insert(target_namespace.clone());
+
+ match check_or_create_target_namespace(¶ms, &target_namespaces, &target_namespace).await
+ {
+ Ok(true) => info!("Created namespace {target_namespace}"),
+ Ok(false) => {}
+ Err(err) => {
+ info!("Cannot sync {source_store_and_ns} into {target_store_and_ns} - {err}");
+ errors = true;
+ continue;
+ }
+ }
+
+ match push_namespace(&namespace, ¶ms, &known_chunks).await {
+ Ok((sync_progress, sync_stats, sync_errors)) => {
+ errors |= sync_errors;
+ stats.add(sync_stats);
+
+ if params.max_depth != Some(0) {
+ groups += sync_progress.done_groups;
+ snapshots += sync_progress.done_snapshots;
+
+ let ns = if namespace.is_root() {
+ "root namespace".into()
+ } else {
+ format!("namespace {namespace}")
+ };
+ info!(
+ "Finished syncing {ns}, current progress: {groups} groups, {snapshots} snapshots"
+ );
+ }
+ }
+ Err(err) => {
+ errors = true;
+ info!("Encountered errors while syncing namespace {namespace} - {err}");
+ }
+ }
+ }
+
+ if params.remove_vanished {
+ for target_namespace in target_namespaces {
+ if synced_namespaces.contains(&target_namespace) {
+ continue;
+ }
+ if let Err(err) = remove_target_namespace(¶ms, &target_namespace).await {
+ info!("failed to remove vanished namespace {target_namespace} - {err}");
+ continue;
+ }
+ info!("removed vanished namespace {target_namespace}");
+ }
+ }
+
+ if errors {
+ bail!("sync failed with some errors.");
+ }
+
+ Ok(stats)
+}
+
+/// Push namespace including all backup groups to target
+///
+/// Iterate over all backup groups in the namespace and push them to the target.
+pub(crate) async fn push_namespace(
+ namespace: &BackupNamespace,
+ params: &PushParameters,
+ known_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+) -> Result<(StoreProgress, SyncStats, bool), Error> {
+ let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
+
+ list.sort_unstable_by(|a, b| {
+ let type_order = a.ty.cmp(&b.ty);
+ if type_order == Ordering::Equal {
+ a.id.cmp(&b.id)
+ } else {
+ type_order
+ }
+ });
+
+ let total = list.len();
+ let list: Vec<BackupGroup> = list
+ .into_iter()
+ .filter(|group| group.apply_filters(¶ms.group_filter))
+ .collect();
+
+ info!(
+ "found {filtered} groups to sync (out of {total} total)",
+ filtered = list.len()
+ );
+
+ let target_groups = if params.remove_vanished {
+ fetch_target_groups(params, namespace).await?
+ } else {
+ // avoid fetching of groups, not required if remove vanished not set
+ Vec::new()
+ };
+
+ let mut errors = false;
+ // Remember synced groups, remove others when the remove vanished flag is set
+ let mut synced_groups = HashSet::new();
+ let mut progress = StoreProgress::new(list.len() as u64);
+ let mut stats = SyncStats::default();
+
+ for (done, group) in list.into_iter().enumerate() {
+ progress.done_groups = done as u64;
+ progress.done_snapshots = 0;
+ progress.group_snapshots = 0;
+ synced_groups.insert(group.clone());
+
+ match push_group(params, namespace, &group, &mut progress, known_chunks).await {
+ Ok(sync_stats) => stats.add(sync_stats),
+ Err(err) => {
+ info!("sync group '{group}' failed - {err}");
+ errors = true;
+ }
+ }
+ }
+
+ if params.remove_vanished {
+ for target_group in target_groups {
+ if synced_groups.contains(&target_group) {
+ continue;
+ }
+ if !target_group.apply_filters(¶ms.group_filter) {
+ continue;
+ }
+
+ info!("delete vanished group '{target_group}'");
+
+ let count_before = match fetch_target_groups(params, namespace).await {
+ Ok(snapshots) => snapshots.len(),
+ Err(_err) => 0, // ignore errors
+ };
+
+ if let Err(err) = remove_target_group(params, namespace, &target_group).await {
+ info!("{err}");
+ errors = true;
+ continue;
+ }
+
+ let mut count_after = match fetch_target_groups(params, namespace).await {
+ Ok(snapshots) => snapshots.len(),
+ Err(_err) => 0, // ignore errors
+ };
+
+ let deleted_groups = if count_after > 0 {
+ info!("kept some protected snapshots of group '{target_group}'");
+ 0
+ } else {
+ 1
+ };
+
+ if count_after > count_before {
+ count_after = count_before;
+ }
+
+ stats.add(SyncStats::from(RemovedVanishedStats {
+ snapshots: count_before - count_after,
+ groups: deleted_groups,
+ namespaces: 0,
+ }));
+ }
+ }
+
+ Ok((progress, stats, errors))
+}
+
+async fn fetch_target_snapshots(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+) -> Result<Vec<SnapshotListItem>, Error> {
+ let api_path = format!(
+ "api2/json/admin/datastore/{store}/snapshots",
+ store = params.target.repo.store(),
+ );
+ let mut args = serde_json::to_value(group)?;
+ if !namespace.is_root() {
+ let target_ns = namespace.map_prefix(¶ms.source.ns, ¶ms.target.ns)?;
+ args["ns"] = serde_json::to_value(target_ns)?;
+ }
+ let mut result = params.target.client.get(&api_path, Some(args)).await?;
+ let snapshots: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+
+ Ok(snapshots)
+}
+
+async fn fetch_previous_backup_time(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+) -> Result<Option<i64>, Error> {
+ let mut snapshots = fetch_target_snapshots(params, namespace, group).await?;
+ snapshots.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
+ Ok(snapshots.last().map(|snapshot| snapshot.backup.time))
+}
+
+async fn forget_target_snapshot(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ snapshot: &BackupDir,
+) -> Result<(), Error> {
+ let api_path = format!(
+ "api2/json/admin/datastore/{store}/snapshots",
+ store = params.target.repo.store(),
+ );
+ let mut args = serde_json::to_value(snapshot)?;
+ if !namespace.is_root() {
+ let target_ns = namespace.map_prefix(¶ms.source.ns, ¶ms.target.ns)?;
+ args["ns"] = serde_json::to_value(target_ns)?;
+ }
+ params.target.client.delete(&api_path, Some(args)).await?;
+
+ Ok(())
+}
+
+/// Push group including all snaphshots to target
+///
+/// Iterate over all snapshots in the group and push them to the target.
+/// The group sync operation consists of the following steps:
+/// - Query snapshots of given group from the source
+/// - Sort snapshots by time
+/// - Apply transfer last cutoff and filters to list
+/// - Iterate the snapshot list and push each snapshot individually
+/// - (Optional): Remove vanished groups if `remove_vanished` flag is set
+pub(crate) async fn push_group(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ progress: &mut StoreProgress,
+ known_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+) -> Result<SyncStats, Error> {
+ let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
+ let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
+
+ let mut snapshots: Vec<BackupDir> = params.source.list_backup_dirs(namespace, group).await?;
+ snapshots.sort_unstable_by(|a, b| a.time.cmp(&b.time));
+
+ let total_snapshots = snapshots.len();
+ let cutoff = params
+ .transfer_last
+ .map(|count| total_snapshots.saturating_sub(count))
+ .unwrap_or_default();
+
+ let last_snapshot_time = fetch_previous_backup_time(params, namespace, group)
+ .await?
+ .unwrap_or(i64::MIN);
+
+ let mut source_snapshots = HashSet::new();
+ let snapshots: Vec<BackupDir> = snapshots
+ .into_iter()
+ .enumerate()
+ .filter(|&(pos, ref snapshot)| {
+ source_snapshots.insert(snapshot.time);
+ if last_snapshot_time > snapshot.time {
+ already_synced_skip_info.update(snapshot.time);
+ return false;
+ } else if already_synced_skip_info.count > 0 {
+ info!("{already_synced_skip_info}");
+ already_synced_skip_info.reset();
+ return true;
+ }
+
+ if pos < cutoff && last_snapshot_time != snapshot.time {
+ transfer_last_skip_info.update(snapshot.time);
+ return false;
+ } else if transfer_last_skip_info.count > 0 {
+ info!("{transfer_last_skip_info}");
+ transfer_last_skip_info.reset();
+ }
+ true
+ })
+ .map(|(_, dir)| dir)
+ .collect();
+
+ progress.group_snapshots = snapshots.len() as u64;
+
+ let target_snapshots = fetch_target_snapshots(params, namespace, group).await?;
+ let target_snapshots: Vec<BackupDir> = target_snapshots
+ .into_iter()
+ .map(|snapshot| snapshot.backup)
+ .collect();
+
+ let mut stats = SyncStats::default();
+ for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
+ if target_snapshots.contains(&source_snapshot) {
+ progress.done_snapshots = pos as u64 + 1;
+ info!("percentage done: {progress}");
+ continue;
+ }
+ let result = push_snapshot(params, namespace, &source_snapshot, known_chunks).await;
+
+ progress.done_snapshots = pos as u64 + 1;
+ info!("percentage done: {progress}");
+
+ // stop on error
+ let sync_stats = result?;
+ stats.add(sync_stats);
+ }
+
+ if params.remove_vanished {
+ let target_snapshots = fetch_target_snapshots(params, namespace, group).await?;
+ for snapshot in target_snapshots {
+ if source_snapshots.contains(&snapshot.backup.time) {
+ continue;
+ }
+ if snapshot.protected {
+ info!(
+ "don't delete vanished snapshot {name} (protected)",
+ name = snapshot.backup
+ );
+ continue;
+ }
+ if let Err(err) = forget_target_snapshot(params, namespace, &snapshot.backup).await {
+ info!(
+ "could not delete vanished snapshot {name} - {err}",
+ name = snapshot.backup
+ );
+ }
+ info!("delete vanished snapshot {name}", name = snapshot.backup);
+ stats.add(SyncStats::from(RemovedVanishedStats {
+ snapshots: 1,
+ groups: 0,
+ namespaces: 0,
+ }));
+ }
+ }
+
+ Ok(stats)
+}
+
+/// Push snapshot to target
+///
+/// Creates a new snapshot on the target and pushes the content of the source snapshot to the
+/// target by creating a new manifest file and connecting to the remote as backup writer client.
+/// Chunks are written by recreating the index by uploading the chunk stream as read from the
+/// source. Data blobs are uploaded as such.
+pub(crate) async fn push_snapshot(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ snapshot: &BackupDir,
+ known_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+) -> Result<SyncStats, Error> {
+ let mut stats = SyncStats::default();
+
+ let upload_options = UploadOptions {
+ compress: true,
+ encrypt: false,
+ ..UploadOptions::default()
+ };
+ let target_ns = namespace.map_prefix(¶ms.source.ns, ¶ms.target.ns)?;
+ let backup_dir = params
+ .source
+ .store
+ .backup_dir(target_ns.clone(), snapshot.clone())?;
+
+ let reader = params.source.reader(namespace, snapshot).await?;
+
+ // Load the source manifest, needed to find crypt mode for files
+ let mut tmp_source_manifest_path = backup_dir.full_path();
+ tmp_source_manifest_path.push(MANIFEST_BLOB_NAME);
+ tmp_source_manifest_path.set_extension("tmp");
+ let source_manifest = if let Some(manifest_blob) = reader
+ .load_file_into(MANIFEST_BLOB_NAME, &tmp_source_manifest_path)
+ .await?
+ {
+ BackupManifest::try_from(manifest_blob)?
+ } else {
+ // no manifest in snapshot, skip
+ return Ok(stats);
+ };
+
+ // Manifest to be created on target, referencing all the source archives after upload.
+ let mut manifest = BackupManifest::new(snapshot.clone());
+
+ // writer instance locks the snapshot on the remote side
+ let backup_writer = BackupWriter::start(
+ ¶ms.target.client,
+ None,
+ params.target.repo.store(),
+ &target_ns,
+ snapshot,
+ false,
+ false,
+ true,
+ )
+ .await?;
+
+ for entry in source_manifest.files() {
+ let mut path = backup_dir.full_path();
+ path.push(&entry.filename);
+ if path.try_exists()? {
+ match ArchiveType::from_path(&entry.filename)? {
+ ArchiveType::Blob => {
+ let file = std::fs::File::open(path.clone())?;
+ let backup_stats = backup_writer.upload_blob(file, &entry.filename).await?;
+ manifest.add_file(
+ entry.filename.to_string(),
+ backup_stats.size,
+ backup_stats.csum,
+ entry.chunk_crypt_mode(),
+ )?;
+ stats.add(SyncStats {
+ chunk_count: backup_stats.chunk_count as usize,
+ bytes: backup_stats.size as usize,
+ elapsed: backup_stats.duration,
+ removed: None,
+ });
+ }
+ ArchiveType::DynamicIndex => {
+ let index = DynamicIndexReader::open(&path)?;
+ let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
+ let sync_stats = push_index(
+ &entry.filename,
+ index,
+ chunk_reader,
+ &backup_writer,
+ &mut manifest,
+ entry.chunk_crypt_mode(),
+ None,
+ known_chunks,
+ )
+ .await?;
+ stats.add(sync_stats);
+ }
+ ArchiveType::FixedIndex => {
+ let index = FixedIndexReader::open(&path)?;
+ let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
+ let size = index.index_bytes();
+ let sync_stats = push_index(
+ &entry.filename,
+ index,
+ chunk_reader,
+ &backup_writer,
+ &mut manifest,
+ entry.chunk_crypt_mode(),
+ Some(size),
+ known_chunks,
+ )
+ .await?;
+ stats.add(sync_stats);
+ }
+ }
+ } else {
+ info!("{path:?} does not exist, skipped.");
+ }
+ }
+
+ // Fetch client log from source and push to target
+ // this has to be handled individually since the log is never part of the manifest
+ let mut client_log_path = backup_dir.full_path();
+ client_log_path.push(CLIENT_LOG_BLOB_NAME);
+ if client_log_path.is_file() {
+ backup_writer
+ .upload_blob_from_file(
+ &client_log_path,
+ CLIENT_LOG_BLOB_NAME,
+ upload_options.clone(),
+ )
+ .await?;
+ } else {
+ info!("Client log at {client_log_path:?} does not exist or is not a file, skipped.");
+ }
+
+ // Rewrite manifest for pushed snapshot, re-adding the existing fingerprint and signature
+ let mut manifest_json = serde_json::to_value(manifest)?;
+ manifest_json["unprotected"] = source_manifest.unprotected;
+ if let Some(signature) = source_manifest.signature {
+ manifest_json["signature"] = serde_json::to_value(signature)?;
+ }
+ let manifest_string = serde_json::to_string_pretty(&manifest_json).unwrap();
+ let backup_stats = backup_writer
+ .upload_blob_from_data(
+ manifest_string.into_bytes(),
+ MANIFEST_BLOB_NAME,
+ upload_options,
+ )
+ .await?;
+ backup_writer.finish().await?;
+
+ stats.add(SyncStats {
+ chunk_count: backup_stats.chunk_count as usize,
+ bytes: backup_stats.size as usize,
+ elapsed: backup_stats.duration,
+ removed: None,
+ });
+
+ Ok(stats)
+}
+
+// Read fixed or dynamic index and push to target by uploading via the backup writer instance
+//
+// For fixed indexes, the size must be provided as given by the index reader.
+#[allow(clippy::too_many_arguments)]
+async fn push_index<'a>(
+ filename: &'a str,
+ index: impl IndexFile + Send + 'static,
+ chunk_reader: Arc<dyn AsyncReadChunk>,
+ backup_writer: &BackupWriter,
+ manifest: &mut BackupManifest,
+ crypt_mode: CryptMode,
+ size: Option<u64>,
+ known_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+) -> Result<SyncStats, Error> {
+ let (upload_channel_tx, upload_channel_rx) = mpsc::channel(20);
+ let mut chunk_infos =
+ stream::iter(0..index.index_count()).map(move |pos| index.chunk_info(pos).unwrap());
+
+ tokio::spawn(async move {
+ while let Some(chunk_info) = chunk_infos.next().await {
+ let chunk_info = chunk_reader
+ .read_raw_chunk(&chunk_info.digest)
+ .await
+ .map(|chunk| ChunkInfo {
+ chunk,
+ digest: chunk_info.digest,
+ chunk_len: chunk_info.size(),
+ offset: chunk_info.range.start,
+ });
+ let _ = upload_channel_tx.send(chunk_info).await;
+ }
+ });
+
+ let chunk_info_stream = ReceiverStream::new(upload_channel_rx).map_err(Error::from);
+
+ let upload_options = UploadOptions {
+ compress: true,
+ encrypt: false,
+ fixed_size: size,
+ ..UploadOptions::default()
+ };
+
+ let upload_stats = backup_writer
+ .upload_index_chunk_info(
+ filename,
+ chunk_info_stream,
+ upload_options,
+ known_chunks.clone(),
+ )
+ .await?;
+
+ manifest.add_file(
+ filename.to_string(),
+ upload_stats.size,
+ upload_stats.csum,
+ crypt_mode,
+ )?;
+
+ Ok(SyncStats {
+ chunk_count: upload_stats.chunk_count as usize,
+ bytes: upload_stats.size as usize,
+ elapsed: upload_stats.duration,
+ removed: None,
+ })
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 19/24] api: config: extend sync job config by sync direction
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (17 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 18/24] fix #3044: server: implement push support for sync operations Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 20/24] api: push: implement endpoint for sync in push direction Christian Ebner
` (6 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
In order for sync jobs to be either pull or push jobs, allow to
configure the direction of the job.
This implements the required types and methods to parse the
configured sync direction, update as well as delete it.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-api-types/src/jobs.rs | 49 +++++++++++++++++++++++++++++++++++++++
src/api2/config/sync.rs | 9 +++++++
2 files changed, 58 insertions(+)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 868702bc0..bdc557787 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -20,6 +20,8 @@ const_regex! {
pub VERIFICATION_JOB_WORKER_ID_REGEX = concatcp!(r"^(", PROXMOX_SAFE_ID_REGEX_STR, r"):");
/// Regex for sync jobs '(REMOTE|\-):REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID'
pub SYNC_JOB_WORKER_ID_REGEX = concatcp!(r"^(", PROXMOX_SAFE_ID_REGEX_STR, r"|\-):(", PROXMOX_SAFE_ID_REGEX_STR, r"):(", PROXMOX_SAFE_ID_REGEX_STR, r")(?::(", BACKUP_NS_RE, r"))?:");
+ /// Regex for sync direction'(pull|push)'
+ pub SYNC_DIRECTION_REGEX = r"^(pull|push)$";
}
pub const JOB_ID_SCHEMA: Schema = StringSchema::new("Job ID.")
@@ -498,6 +500,47 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
.minimum(1)
.schema();
+pub const SYNC_DIRECTION_SCHEMA: Schema = StringSchema::new("Sync job direction (pull|push)")
+ .format(&ApiStringFormat::Pattern(&SYNC_DIRECTION_REGEX))
+ .schema();
+
+/// Direction of the sync job, push or pull
+#[derive(Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash, UpdaterType)]
+pub enum SyncDirection {
+ #[default]
+ Pull,
+ Push,
+}
+
+impl ApiType for SyncDirection {
+ const API_SCHEMA: Schema = SYNC_DIRECTION_SCHEMA;
+}
+
+// used for serialization using `proxmox_serde::forward_serialize_to_display` macro
+impl std::fmt::Display for SyncDirection {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ SyncDirection::Pull => f.write_str("pull"),
+ SyncDirection::Push => f.write_str("push"),
+ }
+ }
+}
+
+impl std::str::FromStr for SyncDirection {
+ type Err = anyhow::Error;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "pull" => Ok(SyncDirection::Pull),
+ "push" => Ok(SyncDirection::Push),
+ _ => bail!("invalid sync direction"),
+ }
+ }
+}
+
+proxmox_serde::forward_deserialize_to_from_str!(SyncDirection);
+proxmox_serde::forward_serialize_to_display!(SyncDirection);
+
#[api(
properties: {
id: {
@@ -552,6 +595,10 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
schema: TRANSFER_LAST_SCHEMA,
optional: true,
},
+ "sync-direction": {
+ schema: SYNC_DIRECTION_SCHEMA,
+ optional: true,
+ },
}
)]
#[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -585,6 +632,8 @@ pub struct SyncJobConfig {
pub limit: RateLimitConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub transfer_last: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub sync_direction: Option<SyncDirection>,
}
impl SyncJobConfig {
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 6fdc69a9e..fc5e9c148 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -231,6 +231,8 @@ pub enum DeletableProperty {
MaxDepth,
/// Delete the transfer_last property,
TransferLast,
+ /// Delete the sync_direction property (when not set, fallback to default)
+ SyncDirection,
}
#[api(
@@ -331,6 +333,9 @@ pub fn update_sync_job(
DeletableProperty::TransferLast => {
data.transfer_last = None;
}
+ DeletableProperty::SyncDirection => {
+ data.sync_direction = None;
+ }
}
}
}
@@ -368,6 +373,9 @@ pub fn update_sync_job(
if let Some(transfer_last) = update.transfer_last {
data.transfer_last = Some(transfer_last);
}
+ if let Some(sync_direction) = update.sync_direction {
+ data.sync_direction = Some(sync_direction);
+ }
if update.limit.rate_in.is_some() {
data.limit.rate_in = update.limit.rate_in;
@@ -533,6 +541,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
schedule: None,
limit: pbs_api_types::RateLimitConfig::default(), // no limit
transfer_last: None,
+ sync_direction: Default::default(),
};
// should work without ACLs
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 20/24] api: push: implement endpoint for sync in push direction
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (18 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 19/24] api: config: extend sync job config by sync direction Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 21/24] api: sync: move sync job invocation to common module Christian Ebner
` (5 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Expose the sync job in push direction via a dedicated API endpoint,
analogous to the pull direction.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/api2/mod.rs | 2 +
src/api2/push.rs | 207 +++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 209 insertions(+)
create mode 100644 src/api2/push.rs
diff --git a/src/api2/mod.rs b/src/api2/mod.rs
index a83e4c205..03596326b 100644
--- a/src/api2/mod.rs
+++ b/src/api2/mod.rs
@@ -12,6 +12,7 @@ pub mod helpers;
pub mod node;
pub mod ping;
pub mod pull;
+pub mod push;
pub mod reader;
pub mod status;
pub mod tape;
@@ -29,6 +30,7 @@ const SUBDIRS: SubdirMap = &sorted!([
("nodes", &node::ROUTER),
("ping", &ping::ROUTER),
("pull", &pull::ROUTER),
+ ("push", &push::ROUTER),
("reader", &reader::ROUTER),
("status", &status::ROUTER),
("tape", &tape::ROUTER),
diff --git a/src/api2/push.rs b/src/api2/push.rs
new file mode 100644
index 000000000..8c39f2e8b
--- /dev/null
+++ b/src/api2/push.rs
@@ -0,0 +1,207 @@
+use anyhow::{format_err, Context, Error};
+use futures::{future::FutureExt, select};
+use tracing::info;
+
+use pbs_api_types::{
+ Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
+ GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_READ, PRIV_REMOTE_BACKUP,
+ PRIV_REMOTE_PRUNE, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
+};
+use proxmox_rest_server::WorkerTask;
+use proxmox_router::{Permission, Router, RpcEnvironment};
+use proxmox_schema::api;
+
+use pbs_config::CachedUserInfo;
+
+use crate::server::push::{push_store, PushParameters};
+
+pub fn check_push_privs(
+ auth_id: &Authid,
+ store: &str,
+ namespace: Option<&str>,
+ remote: &str,
+ remote_store: &str,
+ delete: bool,
+) -> Result<(), Error> {
+ let user_info = CachedUserInfo::new()?;
+
+ let source_store_ns_acl_path = match namespace {
+ Some(namespace) => vec!["datastore", store, namespace],
+ None => vec!["datastore", store],
+ };
+
+ user_info.check_privs(
+ auth_id,
+ &source_store_ns_acl_path,
+ PRIV_DATASTORE_READ,
+ false,
+ )?;
+
+ user_info.check_privs(
+ auth_id,
+ &["remote", remote, remote_store],
+ PRIV_REMOTE_BACKUP,
+ false,
+ )?;
+
+ if delete {
+ user_info.check_privs(
+ auth_id,
+ &["remote", remote, remote_store],
+ PRIV_REMOTE_PRUNE,
+ false,
+ )?;
+ }
+
+ Ok(())
+}
+
+impl TryFrom<&SyncJobConfig> for PushParameters {
+ type Error = Error;
+
+ fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
+ PushParameters::new(
+ &sync_job.store,
+ sync_job.ns.clone().unwrap_or_default(),
+ sync_job
+ .remote
+ .as_deref()
+ .context("missing required remote")?,
+ &sync_job.remote_store,
+ sync_job.remote_ns.clone().unwrap_or_default(),
+ sync_job
+ .owner
+ .as_ref()
+ .unwrap_or_else(|| Authid::root_auth_id())
+ .clone(),
+ sync_job.remove_vanished,
+ sync_job.max_depth,
+ sync_job.group_filter.clone(),
+ sync_job.limit.clone(),
+ sync_job.transfer_last,
+ )
+ }
+}
+
+#[api(
+ input: {
+ properties: {
+ store: {
+ schema: DATASTORE_SCHEMA,
+ },
+ ns: {
+ type: BackupNamespace,
+ optional: true,
+ },
+ remote: {
+ schema: REMOTE_ID_SCHEMA,
+ },
+ "remote-store": {
+ schema: DATASTORE_SCHEMA,
+ },
+ "remote-ns": {
+ type: BackupNamespace,
+ optional: true,
+ },
+ "remove-vanished": {
+ schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
+ optional: true,
+ },
+ "max-depth": {
+ schema: NS_MAX_DEPTH_REDUCED_SCHEMA,
+ optional: true,
+ },
+ "group-filter": {
+ schema: GROUP_FILTER_LIST_SCHEMA,
+ optional: true,
+ },
+ limit: {
+ type: RateLimitConfig,
+ flatten: true,
+ },
+ "transfer-last": {
+ schema: TRANSFER_LAST_SCHEMA,
+ optional: true,
+ },
+ },
+ },
+ access: {
+ description: r###"The user needs Remote.Backup privilege on '/remote/{remote}/{remote-store}'
+and needs to own the backup group. Datastore.Read is required on '/datastore/{store}'.
+The delete flag additionally requires the Remote.Prune privilege on '/remote/{remote}/{remote-store}'.
+"###,
+ permission: &Permission::Anybody,
+ },
+)]
+/// Push store to other repository
+#[allow(clippy::too_many_arguments)]
+async fn push(
+ store: String,
+ ns: Option<BackupNamespace>,
+ remote: String,
+ remote_store: String,
+ remote_ns: Option<BackupNamespace>,
+ remove_vanished: Option<bool>,
+ max_depth: Option<usize>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
+ transfer_last: Option<usize>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<String, Error> {
+ let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+ let delete = remove_vanished.unwrap_or(false);
+
+ let ns = ns.unwrap_or_default();
+ let ns_str = if ns.is_root() {
+ None
+ } else {
+ Some(ns.to_string())
+ };
+
+ check_push_privs(
+ &auth_id,
+ &store,
+ ns_str.as_deref(),
+ &remote,
+ &remote_store,
+ delete,
+ )?;
+
+ let push_params = PushParameters::new(
+ &store,
+ ns,
+ &remote,
+ &remote_store,
+ remote_ns.unwrap_or_default(),
+ auth_id.clone(),
+ remove_vanished,
+ max_depth,
+ group_filter,
+ limit,
+ transfer_last,
+ )?;
+
+ let upid_str = WorkerTask::spawn(
+ "sync",
+ Some(store.clone()),
+ auth_id.to_string(),
+ true,
+ move |worker| async move {
+ info!("push datastore '{store}' to '{remote}/{remote_store}'");
+
+ let push_future = push_store(push_params);
+ (select! {
+ success = push_future.fuse() => success,
+ abort = worker.abort_future().map(|_| Err(format_err!("push aborted"))) => abort,
+ })?;
+
+ info!("push datastore '{store}' end");
+
+ Ok(())
+ },
+ )?;
+
+ Ok(upid_str)
+}
+
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PUSH);
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 21/24] api: sync: move sync job invocation to common module
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (19 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 20/24] api: push: implement endpoint for sync in push direction Christian Ebner
@ 2024-07-15 10:15 ` Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 22/24] bin: manager: add datastore push cli command Christian Ebner
` (4 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:15 UTC (permalink / raw)
To: pbs-devel
Moves and refactored the sync_job_do function into a common sync
module so that it can be reused for both sync directions, pull and
push.
The sync direction for the job is determined based on the sync job
configuration, with pull being the fallback default if not
explicitly set.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/api2/admin/sync.rs | 2 +-
src/api2/mod.rs | 1 +
src/api2/pull.rs | 108 ------------------------
src/api2/sync.rs | 142 ++++++++++++++++++++++++++++++++
src/bin/proxmox-backup-proxy.rs | 2 +-
5 files changed, 145 insertions(+), 110 deletions(-)
create mode 100644 src/api2/sync.rs
diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index 4e2ba0be8..8def14c72 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -17,7 +17,7 @@ use pbs_config::CachedUserInfo;
use crate::{
api2::{
config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
- pull::do_sync_job,
+ sync::do_sync_job,
},
server::jobstate::{compute_schedule_status, Job, JobState},
};
diff --git a/src/api2/mod.rs b/src/api2/mod.rs
index 03596326b..44e3776a4 100644
--- a/src/api2/mod.rs
+++ b/src/api2/mod.rs
@@ -15,6 +15,7 @@ pub mod pull;
pub mod push;
pub mod reader;
pub mod status;
+pub mod sync;
pub mod tape;
pub mod types;
pub mod version;
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e733c9839..d039dab59 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -13,10 +13,8 @@ use pbs_api_types::{
TRANSFER_LAST_SCHEMA,
};
use pbs_config::CachedUserInfo;
-use proxmox_human_byte::HumanByte;
use proxmox_rest_server::WorkerTask;
-use crate::server::jobstate::Job;
use crate::server::pull::{pull_store, PullParameters};
pub fn check_pull_privs(
@@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
}
}
-pub fn do_sync_job(
- mut job: Job,
- sync_job: SyncJobConfig,
- auth_id: &Authid,
- schedule: Option<String>,
- to_stdout: bool,
-) -> Result<String, Error> {
- let job_id = format!(
- "{}:{}:{}:{}:{}",
- sync_job.remote.as_deref().unwrap_or("-"),
- sync_job.remote_store,
- sync_job.store,
- sync_job.ns.clone().unwrap_or_default(),
- job.jobname()
- );
- let worker_type = job.jobtype().to_string();
-
- if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
- bail!("can't sync to same datastore");
- }
-
- let upid_str = WorkerTask::spawn(
- &worker_type,
- Some(job_id.clone()),
- auth_id.to_string(),
- to_stdout,
- move |worker| async move {
- job.start(&worker.upid().to_string())?;
-
- let worker2 = worker.clone();
- let sync_job2 = sync_job.clone();
-
- let worker_future = async move {
- let pull_params = PullParameters::try_from(&sync_job)?;
-
- info!("Starting datastore sync job '{job_id}'");
- if let Some(event_str) = schedule {
- info!("task triggered by schedule '{event_str}'");
- }
-
- info!(
- "sync datastore '{}' from '{}{}'",
- sync_job.store,
- sync_job
- .remote
- .as_deref()
- .map_or(String::new(), |remote| format!("{remote}/")),
- sync_job.remote_store,
- );
-
- let pull_stats = pull_store(pull_params).await?;
-
- if pull_stats.bytes != 0 {
- let amount = HumanByte::from(pull_stats.bytes);
- let rate = HumanByte::new_binary(
- pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
- );
- info!(
- "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
- pull_stats.chunk_count,
- );
- } else {
- info!("Summary: sync job found no new data to pull");
- }
-
- if let Some(removed) = pull_stats.removed {
- info!(
- "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
- removed.snapshots, removed.groups, removed.namespaces,
- );
- }
-
- info!("sync job '{}' end", &job_id);
-
- Ok(())
- };
-
- let mut abort_future = worker2
- .abort_future()
- .map(|_| Err(format_err!("sync aborted")));
-
- let result = select! {
- worker = worker_future.fuse() => worker,
- abort = abort_future => abort,
- };
-
- let status = worker2.create_state(&result);
-
- match job.finish(status) {
- Ok(_) => {}
- Err(err) => {
- eprintln!("could not finish job state: {}", err);
- }
- }
-
- if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
- eprintln!("send sync notification failed: {err}");
- }
-
- result
- },
- )?;
-
- Ok(upid_str)
-}
-
#[api(
input: {
properties: {
diff --git a/src/api2/sync.rs b/src/api2/sync.rs
new file mode 100644
index 000000000..3972c6ead
--- /dev/null
+++ b/src/api2/sync.rs
@@ -0,0 +1,142 @@
+//! Sync datastore from remote server
+use anyhow::{bail, format_err, Error};
+use futures::{future::FutureExt, select};
+use tracing::info;
+
+use pbs_api_types::{Authid, SyncDirection, SyncJobConfig};
+use proxmox_human_byte::HumanByte;
+use proxmox_rest_server::WorkerTask;
+
+use crate::server::jobstate::Job;
+use crate::server::pull::{pull_store, PullParameters};
+use crate::server::push::{push_store, PushParameters};
+
+pub fn do_sync_job(
+ mut job: Job,
+ sync_job: SyncJobConfig,
+ auth_id: &Authid,
+ schedule: Option<String>,
+ to_stdout: bool,
+) -> Result<String, Error> {
+ let job_id = format!(
+ "{}:{}:{}:{}:{}",
+ sync_job.remote.as_deref().unwrap_or("-"),
+ sync_job.remote_store,
+ sync_job.store,
+ sync_job.ns.clone().unwrap_or_default(),
+ job.jobname(),
+ );
+ let worker_type = job.jobtype().to_string();
+
+ if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
+ bail!("can't sync to same datastore");
+ }
+
+ let upid_str = WorkerTask::spawn(
+ &worker_type,
+ Some(job_id.clone()),
+ auth_id.to_string(),
+ to_stdout,
+ move |worker| async move {
+ job.start(&worker.upid().to_string())?;
+
+ let worker2 = worker.clone();
+ let sync_job2 = sync_job.clone();
+
+ let worker_future = async move {
+ let direction = sync_job
+ .sync_direction
+ .as_ref()
+ .unwrap_or(&SyncDirection::Pull);
+ info!("Starting datastore sync job '{job_id}'");
+ if let Some(event_str) = schedule {
+ info!("task triggered by schedule '{event_str}'");
+ }
+ info!(
+ "sync datastore '{}' from '{}{}'",
+ sync_job.store,
+ sync_job
+ .remote
+ .as_deref()
+ .map_or(String::new(), |remote| format!("{remote}/")),
+ sync_job.remote_store,
+ );
+
+ let sync_stats = match direction {
+ SyncDirection::Pull => {
+ let pull_params = PullParameters::try_from(&sync_job)?;
+ pull_store(pull_params).await?
+ }
+ SyncDirection::Push => {
+ let push_params = PushParameters::try_from(&sync_job)?;
+ push_store(push_params).await?
+ }
+ };
+
+ if sync_stats.bytes != 0 {
+ let amount = HumanByte::from(sync_stats.bytes);
+ let rate = HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(),
+ );
+
+ match direction {
+ SyncDirection::Pull =>
+ info!(
+ "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
+ sync_stats.chunk_count,
+ ),
+ SyncDirection::Push =>
+ info!(
+ "Summary: sync job pushed {amount} in {} chunks (average rate: {rate}/s)",
+ sync_stats.chunk_count,
+ ),
+ }
+ } else {
+ match direction {
+ SyncDirection::Pull => {
+ info!("Summary: sync job found no new data to pull")
+ }
+ SyncDirection::Push => {
+ info!("Summary: sync job found no new data to push")
+ }
+ }
+ }
+
+ if let Some(removed) = sync_stats.removed {
+ info!(
+ "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
+ removed.snapshots, removed.groups, removed.namespaces,
+ );
+ }
+
+ info!("sync job '{job_id}' end");
+
+ Ok(())
+ };
+
+ let mut abort_future = worker2
+ .abort_future()
+ .map(|_| Err(format_err!("sync aborted")));
+
+ let result = select! {
+ worker = worker_future.fuse() => worker,
+ abort = abort_future => abort,
+ };
+
+ let status = worker2.create_state(&result);
+
+ match job.finish(status) {
+ Ok(_) => {}
+ Err(err) => eprintln!("could not finish job state: {err}"),
+ }
+
+ if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
+ eprintln!("send sync notification failed: {err}");
+ }
+
+ result
+ },
+ )?;
+
+ Ok(upid_str)
+}
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index e0bd47b45..2dd0c9d00 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -58,7 +58,7 @@ use proxmox_backup::tools::{
PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
};
-use proxmox_backup::api2::pull::do_sync_job;
+use proxmox_backup::api2::sync::do_sync_job;
use proxmox_backup::api2::tape::backup::do_tape_backup_job;
use proxmox_backup::server::do_prune_job;
use proxmox_backup::server::do_verification_job;
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 22/24] bin: manager: add datastore push cli command
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (20 preceding siblings ...)
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 21/24] api: sync: move sync job invocation to common module Christian Ebner
@ 2024-07-15 10:16 ` Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 23/24] form: group filter: allow to set namespace for local datastore Christian Ebner
` (3 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:16 UTC (permalink / raw)
To: pbs-devel
Expose the push api endpoint to be callable via the command line
interface.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/bin/proxmox-backup-manager.rs | 216 +++++++++++++++++++++++-------
1 file changed, 169 insertions(+), 47 deletions(-)
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index b57f60223..768701829 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -11,7 +11,7 @@ use proxmox_sys::fs::CreateOptions;
use pbs_api_types::percent_encoding::percent_encode_component;
use pbs_api_types::{
- BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
+ BackupNamespace, GroupFilter, RateLimitConfig, SyncDirection, SyncJobConfig, DATASTORE_SCHEMA,
GROUP_FILTER_LIST_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, NS_MAX_DEPTH_SCHEMA,
REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, TRANSFER_LAST_SCHEMA, UPID_SCHEMA,
VERIFICATION_OUTDATED_AFTER_SCHEMA,
@@ -293,6 +293,72 @@ fn task_mgmt_cli() -> CommandLineInterface {
cmd_def.into()
}
+/// Sync datastore by pulling from another repository
+#[allow(clippy::too_many_arguments)]
+async fn sync_datastore(
+ remote: String,
+ remote_store: String,
+ remote_ns: Option<BackupNamespace>,
+ store: String,
+ ns: Option<BackupNamespace>,
+ remove_vanished: Option<bool>,
+ max_depth: Option<usize>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
+ transfer_last: Option<usize>,
+ param: Value,
+ sync_direction: SyncDirection,
+) -> Result<Value, Error> {
+ let output_format = get_output_format(¶m);
+
+ let client = connect_to_localhost()?;
+ let mut args = json!({
+ "store": store,
+ "remote": remote,
+ "remote-store": remote_store,
+ });
+
+ if remote_ns.is_some() {
+ args["remote-ns"] = json!(remote_ns);
+ }
+
+ if ns.is_some() {
+ args["ns"] = json!(ns);
+ }
+
+ if max_depth.is_some() {
+ args["max-depth"] = json!(max_depth);
+ }
+
+ if group_filter.is_some() {
+ args["group-filter"] = json!(group_filter);
+ }
+
+ if let Some(remove_vanished) = remove_vanished {
+ args["remove-vanished"] = Value::from(remove_vanished);
+ }
+
+ if transfer_last.is_some() {
+ args["transfer-last"] = json!(transfer_last)
+ }
+
+ let mut limit_json = json!(limit);
+ let limit_map = limit_json
+ .as_object_mut()
+ .ok_or_else(|| format_err!("limit is not an Object"))?;
+
+ args.as_object_mut().unwrap().append(limit_map);
+
+ let result = match sync_direction {
+ SyncDirection::Pull => client.post("api2/json/pull", Some(args)).await?,
+ SyncDirection::Push => client.post("api2/json/push", Some(args)).await?,
+ };
+
+ view_task_result(&client, result, &output_format).await?;
+
+ Ok(Value::Null)
+}
+
// fixme: avoid API redefinition
#[api(
input: {
@@ -341,7 +407,7 @@ fn task_mgmt_cli() -> CommandLineInterface {
}
}
)]
-/// Sync datastore from another repository
+/// Sync datastore by pulling from another repository
#[allow(clippy::too_many_arguments)]
async fn pull_datastore(
remote: String,
@@ -356,52 +422,100 @@ async fn pull_datastore(
transfer_last: Option<usize>,
param: Value,
) -> Result<Value, Error> {
- let output_format = get_output_format(¶m);
-
- let client = connect_to_localhost()?;
-
- let mut args = json!({
- "store": store,
- "remote": remote,
- "remote-store": remote_store,
- });
-
- if remote_ns.is_some() {
- args["remote-ns"] = json!(remote_ns);
- }
-
- if ns.is_some() {
- args["ns"] = json!(ns);
- }
-
- if max_depth.is_some() {
- args["max-depth"] = json!(max_depth);
- }
-
- if group_filter.is_some() {
- args["group-filter"] = json!(group_filter);
- }
-
- if let Some(remove_vanished) = remove_vanished {
- args["remove-vanished"] = Value::from(remove_vanished);
- }
-
- if transfer_last.is_some() {
- args["transfer-last"] = json!(transfer_last)
- }
-
- let mut limit_json = json!(limit);
- let limit_map = limit_json
- .as_object_mut()
- .ok_or_else(|| format_err!("limit is not an Object"))?;
-
- args.as_object_mut().unwrap().append(limit_map);
-
- let result = client.post("api2/json/pull", Some(args)).await?;
-
- view_task_result(&client, result, &output_format).await?;
+ sync_datastore(
+ remote,
+ remote_store,
+ remote_ns,
+ store,
+ ns,
+ remove_vanished,
+ max_depth,
+ group_filter,
+ limit,
+ transfer_last,
+ param,
+ SyncDirection::Pull,
+ )
+ .await
+}
- Ok(Value::Null)
+#[api(
+ input: {
+ properties: {
+ "store": {
+ schema: DATASTORE_SCHEMA,
+ },
+ "ns": {
+ type: BackupNamespace,
+ optional: true,
+ },
+ remote: {
+ schema: REMOTE_ID_SCHEMA,
+ },
+ "remote-store": {
+ schema: DATASTORE_SCHEMA,
+ },
+ "remote-ns": {
+ type: BackupNamespace,
+ optional: true,
+ },
+ "remove-vanished": {
+ schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
+ optional: true,
+ },
+ "max-depth": {
+ schema: NS_MAX_DEPTH_SCHEMA,
+ optional: true,
+ },
+ "group-filter": {
+ schema: GROUP_FILTER_LIST_SCHEMA,
+ optional: true,
+ },
+ limit: {
+ type: RateLimitConfig,
+ flatten: true,
+ },
+ "output-format": {
+ schema: OUTPUT_FORMAT,
+ optional: true,
+ },
+ "transfer-last": {
+ schema: TRANSFER_LAST_SCHEMA,
+ optional: true,
+ },
+ }
+ }
+)]
+/// Sync datastore from another repository
+#[allow(clippy::too_many_arguments)]
+async fn push_datastore(
+ remote: String,
+ remote_store: String,
+ remote_ns: Option<BackupNamespace>,
+ store: String,
+ ns: Option<BackupNamespace>,
+ remove_vanished: Option<bool>,
+ max_depth: Option<usize>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
+ transfer_last: Option<usize>,
+ param: Value,
+) -> Result<Value, Error> {
+ sync_datastore(
+ remote,
+ remote_store,
+ remote_ns,
+ store,
+ ns,
+ remove_vanished,
+ max_depth,
+ group_filter,
+ limit,
+ transfer_last,
+ param,
+ SyncDirection::Push,
+ )
+ .await
}
#[api(
@@ -527,6 +641,14 @@ async fn run() -> Result<(), Error> {
.completion_cb("group-filter", complete_remote_datastore_group_filter)
.completion_cb("remote-ns", complete_remote_datastore_namespace),
)
+ .insert(
+ "push",
+ CliCommand::new(&API_METHOD_PUSH_DATASTORE)
+ .arg_param(&["store", "remote", "remote-store"])
+ .completion_cb("store", pbs_config::datastore::complete_datastore_name)
+ .completion_cb("remote", pbs_config::remote::complete_remote_name)
+ .completion_cb("remote-store", complete_remote_datastore_name),
+ )
.insert(
"verify",
CliCommand::new(&API_METHOD_VERIFY)
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 23/24] form: group filter: allow to set namespace for local datastore
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (21 preceding siblings ...)
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 22/24] bin: manager: add datastore push cli command Christian Ebner
@ 2024-07-15 10:16 ` Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 24/24] www: sync edit: allow to set sync direction for sync jobs Christian Ebner
` (2 subsequent siblings)
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:16 UTC (permalink / raw)
To: pbs-devel
The namespace has to be set in order to get the correct groups for
use of the group filter with a local datastore as source, as
required for sync jobs in push direction.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
www/form/GroupFilter.js | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
diff --git a/www/form/GroupFilter.js b/www/form/GroupFilter.js
index c9c2d913e..7275b00ed 100644
--- a/www/form/GroupFilter.js
+++ b/www/form/GroupFilter.js
@@ -258,7 +258,11 @@ Ext.define('PBS.form.GroupFilter', {
return;
}
if (me.namespace) {
- url += `?namespace=${me.namespace}`;
+ if (me.remote) {
+ url += `?namespace=${me.namespace}`;
+ } else {
+ url += `?ns=${me.namespace}`;
+ }
}
me.setDsStoreUrl(url);
me.dsStore.load({
@@ -279,6 +283,18 @@ Ext.define('PBS.form.GroupFilter', {
}
me.remote = undefined;
me.datastore = datastore;
+ me.namespace = undefined;
+ me.updateGroupSelectors();
+ },
+
+ setLocalNamespace: function(datastore, namespace) {
+ let me = this;
+ if (me.datastore === datastore && me.namespace === namespace) {
+ return;
+ }
+ me.remote = undefined;
+ me.datastore = datastore;
+ me.namespace = namespace;
me.updateGroupSelectors();
},
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* [pbs-devel] [RFC proxmox-backup 24/24] www: sync edit: allow to set sync direction for sync jobs
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (22 preceding siblings ...)
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 23/24] form: group filter: allow to set namespace for local datastore Christian Ebner
@ 2024-07-15 10:16 ` Christian Ebner
2024-07-16 14:09 ` [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Gabriel Goller
2024-07-17 15:48 ` Thomas Lamprecht
25 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-15 10:16 UTC (permalink / raw)
To: pbs-devel
Adds a radio button which allows to set the sync direction when
creating/editing a sync job.
Forces to provide a remote in case of push direction and switch
fetching of the groups for selection in filters from remote to
local.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
www/window/SyncJobEdit.js | 60 ++++++++++++++++++++++++++++++++++++++-
1 file changed, 59 insertions(+), 1 deletion(-)
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index 6543995e8..6b291cdb4 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -144,6 +144,57 @@ Ext.define('PBS.window.SyncJobEdit', {
submitAutoScaledSizeUnit: true,
// NOTE: handle deleteEmpty in onGetValues due to bandwidth field having a cbind too
},
+ {
+ xtype: 'radiogroup',
+ fieldLabel: gettext('Sync Direction'),
+ defaultType: 'radiofield',
+ items: [
+ {
+ boxLabel: 'Pull',
+ name: 'sync-direction',
+ inputValue: 'pull',
+ checked: true,
+ },
+ {
+ boxLabel: 'Push',
+ name: 'sync-direction',
+ inputValue: 'push',
+ },
+ ],
+ listeners: {
+ change: function(_group, radio) {
+ let me = this;
+ let form = me.up('pbsSyncJobEdit');
+ let remoteField = form.down('field[name=remote]');
+ let remoteNsField = form.down('field[name=remote-ns]');
+ let remoteStoreField = form.down('field[name=remote-store]');
+ let locationField = form.down('field[name=location]');
+ let groupFilter = form.down('pbsGroupFilter');
+
+ let isSyncDirectionPull = radio['sync-direction'] === 'pull';
+ locationField.setDisabled(!isSyncDirectionPull);
+
+ if (isSyncDirectionPull) {
+ remoteField.setFieldLabel(gettext("Source Remote"));
+ remoteNsField.setFieldLabel(gettext("Source Namespace"));
+ remoteStoreField.setFieldLabel(gettext("Source Store"));
+ let remote = remoteField.getValue();
+ let remoteStore = remoteStoreField.getValue();
+ let remoteNs = remoteNsField.getValue();
+ groupFilter.setRemoteNamespace(remote, remoteStore, remoteNs);
+ } else {
+ remoteField.setDisabled(false);
+ remoteField.setFieldLabel(gettext("Target Remote"));
+ remoteNsField.setFieldLabel(gettext("Target Namespace"));
+ remoteStoreField.setFieldLabel(gettext("Target Store"));
+ locationField.setValue("remote");
+ let localStore = form.down('field[name=store]').getValue();
+ let localNamespace = form.down('field[name=ns]').getValue();
+ groupFilter.setLocalNamespace(localStore, localNamespace);
+ }
+ },
+ },
+ },
],
column2: [
@@ -238,7 +289,14 @@ Ext.define('PBS.window.SyncJobEdit', {
let remoteNamespaceField = me.up('pbsSyncJobEdit').down('field[name=remote-ns]');
remoteNamespaceField.setRemote(remote);
remoteNamespaceField.setRemoteStore(value);
- me.up('tabpanel').down('pbsGroupFilter').setRemoteDatastore(remote, value);
+
+ let syncDirection = me.up('pbsSyncJobEdit').down('field[name=sync-direction]').getValue();
+ if (syncDirection === 'pull') {
+ me.up('tabpanel').down('pbsGroupFilter').setRemoteDatastore(remote, value);
+ } else {
+ let localStore = me.up('pbsSyncJobEdit').down('field[name=store]').getValue();
+ me.up('tabpanel').down('pbsGroupFilter').setLocalDatastore(localStore);
+ }
},
},
},
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (23 preceding siblings ...)
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 24/24] www: sync edit: allow to set sync direction for sync jobs Christian Ebner
@ 2024-07-16 14:09 ` Gabriel Goller
2024-07-16 14:28 ` Christian Ebner
2024-07-23 14:00 ` Christian Ebner
2024-07-17 15:48 ` Thomas Lamprecht
25 siblings, 2 replies; 36+ messages in thread
From: Gabriel Goller @ 2024-07-16 14:09 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
Just a high level overview:
- In the frontend, there is no way to see the difference between a
push/pull sync job when the datastore names are the same. I think
it's better if we add a second table for the push jobs (on the same
page, something like in the "Prune & GC Jobs" tab.)
- Sometimes a group fails to sync, I get an "sync group 'vm/114' failed
pipelined request failed: no such chunk
bb9f8df61474d25e71fa00722318cd387396ca1736605e1248821cc0de3d3af8".
It seems like some chunks get added to the fixed index but haven't
been uploaded yet. (Because sometimes, I also get this error on the
receiving end: "POST /fixed_chunk: 400 Bad Request: backup already
marked as finished.".)
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote
2024-07-16 14:09 ` [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Gabriel Goller
@ 2024-07-16 14:28 ` Christian Ebner
2024-07-16 14:51 ` Gabriel Goller
2024-07-23 14:00 ` Christian Ebner
1 sibling, 1 reply; 36+ messages in thread
From: Christian Ebner @ 2024-07-16 14:28 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Gabriel Goller
On 7/16/24 16:09, Gabriel Goller wrote:
> Just a high level overview:
> - In the frontend, there is no way to see the difference between a
> push/pull sync job when the datastore names are the same. I think
> it's better if we add a second table for the push jobs (on the same
> page, something like in the "Prune & GC Jobs" tab.)
I would rather expose the configured sync direction as a dedicated
column in the sync job listing? Since they are logically both sync jobs,
the sync direction just being a config option.
But of course splitting the listing into separate ones for push and for
pull could also be an option to separate them visually even more.
> - Sometimes a group fails to sync, I get an "sync group 'vm/114'
> failed pipelined request failed: no such chunk
> bb9f8df61474d25e71fa00722318cd387396ca1736605e1248821cc0de3d3af8".
> It seems like some chunks get added to the fixed index but haven't
> been uploaded yet. (Because sometimes, I also get this error on the
> receiving end: "POST /fixed_chunk: 400 Bad Request: backup already
> marked as finished.".)
Oh, thanks for testing this!
As already mentioned off-list, I have the suspicion that a here a chunk
might get uploaded as `KnownChunk` although not present on the server
yet, as the push job shares the known chunk list for all snapshots.
Thank you for the feedback!
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote
2024-07-16 14:28 ` Christian Ebner
@ 2024-07-16 14:51 ` Gabriel Goller
2024-07-16 14:54 ` Christian Ebner
0 siblings, 1 reply; 36+ messages in thread
From: Gabriel Goller @ 2024-07-16 14:51 UTC (permalink / raw)
To: Christian Ebner; +Cc: Proxmox Backup Server development discussion
On 16.07.2024 16:28, Christian Ebner wrote:
>On 7/16/24 16:09, Gabriel Goller wrote:
>>Just a high level overview:
>> - In the frontend, there is no way to see the difference between a
>> push/pull sync job when the datastore names are the same. I think
>> it's better if we add a second table for the push jobs (on the same
>> page, something like in the "Prune & GC Jobs" tab.)
>
>I would rather expose the configured sync direction as a dedicated
>column in the sync job listing? Since they are logically both sync
>jobs, the sync direction just being a config option.
I would still split the list because the sync jobs already have 14
columns and if someone has only push or pull jobs, the field is kind of
a wasted column :/
>But of course splitting the listing into separate ones for push and
>for pull could also be an option to separate them visually even more.
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote
2024-07-16 14:51 ` Gabriel Goller
@ 2024-07-16 14:54 ` Christian Ebner
0 siblings, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-16 14:54 UTC (permalink / raw)
To: Gabriel Goller; +Cc: Proxmox Backup Server development discussion
> On 16.07.2024 16:51 CEST Gabriel Goller <g.goller@proxmox.com> wrote:
>
>
> On 16.07.2024 16:28, Christian Ebner wrote:
> >On 7/16/24 16:09, Gabriel Goller wrote:
> >>Just a high level overview:
> >> - In the frontend, there is no way to see the difference between a
> >> push/pull sync job when the datastore names are the same. I think
> >> it's better if we add a second table for the push jobs (on the same
> >> page, something like in the "Prune & GC Jobs" tab.)
> >
> >I would rather expose the configured sync direction as a dedicated
> >column in the sync job listing? Since they are logically both sync
> >jobs, the sync direction just being a config option.
>
> I would still split the list because the sync jobs already have 14
> columns and if someone has only push or pull jobs, the field is kind of
> a wasted column :/
Fair point!
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote
2024-07-16 14:09 ` [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Gabriel Goller
2024-07-16 14:28 ` Christian Ebner
@ 2024-07-23 14:00 ` Christian Ebner
1 sibling, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-23 14:00 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Gabriel Goller
On 7/16/24 16:09, Gabriel Goller wrote:
> Just a high level overview:
> - In the frontend, there is no way to see the difference between a
> push/pull sync job when the datastore names are the same. I think
> it's better if we add a second table for the push jobs (on the same
> page, something like in the "Prune & GC Jobs" tab.)
> - Sometimes a group fails to sync, I get an "sync group 'vm/114'
> failed pipelined request failed: no such chunk
> bb9f8df61474d25e71fa00722318cd387396ca1736605e1248821cc0de3d3af8".
> It seems like some chunks get added to the fixed index but haven't
> been uploaded yet. (Because sometimes, I also get this error on the
> receiving end: "POST /fixed_chunk: 400 Bad Request: backup already
> marked as finished.".)
For the record, the issue causing this seemingly unknown chunks is that
the server side also keeps track of the known chunks, as stored in the
`BackupEnvironment` state. This state however only lives as long as the
backup writer instance connection. Therefore, it is not possible to keep
all the known chunks for the full sync job.
The upcoming version of the patches will therefore use only the
orthogonal approach and download the previous snapshot manifest and
indexes and use that instead to avoid duplicate chunk upload. This also
mimics more closely a regular backup run.
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
` (24 preceding siblings ...)
2024-07-16 14:09 ` [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Gabriel Goller
@ 2024-07-17 15:48 ` Thomas Lamprecht
2024-07-18 7:36 ` Christian Ebner
2024-07-30 10:42 ` Christian Ebner
25 siblings, 2 replies; 36+ messages in thread
From: Thomas Lamprecht @ 2024-07-17 15:48 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Christian Ebner
Am 15/07/2024 um 12:15 schrieb Christian Ebner:
> While being mostly implemented, there are still some implementation
> details to be clarified, therefore requesting comments on the current
> state of the patch series.
>
> This patch series implements the functionality to extend the current
> sync jobs in pull direction by an additional push direction, allowing
> to push contents of a local source datastore to a remote target.
nice!
> The series implements this by using the REST API of the remote target
> for fetching, creating and/or deleting namespaces, groups and backups,
> and reuses the clients backup writer functionality to create snapshots
> by writing a manifeset on the remote target and sync the fixed index,
> dynamic index or blobs contained in the source manifest to the remote,
> preserving also encryption information.
>
> The patch series is structured as follows:
> - patches 1 to 5 are cleanup patches
> - patches 6 to 11 are patches restructuring the current code so that
> functionality of the current pull implementation can be reused for
> the push implementation as well
> - patches 12 and 13 extend the backup writers functionality to be able
> to push snapshots to the target
> - patches 14 to 16 are once again preparatory patches for shared
> implementation of sync jobs in pull and push direction
> - patch 17 defines the required permission acls and roles
> - patch 18 implements almost all of the logic required for the push,
> including pushing of the datastore, namespace, groups and snapshots,
> taking into account also filters and additional sync flags
> - patch 19 extends the current sync job configuration by a flag
> allowing to set the direction the sync job should operate, defaulting
> to pull.
> - patches 20 to 24 finally expose the new sync job direction via the
> API, CLI and WebUI.
>
> While most of the functionality is already in place, some open
> questions remain:
> - Remove vanished stats would require to expose additional information
> to the REST API endpoints for deletion of namespaces and groups
might be fine to add, but that could be also done later.
> - Performance for push: The current implementation only allows to
> re-upload known chunks which have been read from the local datastore,
> there is no download of previous snapshots to optimize this.
not 100% sure what you mean here. Is it that you always send all chunks
of affected backup snapshots as you cannot know which chunks are already
in the pool on the target side?
And with "no download of previous snapshots to optimize this" you mean
that you do not get the index of the previously synced snapshot to check
that for which chunks you can skip for now, but that could be done in
the future?
> - Permissions and roles: Currently, a dedicated role is implemented for
> a push operator. It remains to be clarified if the given permissions
> are to open, or if additional subsets of permissions might be
> warranted, e.g. to allow for removed vanished.
I mean, in the end the remote can already reduce the privileges to lock
usage to specific NS or forbid creating backups completely, so I think
that we'd be fine w.r.t. not being to open. That might be something to
mention in the docs though, as users might wonder why they get a
permission error when a sync job runs even though they had enough rights
to create it.
Besides that, while I did not think this through all to closely, the new
privileges seem OK to have.
They allow admins to give users access to a remote that uses rather
powerful credentials while still controlling roughly what a user can do
with it. While in untrusted environments one probably wants to avoid
that situation, in (semi-)trusted environments this can be nice to avoid
error potential of some automation while not requiring an admin to
configure many remotes for the same PBS, each using separate credentials
with a minimal set of privs granted.
>
> Christian Ebner (24):
> datastore: data blob: fix typos in comments
> server: pull: be more specific in module comment
> server: pull: silence clippy to many arguments warning
> www: sync edit: indetation style fix
> server: pull: fix sync info message for root namespace
applied the above 5 clean-up commits already, thanks!
btw. I agree with Gabriel w.r.t. having this separated a bit more
explicitly in the user interface; while for the implementation it might
not be that different, there's a big difference for the user. Making an
error and getting source and target swapped by mistake might lead to
some problematic results, like pruning some important snapshots as the
(wrongly) chosen source is empty.
So I'd not only show the jobs in separated grids (can be the same
panel though) but also use different add, edit, and remove dialogues and
buttons.
It might even make sense to evaluate using a different sync job section
type, and thus config, for these; not saying that's a must, but it myabe
could additionally help to avoid mistakes.
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote
2024-07-17 15:48 ` Thomas Lamprecht
@ 2024-07-18 7:36 ` Christian Ebner
2024-07-30 10:42 ` Christian Ebner
1 sibling, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-18 7:36 UTC (permalink / raw)
To: Thomas Lamprecht, Proxmox Backup Server development discussion
On 7/17/24 17:48, Thomas Lamprecht wrote:
> Am 15/07/2024 um 12:15 schrieb Christian Ebner:
>> While being mostly implemented, there are still some implementation
>> details to be clarified, therefore requesting comments on the current
>> state of the patch series.
>>
>> This patch series implements the functionality to extend the current
>> sync jobs in pull direction by an additional push direction, allowing
>> to push contents of a local source datastore to a remote target.
>
> nice!
>
>> The series implements this by using the REST API of the remote target
>> for fetching, creating and/or deleting namespaces, groups and backups,
>> and reuses the clients backup writer functionality to create snapshots
>> by writing a manifeset on the remote target and sync the fixed index,
>> dynamic index or blobs contained in the source manifest to the remote,
>> preserving also encryption information.
>>
>> The patch series is structured as follows:
>> - patches 1 to 5 are cleanup patches
>> - patches 6 to 11 are patches restructuring the current code so that
>> functionality of the current pull implementation can be reused for
>> the push implementation as well
>> - patches 12 and 13 extend the backup writers functionality to be able
>> to push snapshots to the target
>> - patches 14 to 16 are once again preparatory patches for shared
>> implementation of sync jobs in pull and push direction
>> - patch 17 defines the required permission acls and roles
>> - patch 18 implements almost all of the logic required for the push,
>> including pushing of the datastore, namespace, groups and snapshots,
>> taking into account also filters and additional sync flags
>> - patch 19 extends the current sync job configuration by a flag
>> allowing to set the direction the sync job should operate, defaulting
>> to pull.
>> - patches 20 to 24 finally expose the new sync job direction via the
>> API, CLI and WebUI.
>>
>> While most of the functionality is already in place, some open
>> questions remain:
>> - Remove vanished stats would require to expose additional information
>> to the REST API endpoints for deletion of namespaces and groups
>
> might be fine to add, but that could be also done later.
Okay, than I will see how to add this as well, but make sure to keep
these patches independent so this can be also left out until later.
After all this would break the APIs return values.
>
>> - Performance for push: The current implementation only allows to
>> re-upload known chunks which have been read from the local datastore,
>> there is no download of previous snapshots to optimize this.
>
> not 100% sure what you mean here. Is it that you always send all chunks
> of affected backup snapshots as you cannot know which chunks are already
> in the pool on the target side >
> And with "no download of previous snapshots to optimize this" you mean
> that you do not get the index of the previously synced snapshot to check
> that for which chunks you can skip for now, but that could be done in
> the future?
Sorry if I created confusion here, this is more of an open point rather
than a question (also my wording was not fully correct, should have been
re-index, not re-upload). But your understanding is correct.
Let me try to explain in a bit more detail: in the current
implementation, the sync job in push direction keeps track of all the
chunks uploaded by the backup writer upload streams for the various
snapshots. This is to avoid double upload of chunks, while still
re-indexing the chunks in the corresponding index files (however, here
still lingers the bug reported by Gabriel).
The next optimization would be to also fetch the previous snapshot for
each backup group already present on the sync target, and use its index
file to to avoid upload of these chunks already known to the targets
datastore as well, only re-indexing them for the snapshot to be pushed.
This implementation is currently still lacking, my intention is however
to add this as well.
>
>> - Permissions and roles: Currently, a dedicated role is implemented for
>> a push operator. It remains to be clarified if the given permissions
>> are to open, or if additional subsets of permissions might be
>> warranted, e.g. to allow for removed vanished.
>
> I mean, in the end the remote can already reduce the privileges to lock
> usage to specific NS or forbid creating backups completely, so I think
> that we'd be fine w.r.t. not being to open. That might be something to
> mention in the docs though, as users might wonder why they get a
> permission error when a sync job runs even though they had enough rights
> to create it.
>
> Besides that, while I did not think this through all to closely, the new
> privileges seem OK to have.
> They allow admins to give users access to a remote that uses rather
> powerful credentials while still controlling roughly what a user can do
> with it. While in untrusted environments one probably wants to avoid
> that situation, in (semi-)trusted environments this can be nice to avoid
> error potential of some automation while not requiring an admin to
> configure many remotes for the same PBS, each using separate credentials
> with a minimal set of privs granted.
Yes, true. In the end the privs on the target are these who matter and
win, so constrains configured on the source are less critical. Will also
add a draft section to the documentation in the next version of the series.
>
>>
>> Christian Ebner (24):
>> datastore: data blob: fix typos in comments
>> server: pull: be more specific in module comment
>> server: pull: silence clippy to many arguments warning
>> www: sync edit: indetation style fix
>> server: pull: fix sync info message for root namespace
>
> applied the above 5 clean-up commits already, thanks!
Nice, thanks!
>
> btw. I agree with Gabriel w.r.t. having this separated a bit more
> explicitly in the user interface; while for the implementation it might
> not be that different, there's a big difference for the user. Making an
> error and getting source and target swapped by mistake might lead to
> some problematic results, like pruning some important snapshots as the
> (wrongly) chosen source is empty.
Yes, both of your arguments convinced me that clearly separating the UI
makes more sense.
>
> So I'd not only show the jobs in separated grids (can be the same
> panel though) but also use different add, edit, and remove dialogues and
> buttons.
Okay, will go that route then.
>
> It might even make sense to evaluate using a different sync job section
> type, and thus config, for these; not saying that's a must, but it myabe
> could additionally help to avoid mistakes.
Will have a look at what this would imply as well.
Thanks a lot for the feedback!
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread
* Re: [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote
2024-07-17 15:48 ` Thomas Lamprecht
2024-07-18 7:36 ` Christian Ebner
@ 2024-07-30 10:42 ` Christian Ebner
1 sibling, 0 replies; 36+ messages in thread
From: Christian Ebner @ 2024-07-30 10:42 UTC (permalink / raw)
To: Thomas Lamprecht, Proxmox Backup Server development discussion
On 7/17/24 17:48, Thomas Lamprecht wrote:
>
> It might even make sense to evaluate using a different sync job section
> type, and thus config, for these; not saying that's a must, but it myabe
> could additionally help to avoid mistakes.
Had a closer look at this: If I see correctly, this would imply to
either create duplicate API endpoints for interacting with the config or
pass an additional sync direction variant flag to the preexisting
endpoints, in order to distinguish the section types and therefore sync
job variants while keeping the same `SyncJobConfig`. The latter seems to
be the more promising approach.
So I will go for adding a new section type `sync-push` and add the
`SyncDirection` parameters where required.
This separates the sync job variants cleanly in the config, making it
impossible to switch sync direction by simply deleting a line (switching
over the config type can not happen as easily, therefore better
protecting from accidental misconfiguration).
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 36+ messages in thread