all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: Christian Ebner <c.ebner@proxmox.com>, pbs-devel@lists.proxmox.com
Subject: Re: [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages
Date: Mon, 20 Apr 2026 12:57:53 +0200	[thread overview]
Message-ID: <1776677069.eizi09d274.astroid@yuna.none> (raw)
In-Reply-To: <20260417092621.455374-4-c.ebner@proxmox.com>

On April 17, 2026 11:26 am, Christian Ebner wrote:
> Implements a buffered logger instance which collects messages send
> from different sender instances via an async tokio channel and
> buffers them. Sender identify by label and provide a log level for
> each log line to be buffered and flushed.
> 
> On collection, log lines are grouped by label and buffered in
> sequence of arrival per label, up to the configured maximum number of
> per group lines or periodically with the configured interval. The
> interval timeout is reset when contents are flushed. In addition,
> senders can request flushing at any given point.
> 
> When the timeout set based on the interval is reached, all labels
> log buffers are flushed. There is no guarantee on the order of labels
> when flushing.
> 
> Log output is written based on provided log line level and prefixed
> by the label.
> 
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> changes since version 5:
> - not present in previous version
> 
>  pbs-tools/Cargo.toml             |   2 +
>  pbs-tools/src/buffered_logger.rs | 216 +++++++++++++++++++++++++++++++
>  pbs-tools/src/lib.rs             |   1 +
>  3 files changed, 219 insertions(+)
>  create mode 100644 pbs-tools/src/buffered_logger.rs
> 
> diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
> index 998e3077e..6b1d92fa6 100644
> --- a/pbs-tools/Cargo.toml
> +++ b/pbs-tools/Cargo.toml
> @@ -17,10 +17,12 @@ openssl.workspace = true
>  serde_json.workspace = true
>  # rt-multi-thread is required for block_in_place
>  tokio = { workspace = true, features = [ "fs", "io-util", "rt", "rt-multi-thread", "sync" ] }
> +tracing.workspace = true
>  
>  proxmox-async.workspace = true
>  proxmox-io = { workspace = true, features = [ "tokio" ] }
>  proxmox-human-byte.workspace = true
> +proxmox-log.workspace = true
>  proxmox-sys.workspace = true
>  proxmox-time.workspace = true
>  
> diff --git a/pbs-tools/src/buffered_logger.rs b/pbs-tools/src/buffered_logger.rs
> new file mode 100644
> index 000000000..39cf068cd
> --- /dev/null
> +++ b/pbs-tools/src/buffered_logger.rs
> @@ -0,0 +1,216 @@
> +//! Log aggregator to collect and group messages send from concurrent tasks via

nit: sen*t*

> +//! a tokio channel.
> +
> +use std::collections::hash_map::Entry;
> +use std::collections::HashMap;
> +use std::time::Duration;
> +
> +use anyhow::Error;
> +use tokio::sync::mpsc;
> +use tokio::time::{self, Instant};
> +use tracing::{debug, error, info, trace, warn, Level};
> +
> +use proxmox_log::LogContext;
> +
> +/// Label to be used to group currently buffered messages when flushing.

I'd drop the currently here

> +pub type SenderLabel = String;
> +
> +/// Requested action for the log collection task
> +enum SenderRequest {
> +    // new log line to be buffered
> +    Message(LogLine),

see below, I think this should have the label split out

> +    // flush currently buffered log lines associated by sender label
> +    Flush(SenderLabel),

this is actually used at the moment to finish a particular label, maybe
that should be made explicit (see below)

> +}
> +
> +/// Logger instance to buffer and group log output to keep concurrent logs readable
> +///
> +/// Receives the logs from an async input channel, buffers them grouped by input
> +/// channel and flushes them after either reaching a timeout or capacity limit.

/// Receives log lines tagged with a label, and buffers them grouped
/// by the label value. Buffered messages are flushed either after
/// reaching a certain timeout or capacity limit, or when explicitly
/// requested.

> +pub struct BufferedLogger {
> +    // buffer to aggregate log lines based on sender label
> +    buffer_map: HashMap<SenderLabel, Vec<LogLine>>,

do we expect this to be always used with the tiny limits we currently
employ? if so, we might want to consider a different structure here that
is more efficient/optimized for that use case?

also note that this effectively duplicates the label once per line..

> +    // maximum number of received lines for an individual sender instance before
> +    // flushing
> +    max_buffered_lines: usize,
> +    // maximum aggregation duration of received lines for an individual sender
> +    // instance before flushing
> +    max_aggregation_time: Duration,
> +    // channel to receive log messages
> +    receiver: mpsc::Receiver<SenderRequest>,
> +}
> +
> +/// Instance to create new sender instances by cloning the channel sender
> +pub struct LogLineSenderBuilder {
> +    // to clone new senders if requested
> +    _sender: mpsc::Sender<SenderRequest>,

nit: this should be called `sender`, it is used below even if just for
cloning?

> +}
> +
> +impl LogLineSenderBuilder {
> +    /// Create new sender instance to send log messages, to be grouped by given label
> +    ///
> +    /// Label is not checked to be unique (no other instance with same label exists),
> +    /// it is the callers responsibility to check so if required.
> +    pub fn sender_with_label(&self, label: SenderLabel) -> LogLineSender {
> +        LogLineSender {
> +            label,
> +            sender: self._sender.clone(),
> +        }
> +    }
> +}
> +
> +/// Sender to publish new log messages to buffered log aggregator

this sender doesn't publish anything

> +pub struct LogLineSender {
> +    // label used to group log lines
> +    label: SenderLabel,
> +    // sender to publish new log lines to buffered log aggregator task
> +    sender: mpsc::Sender<SenderRequest>,
> +}
> +
> +impl LogLineSender {
> +    /// Send a new log message with given level to the buffered logger task
> +    pub async fn log(&self, level: Level, message: String) -> Result<(), Error> {
> +        let line = LogLine {
> +            label: self.label.clone(),
> +            level,
> +            message,
> +        };
> +        self.sender.send(SenderRequest::Message(line)).await?;
> +        Ok(())
> +    }
> +
> +    /// Flush all messages with sender's label

Flush all *buffered* messages with *this* sender's label

?

> +    pub async fn flush(&self) -> Result<(), Error> {
> +        self.sender
> +            .send(SenderRequest::Flush(self.label.clone()))
> +            .await?;
> +        Ok(())
> +    }
> +}
> +
> +/// Log message entity
> +struct LogLine {
> +    /// label indentifiying the sender

nit: typo, capitalization inconsistent

> +    label: SenderLabel,
> +    /// Log level to use during flushing

Log level of the message

?

> +    level: Level,
> +    /// log line to be buffered and flushed

Log message

?

buffering and flushing happens elsewhere..

> +    message: String,
> +}
> +
> +impl BufferedLogger {
> +    /// New instance of a buffered logger
> +    pub fn new(
> +        max_buffered_lines: usize,
> +        max_aggregation_time: Duration,
> +    ) -> (Self, LogLineSenderBuilder) {
> +        let (_sender, receiver) = mpsc::channel(100);

nit: this should be called `sender`

> +
> +        (
> +            Self {
> +                buffer_map: HashMap::new(),
> +                max_buffered_lines,
> +                max_aggregation_time,
> +                receiver,
> +            },
> +            LogLineSenderBuilder { _sender },
> +        )
> +    }
> +
> +    /// Starts the collection loop spawned on a new tokio task
> +    /// Finishes when all sender belonging to the channel have been dropped.
> +    pub fn run_log_collection(mut self) {
> +        let future = async move {
> +            loop {
> +                let deadline = Instant::now() + self.max_aggregation_time;
> +                match time::timeout_at(deadline, self.receive_log_line()).await {

why manually calculate the deadline, wouldn't using `time::timeout` work
as well? the only difference from a quick glance is that that one does a
checked_add for now + delay..

but also, isn't this kind of broken in any case? let's say I have two
labels A and B:

 0.99 A1
 1.98 A2
 2.97 A3
 3.96 A4
 4.95 A5 (now A is at capacity)
 5.94 B1
 9.90 B5 (now B is at capacity as well)

either

10.90 timeout elapses, everything is flushed

or

10.89 A6 (A gets flushed and can start over - but B hasn't been flushed)
11.88 A7
12.87 A8
13.86 A9
14.85 A10 (A has 5 buffered messages again)
..

this means that any label that doesn't log a 6th message can stall for
quite a long time, as long as other labels make progress (and it isn't
flushed explicitly)?

> +                    Ok(finished) => {
> +                        if finished {
> +                            break;
> +                        }
> +                    }
> +                    Err(_timeout) => self.flush_all_buffered(),
> +                }
> +            }
> +        };
> +        match LogContext::current() {
> +            None => tokio::spawn(future),
> +            Some(context) => tokio::spawn(context.scope(future)),
> +        };
> +    }
> +
> +    /// Collects new log lines, buffers and flushes them if max lines limit exceeded.
> +    ///
> +    /// Returns `true` if all the senders have been dropped and the task should no
> +    /// longer wait for new messages and finish.
> +    async fn receive_log_line(&mut self) -> bool {
> +        if let Some(request) = self.receiver.recv().await {
> +            match request {
> +                SenderRequest::Flush(label) => {
> +                    if let Some(log_lines) = self.buffer_map.get_mut(&label) {
> +                        Self::log_with_label(&label, log_lines);
> +                        log_lines.clear();
> +                    }
> +                }
> +                SenderRequest::Message(log_line) => {

if this would be Message((label, level, line)) or Message((label,
level_and_line)) the label would not need to be stored in the buffer
keys and values..

> +                    if self.max_buffered_lines == 0
> +                        || self.max_aggregation_time < Duration::from_secs(0)

the timeout can never be below zero, as that is the minimum duration
(duration is unsigned)?

> +                    {
> +                        // shortcut if no buffering should happen
> +                        Self::log_by_level(&log_line.label, &log_line);

shouldn't we rather handle this by not using the buffered logger in the
first place? e.g., have this and a simple not-buffering logger implement
a shared logging trait, or something similar?

one simple approach would be to just make the LogLineSender log directly
in this case, and not send anything at all?

because if we don't want buffering, sending all log messages through a
channel and setting up the timeout machinery can be avoided completely..

> +                    }
> +
> +                    match self.buffer_map.entry(log_line.label.clone()) {
> +                        Entry::Occupied(mut occupied) => {
> +                            let log_lines = occupied.get_mut();
> +                            if log_lines.len() + 1 > self.max_buffered_lines {
> +                                // reached limit for this label,
> +                                // flush all buffered and new log line
> +                                Self::log_with_label(&log_line.label, log_lines);
> +                                log_lines.clear();
> +                                Self::log_by_level(&log_line.label, &log_line);
> +                            } else {
> +                                // below limit, push to buffer to flush later
> +                                log_lines.push(log_line);
> +                            }
> +                        }
> +                        Entry::Vacant(vacant) => {
> +                            vacant.insert(vec![log_line]);
> +                        }
> +                    }
> +                }
> +            }
> +            return false;
> +        }
> +
> +        // no more senders, all LogLineSender's and LogLineSenderBuilder have been dropped

nit: typo `'s`

> +        self.flush_all_buffered();
> +        true
> +    }
> +
> +    /// Flush all currently buffered contents without ordering, but grouped by label
> +    fn flush_all_buffered(&mut self) {
> +        for (label, log_lines) in self.buffer_map.iter() {
> +            Self::log_with_label(label, log_lines);
> +        }
> +        self.buffer_map.clear();

wouldn't it be better performance wise to
- clear each label's log lines (like in SendRequest::Flush)
- remove the hashmap entry in SendRequest::Flush, or rename that one to
  finish, since that is what it actually does?

granted, this only triggers when the timeout elapses or there are no
more senders, but for the timeout case it might still be beneficial? it
should remove a lot of allocation churn at least..

> +    }
> +
> +    /// Log given log lines prefixed by label
> +    fn log_with_label(label: &str, log_lines: &[LogLine]) {

currently each LogLine contains the label anyway, but see above, I do
think this split makes sense but it should be done completely ;)

> +        for log_line in log_lines {
> +            Self::log_by_level(label, log_line);
> +        }
> +    }
> +
> +    /// Write the given log line prefixed by label
> +    fn log_by_level(label: &str, log_line: &LogLine) {

this also logs with label, IMHO the naming is confusing..

> +        match log_line.level {
> +            Level::ERROR => error!("[{label}]: {}", log_line.message),
> +            Level::WARN => warn!("[{label}]: {}", log_line.message),
> +            Level::INFO => info!("[{label}]: {}", log_line.message),
> +            Level::DEBUG => debug!("[{label}]: {}", log_line.message),
> +            Level::TRACE => trace!("[{label}]: {}", log_line.message),
> +        }
> +    }
> +}
> diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs
> index f41aef6df..1e3972c92 100644
> --- a/pbs-tools/src/lib.rs
> +++ b/pbs-tools/src/lib.rs
> @@ -1,4 +1,5 @@
>  pub mod async_lru_cache;
> +pub mod buffered_logger;
>  pub mod cert;
>  pub mod crypt_config;
>  pub mod format;
> -- 
> 2.47.3
> 
> 
> 
> 
> 
> 




  reply	other threads:[~2026-04-20 10:58 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-17  9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-20 10:57   ` Fabian Grünbichler [this message]
2026-04-20 17:15     ` Christian Ebner
2026-04-21  6:49       ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-20 11:15   ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-20 12:29   ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-20 11:56   ` Fabian Grünbichler
2026-04-21  7:21     ` Christian Ebner
2026-04-21  7:42       ` Christian Ebner
2026-04-21  8:00         ` Fabian Grünbichler
2026-04-21  8:04           ` Christian Ebner
2026-04-21 12:57     ` Thomas Lamprecht
2026-04-17  9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner
2026-04-20 12:33 ` [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Fabian Grünbichler
2026-04-21 10:28 ` superseded: " Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1776677069.eizi09d274.astroid@yuna.none \
    --to=f.gruenbichler@proxmox.com \
    --cc=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal