all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v7 2/9] tools: implement buffered logger for concurrent log messages
Date: Tue, 21 Apr 2026 12:26:47 +0200	[thread overview]
Message-ID: <20260421102654.610007-3-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260421102654.610007-1-c.ebner@proxmox.com>

Adds a trait LogSender, implemented by a shortcut
UnbufferedLogLineSender to be used when no buffering must occur and
the LogLineSender, which interacts with the BufferedLogger in order
to buffer and flush logged contents.

The BufferedLogger instance collects messages send from different
sender instances via an async tokio channel and buffers them. Sender
identify by label and provide a log level for each log line to be
buffered and flushed.

On collection, log lines are grouped by label and buffered in
sequence of arrival per label, up to the configured maximum number of
per group lines. In addition, the last logged timestamp is updated,
which will be used to periodically flush log lines per-label with the
configured interval. There is no guarantee on the order of labels when
flushing.

In addition, senders can request flushing at any given point.

Log output is written based on provided log line level and prefixed
by the label.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-tools/Cargo.toml             |   3 +
 pbs-tools/src/buffered_logger.rs | 253 +++++++++++++++++++++++++++++++
 pbs-tools/src/lib.rs             |   1 +
 3 files changed, 257 insertions(+)
 create mode 100644 pbs-tools/src/buffered_logger.rs

diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
index 998e3077e..8bc01e85d 100644
--- a/pbs-tools/Cargo.toml
+++ b/pbs-tools/Cargo.toml
@@ -8,6 +8,7 @@ description = "common tools used throughout pbs"
 # This must not depend on any subcrates more closely related to pbs itself.
 [dependencies]
 anyhow.workspace = true
+async-trait.workspace = true
 bytes.workspace = true
 foreign-types.workspace = true
 hex.workspace = true
@@ -17,10 +18,12 @@ openssl.workspace = true
 serde_json.workspace = true
 # rt-multi-thread is required for block_in_place
 tokio = { workspace = true, features = [ "fs", "io-util", "rt", "rt-multi-thread", "sync" ] }
+tracing.workspace = true
 
 proxmox-async.workspace = true
 proxmox-io = { workspace = true, features = [ "tokio" ] }
 proxmox-human-byte.workspace = true
+proxmox-log.workspace = true
 proxmox-sys.workspace = true
 proxmox-time.workspace = true
 
diff --git a/pbs-tools/src/buffered_logger.rs b/pbs-tools/src/buffered_logger.rs
new file mode 100644
index 000000000..d0217850f
--- /dev/null
+++ b/pbs-tools/src/buffered_logger.rs
@@ -0,0 +1,253 @@
+//! Log aggregator to collect and group messages sent from concurrent tasks via
+//! a tokio channel.
+
+use std::collections::hash_map::Entry;
+use std::collections::HashMap;
+use std::time::Duration;
+
+use anyhow::Error;
+use tokio::sync::mpsc;
+use tokio::time::Instant;
+use tracing::{debug, error, info, trace, warn, Level};
+
+use proxmox_log::LogContext;
+
+#[async_trait::async_trait]
+pub trait LogSender: Send + Sync {
+    async fn log(&self, level: Level, message: String) -> Result<(), Error>;
+    async fn flush(&self) -> Result<(), Error>;
+}
+
+pub struct UnbufferedLogLineSender {
+    label: SenderLabel,
+}
+
+impl UnbufferedLogLineSender {
+    pub fn new(label: String) -> Self {
+        Self { label }
+    }
+}
+
+#[async_trait::async_trait]
+impl LogSender for UnbufferedLogLineSender {
+    async fn log(&self, level: Level, message: String) -> Result<(), Error> {
+        let label = &self.label;
+        match level {
+            Level::ERROR => error!("[{label}]: {message}"),
+            Level::WARN => warn!("[{label}]: {message}"),
+            Level::INFO => info!("[{label}]: {message}"),
+            Level::DEBUG => debug!("[{label}]: {message}"),
+            Level::TRACE => trace!("[{label}]: {message}"),
+        }
+        Ok(())
+    }
+
+    async fn flush(&self) -> Result<(), Error> {
+        Ok(())
+    }
+}
+
+/// Label to be used to group buffered messages when flushing.
+pub type SenderLabel = String;
+
+/// Requested action for the log collection task
+enum SenderRequest {
+    /// New log line to be buffered
+    Message(SenderLabel, LogLine),
+    /// Flush currently buffered log lines associated by sender label
+    Flush(SenderLabel),
+}
+
+/// Receives log lines tagged with a label, and buffers them grouped
+/// by the label value. Buffered messages are flushed either after
+/// reaching a certain timeout or capacity limit, or when explicitly
+/// requested.
+pub struct BufferedLogger {
+    /// Buffer to aggregate log lines and last message received instant based on sender label
+    buffer_map: HashMap<SenderLabel, (Instant, Vec<LogLine>)>,
+    /// Maximum number of received lines for an individual sender instance before
+    /// flushing
+    max_buffered_lines: usize,
+    /// Maximum aggregation duration of received lines for an individual sender
+    /// instance before flushing
+    max_aggregation_time: Duration,
+    /// Channel to receive log messages
+    receiver: mpsc::Receiver<SenderRequest>,
+}
+
+/// Instance to create new sender instances by cloning the channel sender
+pub struct LogLineSenderBuilder {
+    // Used to clone new senders if requested
+    sender: mpsc::Sender<SenderRequest>,
+}
+
+impl LogLineSenderBuilder {
+    /// Create new sender instance to send log messages, to be grouped by given label
+    ///
+    /// Label is not checked to be unique (no other instance with same label exists),
+    /// it is the callers responsibility to check so if required.
+    pub fn sender_with_label(&self, label: SenderLabel) -> LogLineSender {
+        LogLineSender {
+            label,
+            sender: self.sender.clone(),
+        }
+    }
+}
+
+/// Sender to send new log messages to buffered log aggregator
+pub struct LogLineSender {
+    /// Label used to group log lines
+    label: SenderLabel,
+    /// Sender to publish new log lines to buffered log aggregator task
+    sender: mpsc::Sender<SenderRequest>,
+}
+
+#[async_trait::async_trait]
+impl LogSender for LogLineSender {
+    /// Send a new log message with given level to the buffered logger task
+    async fn log(&self, level: Level, message: String) -> Result<(), Error> {
+        let line = LogLine { level, message };
+        self.sender
+            .send(SenderRequest::Message(self.label.clone(), line))
+            .await?;
+        Ok(())
+    }
+
+    /// Request flushing of all buffered messages with this sender's label
+    async fn flush(&self) -> Result<(), Error> {
+        self.sender
+            .send(SenderRequest::Flush(self.label.clone()))
+            .await?;
+        Ok(())
+    }
+}
+
+/// Log message entity
+struct LogLine {
+    /// Log level of the log message
+    level: Level,
+    /// Log message
+    message: String,
+}
+
+impl BufferedLogger {
+    /// New instance of a buffered logger
+    pub fn new(
+        max_buffered_lines: usize,
+        max_aggregation_time: Duration,
+    ) -> (Self, LogLineSenderBuilder) {
+        let (sender, receiver) = mpsc::channel(100);
+
+        (
+            Self {
+                buffer_map: HashMap::new(),
+                max_buffered_lines,
+                max_aggregation_time,
+                receiver,
+            },
+            LogLineSenderBuilder { sender },
+        )
+    }
+
+    /// Starts the collection loop spawned on a new tokio task
+    /// Finishes when all sender belonging to the channel have been dropped.
+    pub fn run_log_collection(mut self) {
+        let future = async move {
+            loop {
+                for (label, (last_logged, log_lines)) in self.buffer_map.iter_mut() {
+                    if !log_lines.is_empty()
+                        && Instant::now().duration_since(*last_logged) > self.max_aggregation_time
+                    {
+                        Self::flush_by_label(label, log_lines);
+                    }
+                }
+
+                match tokio::time::timeout(self.max_aggregation_time, self.receive_log_line()).await
+                {
+                    Ok(finished) => {
+                        if finished {
+                            break;
+                        }
+                    }
+                    Err(_timeout) => self.flush_all_buffered(),
+                }
+            }
+        };
+        match LogContext::current() {
+            None => tokio::spawn(future),
+            Some(context) => tokio::spawn(context.scope(future)),
+        };
+    }
+
+    /// Collects new log lines, buffers and flushes them if max lines limit exceeded.
+    ///
+    /// Returns `true` if all the senders have been dropped and the task should no
+    /// longer wait for new messages and finish.
+    async fn receive_log_line(&mut self) -> bool {
+        if let Some(request) = self.receiver.recv().await {
+            match request {
+                SenderRequest::Flush(label) => {
+                    if let Some((_last_logged, log_lines)) = self.buffer_map.get_mut(&label) {
+                        Self::flush_by_label(&label, log_lines);
+                    }
+                }
+                SenderRequest::Message(label, log_line) => {
+                    if self.max_buffered_lines == 0 || self.max_aggregation_time.is_zero() {
+                        // shortcut if no buffering should happen
+                        Self::log_by_level(&label, &log_line);
+                    }
+
+                    match self.buffer_map.entry(label.clone()) {
+                        Entry::Occupied(mut occupied) => {
+                            let (last_logged, log_lines) = occupied.get_mut();
+                            *last_logged = Instant::now();
+                            if log_lines.len() + 1 > self.max_buffered_lines {
+                                // reached limit for this label,
+                                // flush all buffered and new log line
+                                Self::flush_by_label(&label, log_lines);
+                                Self::log_by_level(&label, &log_line);
+                            } else {
+                                // below limit, push to buffer to flush later
+                                log_lines.push(log_line);
+                            }
+                        }
+                        Entry::Vacant(vacant) => {
+                            vacant.insert((Instant::now(), vec![log_line]));
+                        }
+                    }
+                }
+            }
+            return false;
+        }
+
+        // no more senders, all LogLineSenders and LogLineSenderBuilder have been dropped
+        self.flush_all_buffered();
+        true
+    }
+
+    /// Flush all currently buffered contents without ordering, but grouped by label
+    fn flush_all_buffered(&mut self) {
+        for (label, (_last_logged, log_lines)) in self.buffer_map.iter_mut() {
+            Self::flush_by_label(label, log_lines);
+        }
+    }
+
+    /// Flush all currently buffered contents without ordering, but grouped by label
+    fn flush_by_label(label: &str, log_lines: &mut Vec<LogLine>) {
+        for log_line in log_lines.iter() {
+            Self::log_by_level(label, log_line);
+        }
+        log_lines.clear();
+    }
+
+    /// Write the given log line prefixed by label
+    fn log_by_level(label: &str, log_line: &LogLine) {
+        match log_line.level {
+            Level::ERROR => error!("[{label}]: {}", log_line.message),
+            Level::WARN => warn!("[{label}]: {}", log_line.message),
+            Level::INFO => info!("[{label}]: {}", log_line.message),
+            Level::DEBUG => debug!("[{label}]: {}", log_line.message),
+            Level::TRACE => trace!("[{label}]: {}", log_line.message),
+        }
+    }
+}
diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs
index f41aef6df..1e3972c92 100644
--- a/pbs-tools/src/lib.rs
+++ b/pbs-tools/src/lib.rs
@@ -1,4 +1,5 @@
 pub mod async_lru_cache;
+pub mod buffered_logger;
 pub mod cert;
 pub mod crypt_config;
 pub mod format;
-- 
2.47.3





  parent reply	other threads:[~2026-04-21 10:27 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-21 10:26 [PATCH proxmox{,-backup} v7 0/9] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox v7 1/9] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-21 22:18   ` applied: " Thomas Lamprecht
2026-04-21 10:26 ` Christian Ebner [this message]
2026-04-21 10:26 ` [PATCH proxmox-backup v7 3/9] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 4/9] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 5/9] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 6/9] server: pull: prefix log messages and add error context Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 7/9] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 8/9] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 9/9] ui: expose group worker setting in sync job edit window Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260421102654.610007-3-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal