public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs
@ 2024-07-25 10:19 Christian Ebner
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
                   ` (3 more replies)
  0 siblings, 4 replies; 9+ messages in thread
From: Christian Ebner @ 2024-07-25 10:19 UTC (permalink / raw)
  To: pbs-devel

Pulling contents from a remote source via a sync job suffers from low
throughput on high latency networks because of limitations by the
HTTP/2 connection, as described in [0]. As a workaround, pulling
multiple groups in parallel by establishing multiple reader instances
has been suggested.

This patch series therefore adds a configuration property
`group-sync-tasks` to sync jobs which allows to define the number of
concurrent group pull tasks for each job. This is currently not
exposed on the UI. A valid config would look like this:

```
sync: s-d2755441-cca9
	ns
	owner root@pam
	group-sync-tasks 4
	remote pbs-remote-source
	remote-ns
	remote-store store
	remove-vanished false
	schedule daily
	store pullstore
```

This brings improvements as roughly tested by artificially increasing
the latency on the bridge of the pull target host to 150ms via

`tc qdisc add dev vmbr0 root netem delay 150ms`

and verifying by pinging the remote source host that the latency
applied.

Pulling using 2 concurrent tasks reduced the task runtime by about
-25% as compared to only a single task, 4 configured tasks reduced the
runtime by about -30%.

The current approach however interferes with status logging of a sync
job, as now no sequence is guaranteed anymore. Therefore, the logs are
buffered instead and only shown after the corresponding group pull
tasks has been run to completion.

Sending this as RFC as I am not to happy with how logging is handled,
maybe somebody has a better idea.

[0] https://bugzilla.proxmox.com/show_bug.cgi?id=4182

Christian Ebner (4):
  api: config/sync: add optional group-sync-tasks property
  server: pull: factor out group pull task into helper
  fix #4182: server: sync: allow pulling groups concurrently
  server: pull: conditionally buffer parallel tasks log output

 pbs-api-types/src/jobs.rs           |  14 ++
 pbs-datastore/src/store_progress.rs |   2 +-
 src/api2/config/sync.rs             |  10 +
 src/api2/pull.rs                    |  13 +-
 src/server/pull.rs                  | 311 +++++++++++++++++++++-------
 5 files changed, 274 insertions(+), 76 deletions(-)

-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property
  2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
@ 2024-07-25 10:19 ` Christian Ebner
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Christian Ebner
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 9+ messages in thread
From: Christian Ebner @ 2024-07-25 10:19 UTC (permalink / raw)
  To: pbs-devel

Allow to configure from 1 up to 8 concurrent tasks to perform
multiple group syncs concurrently.
The property is exposed via the sync job config and passed to
the pull parameters for the sync job to setup and execute the tasks
accordingly.

Implements the schema definitions and includes the new property to
the `SyncJobConfig` and `PullParameters`.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-api-types/src/jobs.rs | 14 ++++++++++++++
 src/api2/config/sync.rs   | 10 ++++++++++
 src/api2/pull.rs          | 13 ++++++++++---
 src/server/pull.rs        |  4 ++++
 4 files changed, 38 insertions(+), 3 deletions(-)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 868702bc0..5e58787f7 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -64,6 +64,14 @@ pub const REMOVE_VANISHED_BACKUPS_SCHEMA: Schema = BooleanSchema::new(
 .default(false)
 .schema();
 
+const MAX_GROUP_SYNC_TASKS: usize = 8;
+pub const GROUP_SYNC_TASKS_SCHEMA: Schema =
+    IntegerSchema::new("Number of concurrent group pull tasks for sync jobs")
+        .minimum(1)
+        .maximum(MAX_GROUP_SYNC_TASKS as isize)
+        .default(1)
+        .schema();
+
 #[api(
     properties: {
         "next-run": {
@@ -552,6 +560,10 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
             schema: TRANSFER_LAST_SCHEMA,
             optional: true,
         },
+        "group-sync-tasks": {
+            schema: GROUP_SYNC_TASKS_SCHEMA,
+            optional: true,
+        },
     }
 )]
 #[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -585,6 +597,8 @@ pub struct SyncJobConfig {
     pub limit: RateLimitConfig,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub transfer_last: Option<usize>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub group_sync_tasks: Option<usize>,
 }
 
 impl SyncJobConfig {
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 6fdc69a9e..b6cf81328 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -231,6 +231,8 @@ pub enum DeletableProperty {
     MaxDepth,
     /// Delete the transfer_last property,
     TransferLast,
+    /// Delete the group_sync_tasks property,
+    GroupSyncTasks,
 }
 
 #[api(
@@ -331,6 +333,9 @@ pub fn update_sync_job(
                 DeletableProperty::TransferLast => {
                     data.transfer_last = None;
                 }
+                DeletableProperty::GroupSyncTasks => {
+                    data.group_sync_tasks = None;
+                }
             }
         }
     }
