public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>,
	pbs-devel@lists.proxmox.com
Subject: Re: [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages
Date: Mon, 20 Apr 2026 19:15:13 +0200	[thread overview]
Message-ID: <bda178d7-f666-49e7-9859-29f972bae36b@proxmox.com> (raw)
In-Reply-To: <1776677069.eizi09d274.astroid@yuna.none>

On 4/20/26 12:56 PM, Fabian Grünbichler wrote:
> On April 17, 2026 11:26 am, Christian Ebner wrote:
>> Implements a buffered logger instance which collects messages send
>> from different sender instances via an async tokio channel and
>> buffers them. Sender identify by label and provide a log level for
>> each log line to be buffered and flushed.
>>
>> On collection, log lines are grouped by label and buffered in
>> sequence of arrival per label, up to the configured maximum number of
>> per group lines or periodically with the configured interval. The
>> interval timeout is reset when contents are flushed. In addition,
>> senders can request flushing at any given point.
>>
>> When the timeout set based on the interval is reached, all labels
>> log buffers are flushed. There is no guarantee on the order of labels
>> when flushing.
>>
>> Log output is written based on provided log line level and prefixed
>> by the label.
>>
>> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
>> ---
>> changes since version 5:
>> - not present in previous version
>>
>>   pbs-tools/Cargo.toml             |   2 +
>>   pbs-tools/src/buffered_logger.rs | 216 +++++++++++++++++++++++++++++++
>>   pbs-tools/src/lib.rs             |   1 +
>>   3 files changed, 219 insertions(+)
>>   create mode 100644 pbs-tools/src/buffered_logger.rs
>>
>> diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
>> index 998e3077e..6b1d92fa6 100644
>> --- a/pbs-tools/Cargo.toml
>> +++ b/pbs-tools/Cargo.toml
>> @@ -17,10 +17,12 @@ openssl.workspace = true
>>   serde_json.workspace = true
>>   # rt-multi-thread is required for block_in_place
>>   tokio = { workspace = true, features = [ "fs", "io-util", "rt", "rt-multi-thread", "sync" ] }
>> +tracing.workspace = true
>>   
>>   proxmox-async.workspace = true
>>   proxmox-io = { workspace = true, features = [ "tokio" ] }
>>   proxmox-human-byte.workspace = true
>> +proxmox-log.workspace = true
>>   proxmox-sys.workspace = true
>>   proxmox-time.workspace = true
>>   
>> diff --git a/pbs-tools/src/buffered_logger.rs b/pbs-tools/src/buffered_logger.rs
>> new file mode 100644
>> index 000000000..39cf068cd
>> --- /dev/null
>> +++ b/pbs-tools/src/buffered_logger.rs
>> @@ -0,0 +1,216 @@
>> +//! Log aggregator to collect and group messages send from concurrent tasks via
> 
> nit: sen*t*
> 
>> +//! a tokio channel.
>> +
>> +use std::collections::hash_map::Entry;
>> +use std::collections::HashMap;
>> +use std::time::Duration;
>> +
>> +use anyhow::Error;
>> +use tokio::sync::mpsc;
>> +use tokio::time::{self, Instant};
>> +use tracing::{debug, error, info, trace, warn, Level};
>> +
>> +use proxmox_log::LogContext;
>> +
>> +/// Label to be used to group currently buffered messages when flushing.
> 
> I'd drop the currently here
> 
>> +pub type SenderLabel = String;
>> +
>> +/// Requested action for the log collection task
>> +enum SenderRequest {
>> +    // new log line to be buffered
>> +    Message(LogLine),
> 
> see below, I think this should have the label split out
> 
>> +    // flush currently buffered log lines associated by sender label
>> +    Flush(SenderLabel),
> 
> this is actually used at the moment to finish a particular label, maybe
> that should be made explicit (see below)
> 
>> +}
>> +
>> +/// Logger instance to buffer and group log output to keep concurrent logs readable
>> +///
>> +/// Receives the logs from an async input channel, buffers them grouped by input
>> +/// channel and flushes them after either reaching a timeout or capacity limit.
> 
> /// Receives log lines tagged with a label, and buffers them grouped
> /// by the label value. Buffered messages are flushed either after
> /// reaching a certain timeout or capacity limit, or when explicitly
> /// requested.
> 
>> +pub struct BufferedLogger {
>> +    // buffer to aggregate log lines based on sender label
>> +    buffer_map: HashMap<SenderLabel, Vec<LogLine>>,
> 
> do we expect this to be always used with the tiny limits we currently
> employ? if so, we might want to consider a different structure here that
> is more efficient/optimized for that use case?

Yes, at the moment I do not foresee any users of this which would 
require larger limits. What struct do you suggest though? A Vec suffers 
from issues when deleting no longer needed items, so a linked list?

> also note that this effectively duplicates the label once per line..

Yes, it is better to factor out the `SenderLabel` from the `LogLine`, 
and send new messages via `Message(SenderLabel, LogLine)` instead. 
Adapted for v7.

> 
>> +    // maximum number of received lines for an individual sender instance before
>> +    // flushing
>> +    max_buffered_lines: usize,
>> +    // maximum aggregation duration of received lines for an individual sender
>> +    // instance before flushing
>> +    max_aggregation_time: Duration,
>> +    // channel to receive log messages
>> +    receiver: mpsc::Receiver<SenderRequest>,
>> +}
>> +
>> +/// Instance to create new sender instances by cloning the channel sender
>> +pub struct LogLineSenderBuilder {
>> +    // to clone new senders if requested
>> +    _sender: mpsc::Sender<SenderRequest>,
> 
> nit: this should be called `sender`, it is used below even if just for
> cloning?

Acked, adapting this for v7.

> 
>> +}
>> +
>> +impl LogLineSenderBuilder {
>> +    /// Create new sender instance to send log messages, to be grouped by given label
>> +    ///
>> +    /// Label is not checked to be unique (no other instance with same label exists),
>> +    /// it is the callers responsibility to check so if required.
>> +    pub fn sender_with_label(&self, label: SenderLabel) -> LogLineSender {
>> +        LogLineSender {
>> +            label,
>> +            sender: self._sender.clone(),
>> +        }
>> +    }
>> +}
>> +
>> +/// Sender to publish new log messages to buffered log aggregator
> 
> this sender doesn't publish anything
> 
>> +pub struct LogLineSender {
>> +    // label used to group log lines
>> +    label: SenderLabel,
>> +    // sender to publish new log lines to buffered log aggregator task
>> +    sender: mpsc::Sender<SenderRequest>,
>> +}
>> +
>> +impl LogLineSender {
>> +    /// Send a new log message with given level to the buffered logger task
>> +    pub async fn log(&self, level: Level, message: String) -> Result<(), Error> {
>> +        let line = LogLine {
>> +            label: self.label.clone(),
>> +            level,
>> +            message,
>> +        };
>> +        self.sender.send(SenderRequest::Message(line)).await?;
>> +        Ok(())
>> +    }
>> +
>> +    /// Flush all messages with sender's label
> 
> Flush all *buffered* messages with *this* sender's label
> 
> ?
> 
>> +    pub async fn flush(&self) -> Result<(), Error> {
>> +        self.sender
>> +            .send(SenderRequest::Flush(self.label.clone()))
>> +            .await?;
>> +        Ok(())
>> +    }
>> +}
>> +
>> +/// Log message entity
>> +struct LogLine {
>> +    /// label indentifiying the sender
> 
> nit: typo, capitalization inconsistent
> 
>> +    label: SenderLabel,
>> +    /// Log level to use during flushing
> 
> Log level of the message
> 
> ?
> 
>> +    level: Level,
>> +    /// log line to be buffered and flushed
> 
> Log message
> 
> ?
> 
> buffering and flushing happens elsewhere..
> 
>> +    message: String,
>> +}
>> +
>> +impl BufferedLogger {
>> +    /// New instance of a buffered logger
>> +    pub fn new(
>> +        max_buffered_lines: usize,
>> +        max_aggregation_time: Duration,
>> +    ) -> (Self, LogLineSenderBuilder) {
>> +        let (_sender, receiver) = mpsc::channel(100);
> 
> nit: this should be called `sender`
> 
>> +
>> +        (
>> +            Self {
>> +                buffer_map: HashMap::new(),
>> +                max_buffered_lines,
>> +                max_aggregation_time,
>> +                receiver,
>> +            },
>> +            LogLineSenderBuilder { _sender },
>> +        )
>> +    }
>> +
>> +    /// Starts the collection loop spawned on a new tokio task
>> +    /// Finishes when all sender belonging to the channel have been dropped.
>> +    pub fn run_log_collection(mut self) {
>> +        let future = async move {
>> +            loop {
>> +                let deadline = Instant::now() + self.max_aggregation_time;
>> +                match time::timeout_at(deadline, self.receive_log_line()).await {
> 
> why manually calculate the deadline, wouldn't using `time::timeout` work
> as well? the only difference from a quick glance is that that one does a
> checked_add for now + delay..

No specific reason for using timeout_at() here, was primed by having 
based this on the s3 client timeout

> 
> but also, isn't this kind of broken in any case? let's say I have two
> labels A and B:
> 
>   0.99 A1
>   1.98 A2
>   2.97 A3
>   3.96 A4
>   4.95 A5 (now A is at capacity)
>   5.94 B1
>   9.90 B5 (now B is at capacity as well)
> 
> either
> 
> 10.90 timeout elapses, everything is flushed
> 
> or
> 
> 10.89 A6 (A gets flushed and can start over - but B hasn't been flushed)
> 11.88 A7
> 12.87 A8
> 13.86 A9
> 14.85 A10 (A has 5 buffered messages again)
> ..
> 
> this means that any label that doesn't log a 6th message can stall for
> quite a long time, as long as other labels make progress (and it isn't
> flushed explicitly)?

Yes, this is true, but that is not really avoidable unless there is a 
timeout per label. Or would you suggest to simply flush all buffered 
lines at periodic intervals, without resetting at all?


> 
>> +                    Ok(finished) => {
>> +                        if finished {
>> +                            break;
>> +                        }
>> +                    }
>> +                    Err(_timeout) => self.flush_all_buffered(),
>> +                }
>> +            }
>> +        };
>> +        match LogContext::current() {
>> +            None => tokio::spawn(future),
>> +            Some(context) => tokio::spawn(context.scope(future)),
>> +        };
>> +    }
>> +
>> +    /// Collects new log lines, buffers and flushes them if max lines limit exceeded.
>> +    ///
>> +    /// Returns `true` if all the senders have been dropped and the task should no
>> +    /// longer wait for new messages and finish.
>> +    async fn receive_log_line(&mut self) -> bool {
>> +        if let Some(request) = self.receiver.recv().await {
>> +            match request {
>> +                SenderRequest::Flush(label) => {
>> +                    if let Some(log_lines) = self.buffer_map.get_mut(&label) {
>> +                        Self::log_with_label(&label, log_lines);
>> +                        log_lines.clear();
>> +                    }
>> +                }
>> +                SenderRequest::Message(log_line) => {
> 
> if this would be Message((label, level, line)) or Message((label,
> level_and_line)) the label would not need to be stored in the buffer
> keys and values..

Yes, adapted based on above mention already

> 
>> +                    if self.max_buffered_lines == 0
>> +                        || self.max_aggregation_time < Duration::from_secs(0)
> 
> the timeout can never be below zero, as that is the minimum duration
> (duration is unsigned)?

This is a typo, the intention was to check for durations below 1 second, 
but since the granularity is seconds, this should check for 0 instead.

> 
>> +                    {
>> +                        // shortcut if no buffering should happen
>> +                        Self::log_by_level(&log_line.label, &log_line);
> 
> shouldn't we rather handle this by not using the buffered logger in the
> first place? e.g., have this and a simple not-buffering logger implement
> a shared logging trait, or something similar?

Hmm, that might be better, yes. Will add a trait with 2 implementations 
based on which logger is required.

> 
> one simple approach would be to just make the LogLineSender log directly
> in this case, and not send anything at all?
> 
> because if we don't want buffering, sending all log messages through a
> channel and setting up the timeout machinery can be avoided completely..
> 
>> +                    }
>> +
>> +                    match self.buffer_map.entry(log_line.label.clone()) {
>> +                        Entry::Occupied(mut occupied) => {
>> +                            let log_lines = occupied.get_mut();
>> +                            if log_lines.len() + 1 > self.max_buffered_lines {
>> +                                // reached limit for this label,
>> +                                // flush all buffered and new log line
>> +                                Self::log_with_label(&log_line.label, log_lines);
>> +                                log_lines.clear();
>> +                                Self::log_by_level(&log_line.label, &log_line);
>> +                            } else {
>> +                                // below limit, push to buffer to flush later
>> +                                log_lines.push(log_line);
>> +                            }
>> +                        }
>> +                        Entry::Vacant(vacant) => {
>> +                            vacant.insert(vec![log_line]);
>> +                        }
>> +                    }
>> +                }
>> +            }
>> +            return false;
>> +        }
>> +
>> +        // no more senders, all LogLineSender's and LogLineSenderBuilder have been dropped
> 
> nit: typo `'s`
> 
>> +        self.flush_all_buffered();
>> +        true
>> +    }
>> +
>> +    /// Flush all currently buffered contents without ordering, but grouped by label
>> +    fn flush_all_buffered(&mut self) {
>> +        for (label, log_lines) in self.buffer_map.iter() {
>> +            Self::log_with_label(label, log_lines);
>> +        }
>> +        self.buffer_map.clear();
> 
> wouldn't it be better performance wise to
> - clear each label's log lines (like in SendRequest::Flush)
> - remove the hashmap entry in SendRequest::Flush, or rename that one to
>    finish, since that is what it actually does?
> 
> granted, this only triggers when the timeout elapses or there are no
> more senders, but for the timeout case it might still be beneficial? it
> should remove a lot of allocation churn at least..

Yes, true. Fixed that as well.

> 
>> +    }
>> +
>> +    /// Log given log lines prefixed by label
>> +    fn log_with_label(label: &str, log_lines: &[LogLine]) {
> 
> currently each LogLine contains the label anyway, but see above, I do
> think this split makes sense but it should be done completely ;)

yes, makes more sense, split off as suggested

> 
>> +        for log_line in log_lines {
>> +            Self::log_by_level(label, log_line);
>> +        }
>> +    }
>> +
>> +    /// Write the given log line prefixed by label
>> +    fn log_by_level(label: &str, log_line: &LogLine) {
> 
> this also logs with label, IMHO the naming is confusing..
> 
>> +        match log_line.level {
>> +            Level::ERROR => error!("[{label}]: {}", log_line.message),
>> +            Level::WARN => warn!("[{label}]: {}", log_line.message),
>> +            Level::INFO => info!("[{label}]: {}", log_line.message),
>> +            Level::DEBUG => debug!("[{label}]: {}", log_line.message),
>> +            Level::TRACE => trace!("[{label}]: {}", log_line.message),
>> +        }
>> +    }
>> +}
>> diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs
>> index f41aef6df..1e3972c92 100644
>> --- a/pbs-tools/src/lib.rs
>> +++ b/pbs-tools/src/lib.rs
>> @@ -1,4 +1,5 @@
>>   pub mod async_lru_cache;
>> +pub mod buffered_logger;
>>   pub mod cert;
>>   pub mod crypt_config;
>>   pub mod format;
>> -- 
>> 2.47.3
>>
>>
>>
>>
>>
>>





  reply	other threads:[~2026-04-20 17:15 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-17  9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-20 10:57   ` Fabian Grünbichler
2026-04-20 17:15     ` Christian Ebner [this message]
2026-04-21  6:49       ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-20 11:15   ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-20 12:29   ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-20 11:56   ` Fabian Grünbichler
2026-04-21  7:21     ` Christian Ebner
2026-04-21  7:42       ` Christian Ebner
2026-04-21  8:00         ` Fabian Grünbichler
2026-04-21  8:04           ` Christian Ebner
2026-04-21 12:57     ` Thomas Lamprecht
2026-04-17  9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner
2026-04-20 12:33 ` [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Fabian Grünbichler
2026-04-21 10:28 ` superseded: " Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=bda178d7-f666-49e7-9859-29f972bae36b@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=f.gruenbichler@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal