From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 56E871FF136 for ; Mon, 20 Apr 2026 19:15:50 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 2EFB1BBB4; Mon, 20 Apr 2026 19:15:50 +0200 (CEST) Message-ID: Date: Mon, 20 Apr 2026 19:15:13 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Subject: Re: [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages To: =?UTF-8?Q?Fabian_Gr=C3=BCnbichler?= , pbs-devel@lists.proxmox.com References: <20260417092621.455374-1-c.ebner@proxmox.com> <20260417092621.455374-4-c.ebner@proxmox.com> <1776677069.eizi09d274.astroid@yuna.none> Content-Language: en-US, de-DE From: Christian Ebner In-Reply-To: <1776677069.eizi09d274.astroid@yuna.none> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776705229689 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.020 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: EKLNUH7TYWNOGXSZWY2WI2XCZIFWLBOX X-Message-ID-Hash: EKLNUH7TYWNOGXSZWY2WI2XCZIFWLBOX X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On 4/20/26 12:56 PM, Fabian Grünbichler wrote: > On April 17, 2026 11:26 am, Christian Ebner wrote: >> Implements a buffered logger instance which collects messages send >> from different sender instances via an async tokio channel and >> buffers them. Sender identify by label and provide a log level for >> each log line to be buffered and flushed. >> >> On collection, log lines are grouped by label and buffered in >> sequence of arrival per label, up to the configured maximum number of >> per group lines or periodically with the configured interval. The >> interval timeout is reset when contents are flushed. In addition, >> senders can request flushing at any given point. >> >> When the timeout set based on the interval is reached, all labels >> log buffers are flushed. There is no guarantee on the order of labels >> when flushing. >> >> Log output is written based on provided log line level and prefixed >> by the label. >> >> Signed-off-by: Christian Ebner >> --- >> changes since version 5: >> - not present in previous version >> >> pbs-tools/Cargo.toml | 2 + >> pbs-tools/src/buffered_logger.rs | 216 +++++++++++++++++++++++++++++++ >> pbs-tools/src/lib.rs | 1 + >> 3 files changed, 219 insertions(+) >> create mode 100644 pbs-tools/src/buffered_logger.rs >> >> diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml >> index 998e3077e..6b1d92fa6 100644 >> --- a/pbs-tools/Cargo.toml >> +++ b/pbs-tools/Cargo.toml >> @@ -17,10 +17,12 @@ openssl.workspace = true >> serde_json.workspace = true >> # rt-multi-thread is required for block_in_place >> tokio = { workspace = true, features = [ "fs", "io-util", "rt", "rt-multi-thread", "sync" ] } >> +tracing.workspace = true >> >> proxmox-async.workspace = true >> proxmox-io = { workspace = true, features = [ "tokio" ] } >> proxmox-human-byte.workspace = true >> +proxmox-log.workspace = true >> proxmox-sys.workspace = true >> proxmox-time.workspace = true >> >> diff --git a/pbs-tools/src/buffered_logger.rs b/pbs-tools/src/buffered_logger.rs >> new file mode 100644 >> index 000000000..39cf068cd >> --- /dev/null >> +++ b/pbs-tools/src/buffered_logger.rs >> @@ -0,0 +1,216 @@ >> +//! Log aggregator to collect and group messages send from concurrent tasks via > > nit: sen*t* > >> +//! a tokio channel. >> + >> +use std::collections::hash_map::Entry; >> +use std::collections::HashMap; >> +use std::time::Duration; >> + >> +use anyhow::Error; >> +use tokio::sync::mpsc; >> +use tokio::time::{self, Instant}; >> +use tracing::{debug, error, info, trace, warn, Level}; >> + >> +use proxmox_log::LogContext; >> + >> +/// Label to be used to group currently buffered messages when flushing. > > I'd drop the currently here > >> +pub type SenderLabel = String; >> + >> +/// Requested action for the log collection task >> +enum SenderRequest { >> + // new log line to be buffered >> + Message(LogLine), > > see below, I think this should have the label split out > >> + // flush currently buffered log lines associated by sender label >> + Flush(SenderLabel), > > this is actually used at the moment to finish a particular label, maybe > that should be made explicit (see below) > >> +} >> + >> +/// Logger instance to buffer and group log output to keep concurrent logs readable >> +/// >> +/// Receives the logs from an async input channel, buffers them grouped by input >> +/// channel and flushes them after either reaching a timeout or capacity limit. > > /// Receives log lines tagged with a label, and buffers them grouped > /// by the label value. Buffered messages are flushed either after > /// reaching a certain timeout or capacity limit, or when explicitly > /// requested. > >> +pub struct BufferedLogger { >> + // buffer to aggregate log lines based on sender label >> + buffer_map: HashMap>, > > do we expect this to be always used with the tiny limits we currently > employ? if so, we might want to consider a different structure here that > is more efficient/optimized for that use case? Yes, at the moment I do not foresee any users of this which would require larger limits. What struct do you suggest though? A Vec suffers from issues when deleting no longer needed items, so a linked list? > also note that this effectively duplicates the label once per line.. Yes, it is better to factor out the `SenderLabel` from the `LogLine`, and send new messages via `Message(SenderLabel, LogLine)` instead. Adapted for v7. > >> + // maximum number of received lines for an individual sender instance before >> + // flushing >> + max_buffered_lines: usize, >> + // maximum aggregation duration of received lines for an individual sender >> + // instance before flushing >> + max_aggregation_time: Duration, >> + // channel to receive log messages >> + receiver: mpsc::Receiver, >> +} >> + >> +/// Instance to create new sender instances by cloning the channel sender >> +pub struct LogLineSenderBuilder { >> + // to clone new senders if requested >> + _sender: mpsc::Sender, > > nit: this should be called `sender`, it is used below even if just for > cloning? Acked, adapting this for v7. > >> +} >> + >> +impl LogLineSenderBuilder { >> + /// Create new sender instance to send log messages, to be grouped by given label >> + /// >> + /// Label is not checked to be unique (no other instance with same label exists), >> + /// it is the callers responsibility to check so if required. >> + pub fn sender_with_label(&self, label: SenderLabel) -> LogLineSender { >> + LogLineSender { >> + label, >> + sender: self._sender.clone(), >> + } >> + } >> +} >> + >> +/// Sender to publish new log messages to buffered log aggregator > > this sender doesn't publish anything > >> +pub struct LogLineSender { >> + // label used to group log lines >> + label: SenderLabel, >> + // sender to publish new log lines to buffered log aggregator task >> + sender: mpsc::Sender, >> +} >> + >> +impl LogLineSender { >> + /// Send a new log message with given level to the buffered logger task >> + pub async fn log(&self, level: Level, message: String) -> Result<(), Error> { >> + let line = LogLine { >> + label: self.label.clone(), >> + level, >> + message, >> + }; >> + self.sender.send(SenderRequest::Message(line)).await?; >> + Ok(()) >> + } >> + >> + /// Flush all messages with sender's label > > Flush all *buffered* messages with *this* sender's label > > ? > >> + pub async fn flush(&self) -> Result<(), Error> { >> + self.sender >> + .send(SenderRequest::Flush(self.label.clone())) >> + .await?; >> + Ok(()) >> + } >> +} >> + >> +/// Log message entity >> +struct LogLine { >> + /// label indentifiying the sender > > nit: typo, capitalization inconsistent > >> + label: SenderLabel, >> + /// Log level to use during flushing > > Log level of the message > > ? > >> + level: Level, >> + /// log line to be buffered and flushed > > Log message > > ? > > buffering and flushing happens elsewhere.. > >> + message: String, >> +} >> + >> +impl BufferedLogger { >> + /// New instance of a buffered logger >> + pub fn new( >> + max_buffered_lines: usize, >> + max_aggregation_time: Duration, >> + ) -> (Self, LogLineSenderBuilder) { >> + let (_sender, receiver) = mpsc::channel(100); > > nit: this should be called `sender` > >> + >> + ( >> + Self { >> + buffer_map: HashMap::new(), >> + max_buffered_lines, >> + max_aggregation_time, >> + receiver, >> + }, >> + LogLineSenderBuilder { _sender }, >> + ) >> + } >> + >> + /// Starts the collection loop spawned on a new tokio task >> + /// Finishes when all sender belonging to the channel have been dropped. >> + pub fn run_log_collection(mut self) { >> + let future = async move { >> + loop { >> + let deadline = Instant::now() + self.max_aggregation_time; >> + match time::timeout_at(deadline, self.receive_log_line()).await { > > why manually calculate the deadline, wouldn't using `time::timeout` work > as well? the only difference from a quick glance is that that one does a > checked_add for now + delay.. No specific reason for using timeout_at() here, was primed by having based this on the s3 client timeout > > but also, isn't this kind of broken in any case? let's say I have two > labels A and B: > > 0.99 A1 > 1.98 A2 > 2.97 A3 > 3.96 A4 > 4.95 A5 (now A is at capacity) > 5.94 B1 > 9.90 B5 (now B is at capacity as well) > > either > > 10.90 timeout elapses, everything is flushed > > or > > 10.89 A6 (A gets flushed and can start over - but B hasn't been flushed) > 11.88 A7 > 12.87 A8 > 13.86 A9 > 14.85 A10 (A has 5 buffered messages again) > .. > > this means that any label that doesn't log a 6th message can stall for > quite a long time, as long as other labels make progress (and it isn't > flushed explicitly)? Yes, this is true, but that is not really avoidable unless there is a timeout per label. Or would you suggest to simply flush all buffered lines at periodic intervals, without resetting at all? > >> + Ok(finished) => { >> + if finished { >> + break; >> + } >> + } >> + Err(_timeout) => self.flush_all_buffered(), >> + } >> + } >> + }; >> + match LogContext::current() { >> + None => tokio::spawn(future), >> + Some(context) => tokio::spawn(context.scope(future)), >> + }; >> + } >> + >> + /// Collects new log lines, buffers and flushes them if max lines limit exceeded. >> + /// >> + /// Returns `true` if all the senders have been dropped and the task should no >> + /// longer wait for new messages and finish. >> + async fn receive_log_line(&mut self) -> bool { >> + if let Some(request) = self.receiver.recv().await { >> + match request { >> + SenderRequest::Flush(label) => { >> + if let Some(log_lines) = self.buffer_map.get_mut(&label) { >> + Self::log_with_label(&label, log_lines); >> + log_lines.clear(); >> + } >> + } >> + SenderRequest::Message(log_line) => { > > if this would be Message((label, level, line)) or Message((label, > level_and_line)) the label would not need to be stored in the buffer > keys and values.. Yes, adapted based on above mention already > >> + if self.max_buffered_lines == 0 >> + || self.max_aggregation_time < Duration::from_secs(0) > > the timeout can never be below zero, as that is the minimum duration > (duration is unsigned)? This is a typo, the intention was to check for durations below 1 second, but since the granularity is seconds, this should check for 0 instead. > >> + { >> + // shortcut if no buffering should happen >> + Self::log_by_level(&log_line.label, &log_line); > > shouldn't we rather handle this by not using the buffered logger in the > first place? e.g., have this and a simple not-buffering logger implement > a shared logging trait, or something similar? Hmm, that might be better, yes. Will add a trait with 2 implementations based on which logger is required. > > one simple approach would be to just make the LogLineSender log directly > in this case, and not send anything at all? > > because if we don't want buffering, sending all log messages through a > channel and setting up the timeout machinery can be avoided completely.. > >> + } >> + >> + match self.buffer_map.entry(log_line.label.clone()) { >> + Entry::Occupied(mut occupied) => { >> + let log_lines = occupied.get_mut(); >> + if log_lines.len() + 1 > self.max_buffered_lines { >> + // reached limit for this label, >> + // flush all buffered and new log line >> + Self::log_with_label(&log_line.label, log_lines); >> + log_lines.clear(); >> + Self::log_by_level(&log_line.label, &log_line); >> + } else { >> + // below limit, push to buffer to flush later >> + log_lines.push(log_line); >> + } >> + } >> + Entry::Vacant(vacant) => { >> + vacant.insert(vec![log_line]); >> + } >> + } >> + } >> + } >> + return false; >> + } >> + >> + // no more senders, all LogLineSender's and LogLineSenderBuilder have been dropped > > nit: typo `'s` > >> + self.flush_all_buffered(); >> + true >> + } >> + >> + /// Flush all currently buffered contents without ordering, but grouped by label >> + fn flush_all_buffered(&mut self) { >> + for (label, log_lines) in self.buffer_map.iter() { >> + Self::log_with_label(label, log_lines); >> + } >> + self.buffer_map.clear(); > > wouldn't it be better performance wise to > - clear each label's log lines (like in SendRequest::Flush) > - remove the hashmap entry in SendRequest::Flush, or rename that one to > finish, since that is what it actually does? > > granted, this only triggers when the timeout elapses or there are no > more senders, but for the timeout case it might still be beneficial? it > should remove a lot of allocation churn at least.. Yes, true. Fixed that as well. > >> + } >> + >> + /// Log given log lines prefixed by label >> + fn log_with_label(label: &str, log_lines: &[LogLine]) { > > currently each LogLine contains the label anyway, but see above, I do > think this split makes sense but it should be done completely ;) yes, makes more sense, split off as suggested > >> + for log_line in log_lines { >> + Self::log_by_level(label, log_line); >> + } >> + } >> + >> + /// Write the given log line prefixed by label >> + fn log_by_level(label: &str, log_line: &LogLine) { > > this also logs with label, IMHO the naming is confusing.. > >> + match log_line.level { >> + Level::ERROR => error!("[{label}]: {}", log_line.message), >> + Level::WARN => warn!("[{label}]: {}", log_line.message), >> + Level::INFO => info!("[{label}]: {}", log_line.message), >> + Level::DEBUG => debug!("[{label}]: {}", log_line.message), >> + Level::TRACE => trace!("[{label}]: {}", log_line.message), >> + } >> + } >> +} >> diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs >> index f41aef6df..1e3972c92 100644 >> --- a/pbs-tools/src/lib.rs >> +++ b/pbs-tools/src/lib.rs >> @@ -1,4 +1,5 @@ >> pub mod async_lru_cache; >> +pub mod buffered_logger; >> pub mod cert; >> pub mod crypt_config; >> pub mod format; >> -- >> 2.47.3 >> >> >> >> >> >>