@@ -369,6 +374,10 @@ pub fn update_sync_job(
         data.transfer_last = Some(transfer_last);
     }
 
+    if let Some(group_sync_tasks) = update.group_sync_tasks {
+        data.group_sync_tasks = Some(group_sync_tasks);
+    }
+
     if update.limit.rate_in.is_some() {
         data.limit.rate_in = update.limit.rate_in;
     }
@@ -533,6 +542,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
         schedule: None,
         limit: pbs_api_types::RateLimitConfig::default(), // no limit
         transfer_last: None,
+        group_sync_tasks: None,
     };
 
     // should work without ACLs
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e733c9839..0756e0a51 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -8,9 +8,9 @@ use proxmox_schema::api;
 
 use pbs_api_types::{
     Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
-    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
-    PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
-    TRANSFER_LAST_SCHEMA,
+    GROUP_FILTER_LIST_SCHEMA, GROUP_SYNC_TASKS_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA,
+    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA,
+    REMOVE_VANISHED_BACKUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
 };
 use pbs_config::CachedUserInfo;
 use proxmox_human_byte::HumanByte;
@@ -89,6 +89,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
             sync_job.group_filter.clone(),
             sync_job.limit.clone(),
             sync_job.transfer_last,
+            sync_job.group_sync_tasks,
         )
     }
 }
@@ -240,6 +241,10 @@ pub fn do_sync_job(
                 schema: TRANSFER_LAST_SCHEMA,
                 optional: true,
             },
+            "group-sync-tasks": {
+                schema: GROUP_SYNC_TASKS_SCHEMA,
+                optional: true,
+            },
         },
     },
     access: {
@@ -264,6 +269,7 @@ async fn pull(
     group_filter: Option<Vec<GroupFilter>>,
     limit: RateLimitConfig,
     transfer_last: Option<usize>,
+    group_sync_tasks: Option<usize>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -301,6 +307,7 @@ async fn pull(
         group_filter,
         limit,
         transfer_last,
+        group_sync_tasks,
     )?;
 
     // fixme: set to_stdout to false?
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 823515e9a..80443132e 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -499,6 +499,8 @@ pub(crate) struct PullParameters {
     group_filter: Vec<GroupFilter>,
     /// How many snapshots should be transferred at most (taking the newest N snapshots)
     transfer_last: Option<usize>,
+    /// Number of concurrent group pull tasks for sync job
+    group_sync_tasks: Option<usize>,
 }
 
 impl PullParameters {
@@ -516,6 +518,7 @@ impl PullParameters {
         group_filter: Option<Vec<GroupFilter>>,
         limit: RateLimitConfig,
         transfer_last: Option<usize>,
+        group_sync_tasks: Option<usize>,
     ) -> Result<Self, Error> {
         if let Some(max_depth) = max_depth {
             ns.check_max_depth(max_depth)?;
@@ -560,6 +563,7 @@ impl PullParameters {
             max_depth,
             group_filter,
             transfer_last,
+            group_sync_tasks,
         })
     }
 }
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper
  2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
@ 2024-07-25 10:19 ` Christian Ebner
  2024-07-30 15:56   ` Gabriel Goller
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output Christian Ebner
  3 siblings, 1 reply; 9+ messages in thread
From: Christian Ebner @ 2024-07-25 10:19 UTC (permalink / raw)
  To: pbs-devel

Make the error handling and accounting logic for each group pull task
reusable by moving it into its own helper function, returning the
future.
The store progress is placed behind a reference counted mutex to
allow for concurrent access of status updates.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-datastore/src/store_progress.rs |   2 +-
 src/server/pull.rs                  | 102 +++++++++++++++++-----------
 2 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 /// Tracker for progress of operations iterating over `Datastore` contents.
 pub struct StoreProgress {
     /// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 80443132e..e2d155c78 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,10 @@
 //! Sync datastore by pulling contents from remote server
 
 use std::collections::{HashMap, HashSet};
+use std::future::Future;
 use std::io::{Seek, Write};
 use std::path::{Path, PathBuf};
+use std::pin::Pin;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, SystemTime};
@@ -1023,7 +1025,7 @@ async fn pull_group(
     params: &PullParameters,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<PullStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1079,7 +1081,10 @@ async fn pull_group(
     // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-    progress.group_snapshots = list.len() as u64;
+    {
+        let mut progress = progress.lock().unwrap();
+        progress.group_snapshots = list.len() as u64;
+    }
 
     let mut pull_stats = PullStats::default();
 
@@ -1095,8 +1100,11 @@ async fn pull_group(
             .await?;
         let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        {
+            let mut progress = progress.lock().unwrap();
+            progress.done_snapshots = pos as u64 + 1;
+            info!("percentage done: {progress}");
+        }
 
         let stats = result?; // stop on error
         pull_stats.add(stats);
@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
     Ok(pull_stats)
 }
 
+fn pull_group_task<'future>(
+    params: &'future PullParameters,
+    group: &'future BackupGroup,
+    namespace: &'future BackupNamespace,
+    target_namespace: &'future BackupNamespace,
+    progress: StoreProgress,
+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>
+{
+    Box::pin(async move {
+        let progress = Arc::new(Mutex::new(progress));
+        let mut pull_stats = PullStats::default();
+        let mut errors = false;
+
+        let (owner, _lock_guard) = match params.target.store.create_locked_backup_group(
+            target_namespace,
+            group,
+            &params.owner,
+        ) {
+            Ok(result) => result,
+            Err(err) => {
+                info!("sync group {group} failed - group lock failed: {err}");
+                errors = true;
+                // do not stop here, instead continue
+                info!("create_locked_backup_group failed");
+                return Ok((progress.lock().unwrap().clone(), pull_stats, errors));
+            }
+        };
+
+        // permission check
+        if params.owner != owner {
+            // only the owner is allowed to create additional snapshots
+            info!(
+                "sync group {group} failed - owner check failed ({} != {owner})",
+                params.owner,
+            );
+            errors = true; // do not stop here, instead continue
+        } else {
+            match pull_group(params, namespace, group, progress.clone()).await {
+                Ok(stats) => pull_stats.add(stats),
+                Err(err) => {
+                    info!("sync group {group} failed - {err}");
+                    errors = true; // do not bail here, instead continue
+                }
+            }
+        }
+
+        let progress = progress.lock().unwrap().clone();
+        Ok((progress, pull_stats, errors))
+    })
+}
+
 /// Pulls a namespace according to `params`.
 ///
 /// Pulling a namespace consists of the following steps:
@@ -1402,40 +1461,7 @@ pub(crate) async fn pull_ns(
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
-
-        let (owner, _lock_guard) =
-            match params
-                .target
-                .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
-            {
-                Ok(result) => result,
-                Err(err) => {
-                    info!("sync group {} failed - group lock failed: {err}", &group);
-                    errors = true;
-                    // do not stop here, instead continue
-                    info!("create_locked_backup_group failed");
-                    continue;
-                }
-            };
-
-        // permission check
-        if params.owner != owner {
-            // only the owner is allowed to create additional snapshots
-            info!(
-                "sync group {} failed - owner check failed ({} != {owner})",
-                &group, params.owner
-            );
-            errors = true; // do not stop here, instead continue
-        } else {
-            match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => pull_stats.add(stats),
-                Err(err) => {
-                    info!("sync group {} failed - {err}", &group);
-                    errors = true; // do not stop here, instead continue
-                }
-            }
-        }
+        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
     }
 
     if params.remove_vanished {
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently
  2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Christian Ebner
@ 2024-07-25 10:19 ` Christian Ebner
  2024-07-30 15:54   ` Gabriel Goller
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output Christian Ebner
  3 siblings, 1 reply; 9+ messages in thread
From: Christian Ebner @ 2024-07-25 10:19 UTC (permalink / raw)
  To: pbs-devel

Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them, therefore being limited in download
speed by the http2 connection of the source reader instance in case
of remote syncs. High latency networks suffer from limited download
speed.

Improve the throughput by allowing to pull up to a configured number
of backup groups concurrently, by creating tasks connecting and
pulling from the remote source in parallel.

Link to issue in bugtracker:
https://bugzilla.proxmox.com/show_bug.cgi?id=4182

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/pull.rs | 50 ++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 42 insertions(+), 8 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index e2d155c78..0a54217d4 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -10,6 +10,8 @@ use std::sync::{Arc, Mutex};
 use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Error};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
 use http::StatusCode;
 use proxmox_human_byte::HumanByte;
 use proxmox_router::HttpError;
@@ -1452,16 +1454,48 @@ pub(crate) async fn pull_ns(
         new_groups.insert(group.clone());
     }
 
-    let mut progress = StoreProgress::new(list.len() as u64);
-    let mut pull_stats = PullStats::default();
+    let mut store_progress = StoreProgress::new(list.len() as u64);
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
-        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
+    let mut pull_group_tasks = FuturesUnordered::new();
+
+    let mut list_iter = list.iter();
+    // queue up to requested number of initial group sync tasks to the task pool
+    for _ in 0..params.group_sync_tasks.unwrap_or(1) {
+        if let Some(group) = list_iter.next() {
+            let task_progress = StoreProgress::new(list.len() as u64);
+            pull_group_tasks.push(pull_group_task(
+                params,
+                group,
+                namespace,
+                &target_ns,
+                task_progress,
+            ));
+        }
+    }
+
+    let mut pull_stats = PullStats::default();
+    // poll to initiate tasks, queue another remaining tasks for each finished one
+    while let Some(result) = pull_group_tasks.next().await {
+        let (progress, stats, has_errors) = result?;
+        errors |= has_errors;
+        pull_stats.add(stats);
+        store_progress.done_groups += progress.done_groups;
+        store_progress.done_snapshots += progress.done_snapshots;
+
+        matches!(params.group_sync_tasks, Some(n) if n > 1);
+        // queue another remaining group sync to the task pool
+        if let Some(group) = list_iter.next() {
+            let task_progress = StoreProgress::new(list.len() as u64);
+            pull_group_tasks.push(pull_group_task(
+                params,
+                group,
+                namespace,
+                &target_ns,
+                task_progress,
+            ));
+        }
     }
 
     if params.remove_vanished {
@@ -1516,5 +1550,5 @@ pub(crate) async fn pull_ns(
         };
     }
 
-    Ok((progress, pull_stats, errors))
+    Ok((store_progress, pull_stats, errors))
 }
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output
  2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
                   ` (2 preceding siblings ...)
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
@ 2024-07-25 10:19 ` Christian Ebner
  3 siblings, 0 replies; 9+ messages in thread
From: Christian Ebner @ 2024-07-25 10:19 UTC (permalink / raw)
  To: pbs-devel

In order to keep the log messages in a meaningful order when running
using parallel connections to sync backup groups, buffer them in the
sync stats and only display them when the corresponding task is
finished.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/pull.rs | 165 ++++++++++++++++++++++++++++++++++++---------
 1 file changed, 134 insertions(+), 31 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0a54217d4..109cd3d1c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -89,6 +89,7 @@ pub(crate) struct PullStats {
     pub(crate) bytes: usize,
     pub(crate) elapsed: Duration,
     pub(crate) removed: Option<RemovedVanishedStats>,
+    pub(crate) log_buffer: Vec<String>,
 }
 
 impl From<RemovedVanishedStats> for PullStats {
@@ -101,10 +102,11 @@ impl From<RemovedVanishedStats> for PullStats {
 }
 
 impl PullStats {
-    fn add(&mut self, rhs: PullStats) {
+    fn add(&mut self, mut rhs: PullStats) {
         self.chunk_count += rhs.chunk_count;
         self.bytes += rhs.bytes;
         self.elapsed += rhs.elapsed;
+        self.log_buffer.append(&mut rhs.log_buffer);
 
         if let Some(rhs_removed) = rhs.removed {
             if let Some(ref mut removed) = self.removed {
@@ -443,7 +445,6 @@ impl PullReader for RemoteReader {
             if let Err(err) = std::fs::rename(&tmp_path, to_path) {
                 bail!("Atomic rename file {:?} failed - {}", to_path, err);
             }
-            info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
         }
 
         Ok(())
@@ -577,6 +578,7 @@ async fn pull_index_chunks<I: IndexFile>(
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -658,17 +660,20 @@ async fn pull_index_chunks<I: IndexFile>(
     let bytes = bytes.load(Ordering::SeqCst);
     let chunk_count = chunk_count.load(Ordering::SeqCst);
 
-    info!(
+    let mut log_buffer = Vec::new();
+    let msg = format!(
         "downloaded {} ({}/s)",
         HumanByte::from(bytes),
         HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
     );
+    log_info_buffer(msg, buffer_logs, &mut log_buffer);
 
     Ok(PullStats {
         chunk_count,
         bytes,
         elapsed,
         removed: None,
+        log_buffer,
     })
 }
 
@@ -702,6 +707,7 @@ async fn pull_single_archive<'a>(
     snapshot: &'a pbs_datastore::BackupDir,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
@@ -712,7 +718,11 @@ async fn pull_single_archive<'a>(
 
     let mut pull_stats = PullStats::default();
 
-    info!("sync archive {archive_name}");
+    log_info_buffer(
+        format!("sync archive {archive_name}"),
+        buffer_logs,
+        &mut pull_stats.log_buffer,
+    );
 
     reader.load_file_into(archive_name, &tmp_path).await?;
 
@@ -727,13 +737,18 @@ async fn pull_single_archive<'a>(
             verify_archive(archive_info, &csum, size)?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                log_info_buffer(
+                    "skipping chunk sync for same datastore".to_string(),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             } else {
                 let stats = pull_index_chunks(
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    buffer_logs,
                 )
                 .await?;
                 pull_stats.add(stats);
@@ -747,13 +762,18 @@ async fn pull_single_archive<'a>(
             verify_archive(archive_info, &csum, size)?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                log_info_buffer(
+                    "skipping chunk sync for same datastore".to_string(),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             } else {
                 let stats = pull_index_chunks(
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    buffer_logs,
                 )
                 .await?;
                 pull_stats.add(stats);
@@ -784,6 +804,7 @@ async fn pull_snapshot<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let mut pull_stats = PullStats::default();
     let mut manifest_name = snapshot.full_path();
@@ -820,8 +841,17 @@ async fn pull_snapshot<'a>(
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
                 reader.try_download_client_log(&client_log_name).await?;
+                log_info_buffer(
+                    format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             };
-            info!("no data changes");
+            log_info_buffer(
+                "no data changes".to_string(),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(pull_stats); // nothing changed
         }
@@ -841,7 +871,11 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
@@ -851,7 +885,11 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
@@ -861,15 +899,25 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
             }
         }
 
-        let stats =
-            pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
+        let stats = pull_single_archive(
+            reader.clone(),
+            snapshot,
+            item,
+            downloaded_chunks.clone(),
+            buffer_logs,
+        )
+        .await?;
         pull_stats.add(stats);
     }
 
@@ -879,6 +927,11 @@ async fn pull_snapshot<'a>(
 
     if !client_log_name.exists() {
         reader.try_download_client_log(&client_log_name).await?;
+        log_info_buffer(
+            format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
     };
     snapshot
         .cleanup_unreferenced_files(&manifest)
@@ -895,15 +948,21 @@ async fn pull_snapshot_from<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let (_path, is_new, _snap_lock) = snapshot
         .datastore()
         .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
 
-    let pull_stats = if is_new {
-        info!("sync snapshot {}", snapshot.dir());
+    let mut pull_stats = PullStats::default();
+    if is_new {
+        log_info_buffer(
+            format!("sync snapshot {}", snapshot.dir()),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
 
-        match pull_snapshot(reader, snapshot, downloaded_chunks).await {
+        match pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await {
             Err(err) => {
                 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
                     snapshot.backup_ns(),
@@ -914,14 +973,23 @@ async fn pull_snapshot_from<'a>(
                 }
                 return Err(err);
             }
-            Ok(pull_stats) => {
-                info!("sync snapshot {} done", snapshot.dir());
-                pull_stats
+            Ok(stats) => {
+                pull_stats.add(stats);
+                log_info_buffer(
+                    format!("sync snapshot {}", snapshot.dir()),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             }
         }
     } else {
-        info!("re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(reader, snapshot, downloaded_chunks).await?
+        log_info_buffer(
+            format!("re-sync snapshot {}", snapshot.dir()),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
+        let stats = pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await?;
+        pull_stats.add(stats);
     };
 
     Ok(pull_stats)
@@ -1054,6 +1122,8 @@ async fn pull_group(
         .last_successful_backup(&target_ns, group)?
         .unwrap_or(i64::MIN);
 
+    let mut pull_stats = PullStats::default();
+    let buffer_logs = matches!(params.group_sync_tasks, Some(n) if n > 1);
     let list: Vec<BackupDir> = raw_list
         .into_iter()
         .enumerate()
@@ -1063,7 +1133,11 @@ async fn pull_group(
                 already_synced_skip_info.update(dir.time);
                 return false;
             } else if already_synced_skip_info.count > 0 {
-                info!("{already_synced_skip_info}");
+                log_info_buffer(
+                    format!("{already_synced_skip_info}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
                 already_synced_skip_info.reset();
                 return true;
             }
@@ -1072,7 +1146,11 @@ async fn pull_group(
                 transfer_last_skip_info.update(dir.time);
                 return false;
             } else if transfer_last_skip_info.count > 0 {
-                info!("{transfer_last_skip_info}");
+                log_info_buffer(
+                    format!("{transfer_last_skip_info}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
                 transfer_last_skip_info.reset();
             }
             true
@@ -1088,8 +1166,6 @@ async fn pull_group(
         progress.group_snapshots = list.len() as u64;
     }
 
-    let mut pull_stats = PullStats::default();
-
     for (pos, from_snapshot) in list.into_iter().enumerate() {
         let to_snapshot = params
             .target
@@ -1100,12 +1176,17 @@ async fn pull_group(
             .source
             .reader(source_namespace, &from_snapshot)
             .await?;
-        let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
+        let result =
+            pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), buffer_logs).await;
 
         {
             let mut progress = progress.lock().unwrap();
             progress.done_snapshots = pos as u64 + 1;
-            info!("percentage done: {progress}");
+            log_info_buffer(
+                format!("percentage done: {progress}"),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
         }
 
         let stats = result?; // stop on error
@@ -1124,13 +1205,21 @@ async fn pull_group(
                 continue;
             }
             if snapshot.is_protected() {
-                info!(
-                    "don't delete vanished snapshot {} (protected)",
-                    snapshot.dir()
+                log_info_buffer(
+                    format!(
+                        "don't delete vanished snapshot {} (protected)",
+                        snapshot.dir()
+                    ),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
                 );
                 continue;
             }
-            info!("delete vanished snapshot {}", snapshot.dir());
+            log_info_buffer(
+                format!("delete vanished snapshot {}", snapshot.dir()),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
             params
                 .target
                 .store
@@ -1478,8 +1567,14 @@ pub(crate) async fn pull_ns(
     let mut pull_stats = PullStats::default();
     // poll to initiate tasks, queue another remaining tasks for each finished one
     while let Some(result) = pull_group_tasks.next().await {
-        let (progress, stats, has_errors) = result?;
+        let (progress, mut stats, has_errors) = result?;
         errors |= has_errors;
+        // Generate log output
+        for log_line in stats.log_buffer.iter() {
+            info!("{log_line}");
+        }
+        // clear log buffer before adding, don't need the logs anymore
+        stats.log_buffer.clear();
         pull_stats.add(stats);
         store_progress.done_groups += progress.done_groups;
         store_progress.done_snapshots += progress.done_snapshots;
@@ -1552,3 +1647,11 @@ pub(crate) async fn pull_ns(
 
     Ok((store_progress, pull_stats, errors))
 }
+
+fn log_info_buffer(msg: String, buffer_logs: bool, buffer: &mut Vec<String>) {
+    if buffer_logs {
+        buffer.push(msg);
+    } else {
+        info!("{msg}");
+    }
+}
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
@ 2024-07-30 15:54   ` Gabriel Goller
  2024-07-31  7:35     ` Christian Ebner
  0 siblings, 1 reply; 9+ messages in thread
From: Gabriel Goller @ 2024-07-30 15:54 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On 25.07.2024 12:19, Christian Ebner wrote:
>Currently, a sync job sequentially pulls the backup groups and the
>snapshots contained within them, therefore being limited in download
>speed by the http2 connection of the source reader instance in case
>of remote syncs. High latency networks suffer from limited download
>speed.
>
>Improve the throughput by allowing to pull up to a configured number
>of backup groups concurrently, by creating tasks connecting and
>pulling from the remote source in parallel.
>
>Link to issue in bugtracker:
>https://bugzilla.proxmox.com/show_bug.cgi?id=4182
>
>Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
>---
> src/server/pull.rs | 50 ++++++++++++++++++++++++++++++++++++++--------
> 1 file changed, 42 insertions(+), 8 deletions(-)
>
>diff --git a/src/server/pull.rs b/src/server/pull.rs
>index e2d155c78..0a54217d4 100644
>--- a/src/server/pull.rs
>+++ b/src/server/pull.rs
>@@ -10,6 +10,8 @@ use std::sync::{Arc, Mutex};
> use std::time::{Duration, SystemTime};
>
> use anyhow::{bail, format_err, Error};
>+use futures::stream::FuturesUnordered;
>+use futures::StreamExt;
> use http::StatusCode;
> use proxmox_human_byte::HumanByte;
> use proxmox_router::HttpError;
>@@ -1452,16 +1454,48 @@ pub(crate) async fn pull_ns(
>         new_groups.insert(group.clone());
>     }
>
>-    let mut progress = StoreProgress::new(list.len() as u64);
>-    let mut pull_stats = PullStats::default();
>+    let mut store_progress = StoreProgress::new(list.len() as u64);
>
>     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
>
>-    for (done, group) in list.into_iter().enumerate() {
>-        progress.done_groups = done as u64;
>-        progress.done_snapshots = 0;
>-        progress.group_snapshots = 0;
>-        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
>+    let mut pull_group_tasks = FuturesUnordered::new();
>+
>+    let mut list_iter = list.iter();
>+    // queue up to requested number of initial group sync tasks to the task pool
>+    for _ in 0..params.group_sync_tasks.unwrap_or(1) {
>+        if let Some(group) = list_iter.next() {
>+            let task_progress = StoreProgress::new(list.len() as u64);
>+            pull_group_tasks.push(pull_group_task(
>+                params,
>+                group,
>+                namespace,
>+                &target_ns,
>+                task_progress,
>+            ));
>+        }
>+    }
>+
>+    let mut pull_stats = PullStats::default();
>+    // poll to initiate tasks, queue another remaining tasks for each finished one
>+    while let Some(result) = pull_group_tasks.next().await {
>+        let (progress, stats, has_errors) = result?;
>+        errors |= has_errors;
>+        pull_stats.add(stats);
>+        store_progress.done_groups += progress.done_groups;
>+        store_progress.done_snapshots += progress.done_snapshots;
>+
>+        matches!(params.group_sync_tasks, Some(n) if n > 1);

This can be removed, it does nothing.

>+        // queue another remaining group sync to the task pool
>+        if let Some(group) = list_iter.next() {
>+            let task_progress = StoreProgress::new(list.len() as u64);
>+            pull_group_tasks.push(pull_group_task(
>+                params,
>+                group,
>+                namespace,
>+                &target_ns,
>+                task_progress,
>+            ));
>+        }
>     }
>
>     if params.remove_vanished {
>@@ -1516,5 +1550,5 @@ pub(crate) async fn pull_ns(
>         };
>     }
>
>-    Ok((progress, pull_stats, errors))
>+    Ok((store_progress, pull_stats, errors))
> }
>-- 
>2.39.2
>
>
>
>_______________________________________________
>pbs-devel mailing list
>pbs-devel@lists.proxmox.com
>https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper
  2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Christian Ebner
@ 2024-07-30 15:56   ` Gabriel Goller
  2024-07-31  7:38     ` Christian Ebner
  0 siblings, 1 reply; 9+ messages in thread
From: Gabriel Goller @ 2024-07-30 15:56 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On 25.07.2024 12:19, Christian Ebner wrote:
>Make the error handling and accounting logic for each group pull task
>reusable by moving it into its own helper function, returning the
>future.
>The store progress is placed behind a reference counted mutex to
>allow for concurrent access of status updates.
>
>Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
>---
> pbs-datastore/src/store_progress.rs |   2 +-
> src/server/pull.rs                  | 102 +++++++++++++++++-----------
> 2 files changed, 65 insertions(+), 39 deletions(-)
>
>diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
>index a32bb9a9d..8afa60ace 100644
>--- a/pbs-datastore/src/store_progress.rs
>+++ b/pbs-datastore/src/store_progress.rs
>@@ -1,4 +1,4 @@
>-#[derive(Debug, Default)]
>+#[derive(Clone, Debug, Default)]
> /// Tracker for progress of operations iterating over `Datastore` contents.
> pub struct StoreProgress {
>     /// Completed groups
>diff --git a/src/server/pull.rs b/src/server/pull.rs
>index 80443132e..e2d155c78 100644
>--- a/src/server/pull.rs
>+++ b/src/server/pull.rs
>@@ -1,8 +1,10 @@
> //! Sync datastore by pulling contents from remote server
>
> use std::collections::{HashMap, HashSet};
>+use std::future::Future;
> use std::io::{Seek, Write};
> use std::path::{Path, PathBuf};
>+use std::pin::Pin;
> use std::sync::atomic::{AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> use std::time::{Duration, SystemTime};
>@@ -1023,7 +1025,7 @@ async fn pull_group(
>     params: &PullParameters,
>     source_namespace: &BackupNamespace,
>     group: &BackupGroup,
>-    progress: &mut StoreProgress,
>+    progress: Arc<Mutex<StoreProgress>>,
> ) -> Result<PullStats, Error> {
>     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
>     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
>@@ -1079,7 +1081,10 @@ async fn pull_group(
>     // start with 65536 chunks (up to 256 GiB)
>     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>
>-    progress.group_snapshots = list.len() as u64;
>+    {
>+        let mut progress = progress.lock().unwrap();
>+        progress.group_snapshots = list.len() as u64;
>+    }
>
>     let mut pull_stats = PullStats::default();
>
>@@ -1095,8 +1100,11 @@ async fn pull_group(
>             .await?;
>         let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
>
>-        progress.done_snapshots = pos as u64 + 1;
>-        info!("percentage done: {progress}");
>+        {
>+            let mut progress = progress.lock().unwrap();
>+            progress.done_snapshots = pos as u64 + 1;
>+            info!("percentage done: {progress}");
>+        }
>
>         let stats = result?; // stop on error
>         pull_stats.add(stats);
>@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
>     Ok(pull_stats)
> }
>
>+fn pull_group_task<'future>(
>+    params: &'future PullParameters,
>+    group: &'future BackupGroup,
>+    namespace: &'future BackupNamespace,
>+    target_namespace: &'future BackupNamespace,
>+    progress: StoreProgress,
>+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>

This should be the same as making the function async:

async fn pull_group_task(...) -> Result<(...), Error> {}


Just posted these two things on the mailing list so that I don't forget
it, will follow up with a more detailed review. Will also have a look at
how we can improve the logging as it's quite janky atm :).


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently
  2024-07-30 15:54   ` Gabriel Goller
@ 2024-07-31  7:35     ` Christian Ebner
  0 siblings, 0 replies; 9+ messages in thread
From: Christian Ebner @ 2024-07-31  7:35 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Gabriel Goller

On 7/30/24 17:54, Gabriel Goller wrote:
> On 25.07.2024 12:19, Christian Ebner wrote:
>> +        matches!(params.group_sync_tasks, Some(n) if n > 1);
> 
> This can be removed, it does nothing.
> 

True, will be removed in the next iteration of the patches.



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper
  2024-07-30 15:56   ` Gabriel Goller
@ 2024-07-31  7:38     ` Christian Ebner
  0 siblings, 0 replies; 9+ messages in thread
From: Christian Ebner @ 2024-07-31  7:38 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Gabriel Goller

On 7/30/24 17:56, Gabriel Goller wrote:
> On 25.07.2024 12:19, Christian Ebner wrote:
>>
>> +fn pull_group_task<'future>(
>> +    params: &'future PullParameters,
>> +    group: &'future BackupGroup,
>> +    namespace: &'future BackupNamespace,
>> +    target_namespace: &'future BackupNamespace,
>> +    progress: StoreProgress,
>> +) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, 
>> bool), Error>> + Send + 'future>>
> 
> This should be the same as making the function async:
> 
> async fn pull_group_task(...) -> Result<(...), Error> {}
> 
> 
> Just posted these two things on the mailing list so that I don't forget
> it, will follow up with a more detailed review. Will also have a look at
> how we can improve the logging as it's quite janky atm :).

Yes, thanks for the suggestion, makes above declaration much more 
readable and concise.

Also, thanks a lot for the second pair of eyes on the logging, very much 
appreciated!



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2024-07-31  7:38 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Christian Ebner
2024-07-30 15:56   ` Gabriel Goller
2024-07-31  7:38     ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2024-07-30 15:54   ` Gabriel Goller
2024-07-31  7:35     ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output Christian Ebner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal