* [PATCH proxmox v8 01/10] pbs api types: add `worker-threads` to sync job config
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox-backup v8 02/10] tools: implement buffered logger for concurrent log messages Christian Ebner
` (9 subsequent siblings)
10 siblings, 0 replies; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
Allow to specify the number of concurrent worker threads used to sync
groups for sync jobs. Values can range from the current 1 to 32,
although higher number of threads will saturate with respect to
performance improvements.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-api-types/src/jobs.rs | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 7e6dfb94..c4e6dda6 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -88,6 +88,11 @@ pub const VERIFY_JOB_VERIFY_THREADS_SCHEMA: Schema = threads_schema(
4,
);
+pub const SYNC_WORKER_THREADS_SCHEMA: Schema = threads_schema(
+ "The number of worker threads to process groups in parallel.",
+ 1,
+);
+
#[api(
properties: {
"next-run": {
@@ -664,6 +669,10 @@ pub const UNMOUNT_ON_SYNC_DONE_SCHEMA: Schema =
type: SyncDirection,
optional: true,
},
+ "worker-threads": {
+ schema: SYNC_WORKER_THREADS_SCHEMA,
+ optional: true,
+ },
}
)]
#[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -709,6 +718,8 @@ pub struct SyncJobConfig {
pub unmount_on_done: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_direction: Option<SyncDirection>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub worker_threads: Option<usize>,
}
impl SyncJobConfig {
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* [PATCH proxmox-backup v8 02/10] tools: implement buffered logger for concurrent log messages
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox v8 01/10] pbs api types: add `worker-threads` to sync job config Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 17:25 ` Thomas Lamprecht
2026-04-22 13:18 ` [PATCH proxmox-backup v8 03/10] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
` (8 subsequent siblings)
10 siblings, 1 reply; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
The BufferedLogger instance collects messages send from different
sender instances via an async tokio channel and buffers them. Sender
identify by label and provide a log level for each log line to be
buffered and flushed.
On collection, log lines are grouped by label and buffered in
sequence of arrival per label, up to the configured maximum number of
per group lines. In addition, the last logged timestamp is updated,
which will be used to periodically flush log lines per-label with the
configured interval. There is no guarantee on the order of labels when
flushing.
In addition, senders can request flushing at any given point, all logs
are flushed when closing the sender builder.
Log output is written based on provided log line level and prefixed
by the label.
Co-developed-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-tools/Cargo.toml | 3 +
pbs-tools/src/buffered_logger.rs | 238 +++++++++++++++++++++++++++++++
pbs-tools/src/lib.rs | 1 +
3 files changed, 242 insertions(+)
create mode 100644 pbs-tools/src/buffered_logger.rs
diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
index 998e3077e..8bc01e85d 100644
--- a/pbs-tools/Cargo.toml
+++ b/pbs-tools/Cargo.toml
@@ -8,6 +8,7 @@ description = "common tools used throughout pbs"
# This must not depend on any subcrates more closely related to pbs itself.
[dependencies]
anyhow.workspace = true
+async-trait.workspace = true
bytes.workspace = true
foreign-types.workspace = true
hex.workspace = true
@@ -17,10 +18,12 @@ openssl.workspace = true
serde_json.workspace = true
# rt-multi-thread is required for block_in_place
tokio = { workspace = true, features = [ "fs", "io-util", "rt", "rt-multi-thread", "sync" ] }
+tracing.workspace = true
proxmox-async.workspace = true
proxmox-io = { workspace = true, features = [ "tokio" ] }
proxmox-human-byte.workspace = true
+proxmox-log.workspace = true
proxmox-sys.workspace = true
proxmox-time.workspace = true
diff --git a/pbs-tools/src/buffered_logger.rs b/pbs-tools/src/buffered_logger.rs
new file mode 100644
index 000000000..4e9bf8651
--- /dev/null
+++ b/pbs-tools/src/buffered_logger.rs
@@ -0,0 +1,238 @@
+//! Log aggregator to collect and group messages sent from concurrent tasks via
+//! a tokio channel.
+
+use std::collections::hash_map::Entry;
+use std::collections::HashMap;
+use std::time::Duration;
+
+use anyhow::Error;
+use tokio::sync::mpsc;
+use tokio::time::Instant;
+use tracing::{debug, error, info, trace, warn, Level};
+
+use proxmox_log::LogContext;
+
+/// Label to be used to group buffered messages when flushing.
+pub type SenderLabel = String;
+
+/// Requested action for the log collection task
+enum SenderRequest {
+ /// New log line to be buffered
+ Message(SenderLabel, LogLine),
+ /// Flush currently buffered log lines associated by sender label
+ Flush(SenderLabel),
+ /// Closes the channel, flushing all buffered log lines and preventing further logging
+ Close,
+}
+
+/// Receives log lines tagged with a label, and buffers them grouped
+/// by the label value. Buffered messages are flushed either after
+/// reaching a certain timeout or capacity limit, or when explicitly
+/// requested.
+pub struct BufferedLogger {
+ /// Buffer to aggregate log lines and last message received instant based on sender label
+ buffer_map: HashMap<SenderLabel, (Instant, Vec<LogLine>)>,
+ /// Maximum number of received lines for an individual sender instance before
+ /// flushing
+ max_buffered_lines: usize,
+ /// Maximum aggregation duration of received lines for an individual sender
+ /// instance before flushing
+ max_aggregation_time: Duration,
+ /// Channel to receive log messages
+ receiver: mpsc::Receiver<SenderRequest>,
+}
+
+/// Instance to create new sender instances by cloning the channel sender
+pub struct LogLineSenderBuilder {
+ // Used to clone new senders if requested
+ sender: Option<mpsc::Sender<SenderRequest>>,
+}
+
+impl LogLineSenderBuilder {
+ /// Create new sender instance to send log messages, to be grouped by given label
+ ///
+ /// Label is not checked to be unique (no other instance with same label exists),
+ /// it is the callers responsibility to check so if required.
+ pub fn sender_with_label(&self, label: SenderLabel) -> LogLineSender {
+ LogLineSender {
+ label,
+ sender: self.sender.clone(),
+ }
+ }
+
+ /// Closes the channel, flushing all pending messages and closing the receiver
+ pub async fn close(self) -> Result<(), Error> {
+ if let Some(sender) = self.sender {
+ sender.send(SenderRequest::Close).await?;
+ // wait for flushing and dropping of receiver
+ sender.closed().await;
+ }
+
+ Ok(())
+ }
+}
+
+/// Sender to send new log messages to buffered log aggregator
+pub struct LogLineSender {
+ /// Label used to group log lines
+ label: SenderLabel,
+ /// Sender to publish new log lines to buffered log aggregator task
+ sender: Option<mpsc::Sender<SenderRequest>>,
+}
+
+impl LogLineSender {
+ /// Send a new log message with given level to the buffered logger task
+ pub async fn log(&self, level: Level, message: String) -> Result<(), Error> {
+ let line = LogLine { level, message };
+ if let Some(sender) = &self.sender {
+ sender
+ .send(SenderRequest::Message(self.label.clone(), line))
+ .await?;
+ } else {
+ BufferedLogger::log_by_level(&self.label, &line);
+ }
+ Ok(())
+ }
+
+ /// Request flushing of all buffered messages with this sender's label
+ pub async fn flush(&self) -> Result<(), Error> {
+ if let Some(sender) = &self.sender {
+ sender
+ .send(SenderRequest::Flush(self.label.clone()))
+ .await?;
+ }
+ Ok(())
+ }
+}
+
+/// Log message entity
+struct LogLine {
+ /// Log level of the log message
+ level: Level,
+ /// Log message
+ message: String,
+}
+
+impl BufferedLogger {
+ /// New instance of a buffered logger
+ pub fn new(max_buffered_lines: usize, max_aggregation_time: Duration) -> LogLineSenderBuilder {
+ if max_buffered_lines > 0 || !max_aggregation_time.is_zero() {
+ let (sender, receiver) = mpsc::channel(100);
+
+ let logger = Self {
+ buffer_map: HashMap::new(),
+ max_buffered_lines,
+ max_aggregation_time,
+ receiver,
+ };
+ logger.run_log_collection();
+ LogLineSenderBuilder {
+ sender: Some(sender),
+ }
+ } else {
+ LogLineSenderBuilder { sender: None }
+ }
+ }
+
+ /// Starts the collection loop spawned on a new tokio task
+ /// Finishes when all sender belonging to the channel have been dropped.
+ fn run_log_collection(mut self) {
+ let future = async move {
+ loop {
+ for (label, (last_logged, log_lines)) in self.buffer_map.iter_mut() {
+ if !log_lines.is_empty()
+ && Instant::now().duration_since(*last_logged) > self.max_aggregation_time
+ {
+ Self::flush_by_label(label, log_lines);
+ }
+ }
+
+ match tokio::time::timeout(self.max_aggregation_time, self.receive_log_line()).await
+ {
+ Ok(finished) => {
+ if finished {
+ break;
+ }
+ }
+ Err(_timeout) => self.flush_all_buffered(),
+ }
+ }
+ };
+ match LogContext::current() {
+ None => tokio::spawn(future),
+ Some(context) => tokio::spawn(context.scope(future)),
+ };
+ }
+
+ /// Collects new log lines, buffers and flushes them if max lines limit exceeded.
+ ///
+ /// Returns `true` if all the senders have been dropped and the task should no
+ /// longer wait for new messages and finish.
+ async fn receive_log_line(&mut self) -> bool {
+ match self.receiver.recv().await {
+ Some(SenderRequest::Flush(label)) => {
+ if let Some((_last_logged, mut log_lines)) = self.buffer_map.remove(&label) {
+ Self::flush_by_label(&label, &mut log_lines);
+ }
+ false
+ }
+ Some(SenderRequest::Message(label, log_line)) => {
+ match self.buffer_map.entry(label.clone()) {
+ Entry::Occupied(mut occupied) => {
+ let (last_logged, log_lines) = occupied.get_mut();
+ if log_lines.len() + 1 > self.max_buffered_lines {
+ // reached limit for this label,
+ // flush all buffered and new log line
+ Self::flush_by_label(&label, log_lines);
+ Self::log_by_level(&label, &log_line);
+ } else {
+ // below limit, push to buffer to flush later
+ log_lines.push(log_line);
+ *last_logged = Instant::now();
+ }
+ }
+ Entry::Vacant(vacant) => {
+ vacant.insert((Instant::now(), vec![log_line]));
+ }
+ }
+ false
+ }
+ Some(SenderRequest::Close) => {
+ self.receiver.close();
+ self.flush_all_buffered();
+ true
+ }
+ None => {
+ // no more senders, all LogLineSenders and LogLineSenderBuilder have been dropped
+ self.flush_all_buffered();
+ true
+ }
+ }
+ }
+
+ /// Flush all currently buffered contents without ordering, but grouped by label
+ fn flush_all_buffered(&mut self) {
+ for (label, (_last_logged, log_lines)) in self.buffer_map.iter_mut() {
+ Self::flush_by_label(label, log_lines);
+ }
+ }
+
+ /// Flush all currently buffered contents without ordering, but grouped by label
+ fn flush_by_label(label: &str, log_lines: &mut Vec<LogLine>) {
+ for log_line in log_lines.iter() {
+ Self::log_by_level(label, log_line);
+ }
+ log_lines.clear();
+ }
+
+ /// Write the given log line prefixed by label
+ fn log_by_level(label: &str, log_line: &LogLine) {
+ match log_line.level {
+ Level::ERROR => error!("[{label}]: {}", log_line.message),
+ Level::WARN => warn!("[{label}]: {}", log_line.message),
+ Level::INFO => info!("[{label}]: {}", log_line.message),
+ Level::DEBUG => debug!("[{label}]: {}", log_line.message),
+ Level::TRACE => trace!("[{label}]: {}", log_line.message),
+ }
+ }
+}
diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs
index f41aef6df..1e3972c92 100644
--- a/pbs-tools/src/lib.rs
+++ b/pbs-tools/src/lib.rs
@@ -1,4 +1,5 @@
pub mod async_lru_cache;
+pub mod buffered_logger;
pub mod cert;
pub mod crypt_config;
pub mod format;
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* Re: [PATCH proxmox-backup v8 02/10] tools: implement buffered logger for concurrent log messages
2026-04-22 13:18 ` [PATCH proxmox-backup v8 02/10] tools: implement buffered logger for concurrent log messages Christian Ebner
@ 2026-04-22 17:25 ` Thomas Lamprecht
0 siblings, 0 replies; 15+ messages in thread
From: Thomas Lamprecht @ 2026-04-22 17:25 UTC (permalink / raw)
To: Christian Ebner, pbs-devel
Am 22.04.26 um 15:17 schrieb Christian Ebner:
> The BufferedLogger instance collects messages send from different
> sender instances via an async tokio channel and buffers them. Sender
> identify by label and provide a log level for each log line to be
> buffered and flushed.
>
> On collection, log lines are grouped by label and buffered in
> sequence of arrival per label, up to the configured maximum number of
> per group lines. In addition, the last logged timestamp is updated,
> which will be used to periodically flush log lines per-label with the
> configured interval. There is no guarantee on the order of labels when
> flushing.
>
> In addition, senders can request flushing at any given point, all logs
> are flushed when closing the sender builder.
>
> Log output is written based on provided log line level and prefixed
> by the label.
>
> Co-developed-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> pbs-tools/Cargo.toml | 3 +
> pbs-tools/src/buffered_logger.rs | 238 +++++++++++++++++++++++++++++++
> pbs-tools/src/lib.rs | 1 +
> 3 files changed, 242 insertions(+)
> create mode 100644 pbs-tools/src/buffered_logger.rs
>
> diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
> index 998e3077e..8bc01e85d 100644
> --- a/pbs-tools/Cargo.toml
> +++ b/pbs-tools/Cargo.toml
> @@ -8,6 +8,7 @@ description = "common tools used throughout pbs"
> # This must not depend on any subcrates more closely related to pbs itself.
> [dependencies]
> anyhow.workspace = true
> +async-trait.workspace = true
> bytes.workspace = true
> foreign-types.workspace = true
> hex.workspace = true
> @@ -17,10 +18,12 @@ openssl.workspace = true
> serde_json.workspace = true
> # rt-multi-thread is required for block_in_place
> tokio = { workspace = true, features = [ "fs", "io-util", "rt", "rt-multi-thread", "sync" ] }
> +tracing.workspace = true
>
> proxmox-async.workspace = true
> proxmox-io = { workspace = true, features = [ "tokio" ] }
> proxmox-human-byte.workspace = true
> +proxmox-log.workspace = true
> proxmox-sys.workspace = true
> proxmox-time.workspace = true
>
> diff --git a/pbs-tools/src/buffered_logger.rs b/pbs-tools/src/buffered_logger.rs
> new file mode 100644
> index 000000000..4e9bf8651
> --- /dev/null
> +++ b/pbs-tools/src/buffered_logger.rs
> @@ -0,0 +1,238 @@
> +//! Log aggregator to collect and group messages sent from concurrent tasks via
> +//! a tokio channel.
fine for now to introduce this here, especially given the lower friction
for fixes and improvements, which are also more likely to happen initially,
but in the longer term this might be better of in it's own micro-crate or
proxmox-log as opt-in feature.
As while the pbs-tools workspace crate can be fine as staging ground and
maybe for some stuff that's very specific to PBS, but in general I'd like
to see most of it's (somewhat generic) code to move into the common
proxmox.git workspace in some form - existing crate or new (micro) crate.
^ permalink raw reply [flat|nested] 15+ messages in thread
* [PATCH proxmox-backup v8 03/10] tools: add bounded join set to run concurrent tasks bound by limit
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox v8 01/10] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox-backup v8 02/10] tools: implement buffered logger for concurrent log messages Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 17:32 ` Thomas Lamprecht
2026-04-22 13:18 ` [PATCH proxmox-backup v8 04/10] api: config/sync: add optional `worker-threads` property Christian Ebner
` (7 subsequent siblings)
10 siblings, 1 reply; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
The BoundedJoinSet allows to run tasks concurrently via a JoinSet,
but constrains the number of concurrent tasks by an upper limit.
In contrast to the ParallelHandler implementation, which is a purely
sync implementation and does not provide easy handling for returned
results, this allows to execute tasks in an async context with straight
forward handling of results, as required for e.g. pulling/pushing of
backup groups in parallel for sync jobs. Also, log context is easily
preserved, which is of importance for task logging.
Co-developed-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-tools/src/bounded_join_set.rs | 81 +++++++++++++++++++++++++++++++
pbs-tools/src/lib.rs | 1 +
2 files changed, 82 insertions(+)
create mode 100644 pbs-tools/src/bounded_join_set.rs
diff --git a/pbs-tools/src/bounded_join_set.rs b/pbs-tools/src/bounded_join_set.rs
new file mode 100644
index 000000000..07500c66b
--- /dev/null
+++ b/pbs-tools/src/bounded_join_set.rs
@@ -0,0 +1,81 @@
+//! JoinSet with an upper bound of concurrent tasks.
+//!
+//! Allows to run up to the configured number of tasks concurrently in an async
+//! context.
+
+use std::future::Future;
+
+use tokio::task::{JoinError, JoinSet};
+
+use proxmox_log::LogContext;
+
+/// Run up to preconfigured number of futures concurrently on tokio tasks.
+pub struct BoundedJoinSet<T> {
+ /// Upper bound for concurrent task execution
+ max_tasks: usize,
+ /// Handles to currently spawned tasks
+ workers: JoinSet<T>,
+}
+
+impl<T: Send + 'static> BoundedJoinSet<T> {
+ /// Create a new join set with up to `max_task` concurrently executed tasks.
+ pub fn new(max_tasks: usize) -> Self {
+ Self {
+ max_tasks,
+ workers: JoinSet::new(),
+ }
+ }
+
+ /// Spawn the given task on the workers, waiting until there is capacity to do so.
+ ///
+ /// If there is no capacity, this will await until there is so, returning the results
+ /// for the finished task(s) providing the now free running slot in order of completion
+ /// or a `JoinError` if joining failed.
+ pub async fn spawn_task<F>(&mut self, task: F) -> Result<Vec<T>, JoinError>
+ where
+ F: Future<Output = T>,
+ F: Send + 'static,
+ {
+ let mut results = Vec::with_capacity(self.workers.len());
+
+ // Collect already finished task results if there are some
+ while let Some(result) = self.workers.try_join_next() {
+ results.push(result?);
+ }
+
+ while self.workers.len() >= self.max_tasks {
+ // capacity reached, wait for an active task to complete
+ if let Some(result) = self.workers.join_next().await {
+ results.push(result?);
+ }
+ }
+
+ match LogContext::current() {
+ Some(context) => self.workers.spawn(context.scope(task)),
+ None => self.workers.spawn(task),
+ };
+
+ Ok(results)
+ }
+
+ /// Waits until one of the tasks in the set completes and returns its output.
+ ///
+ /// Returns None if the set is empty.
+ pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
+ self.workers.join_next().await
+ }
+
+ /// Wait on all spawned tasks to run to completion.
+ ///
+ /// Returns the results for each task in order of completion or a `JoinError`
+ /// if joining failed.
+ pub async fn join_spawned_tasks(&mut self) -> Result<Vec<T>, JoinError> {
+ let mut results = Vec::with_capacity(self.workers.len());
+
+ while let Some(result) = self.workers.join_next().await {
+ results.push(result?);
+ }
+
+ Ok(results)
+ }
+}
diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs
index 1e3972c92..dc55366b6 100644
--- a/pbs-tools/src/lib.rs
+++ b/pbs-tools/src/lib.rs
@@ -1,4 +1,5 @@
pub mod async_lru_cache;
+pub mod bounded_join_set;
pub mod buffered_logger;
pub mod cert;
pub mod crypt_config;
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* Re: [PATCH proxmox-backup v8 03/10] tools: add bounded join set to run concurrent tasks bound by limit
2026-04-22 13:18 ` [PATCH proxmox-backup v8 03/10] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
@ 2026-04-22 17:32 ` Thomas Lamprecht
2026-04-22 20:19 ` Thomas Lamprecht
0 siblings, 1 reply; 15+ messages in thread
From: Thomas Lamprecht @ 2026-04-22 17:32 UTC (permalink / raw)
To: Christian Ebner, pbs-devel
Am 22.04.26 um 15:17 schrieb Christian Ebner:
> The BoundedJoinSet allows to run tasks concurrently via a JoinSet,
> but constrains the number of concurrent tasks by an upper limit.
>
> In contrast to the ParallelHandler implementation, which is a purely
> sync implementation and does not provide easy handling for returned
> results, this allows to execute tasks in an async context with straight
> forward handling of results, as required for e.g. pulling/pushing of
> backup groups in parallel for sync jobs. Also, log context is easily
> preserved, which is of importance for task logging.
>
> Co-developed-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> pbs-tools/src/bounded_join_set.rs | 81 +++++++++++++++++++++++++++++++
> pbs-tools/src/lib.rs | 1 +
> 2 files changed, 82 insertions(+)
> create mode 100644 pbs-tools/src/bounded_join_set.rs
>
> diff --git a/pbs-tools/src/bounded_join_set.rs b/pbs-tools/src/bounded_join_set.rs
> new file mode 100644
> index 000000000..07500c66b
> --- /dev/null
> +++ b/pbs-tools/src/bounded_join_set.rs
> @@ -0,0 +1,81 @@
> +//! JoinSet with an upper bound of concurrent tasks.
> +//!
> +//! Allows to run up to the configured number of tasks concurrently in an async
> +//! context.
similar to my reply to the buffered log module: Might be nicer if this lives
in the proxmox rust workspace, and in fact Lukas factored out the
parallel-handler recently into its own crate, depending on the semantic and
dependency overlap this might also go into the newish proxmox-parallel-handler
crate or its own nano crate.
But fine for now to be here, no need for another revision.
^ permalink raw reply [flat|nested] 15+ messages in thread
* Re: [PATCH proxmox-backup v8 03/10] tools: add bounded join set to run concurrent tasks bound by limit
2026-04-22 17:32 ` Thomas Lamprecht
@ 2026-04-22 20:19 ` Thomas Lamprecht
0 siblings, 0 replies; 15+ messages in thread
From: Thomas Lamprecht @ 2026-04-22 20:19 UTC (permalink / raw)
To: Christian Ebner, pbs-devel
Am 22.04.26 um 19:31 schrieb Thomas Lamprecht:
>> diff --git a/pbs-tools/src/bounded_join_set.rs b/pbs-tools/src/bounded_join_set.rs
>> new file mode 100644
>> index 000000000..07500c66b
>> --- /dev/null
>> +++ b/pbs-tools/src/bounded_join_set.rs
>> @@ -0,0 +1,81 @@
>> +//! JoinSet with an upper bound of concurrent tasks.
>> +//!
>> +//! Allows to run up to the configured number of tasks concurrently in an async
>> +//! context.
>
> similar to my reply to the buffered log module: Might be nicer if this lives
> in the proxmox rust workspace, and in fact Lukas factored out the
> parallel-handler recently into its own crate, depending on the semantic and
> dependency overlap this might also go into the newish proxmox-parallel-handler
> crate or its own nano crate.
>
> But fine for now to be here, no need for another revision.
>
On a second thought, if we move this into an existing crate it might be a
better fit for proxmox-async or really it's on nano-crate, but we can
really omit this until actually needed, it's a minimal wrapper after all.
^ permalink raw reply [flat|nested] 15+ messages in thread
* [PATCH proxmox-backup v8 04/10] api: config/sync: add optional `worker-threads` property
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (2 preceding siblings ...)
2026-04-22 13:18 ` [PATCH proxmox-backup v8 03/10] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox-backup v8 05/10] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
` (6 subsequent siblings)
10 siblings, 0 replies; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
Allow to configure from 1 up to 32 worker threads to perform
multiple group syncs in parallel.
The property is exposed via the sync job config and passed to
the pull/push parameters for the sync job to setup and execute the
thread pool accordingly.
Implements the schema definitions and includes the new property to
the `SyncJobConfig`, `PullParameters` and `PushParameters`.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/api2/config/sync.rs | 10 ++++++++++
src/api2/pull.rs | 9 ++++++++-
src/api2/push.rs | 8 +++++++-
src/server/pull.rs | 4 ++++
src/server/push.rs | 4 ++++
src/server/sync.rs | 1 +
6 files changed, 34 insertions(+), 2 deletions(-)
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 67fa3182c..0f073ca54 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -344,6 +344,8 @@ pub enum DeletableProperty {
UnmountOnDone,
/// Delete the sync_direction property,
SyncDirection,
+ /// Delete the worker_threads property,
+ WorkerThreads,
}
#[api(
@@ -467,6 +469,9 @@ pub fn update_sync_job(
DeletableProperty::SyncDirection => {
data.sync_direction = None;
}
+ DeletableProperty::WorkerThreads => {
+ data.worker_threads = None;
+ }
}
}
}
@@ -526,6 +531,10 @@ pub fn update_sync_job(
data.sync_direction = Some(sync_direction);
}
+ if let Some(worker_threads) = update.worker_threads {
+ data.worker_threads = Some(worker_threads);
+ }
+
if update.limit.rate_in.is_some() {
data.limit.rate_in = update.limit.rate_in;
}
@@ -698,6 +707,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
run_on_mount: None,
unmount_on_done: None,
sync_direction: None, // use default
+ worker_threads: None,
};
// should work without ACLs
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 4b1fd5e60..7cf165f91 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -11,7 +11,7 @@ use pbs_api_types::{
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
RESYNC_CORRUPT_SCHEMA, SYNC_ENCRYPTED_ONLY_SCHEMA, SYNC_VERIFIED_ONLY_SCHEMA,
- TRANSFER_LAST_SCHEMA,
+ SYNC_WORKER_THREADS_SCHEMA, TRANSFER_LAST_SCHEMA,
};
use pbs_config::CachedUserInfo;
use proxmox_rest_server::WorkerTask;
@@ -91,6 +91,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
sync_job.encrypted_only,
sync_job.verified_only,
sync_job.resync_corrupt,
+ sync_job.worker_threads,
)
}
}
@@ -148,6 +149,10 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
schema: RESYNC_CORRUPT_SCHEMA,
optional: true,
},
+ "worker-threads": {
+ schema: SYNC_WORKER_THREADS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -175,6 +180,7 @@ async fn pull(
encrypted_only: Option<bool>,
verified_only: Option<bool>,
resync_corrupt: Option<bool>,
+ worker_threads: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -215,6 +221,7 @@ async fn pull(
encrypted_only,
verified_only,
resync_corrupt,
+ worker_threads,
)?;
// fixme: set to_stdout to false?
diff --git a/src/api2/push.rs b/src/api2/push.rs
index e5edc13e0..f27f4ea1a 100644
--- a/src/api2/push.rs
+++ b/src/api2/push.rs
@@ -6,7 +6,7 @@ use pbs_api_types::{
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_READ, PRIV_REMOTE_DATASTORE_BACKUP, PRIV_REMOTE_DATASTORE_PRUNE,
REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, SYNC_ENCRYPTED_ONLY_SCHEMA,
- SYNC_VERIFIED_ONLY_SCHEMA, TRANSFER_LAST_SCHEMA,
+ SYNC_VERIFIED_ONLY_SCHEMA, SYNC_WORKER_THREADS_SCHEMA, TRANSFER_LAST_SCHEMA,
};
use proxmox_rest_server::WorkerTask;
use proxmox_router::{Permission, Router, RpcEnvironment};
@@ -108,6 +108,10 @@ fn check_push_privs(
schema: TRANSFER_LAST_SCHEMA,
optional: true,
},
+ "worker-threads": {
+ schema: SYNC_WORKER_THREADS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -133,6 +137,7 @@ async fn push(
verified_only: Option<bool>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ worker_threads: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -164,6 +169,7 @@ async fn push(
verified_only,
limit,
transfer_last,
+ worker_threads,
)
.await?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 688f95574..5beca6b8d 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -65,6 +65,8 @@ pub(crate) struct PullParameters {
verified_only: bool,
/// Whether to re-sync corrupted snapshots
resync_corrupt: bool,
+ /// Maximum number of worker threads to pull during sync job
+ worker_threads: Option<usize>,
}
impl PullParameters {
@@ -85,6 +87,7 @@ impl PullParameters {
encrypted_only: Option<bool>,
verified_only: Option<bool>,
resync_corrupt: Option<bool>,
+ worker_threads: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -137,6 +140,7 @@ impl PullParameters {
encrypted_only,
verified_only,
resync_corrupt,
+ worker_threads,
})
}
}
diff --git a/src/server/push.rs b/src/server/push.rs
index e69973b4f..1e4651d78 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -83,6 +83,8 @@ pub(crate) struct PushParameters {
verified_only: bool,
/// How many snapshots should be transferred at most (taking the newest N snapshots)
transfer_last: Option<usize>,
+ /// Maximum number of worker threads for push during sync job
+ worker_threads: Option<usize>,
}
impl PushParameters {
@@ -102,6 +104,7 @@ impl PushParameters {
verified_only: Option<bool>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ worker_threads: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -165,6 +168,7 @@ impl PushParameters {
encrypted_only,
verified_only,
transfer_last,
+ worker_threads,
})
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
index aedf4a271..9e6aeb9b0 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -675,6 +675,7 @@ pub fn do_sync_job(
sync_job.verified_only,
sync_job.limit.clone(),
sync_job.transfer_last,
+ sync_job.worker_threads,
)
.await?;
push_store(push_params).await?
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* [PATCH proxmox-backup v8 05/10] fix #4182: server: sync: allow pulling backup groups in parallel
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (3 preceding siblings ...)
2026-04-22 13:18 ` [PATCH proxmox-backup v8 04/10] api: config/sync: add optional `worker-threads` property Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox-backup v8 06/10] server: pull: prefix log messages and add error context Christian Ebner
` (5 subsequent siblings)
10 siblings, 0 replies; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them. It is therefore limited in download
speed by the single HTTP/2 connection of the source reader instance
in case of remote syncs. For high latency networks, this suffer from
limited download speed due to head of line blocking.
Improve the throughput by allowing to pull up to a configured number
of backup groups in parallel, by creating a bounded join set, allowing
to which concurrently pulls from the remote source up to the
configured number of tokio tasks. Since these are dedicated tasks,
they can run independent and in parallel on the tokio runtime.
Store progress output is now prefixed by the group as it depends on
the group being pulled since the snapshot count differs. To update
the output on a per group level, the shared group progress count is
passed as atomic counter, the store progress accounted globally as
well as per-group.
Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=4182
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 71 ++++++++++++++++++++++++++++++++--------------
src/server/sync.rs | 32 +++++++++++++++++++++
2 files changed, 82 insertions(+), 21 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5beca6b8d..8d75af168 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -26,6 +26,7 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
+use pbs_tools::bounded_join_set::BoundedJoinSet;
use pbs_tools::sha::sha256;
use super::sync::{
@@ -34,6 +35,7 @@ use super::sync::{
SkipReason, SyncSource, SyncSourceReader, SyncStats,
};
use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::server::sync::SharedGroupProgress;
use crate::tools::parallel_handler::ParallelHandler;
pub(crate) struct PullTarget {
@@ -619,7 +621,7 @@ async fn pull_group(
params: Arc<PullParameters>,
source_namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -782,7 +784,8 @@ async fn pull_group(
}
}
- progress.group_snapshots = list.len() as u64;
+ let mut local_progress = StoreProgress::new(shared_group_progress.total_groups());
+ local_progress.group_snapshots = list.len() as u64;
let mut sync_stats = SyncStats::default();
@@ -805,8 +808,10 @@ async fn pull_group(
)
.await;
- progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: {progress}");
+ // Update done groups progress by other parallel running pulls
+ local_progress.done_groups = shared_group_progress.load_done();
+ local_progress.done_snapshots = pos as u64 + 1;
+ info!("percentage done: group {group}: {local_progress}");
let stats = result?; // stop on error
sync_stats.add(stats);
@@ -843,6 +848,8 @@ async fn pull_group(
}
}
+ shared_group_progress.increment_done();
+
Ok(sync_stats)
}
@@ -1058,7 +1065,7 @@ async fn lock_and_pull_group(
group: &BackupGroup,
namespace: &BackupNamespace,
target_namespace: &BackupNamespace,
- progress: &mut StoreProgress,
+ shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, Error> {
let (owner, _lock_guard) =
match params
@@ -1083,7 +1090,7 @@ async fn lock_and_pull_group(
return Err(format_err!("owner check failed"));
}
- match pull_group(params, namespace, group, progress).await {
+ match pull_group(params, namespace, group, shared_group_progress).await {
Ok(stats) => Ok(stats),
Err(err) => {
info!("sync group {group} failed - {err:#}");
@@ -1135,23 +1142,45 @@ async fn pull_ns(
let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
+ let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
+ let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
- match lock_and_pull_group(
- Arc::clone(¶ms),
- &group,
- namespace,
- &target_ns,
- &mut progress,
- )
- .await
- {
- Ok(stats) => sync_stats.add(stats),
- Err(_err) => errors = true,
+ let mut process_results = |results| {
+ for result in results {
+ progress.done_groups = shared_group_progress.increment_done();
+ match result {
+ Ok(stats) => {
+ sync_stats.add(stats);
+ }
+ Err(_err) => errors = true,
+ }
}
+ };
+
+ for group in list.into_iter() {
+ let namespace = namespace.clone();
+ let target_ns = target_ns.clone();
+ let params = Arc::clone(¶ms);
+ let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let results = group_workers
+ .spawn_task(async move {
+ lock_and_pull_group(
+ Arc::clone(¶ms),
+ &group,
+ &namespace,
+ &target_ns,
+ group_progress_cloned,
+ )
+ .await
+ })
+ .await
+ .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
+ process_results(results);
+ }
+
+ while let Some(result) = group_workers.join_next().await {
+ let result = result.map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
+ process_results(vec![result]);
}
if params.remove_vanished {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 9e6aeb9b0..78c232bf9 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -4,6 +4,7 @@ use std::collections::HashMap;
use std::io::{Seek, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -792,3 +793,34 @@ pub(super) fn exclude_not_verified_or_encrypted(
false
}
+
+/// Track group progress during parallel push/pull in sync jobs
+pub(crate) struct SharedGroupProgress {
+ done: AtomicUsize,
+ total: usize,
+}
+
+impl SharedGroupProgress {
+ /// Create a new instance to track group progress with expected total number of groups
+ pub(crate) fn with_total_groups(total: usize) -> Self {
+ Self {
+ done: AtomicUsize::new(0),
+ total,
+ }
+ }
+
+ /// Return current counter value for done groups
+ pub(crate) fn load_done(&self) -> u64 {
+ self.done.load(Ordering::Acquire) as u64
+ }
+
+ /// Increment counter for done groups and return new value
+ pub(crate) fn increment_done(&self) -> u64 {
+ self.done.fetch_add(1, Ordering::AcqRel) as u64 + 1
+ }
+
+ /// Return the number of total backup groups
+ pub(crate) fn total_groups(&self) -> u64 {
+ self.total as u64
+ }
+}
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* [PATCH proxmox-backup v8 06/10] server: pull: prefix log messages and add error context
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (4 preceding siblings ...)
2026-04-22 13:18 ` [PATCH proxmox-backup v8 05/10] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox-backup v8 07/10] server: sync: allow pushing groups concurrently Christian Ebner
` (4 subsequent siblings)
10 siblings, 0 replies; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
Pulling groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.
Therefore, prefix pull job log messages by the corresponding group or
snapshot and set the error context accordingly.
Also, reword some messages, inline variables in format strings and
start log lines with capital letters to get consistent output.
By using the buffered logger implementation and buffer up to 5 lines
with a timeout of 1 second, subsequent log lines arriving in fast
succession are kept together, reducing the mixing of lines.
Example output for a sequential pull job:
```
...
[ct/100]: 2025-11-17T10:11:42Z: start sync
[ct/100]: 2025-11-17T10:11:42Z/pct.conf.blob: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.ppxar.didx: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.ppxar.didx: downloaded 16.785 MiB (373.063 MiB/s)
[ct/100]: 2025-11-17T10:11:42Z/root.mpxar.didx: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.mpxar.didx: downloaded 65.703 KiB (27.536 MiB/s)
[ct/100]: 2025-11-17T10:11:42Z: sync done
[ct/100]: percentage done: 9.09% (1/11 groups)
[ct/101]: 2026-03-31T12:20:16Z: start sync
[ct/101]: 2026-03-31T12:20:16Z/pct.conf.blob: sync archive
[ct/101]: 2026-03-31T12:20:16Z/root.pxar.didx: sync archive
[ct/101]: 2026-03-31T12:20:16Z/root.pxar.didx: downloaded 199.806 MiB (346.31 MiB/s)
[ct/101]: 2026-03-31T12:20:16Z/catalog.pcat1.didx: sync archive
[ct/101]: 2026-03-31T12:20:16Z/catalog.pcat1.didx: downloaded 180.379 KiB (26.354 MiB/s)
[ct/101]: 2026-03-31T12:20:16Z: sync done
...
```
Example output for a parallel pull job:
```
...
[ct/100]: 2025-11-17T10:11:42Z: start sync
[ct/101]: 2026-03-31T12:20:16Z: start sync
[ct/107]: 2025-07-16T09:14:01Z: start sync
[ct/100]: 2025-11-17T10:11:42Z/pct.conf.blob: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.ppxar.didx: sync archive
[ct/101]: 2026-03-31T12:20:16Z/pct.conf.blob: sync archive
[ct/101]: 2026-03-31T12:20:16Z/root.pxar.didx: sync archive
[ct/106]: 2025-11-17T10:20:32Z: start sync
[ct/106]: 2025-11-17T10:20:32Z/pct.conf.blob: sync archive
[ct/106]: 2025-11-17T10:20:32Z/root.pxar.didx: sync archive
[ct/107]: 2025-07-16T09:14:01Z/pct.conf.blob: sync archive
[ct/107]: 2025-07-16T09:14:01Z/root.ppxar.didx: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.ppxar.didx: downloaded 16.785 MiB (12.032 MiB/s)
[ct/100]: 2025-11-17T10:11:42Z/root.mpxar.didx: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.mpxar.didx: downloaded 65.703 KiB (1021.071 KiB/s)
[ct/100]: 2025-11-17T10:11:42Z: sync done
[ct/100]: snapshot 1/1 within ct/100 is done, 0/11 groups done
[ct/100]: group sync done: percentage done: 9.09% (1/11 groups)
...
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 345 ++++++++++++++++++++++++++++++++++-----------
src/server/sync.rs | 7 +-
2 files changed, 267 insertions(+), 85 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 8d75af168..97def85a5 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,11 +5,11 @@ use std::collections::{HashMap, HashSet};
use std::io::Seek;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
-use std::time::SystemTime;
+use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Context, Error};
use proxmox_human_byte::HumanByte;
-use tracing::{info, warn};
+use tracing::{info, Level};
use pbs_api_types::{
print_store_and_ns, ArchiveType, Authid, BackupArchiveName, BackupDir, BackupGroup,
@@ -27,6 +27,7 @@ use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
use pbs_tools::bounded_join_set::BoundedJoinSet;
+use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
use pbs_tools::sha::sha256;
use super::sync::{
@@ -153,6 +154,8 @@ async fn pull_index_chunks<I: IndexFile>(
index: I,
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
backend: &DatastoreBackend,
+ archive_prefix: &str,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -247,11 +250,16 @@ async fn pull_index_chunks<I: IndexFile>(
let bytes = bytes.load(Ordering::SeqCst);
let chunk_count = chunk_count.load(Ordering::SeqCst);
- info!(
- "downloaded {} ({}/s)",
- HumanByte::from(bytes),
- HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: downloaded {} ({}/s)",
+ HumanByte::from(bytes),
+ HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
+ ),
+ )
+ .await?;
Ok(SyncStats {
chunk_count,
@@ -292,6 +300,7 @@ async fn pull_single_archive<'a>(
archive_info: &'a FileInfo,
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
backend: &DatastoreBackend,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
@@ -302,72 +311,104 @@ async fn pull_single_archive<'a>(
let mut sync_stats = SyncStats::default();
- info!("sync archive {archive_name}");
+ let archive_prefix = format!("{}/{archive_name}", snapshot.backup_time_string());
+
+ log_sender
+ .log(Level::INFO, format!("{archive_prefix}: sync archive"))
+ .await?;
- reader.load_file_into(archive_name, &tmp_path).await?;
+ reader
+ .load_file_into(archive_name, &tmp_path)
+ .await
+ .with_context(|| archive_prefix.clone())?;
- let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
+ let mut tmpfile = std::fs::OpenOptions::new()
+ .read(true)
+ .open(&tmp_path)
+ .with_context(|| archive_prefix.clone())?;
match ArchiveType::from_path(archive_name)? {
ArchiveType::DynamicIndex => {
let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
- format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
+ format_err!("{archive_prefix}: unable to read dynamic index {tmp_path:?} - {err}")
})?;
let (csum, size) = index.compute_csum();
- verify_archive(archive_info, &csum, size)?;
+ verify_archive(archive_info, &csum, size).with_context(|| archive_prefix.clone())?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{archive_prefix}: skipping chunk sync for same datastore"),
+ )
+ .await?;
} else {
let stats = pull_index_chunks(
reader
.chunk_reader(archive_info.crypt_mode)
- .context("failed to get chunk reader")?,
+ .context("failed to get chunk reader")
+ .with_context(|| archive_prefix.clone())?,
snapshot.datastore().clone(),
index,
encountered_chunks,
backend,
+ &archive_prefix,
+ Arc::clone(&log_sender),
)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
sync_stats.add(stats);
}
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile).map_err(|err| {
- format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
+ format_err!("{archive_name}: unable to read fixed index '{tmp_path:?}' - {err}")
})?;
let (csum, size) = index.compute_csum();
- verify_archive(archive_info, &csum, size)?;
+ verify_archive(archive_info, &csum, size).with_context(|| archive_prefix.clone())?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{archive_prefix}: skipping chunk sync for same datastore"),
+ )
+ .await?;
} else {
let stats = pull_index_chunks(
reader
.chunk_reader(archive_info.crypt_mode)
- .context("failed to get chunk reader")?,
+ .context("failed to get chunk reader")
+ .with_context(|| archive_prefix.clone())?,
snapshot.datastore().clone(),
index,
encountered_chunks,
backend,
+ &archive_prefix,
+ Arc::clone(&log_sender),
)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
sync_stats.add(stats);
}
}
ArchiveType::Blob => {
- tmpfile.rewind()?;
- let (csum, size) = sha256(&mut tmpfile)?;
- verify_archive(archive_info, &csum, size)?;
+ proxmox_lang::try_block!({
+ tmpfile.rewind()?;
+ let (csum, size) = sha256(&mut tmpfile)?;
+ verify_archive(archive_info, &csum, size)
+ })
+ .with_context(|| archive_prefix.clone())?;
}
}
if let Err(err) = std::fs::rename(&tmp_path, &path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
+ bail!("{archive_prefix}: Atomic rename file {path:?} failed - {err}");
}
backend
.upload_index_to_backend(snapshot, archive_name)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
Ok(sync_stats)
}
@@ -388,13 +429,24 @@ async fn pull_snapshot<'a>(
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
corrupt: bool,
is_new: bool,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
+ let prefix = snapshot.backup_time_string().to_owned();
if is_new {
- info!("sync snapshot {}", snapshot.dir());
+ log_sender
+ .log(Level::INFO, format!("{prefix}: start sync"))
+ .await?;
} else if corrupt {
- info!("re-sync snapshot {} due to corruption", snapshot.dir());
+ log_sender
+ .log(
+ Level::INFO,
+ format!("re-sync snapshot {prefix} due to corruption"),
+ )
+ .await?;
} else {
- info!("re-sync snapshot {}", snapshot.dir());
+ log_sender
+ .log(Level::INFO, format!("re-sync snapshot {prefix}"))
+ .await?;
}
let mut sync_stats = SyncStats::default();
@@ -409,7 +461,8 @@ async fn pull_snapshot<'a>(
let tmp_manifest_blob;
if let Some(data) = reader
.load_file_into(MANIFEST_BLOB_NAME.as_ref(), &tmp_manifest_name)
- .await?
+ .await
+ .with_context(|| prefix.clone())?
{
tmp_manifest_blob = data;
} else {
@@ -419,28 +472,34 @@ async fn pull_snapshot<'a>(
if manifest_name.exists() && !corrupt {
let manifest_blob = proxmox_lang::try_block!({
let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
- format_err!("unable to open local manifest {manifest_name:?} - {err}")
+ format_err!("{prefix}: unable to open local manifest {manifest_name:?} - {err}")
})?;
- let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
+ let manifest_blob =
+ DataBlob::load_from_reader(&mut manifest_file).with_context(|| prefix.clone())?;
Ok(manifest_blob)
})
.map_err(|err: Error| {
- format_err!("unable to read local manifest {manifest_name:?} - {err}")
+ format_err!("{prefix}: unable to read local manifest {manifest_name:?} - {err}")
})?;
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
- reader.try_download_client_log(&client_log_name).await?;
+ reader
+ .try_download_client_log(&client_log_name)
+ .await
+ .with_context(|| prefix.clone())?;
};
- info!("no data changes");
+ log_sender
+ .log(Level::INFO, format!("{prefix}: no data changes"))
+ .await?;
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(sync_stats); // nothing changed
}
}
let manifest_data = tmp_manifest_blob.raw_data().to_vec();
- let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
+ let manifest = BackupManifest::try_from(tmp_manifest_blob).with_context(|| prefix.clone())?;
if ignore_not_verified_or_encrypted(
&manifest,
@@ -464,35 +523,54 @@ async fn pull_snapshot<'a>(
path.push(&item.filename);
if !corrupt && path.exists() {
- let filename: BackupArchiveName = item.filename.as_str().try_into()?;
+ let filename: BackupArchiveName = item
+ .filename
+ .as_str()
+ .try_into()
+ .with_context(|| prefix.clone())?;
match filename.archive_type() {
ArchiveType::DynamicIndex => {
- let index = DynamicIndexReader::open(&path)?;
+ let index = DynamicIndexReader::open(&path).with_context(|| prefix.clone())?;
let (csum, size) = index.compute_csum();
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: detected changed file {path:?} - {err}"),
+ )
+ .await?;
}
}
}
ArchiveType::FixedIndex => {
- let index = FixedIndexReader::open(&path)?;
+ let index = FixedIndexReader::open(&path).with_context(|| prefix.clone())?;
let (csum, size) = index.compute_csum();
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: detected changed file {path:?} - {err}"),
+ )
+ .await?;
}
}
}
ArchiveType::Blob => {
- let mut tmpfile = std::fs::File::open(&path)?;
- let (csum, size) = sha256(&mut tmpfile)?;
+ let mut tmpfile = std::fs::File::open(&path).with_context(|| prefix.clone())?;
+ let (csum, size) = sha256(&mut tmpfile).with_context(|| prefix.clone())?;
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: detected changed file {path:?} - {err}"),
+ )
+ .await?;
}
}
}
@@ -505,13 +583,14 @@ async fn pull_snapshot<'a>(
item,
encountered_chunks.clone(),
backend,
+ Arc::clone(&log_sender),
)
.await?;
sync_stats.add(stats);
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
- bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
+ bail!("{prefix}: Atomic rename file {manifest_name:?} failed - {err}");
}
if let DatastoreBackend::S3(s3_client) = backend {
let object_key = pbs_datastore::s3::object_key_from_path(
@@ -524,33 +603,40 @@ async fn pull_snapshot<'a>(
let _is_duplicate = s3_client
.upload_replace_with_retry(object_key, data)
.await
- .context("failed to upload manifest to s3 backend")?;
+ .context("failed to upload manifest to s3 backend")
+ .with_context(|| prefix.clone())?;
}
if !client_log_name.exists() {
- reader.try_download_client_log(&client_log_name).await?;
+ reader
+ .try_download_client_log(&client_log_name)
+ .await
+ .with_context(|| prefix.clone())?;
if client_log_name.exists() {
if let DatastoreBackend::S3(s3_client) = backend {
let object_key = pbs_datastore::s3::object_key_from_path(
&snapshot.relative_path(),
CLIENT_LOG_BLOB_NAME.as_ref(),
)
- .context("invalid archive object key")?;
+ .context("invalid archive object key")
+ .with_context(|| prefix.clone())?;
let data = tokio::fs::read(&client_log_name)
.await
- .context("failed to read log file contents")?;
+ .context("failed to read log file contents")
+ .with_context(|| prefix.clone())?;
let contents = hyper::body::Bytes::from(data);
let _is_duplicate = s3_client
.upload_replace_with_retry(object_key, contents)
.await
- .context("failed to upload client log to s3 backend")?;
+ .context("failed to upload client log to s3 backend")
+ .with_context(|| prefix.clone())?;
}
}
};
snapshot
.cleanup_unreferenced_files(&manifest)
- .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
+ .map_err(|err| format_err!("{prefix}: failed to cleanup unreferenced files - {err}"))?;
Ok(sync_stats)
}
@@ -565,10 +651,14 @@ async fn pull_snapshot_from<'a>(
snapshot: &'a pbs_datastore::BackupDir,
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
corrupt: bool,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
+ let prefix = snapshot.backup_time_string().to_string();
+
let (_path, is_new, _snap_lock) = snapshot
.datastore()
- .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
+ .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())
+ .context(prefix.clone())?;
let result = pull_snapshot(
params,
@@ -577,6 +667,7 @@ async fn pull_snapshot_from<'a>(
encountered_chunks,
corrupt,
is_new,
+ Arc::clone(&log_sender),
)
.await;
@@ -589,11 +680,20 @@ async fn pull_snapshot_from<'a>(
snapshot.as_ref(),
true,
) {
- info!("cleanup error - {cleanup_err}");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: cleanup error - {cleanup_err}"),
+ )
+ .await?;
}
return Err(err);
}
- Ok(_) => info!("sync snapshot {} done", snapshot.dir()),
+ Ok(_) => {
+ log_sender
+ .log(Level::INFO, format!("{prefix}: sync done"))
+ .await?
+ }
}
}
@@ -622,7 +722,9 @@ async fn pull_group(
source_namespace: &BackupNamespace,
group: &BackupGroup,
shared_group_progress: Arc<SharedGroupProgress>,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
+ let prefix = format!("{group}");
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -714,11 +816,15 @@ async fn pull_group(
.collect();
if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ log_sender
+ .log(Level::INFO, format!("{prefix}: {already_synced_skip_info}"))
+ .await?;
already_synced_skip_info.reset();
}
if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ log_sender
+ .log(Level::INFO, format!("{prefix}: {transfer_last_skip_info}"))
+ .await?;
transfer_last_skip_info.reset();
}
@@ -730,8 +836,8 @@ async fn pull_group(
.store
.backup_group(target_ns.clone(), group.clone());
if let Some(info) = backup_group.last_backup(true).unwrap_or(None) {
- let mut reusable_chunks = encountered_chunks.lock().unwrap();
if let Err(err) = proxmox_lang::try_block!({
+ let mut reusable_chunks = encountered_chunks.lock().unwrap();
let _snapshot_guard = info
.backup_dir
.lock_shared()
@@ -780,7 +886,12 @@ async fn pull_group(
}
Ok::<(), Error>(())
}) {
- warn!("Failed to collect reusable chunk from last backup: {err:#?}");
+ log_sender
+ .log(
+ Level::WARN,
+ format!("Failed to collect reusable chunk from last backup: {err:#?}"),
+ )
+ .await?;
}
}
@@ -805,13 +916,31 @@ async fn pull_group(
&to_snapshot,
encountered_chunks.clone(),
corrupt,
+ Arc::clone(&log_sender),
)
.await;
// Update done groups progress by other parallel running pulls
local_progress.done_groups = shared_group_progress.load_done();
local_progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: group {group}: {local_progress}");
+ if params.worker_threads.unwrap_or(1) == 1 {
+ log_sender
+ .log(Level::INFO, format!("percentage done: {local_progress}"))
+ .await?;
+ } else {
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "snapshot {}/{} within {group} is done, {}/{} groups done",
+ local_progress.done_snapshots,
+ local_progress.group_snapshots,
+ local_progress.done_groups,
+ local_progress.total_groups,
+ ),
+ )
+ .await?;
+ }
let stats = result?; // stop on error
sync_stats.add(stats);
@@ -829,13 +958,23 @@ async fn pull_group(
continue;
}
if snapshot.is_protected() {
- info!(
- "don't delete vanished snapshot {} (protected)",
- snapshot.dir()
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{prefix}: don't delete vanished snapshot {} (protected)",
+ snapshot.dir(),
+ ),
+ )
+ .await?;
continue;
}
- info!("delete vanished snapshot {}", snapshot.dir());
+ log_sender
+ .log(
+ Level::INFO,
+ format!("delete vanished snapshot {}", snapshot.dir()),
+ )
+ .await?;
params
.target
.store
@@ -848,7 +987,14 @@ async fn pull_group(
}
}
- shared_group_progress.increment_done();
+ if params.worker_threads.unwrap_or(1) > 1 {
+ log_sender
+ .log(
+ Level::INFO,
+ format!("group sync done: percentage done: {local_progress}"),
+ )
+ .await?;
+ }
Ok(sync_stats)
}
@@ -1037,10 +1183,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
}
Err(err) => {
errors = true;
- info!(
- "Encountered errors while syncing namespace {} - {err}",
- &namespace,
- );
+ info!("Encountered errors while syncing namespace {namespace} - {err}");
}
};
}
@@ -1066,6 +1209,7 @@ async fn lock_and_pull_group(
namespace: &BackupNamespace,
target_namespace: &BackupNamespace,
shared_group_progress: Arc<SharedGroupProgress>,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
let (owner, _lock_guard) =
match params
@@ -1075,25 +1219,47 @@ async fn lock_and_pull_group(
{
Ok(res) => res,
Err(err) => {
- info!("sync group {group} failed - group lock failed: {err}");
- info!("create_locked_backup_group failed");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("sync group {group} failed - group lock failed: {err}"),
+ )
+ .await?;
+ log_sender
+ .log(Level::INFO, "create_locked_backup_group failed".to_string())
+ .await?;
return Err(err);
}
};
if params.owner != owner {
// only the owner is allowed to create additional snapshots
- info!(
- "sync group {group} failed - owner check failed ({} != {owner})",
- params.owner
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "sync group {group} failed - owner check failed ({} != {owner})",
+ params.owner,
+ ),
+ )
+ .await?;
return Err(format_err!("owner check failed"));
}
- match pull_group(params, namespace, group, shared_group_progress).await {
+ match pull_group(
+ params,
+ namespace,
+ group,
+ shared_group_progress,
+ Arc::clone(&log_sender),
+ )
+ .await
+ {
Ok(stats) => Ok(stats),
Err(err) => {
- info!("sync group {group} failed - {err:#}");
+ log_sender
+ .log(Level::INFO, format!("sync group {group} failed - {err:#}"))
+ .await?;
Err(err)
}
}
@@ -1126,7 +1292,7 @@ async fn pull_ns(
list.sort_unstable();
info!(
- "found {} groups to sync (out of {unfiltered_count} total)",
+ "Found {} groups to sync (out of {unfiltered_count} total)",
list.len()
);
@@ -1145,6 +1311,13 @@ async fn pull_ns(
let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
+ let (buffered_lines, max_duration) = if params.worker_threads.unwrap_or(1) > 1 {
+ (5, Duration::from_secs(1))
+ } else {
+ (0, Duration::ZERO)
+ };
+ let sender_builder = BufferedLogger::new(buffered_lines, max_duration);
+
let mut process_results = |results| {
for result in results {
progress.done_groups = shared_group_progress.increment_done();
@@ -1162,16 +1335,21 @@ async fn pull_ns(
let target_ns = target_ns.clone();
let params = Arc::clone(¶ms);
let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let log_sender: Arc<LogLineSender> =
+ Arc::new(sender_builder.sender_with_label(group.to_string()));
let results = group_workers
.spawn_task(async move {
- lock_and_pull_group(
+ let result = lock_and_pull_group(
Arc::clone(¶ms),
&group,
&namespace,
&target_ns,
group_progress_cloned,
+ Arc::clone(&log_sender),
)
- .await
+ .await;
+ let _ = log_sender.flush().await;
+ result
})
.await
.map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
@@ -1183,6 +1361,9 @@ async fn pull_ns(
process_results(vec![result]);
}
+ // Force flush of pending messages
+ sender_builder.close().await?;
+
if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
@@ -1198,7 +1379,7 @@ async fn pull_ns(
if !local_group.apply_filters(¶ms.group_filter) {
continue;
}
- info!("delete vanished group '{local_group}'");
+ info!("Delete vanished group '{local_group}'");
let delete_stats_result = params
.target
.store
@@ -1207,7 +1388,7 @@ async fn pull_ns(
match delete_stats_result {
Ok(stats) => {
if !stats.all_removed() {
- info!("kept some protected snapshots of group '{local_group}'");
+ info!("Kept some protected snapshots of group '{local_group}'");
sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 0,
@@ -1230,7 +1411,7 @@ async fn pull_ns(
Ok(())
});
if let Err(err) = result {
- info!("error during cleanup: {err}");
+ info!("Error during cleanup: {err}");
errors = true;
};
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 78c232bf9..17ed4839f 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -135,13 +135,13 @@ impl SyncSourceReader for RemoteSourceReader {
Some(HttpError { code, message }) => match *code {
StatusCode::NOT_FOUND => {
info!(
- "skipping snapshot {} - vanished since start of sync",
+ "Snapshot {}: skipped because vanished since start of sync",
&self.dir
);
return Ok(None);
}
_ => {
- bail!("HTTP error {code} - {message}");
+ bail!("Snapshot {}: HTTP error {code} - {message}", &self.dir);
}
},
None => {
@@ -175,7 +175,8 @@ impl SyncSourceReader for RemoteSourceReader {
bail!("Atomic rename file {to_path:?} failed - {err}");
}
info!(
- "got backup log file {client_log_name}",
+ "Snapshot {snapshot}: got backup log file {client_log_name}",
+ snapshot = &self.dir,
client_log_name = client_log_name.deref()
);
}
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* [PATCH proxmox-backup v8 07/10] server: sync: allow pushing groups concurrently
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (5 preceding siblings ...)
2026-04-22 13:18 ` [PATCH proxmox-backup v8 06/10] server: pull: prefix log messages and add error context Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox-backup v8 08/10] server: push: prefix log messages and add additional logging Christian Ebner
` (3 subsequent siblings)
10 siblings, 0 replies; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
Improve the throughput over high latency connections for sync jobs in
push direction by allowing to push up to a configured number of
backup groups concurrently. Just like for pull sync jobs, use an
bounded join set to run up to the configured number of group worker
tokio tasks in parallel, each connecting and pushing a group to
the reomte target.
The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/push.rs | 102 ++++++++++++++++++++++++++++++++++-----------
1 file changed, 78 insertions(+), 24 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index 1e4651d78..e69f44e85 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -27,6 +27,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{DataStore, StoreProgress};
+use pbs_tools::bounded_join_set::BoundedJoinSet;
use super::sync::{
check_namespace_depth_limit, exclude_not_verified_or_encrypted,
@@ -34,6 +35,7 @@ use super::sync::{
SyncSource, SyncStats,
};
use crate::api2::config::remote;
+use crate::server::sync::SharedGroupProgress;
/// Target for backups to be pushed to
pub(crate) struct PushTarget {
@@ -551,41 +553,62 @@ pub(crate) async fn push_namespace(
let mut errors = false;
// Remember synced groups, remove others when the remove vanished flag is set
- let mut synced_groups = HashSet::new();
+ let synced_groups = Arc::new(Mutex::new(HashSet::new()));
let mut progress = StoreProgress::new(list.len() as u64);
let mut stats = SyncStats::default();
let (owned_target_groups, not_owned_target_groups) =
fetch_target_groups(¶ms, &target_namespace).await?;
+ let not_owned_target_groups = Arc::new(not_owned_target_groups);
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
+ let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
+ let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
- if not_owned_target_groups.contains(&group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
- continue;
- }
- synced_groups.insert(group.clone());
+ let mut process_results = |results| {
+ for result in results {
+ progress.done_groups = shared_group_progress.increment_done();
- match push_group(Arc::clone(¶ms), namespace, &group, &mut progress).await {
- Ok(sync_stats) => stats.add(sync_stats),
- Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- errors = true;
+ match result {
+ Ok(sync_stats) => {
+ stats.add(sync_stats);
+ }
+ Err(()) => errors = true,
}
}
+ };
+
+ for group in list.into_iter() {
+ let namespace = namespace.clone();
+ let params = Arc::clone(¶ms);
+ let not_owned_target_groups = Arc::clone(¬_owned_target_groups);
+ let synced_groups = Arc::clone(&synced_groups);
+ let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let results = group_workers
+ .spawn_task(async move {
+ push_group_do(
+ params,
+ &namespace,
+ &group,
+ group_progress_cloned,
+ synced_groups,
+ not_owned_target_groups,
+ )
+ .await
+ })
+ .await
+ .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
+ process_results(results);
+ }
+
+ while let Some(result) = group_workers.join_next().await {
+ let result = result.map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
+ process_results(vec![result]);
}
if params.remove_vanished {
// only ever allow to prune owned groups on target
for target_group in owned_target_groups {
- if synced_groups.contains(&target_group) {
+ if synced_groups.lock().unwrap().contains(&target_group) {
continue;
}
if !target_group.apply_filters(¶ms.group_filter) {
@@ -664,6 +687,32 @@ async fn forget_target_snapshot(
Ok(())
}
+async fn push_group_do(
+ params: Arc<PushParameters>,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ shared_group_progress: Arc<SharedGroupProgress>,
+ synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+ not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+ if not_owned_target_groups.contains(group) {
+ warn!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ );
+ shared_group_progress.increment_done();
+ return Ok(SyncStats::default());
+ }
+
+ synced_groups.lock().unwrap().insert(group.clone());
+ push_group(params, namespace, group, Arc::clone(&shared_group_progress))
+ .await
+ .map_err(|err| {
+ warn!("Group {group}: Encountered errors: {err:#}");
+ warn!("Failed to push group {group} to remote!");
+ })
+}
+
/// Push group including all snaphshots to target
///
/// Iterate over all snapshots in the group and push them to the target.
@@ -677,7 +726,7 @@ pub(crate) async fn push_group(
params: Arc<PushParameters>,
namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -745,7 +794,8 @@ pub(crate) async fn push_group(
transfer_last_skip_info.reset();
}
- progress.group_snapshots = snapshots.len() as u64;
+ let mut local_progress = StoreProgress::new(shared_group_progress.total_groups());
+ local_progress.group_snapshots = snapshots.len() as u64;
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -759,8 +809,10 @@ pub(crate) async fn push_group(
.await;
fetch_previous_manifest = true;
- progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: {progress}");
+ // Update done groups progress by other parallel running pushes
+ local_progress.done_groups = shared_group_progress.load_done();
+ local_progress.done_snapshots = pos as u64 + 1;
+ info!("Percentage done: group {group}: {local_progress}");
// stop on error
let sync_stats = result?;
@@ -802,6 +854,8 @@ pub(crate) async fn push_group(
}
}
+ shared_group_progress.increment_done();
+
Ok(stats)
}
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* [PATCH proxmox-backup v8 08/10] server: push: prefix log messages and add additional logging
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (6 preceding siblings ...)
2026-04-22 13:18 ` [PATCH proxmox-backup v8 07/10] server: sync: allow pushing groups concurrently Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox-backup v8 09/10] sync: move in-progress snapshot filter to helper and use log line sender Christian Ebner
` (2 subsequent siblings)
10 siblings, 0 replies; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
Pushing groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.
Therefore, prefix push job log messages by the corresponding group or
snapshot and use the buffered logger implementation to buffer up to 5
lines subsequent lines with a timeout of 1 second. This reduces
interwoven log messages stemming from different groups.
Also, be more verbose for push syncs, adding additional log output
for the groups, snapshots and archives being pushed.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/push.rs | 274 +++++++++++++++++++++++++++++++++++++--------
1 file changed, 226 insertions(+), 48 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index e69f44e85..2ff46211c 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -2,12 +2,13 @@
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
+use std::time::Duration;
use anyhow::{bail, format_err, Context, Error};
use futures::stream::{self, StreamExt, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use tracing::{info, warn};
+use tracing::{info, warn, Level};
use pbs_api_types::{
print_store_and_ns, ApiVersion, ApiVersionInfo, ArchiveType, Authid, BackupArchiveName,
@@ -28,6 +29,9 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{DataStore, StoreProgress};
use pbs_tools::bounded_join_set::BoundedJoinSet;
+use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
+
+use proxmox_human_byte::HumanByte;
use super::sync::{
check_namespace_depth_limit, exclude_not_verified_or_encrypted,
@@ -564,6 +568,13 @@ pub(crate) async fn push_namespace(
let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
+ let (buffered_lines, max_duration) = if params.worker_threads.unwrap_or(1) > 1 {
+ (5, Duration::from_secs(1))
+ } else {
+ (0, Duration::ZERO)
+ };
+ let sender_builder = BufferedLogger::new(buffered_lines, max_duration);
+
let mut process_results = |results| {
for result in results {
progress.done_groups = shared_group_progress.increment_done();
@@ -572,7 +583,7 @@ pub(crate) async fn push_namespace(
Ok(sync_stats) => {
stats.add(sync_stats);
}
- Err(()) => errors = true,
+ Err(_err) => errors = true,
}
}
};
@@ -583,17 +594,22 @@ pub(crate) async fn push_namespace(
let not_owned_target_groups = Arc::clone(¬_owned_target_groups);
let synced_groups = Arc::clone(&synced_groups);
let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let log_sender: Arc<LogLineSender> =
+ Arc::new(sender_builder.sender_with_label(group.to_string()));
let results = group_workers
.spawn_task(async move {
- push_group_do(
+ let result = push_group_do(
params,
&namespace,
&group,
group_progress_cloned,
synced_groups,
not_owned_target_groups,
+ Arc::clone(&log_sender),
)
- .await
+ .await;
+ let _ = log_sender.flush().await;
+ result
})
.await
.map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
@@ -605,6 +621,9 @@ pub(crate) async fn push_namespace(
process_results(vec![result]);
}
+ // Force flush of pending messages
+ sender_builder.close().await?;
+
if params.remove_vanished {
// only ever allow to prune owned groups on target
for target_group in owned_target_groups {
@@ -694,23 +713,45 @@ async fn push_group_do(
shared_group_progress: Arc<SharedGroupProgress>,
synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
not_owned_target_groups: Arc<HashSet<BackupGroup>>,
-) -> Result<SyncStats, ()> {
+ log_sender: Arc<LogLineSender>,
+) -> Result<SyncStats, Error> {
if not_owned_target_groups.contains(group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
- shared_group_progress.increment_done();
+ log_sender
+ .log(
+ Level::WARN,
+ format!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ ),
+ )
+ .await?;
return Ok(SyncStats::default());
}
synced_groups.lock().unwrap().insert(group.clone());
- push_group(params, namespace, group, Arc::clone(&shared_group_progress))
- .await
- .map_err(|err| {
- warn!("Group {group}: Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- })
+ match push_group(
+ params,
+ namespace,
+ group,
+ Arc::clone(&shared_group_progress),
+ Arc::clone(&log_sender),
+ )
+ .await
+ {
+ Ok(res) => Ok(res),
+ Err(err) => {
+ log_sender
+ .log(Level::WARN, format!("Encountered errors: {err:#}"))
+ .await?;
+ log_sender
+ .log(
+ Level::WARN,
+ format!("Failed to push group {group} to remote!"),
+ )
+ .await?;
+ Err(err)
+ }
+ }
}
/// Push group including all snaphshots to target
@@ -727,6 +768,7 @@ pub(crate) async fn push_group(
namespace: &BackupNamespace,
group: &BackupGroup,
shared_group_progress: Arc<SharedGroupProgress>,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -738,7 +780,12 @@ pub(crate) async fn push_group(
snapshots.sort_unstable_by_key(|a| a.backup.time);
if snapshots.is_empty() {
- info!("Group '{group}' contains no snapshots to sync to remote");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("Group '{group}' contains no snapshots to sync to remote"),
+ )
+ .await?;
}
let target_namespace = params.map_to_target(namespace)?;
@@ -786,11 +833,15 @@ pub(crate) async fn push_group(
.collect();
if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ log_sender
+ .log(Level::INFO, already_synced_skip_info.to_string())
+ .await?;
already_synced_skip_info.reset();
}
if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ log_sender
+ .log(Level::INFO, transfer_last_skip_info.to_string())
+ .await?;
transfer_last_skip_info.reset();
}
@@ -800,11 +851,18 @@ pub(crate) async fn push_group(
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
+ let prefix = proxmox_time::epoch_to_rfc3339_utc(source_snapshot.time)
+ .context("invalid timestamp")?;
+ log_sender
+ .log(Level::INFO, format!("{prefix}: start sync"))
+ .await?;
let result = push_snapshot(
¶ms,
namespace,
&source_snapshot,
fetch_previous_manifest,
+ Arc::clone(&log_sender),
+ &prefix,
)
.await;
fetch_previous_manifest = true;
@@ -812,10 +870,30 @@ pub(crate) async fn push_group(
// Update done groups progress by other parallel running pushes
local_progress.done_groups = shared_group_progress.load_done();
local_progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: group {group}: {local_progress}");
// stop on error
let sync_stats = result?;
+ log_sender
+ .log(Level::INFO, format!("{prefix}: sync done"))
+ .await?;
+ if params.worker_threads.unwrap_or(1) == 1 {
+ log_sender
+ .log(Level::INFO, format!("percentage done: {local_progress}"))
+ .await?;
+ } else {
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "snapshot {}/{} within {group} is done, {}/{} groups done",
+ local_progress.done_snapshots,
+ local_progress.group_snapshots,
+ local_progress.done_groups,
+ local_progress.total_groups,
+ ),
+ )
+ .await?;
+ }
stats.add(sync_stats);
}
@@ -825,25 +903,42 @@ pub(crate) async fn push_group(
continue;
}
if snapshot.protected {
- info!(
- "Kept protected snapshot {name} on remote",
- name = snapshot.backup
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "Kept protected snapshot {name} on remote",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
continue;
}
match forget_target_snapshot(¶ms, &target_namespace, &snapshot.backup).await {
Ok(()) => {
- info!(
- "Removed vanished snapshot {name} from remote",
- name = snapshot.backup
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "Removed vanished snapshot {name} from remote",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
}
Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!(
- "Failed to remove vanished snapshot {name} from remote!",
- name = snapshot.backup
- );
+ log_sender
+ .log(Level::WARN, format!("Encountered errors: {err:#}"))
+ .await?;
+ log_sender
+ .log(
+ Level::WARN,
+ format!(
+ "Failed to remove vanished snapshot {name} from remote!",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
}
}
stats.add(SyncStats::from(RemovedVanishedStats {
@@ -854,7 +949,14 @@ pub(crate) async fn push_group(
}
}
- shared_group_progress.increment_done();
+ if params.worker_threads.unwrap_or(1) > 1 {
+ log_sender
+ .log(
+ Level::INFO,
+ format!("group sync done: percentage done: {local_progress}"),
+ )
+ .await?;
+ }
Ok(stats)
}
@@ -870,24 +972,40 @@ pub(crate) async fn push_snapshot(
namespace: &BackupNamespace,
snapshot: &BackupDir,
fetch_previous_manifest: bool,
+ log_sender: Arc<LogLineSender>,
+ prefix: &String,
) -> Result<SyncStats, Error> {
let mut stats = SyncStats::default();
- let target_ns = params.map_to_target(namespace)?;
+ let target_ns = params
+ .map_to_target(namespace)
+ .with_context(|| prefix.clone())?;
let backup_dir = params
.source
.store
- .backup_dir(namespace.clone(), snapshot.clone())?;
+ .backup_dir(namespace.clone(), snapshot.clone())
+ .with_context(|| prefix.clone())?;
// Reader locks the snapshot
- let reader = params.source.reader(namespace, snapshot).await?;
+ let reader = params
+ .source
+ .reader(namespace, snapshot)
+ .await
+ .with_context(|| prefix.clone())?;
// Does not lock the manifest, but the reader already assures a locked snapshot
let source_manifest = match backup_dir.load_manifest() {
Ok((manifest, _raw_size)) => manifest,
Err(err) => {
// No manifest in snapshot or failed to read, warn and skip
- log::warn!("Encountered errors: {err:#}");
- log::warn!("Failed to load manifest for '{snapshot}'!");
+ log_sender
+ .log(
+ Level::WARN,
+ format!("{prefix}: Encountered errors: {err:#}"),
+ )
+ .await?;
+ log_sender
+ .log(Level::WARN, format!("{prefix}: Failed to load manifest!"))
+ .await?;
return Ok(stats);
}
};
@@ -914,14 +1032,22 @@ pub(crate) async fn push_snapshot(
no_cache: false,
},
)
- .await?;
+ .await
+ .with_context(|| prefix.clone())?;
let mut previous_manifest = None;
// Use manifest of previous snapshots in group on target for chunk upload deduplication
if fetch_previous_manifest {
match backup_writer.download_previous_manifest().await {
Ok(manifest) => previous_manifest = Some(Arc::new(manifest)),
- Err(err) => log::info!("Could not download previous manifest - {err}"),
+ Err(err) => {
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: Could not download previous manifest - {err}"),
+ )
+ .await?
+ }
}
};
@@ -950,12 +1076,32 @@ pub(crate) async fn push_snapshot(
path.push(&entry.filename);
if path.try_exists()? {
let archive_name = BackupArchiveName::from_path(&entry.filename)?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: sync archive {archive_name}"),
+ )
+ .await?;
+ let archive_prefix = format!("{prefix}/{archive_name}");
match archive_name.archive_type() {
ArchiveType::Blob => {
let file = std::fs::File::open(&path)?;
let backup_stats = backup_writer
.upload_blob(file, archive_name.as_ref())
.await?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(backup_stats.size),
+ HumanByte::new_binary(
+ backup_stats.size as f64 / backup_stats.duration.as_secs_f64()
+ ),
+ ),
+ )
+ .await
+ .with_context(|| archive_prefix.clone())?;
stats.add(SyncStats {
chunk_count: backup_stats.chunk_count as usize,
bytes: backup_stats.size as usize,
@@ -974,7 +1120,7 @@ pub(crate) async fn push_snapshot(
)
.await;
}
- let index = DynamicIndexReader::open(&path)?;
+ let index = DynamicIndexReader::open(&path).with_context(|| prefix.clone())?;
let chunk_reader = reader
.chunk_reader(entry.chunk_crypt_mode())
.context("failed to get chunk reader")?;
@@ -986,7 +1132,20 @@ pub(crate) async fn push_snapshot(
IndexType::Dynamic,
known_chunks.clone(),
)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ ),
+ )
+ .await?;
stats.add(sync_stats);
}
ArchiveType::FixedIndex => {
@@ -1003,7 +1162,8 @@ pub(crate) async fn push_snapshot(
let index = FixedIndexReader::open(&path)?;
let chunk_reader = reader
.chunk_reader(entry.chunk_crypt_mode())
- .context("failed to get chunk reader")?;
+ .context("failed to get chunk reader")
+ .with_context(|| archive_prefix.clone())?;
let size = index.index_bytes();
let sync_stats = push_index(
&archive_name,
@@ -1013,7 +1173,20 @@ pub(crate) async fn push_snapshot(
IndexType::Fixed(Some(size)),
known_chunks.clone(),
)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ ),
+ )
+ .await?;
stats.add(sync_stats);
}
}
@@ -1034,7 +1207,8 @@ pub(crate) async fn push_snapshot(
client_log_name.as_ref(),
upload_options.clone(),
)
- .await?;
+ .await
+ .with_context(|| prefix.clone())?;
}
// Rewrite manifest for pushed snapshot, recreating manifest from source on target
@@ -1046,8 +1220,12 @@ pub(crate) async fn push_snapshot(
MANIFEST_BLOB_NAME.as_ref(),
upload_options,
)
- .await?;
- backup_writer.finish().await?;
+ .await
+ .with_context(|| prefix.clone())?;
+ backup_writer
+ .finish()
+ .await
+ .with_context(|| prefix.clone())?;
stats.add(SyncStats {
chunk_count: backup_stats.chunk_count as usize,
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* [PATCH proxmox-backup v8 09/10] sync: move in-progress snapshot filter to helper and use log line sender
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (7 preceding siblings ...)
2026-04-22 13:18 ` [PATCH proxmox-backup v8 08/10] server: push: prefix log messages and add additional logging Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 13:18 ` [PATCH proxmox-backup v8 10/10] ui: expose group worker setting in sync job edit window Christian Ebner
2026-04-22 19:23 ` applied: [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Thomas Lamprecht
10 siblings, 0 replies; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
Currently, in-progress snapshots are being filtered out from the list
of source snapshots by pre-filtering and logging skipped snapshots
after gathering the list.
For parallel sync jobs, logging now requires however to go through
the BufferedLogger, by sending the logs via the LogLineSender. This
however requires to await inside an async context, which cannot
happen within the filter_map() closure.
Therefore, factor out the filtering to a dedicated helper in order to
avoid pollution of the SyncSource trait with a completely unrelated
parameter and refactor the filtering within that helper so the
logging can happen in async context.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 3 ++-
src/server/push.rs | 3 ++-
src/server/sync.rs | 39 ++++++++++++++++++++++++++-------------
3 files changed, 30 insertions(+), 15 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 97def85a5..47c568376 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -31,7 +31,7 @@ use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
use pbs_tools::sha::sha256;
use super::sync::{
- check_namespace_depth_limit, exclude_not_verified_or_encrypted,
+ check_namespace_depth_limit, exclude_not_verified_or_encrypted, filter_out_in_progress,
ignore_not_verified_or_encrypted, LocalSource, RemoteSource, RemovedVanishedStats, SkipInfo,
SkipReason, SyncSource, SyncSourceReader, SyncStats,
};
@@ -732,6 +732,7 @@ async fn pull_group(
.source
.list_backup_snapshots(source_namespace, group)
.await?;
+ raw_list = filter_out_in_progress(raw_list, Arc::clone(&log_sender)).await?;
raw_list.sort_unstable_by_key(|a| a.backup.time);
let target_ns = source_namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
diff --git a/src/server/push.rs b/src/server/push.rs
index 2ff46211c..1fbb82ebe 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -34,7 +34,7 @@ use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
use proxmox_human_byte::HumanByte;
use super::sync::{
- check_namespace_depth_limit, exclude_not_verified_or_encrypted,
+ check_namespace_depth_limit, exclude_not_verified_or_encrypted, filter_out_in_progress,
ignore_not_verified_or_encrypted, LocalSource, RemovedVanishedStats, SkipInfo, SkipReason,
SyncSource, SyncStats,
};
@@ -777,6 +777,7 @@ pub(crate) async fn push_group(
.source
.list_backup_snapshots(namespace, group)
.await?;
+ snapshots = filter_out_in_progress(snapshots, Arc::clone(&log_sender)).await?;
snapshots.sort_unstable_by_key(|a| a.backup.time);
if snapshots.is_empty() {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 17ed4839f..4827dc3f2 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -13,7 +13,7 @@ use futures::{future::FutureExt, select};
use hyper::http::StatusCode;
use pbs_config::BackupLockGuard;
use serde_json::json;
-use tracing::{info, warn};
+use tracing::{info, warn, Level};
use proxmox_human_byte::HumanByte;
use proxmox_rest_server::WorkerTask;
@@ -28,6 +28,7 @@ use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{BackupManifest, DataStore, ListNamespacesRecursive, LocalChunkReader};
+use pbs_tools::buffered_logger::LogLineSender;
use crate::backup::ListAccessibleBackupGroups;
use crate::server::jobstate::Job;
@@ -375,18 +376,7 @@ impl SyncSource for RemoteSource {
let mut result = self.client.get(&path, Some(args)).await?;
let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
- Ok(snapshot_list
- .into_iter()
- .filter_map(|item: SnapshotListItem| {
- // in-progress backups can't be synced
- if item.size.is_none() {
- info!("skipping snapshot {} - in-progress backup", item.backup);
- return None;
- }
-
- Some(item)
- })
- .collect::<Vec<SnapshotListItem>>())
+ Ok(snapshot_list)
}
fn get_ns(&self) -> BackupNamespace {
@@ -736,6 +726,29 @@ pub fn do_sync_job(
Ok(upid_str)
}
+pub(super) async fn filter_out_in_progress(
+ snapshots: Vec<SnapshotListItem>,
+ log_sender: Arc<LogLineSender>,
+) -> Result<Vec<SnapshotListItem>, Error> {
+ let mut filtered = Vec::with_capacity(snapshots.len());
+
+ for item in snapshots {
+ // in-progress backups can't be synced
+ if item.size.is_none() {
+ log_sender
+ .log(
+ Level::INFO,
+ format!("skipping snapshot {} - in-progress backup", item.backup),
+ )
+ .await?;
+ } else {
+ filtered.push(item);
+ }
+ }
+
+ Ok(filtered)
+}
+
pub(super) fn ignore_not_verified_or_encrypted(
manifest: &BackupManifest,
snapshot: &BackupDir,
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* [PATCH proxmox-backup v8 10/10] ui: expose group worker setting in sync job edit window
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (8 preceding siblings ...)
2026-04-22 13:18 ` [PATCH proxmox-backup v8 09/10] sync: move in-progress snapshot filter to helper and use log line sender Christian Ebner
@ 2026-04-22 13:18 ` Christian Ebner
2026-04-22 19:23 ` applied: [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Thomas Lamprecht
10 siblings, 0 replies; 15+ messages in thread
From: Christian Ebner @ 2026-04-22 13:18 UTC (permalink / raw)
To: pbs-devel
Allows to configure the number of parallel group works via the web
interface.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
www/window/SyncJobEdit.js | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index 074c7855a..26c82bc71 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -448,6 +448,17 @@ Ext.define('PBS.window.SyncJobEdit', {
deleteEmpty: '{!isCreate}',
},
},
+ {
+ xtype: 'proxmoxintegerfield',
+ name: 'worker-threads',
+ fieldLabel: gettext('# of Group Workers'),
+ emptyText: '1',
+ minValue: 1,
+ maxValue: 32,
+ cbind: {
+ deleteEmpty: '{!isCreate}',
+ },
+ },
{
xtype: 'proxmoxcheckbox',
fieldLabel: gettext('Re-sync Corrupt'),
--
2.47.3
^ permalink raw reply [flat|nested] 15+ messages in thread* applied: [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs
2026-04-22 13:18 [PATCH proxmox{,-backup} v8 00/10] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (9 preceding siblings ...)
2026-04-22 13:18 ` [PATCH proxmox-backup v8 10/10] ui: expose group worker setting in sync job edit window Christian Ebner
@ 2026-04-22 19:23 ` Thomas Lamprecht
10 siblings, 0 replies; 15+ messages in thread
From: Thomas Lamprecht @ 2026-04-22 19:23 UTC (permalink / raw)
To: pbs-devel, Christian Ebner
On Wed, 22 Apr 2026 15:18:10 +0200, Christian Ebner wrote:
> Syncing contents from/to a remote source via a sync job suffers from
> low throughput on high latency networks because of limitations by the
> HTTP/2 connection, as described in [0]. To improve, syncing multiple
> groups in parallel by establishing multiple reader instances has been
> suggested.
>
> This patch series implements the functionality by adding the sync job
> configuration property `worker-threads`, allowing to define the
> number of groups pull/push tokio tasks to be executed in parallel on
> the runtime during each job.
>
> [...]
Applied, with some minor follow-ups, thanks!
[1/9] tools: implement buffered logger for concurrent log messages
commit: e842d62d9cf169058b19197f08dac72aebfbedeb
[2/9] tools: add bounded join set to run concurrent tasks bound by limit
commit: 75cd11b19e2041f10957405fec1372082926226d
[3/9] api: config/sync: add optional `worker-threads` property
commit: 8711d4552961d8b70c9b592a79461e8dabad9f28
[4/9] fix #4182: server: sync: allow pulling backup groups in parallel
commit: fbff37959849ff79780622d68ac86ee6c74cb7b0
[5/9] server: pull: prefix log messages and add error context
commit: 44e682c7892424e64797fb0a958e63fb29e3d52f
[6/9] server: sync: allow pushing groups concurrently
commit: 3b4cf51e727e38a99eb0277b44eb7efd73422aab
[7/9] server: push: prefix log messages and add additional logging
commit: 3be2b6f6073b1a066ddb4736fb05bc118151d6c6
[8/9] sync: move in-progress snapshot filter to helper and use log line sender
commit: 38caf283dc69eef15625304a234a21ce4c3ef97e
[9/9] ui: expose group worker setting in sync job edit window
commit: cf09063fc5ebd8134941c98eb6ebd69af84da393
^ permalink raw reply [flat|nested] 15+ messages in thread