From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 18BAE1FF13B for ; Wed, 22 Apr 2026 15:18:49 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id EEAE31F45F; Wed, 22 Apr 2026 15:18:43 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v8 02/10] tools: implement buffered logger for concurrent log messages Date: Wed, 22 Apr 2026 15:18:12 +0200 Message-ID: <20260422131820.769620-3-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260422131820.769620-1-c.ebner@proxmox.com> References: <20260422131820.769620-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776863829852 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.071 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs] Message-ID-Hash: QEUHNG64OZEQSQPGRTPH4D3ZKDZSWGOF X-Message-ID-Hash: QEUHNG64OZEQSQPGRTPH4D3ZKDZSWGOF X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: The BufferedLogger instance collects messages send from different sender instances via an async tokio channel and buffers them. Sender identify by label and provide a log level for each log line to be buffered and flushed. On collection, log lines are grouped by label and buffered in sequence of arrival per label, up to the configured maximum number of per group lines. In addition, the last logged timestamp is updated, which will be used to periodically flush log lines per-label with the configured interval. There is no guarantee on the order of labels when flushing. In addition, senders can request flushing at any given point, all logs are flushed when closing the sender builder. Log output is written based on provided log line level and prefixed by the label. Co-developed-by: Fabian Grünbichler Signed-off-by: Christian Ebner --- pbs-tools/Cargo.toml | 3 + pbs-tools/src/buffered_logger.rs | 238 +++++++++++++++++++++++++++++++ pbs-tools/src/lib.rs | 1 + 3 files changed, 242 insertions(+) create mode 100644 pbs-tools/src/buffered_logger.rs diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml index 998e3077e..8bc01e85d 100644 --- a/pbs-tools/Cargo.toml +++ b/pbs-tools/Cargo.toml @@ -8,6 +8,7 @@ description = "common tools used throughout pbs" # This must not depend on any subcrates more closely related to pbs itself. [dependencies] anyhow.workspace = true +async-trait.workspace = true bytes.workspace = true foreign-types.workspace = true hex.workspace = true @@ -17,10 +18,12 @@ openssl.workspace = true serde_json.workspace = true # rt-multi-thread is required for block_in_place tokio = { workspace = true, features = [ "fs", "io-util", "rt", "rt-multi-thread", "sync" ] } +tracing.workspace = true proxmox-async.workspace = true proxmox-io = { workspace = true, features = [ "tokio" ] } proxmox-human-byte.workspace = true +proxmox-log.workspace = true proxmox-sys.workspace = true proxmox-time.workspace = true diff --git a/pbs-tools/src/buffered_logger.rs b/pbs-tools/src/buffered_logger.rs new file mode 100644 index 000000000..4e9bf8651 --- /dev/null +++ b/pbs-tools/src/buffered_logger.rs @@ -0,0 +1,238 @@ +//! Log aggregator to collect and group messages sent from concurrent tasks via +//! a tokio channel. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::time::Duration; + +use anyhow::Error; +use tokio::sync::mpsc; +use tokio::time::Instant; +use tracing::{debug, error, info, trace, warn, Level}; + +use proxmox_log::LogContext; + +/// Label to be used to group buffered messages when flushing. +pub type SenderLabel = String; + +/// Requested action for the log collection task +enum SenderRequest { + /// New log line to be buffered + Message(SenderLabel, LogLine), + /// Flush currently buffered log lines associated by sender label + Flush(SenderLabel), + /// Closes the channel, flushing all buffered log lines and preventing further logging + Close, +} + +/// Receives log lines tagged with a label, and buffers them grouped +/// by the label value. Buffered messages are flushed either after +/// reaching a certain timeout or capacity limit, or when explicitly +/// requested. +pub struct BufferedLogger { + /// Buffer to aggregate log lines and last message received instant based on sender label + buffer_map: HashMap)>, + /// Maximum number of received lines for an individual sender instance before + /// flushing + max_buffered_lines: usize, + /// Maximum aggregation duration of received lines for an individual sender + /// instance before flushing + max_aggregation_time: Duration, + /// Channel to receive log messages + receiver: mpsc::Receiver, +} + +/// Instance to create new sender instances by cloning the channel sender +pub struct LogLineSenderBuilder { + // Used to clone new senders if requested + sender: Option>, +} + +impl LogLineSenderBuilder { + /// Create new sender instance to send log messages, to be grouped by given label + /// + /// Label is not checked to be unique (no other instance with same label exists), + /// it is the callers responsibility to check so if required. + pub fn sender_with_label(&self, label: SenderLabel) -> LogLineSender { + LogLineSender { + label, + sender: self.sender.clone(), + } + } + + /// Closes the channel, flushing all pending messages and closing the receiver + pub async fn close(self) -> Result<(), Error> { + if let Some(sender) = self.sender { + sender.send(SenderRequest::Close).await?; + // wait for flushing and dropping of receiver + sender.closed().await; + } + + Ok(()) + } +} + +/// Sender to send new log messages to buffered log aggregator +pub struct LogLineSender { + /// Label used to group log lines + label: SenderLabel, + /// Sender to publish new log lines to buffered log aggregator task + sender: Option>, +} + +impl LogLineSender { + /// Send a new log message with given level to the buffered logger task + pub async fn log(&self, level: Level, message: String) -> Result<(), Error> { + let line = LogLine { level, message }; + if let Some(sender) = &self.sender { + sender + .send(SenderRequest::Message(self.label.clone(), line)) + .await?; + } else { + BufferedLogger::log_by_level(&self.label, &line); + } + Ok(()) + } + + /// Request flushing of all buffered messages with this sender's label + pub async fn flush(&self) -> Result<(), Error> { + if let Some(sender) = &self.sender { + sender + .send(SenderRequest::Flush(self.label.clone())) + .await?; + } + Ok(()) + } +} + +/// Log message entity +struct LogLine { + /// Log level of the log message + level: Level, + /// Log message + message: String, +} + +impl BufferedLogger { + /// New instance of a buffered logger + pub fn new(max_buffered_lines: usize, max_aggregation_time: Duration) -> LogLineSenderBuilder { + if max_buffered_lines > 0 || !max_aggregation_time.is_zero() { + let (sender, receiver) = mpsc::channel(100); + + let logger = Self { + buffer_map: HashMap::new(), + max_buffered_lines, + max_aggregation_time, + receiver, + }; + logger.run_log_collection(); + LogLineSenderBuilder { + sender: Some(sender), + } + } else { + LogLineSenderBuilder { sender: None } + } + } + + /// Starts the collection loop spawned on a new tokio task + /// Finishes when all sender belonging to the channel have been dropped. + fn run_log_collection(mut self) { + let future = async move { + loop { + for (label, (last_logged, log_lines)) in self.buffer_map.iter_mut() { + if !log_lines.is_empty() + && Instant::now().duration_since(*last_logged) > self.max_aggregation_time + { + Self::flush_by_label(label, log_lines); + } + } + + match tokio::time::timeout(self.max_aggregation_time, self.receive_log_line()).await + { + Ok(finished) => { + if finished { + break; + } + } + Err(_timeout) => self.flush_all_buffered(), + } + } + }; + match LogContext::current() { + None => tokio::spawn(future), + Some(context) => tokio::spawn(context.scope(future)), + }; + } + + /// Collects new log lines, buffers and flushes them if max lines limit exceeded. + /// + /// Returns `true` if all the senders have been dropped and the task should no + /// longer wait for new messages and finish. + async fn receive_log_line(&mut self) -> bool { + match self.receiver.recv().await { + Some(SenderRequest::Flush(label)) => { + if let Some((_last_logged, mut log_lines)) = self.buffer_map.remove(&label) { + Self::flush_by_label(&label, &mut log_lines); + } + false + } + Some(SenderRequest::Message(label, log_line)) => { + match self.buffer_map.entry(label.clone()) { + Entry::Occupied(mut occupied) => { + let (last_logged, log_lines) = occupied.get_mut(); + if log_lines.len() + 1 > self.max_buffered_lines { + // reached limit for this label, + // flush all buffered and new log line + Self::flush_by_label(&label, log_lines); + Self::log_by_level(&label, &log_line); + } else { + // below limit, push to buffer to flush later + log_lines.push(log_line); + *last_logged = Instant::now(); + } + } + Entry::Vacant(vacant) => { + vacant.insert((Instant::now(), vec![log_line])); + } + } + false + } + Some(SenderRequest::Close) => { + self.receiver.close(); + self.flush_all_buffered(); + true + } + None => { + // no more senders, all LogLineSenders and LogLineSenderBuilder have been dropped + self.flush_all_buffered(); + true + } + } + } + + /// Flush all currently buffered contents without ordering, but grouped by label + fn flush_all_buffered(&mut self) { + for (label, (_last_logged, log_lines)) in self.buffer_map.iter_mut() { + Self::flush_by_label(label, log_lines); + } + } + + /// Flush all currently buffered contents without ordering, but grouped by label + fn flush_by_label(label: &str, log_lines: &mut Vec) { + for log_line in log_lines.iter() { + Self::log_by_level(label, log_line); + } + log_lines.clear(); + } + + /// Write the given log line prefixed by label + fn log_by_level(label: &str, log_line: &LogLine) { + match log_line.level { + Level::ERROR => error!("[{label}]: {}", log_line.message), + Level::WARN => warn!("[{label}]: {}", log_line.message), + Level::INFO => info!("[{label}]: {}", log_line.message), + Level::DEBUG => debug!("[{label}]: {}", log_line.message), + Level::TRACE => trace!("[{label}]: {}", log_line.message), + } + } +} diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs index f41aef6df..1e3972c92 100644 --- a/pbs-tools/src/lib.rs +++ b/pbs-tools/src/lib.rs @@ -1,4 +1,5 @@ pub mod async_lru_cache; +pub mod buffered_logger; pub mod cert; pub mod crypt_config; pub mod format; -- 2.47.3