From: Samuel Rufinatscha <s.rufinatscha@proxmox.com>
To: Kefu Chai <k.chai@proxmox.com>, pve-devel@lists.proxmox.com
Subject: Re: [PATCH pve-cluster 04/14 v2] pmxcfs-rs: add pmxcfs-logger crate
Date: Tue, 24 Feb 2026 17:17:42 +0100 [thread overview]
Message-ID: <edfa171a-8238-4592-9bf0-fb33f9b6ad26@proxmox.com> (raw)
In-Reply-To: <20260213094119.2379288-5-k.chai@proxmox.com>
Nice work on v2 and thanks for applying my previous suggestions, Kefu.
A few things I’d like to suggest:
The binary compat tests should use real C fixtures (include_bytes!) and
assert actual contents. Please generate binary blobs via
clusterlog_get_state() and clog_dump_json() and check for:
* parsed entries
* header fields, including at least one multi entry fixture with cpos != 8
* JSON output for non ASCII content
* and a test that serializes a buffer where not all entries fit
Separately, I’m wondering about the use of VecDeque<LogEntry> vs C byte
ring abstraction. A benchmark at high log rates would help quantify the
serialization/allocation overhead.
Also left two small inline comments on the test offsets and a header
size comment.
On 2/13/26 10:48 AM, Kefu Chai wrote:
> Add configuration management crate for pmxcfs:
> - Config struct: Runtime configuration (node name, IP, flags)
> - Thread-safe debug level mutation via RwLock
> - Arc-wrapped for shared ownership across components
> - Comprehensive unit tests including thread safety tests
>
> This crate provides the foundational configuration structure used
> by all pmxcfs components. The Config is designed to be shared via
> Arc to allow multiple components to access the same configuration
> instance, with mutable debug level for runtime adjustments.
>
> Signed-off-by: Kefu Chai <k.chai@proxmox.com>
> ---
> src/pmxcfs-rs/Cargo.toml | 2 +
> src/pmxcfs-rs/pmxcfs-logger/Cargo.toml | 15 +
> src/pmxcfs-rs/pmxcfs-logger/README.md | 58 ++
> .../pmxcfs-logger/src/cluster_log.rs | 615 ++++++++++++++++
> src/pmxcfs-rs/pmxcfs-logger/src/entry.rs | 694 ++++++++++++++++++
> src/pmxcfs-rs/pmxcfs-logger/src/hash.rs | 176 +++++
> src/pmxcfs-rs/pmxcfs-logger/src/lib.rs | 27 +
> .../pmxcfs-logger/src/ring_buffer.rs | 628 ++++++++++++++++
> .../tests/binary_compatibility_tests.rs | 315 ++++++++
> .../pmxcfs-logger/tests/performance_tests.rs | 294 ++++++++
> 10 files changed, 2824 insertions(+)
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/README.md
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs
>
> diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
> index f190968ed..d26fac04c 100644
> --- a/src/pmxcfs-rs/Cargo.toml
> +++ b/src/pmxcfs-rs/Cargo.toml
> @@ -3,6 +3,7 @@
> members = [
> "pmxcfs-api-types", # Shared types and error definitions
> "pmxcfs-config", # Configuration management
> + "pmxcfs-logger", # Cluster log with ring buffer and deduplication
> ]
> resolver = "2"
>
> @@ -18,6 +19,7 @@ rust-version = "1.85"
> # Internal workspace dependencies
> pmxcfs-api-types = { path = "pmxcfs-api-types" }
> pmxcfs-config = { path = "pmxcfs-config" }
> +pmxcfs-logger = { path = "pmxcfs-logger" }
>
> # Error handling
> thiserror = "1.0"
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml b/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
> new file mode 100644
> index 000000000..1af3f015c
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
> @@ -0,0 +1,15 @@
> +[package]
> +name = "pmxcfs-logger"
> +version = "0.1.0"
> +edition = "2021"
> +
> +[dependencies]
> +anyhow = "1.0"
> +parking_lot = "0.12"
> +serde = { version = "1.0", features = ["derive"] }
> +serde_json = "1.0"
> +tracing = "0.1"
> +
> +[dev-dependencies]
> +tempfile = "3.0"
> +
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/README.md b/src/pmxcfs-rs/pmxcfs-logger/README.md
> new file mode 100644
> index 000000000..38f102c27
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/README.md
> @@ -0,0 +1,58 @@
> +# pmxcfs-logger
> +
> +Cluster-wide log management for pmxcfs, fully compatible with the C implementation (logger.c).
> +
> +## Overview
> +
> +This crate implements a cluster log system matching Proxmox's C-based logger.c behavior. It provides:
> +
> +- **Ring Buffer Storage**: Circular buffer for log entries with automatic capacity management
> +- **FNV-1a Hashing**: Hashing for node and identity-based deduplication
> +- **Deduplication**: Per-node tracking of latest log entries to avoid duplicates
> +- **Time-based Sorting**: Chronological ordering of log entries across nodes
> +- **Multi-node Merging**: Combining logs from multiple cluster nodes
> +- **JSON Export**: Web UI-compatible JSON output matching C format
> +
> +## Architecture
> +
> +### Key Components
> +
> +1. **LogEntry** (`entry.rs`): Individual log entry with automatic UID generation
> +2. **RingBuffer** (`ring_buffer.rs`): Circular buffer with capacity management
> +3. **ClusterLog** (`lib.rs`): Main API with deduplication and merging
> +4. **Hash Functions** (`hash.rs`): FNV-1a implementation matching C
> +
> +## C to Rust Mapping
> +
> +| C Function | Rust Equivalent | Location |
> +|------------|-----------------|----------|
> +| `fnv_64a_buf` | `hash::fnv_64a` | hash.rs |
> +| `clog_pack` | `LogEntry::pack` | entry.rs |
> +| `clog_copy` | `RingBuffer::add_entry` | ring_buffer.rs |
> +| `clog_sort` | `RingBuffer::sort` | ring_buffer.rs |
> +| `clog_dump_json` | `RingBuffer::dump_json` | ring_buffer.rs |
> +| `clusterlog_insert` | `ClusterLog::insert` | lib.rs |
> +| `clusterlog_add` | `ClusterLog::add` | lib.rs |
> +| `clusterlog_merge` | `ClusterLog::merge` | lib.rs |
> +| `dedup_lookup` | `ClusterLog::dedup_lookup` | lib.rs |
> +
> +## Key Differences from C
> +
> +1. **No `node_digest` in DedupEntry**: C stores `node_digest` both as HashMap key and in the struct. Rust only uses it as the key, saving 8 bytes per entry.
> +
> +2. **Mutex granularity**: C uses a single global mutex. Rust uses separate Arc<Mutex<>> for buffer and dedup table, allowing better concurrency.
> +
> +3. **Code size**: Rust implementation is ~24% the size of C (740 lines vs 3,000+) while maintaining equivalent functionality.
> +
> +## Integration
> +
> +This crate is integrated into `pmxcfs-status` to provide cluster log functionality. The `.clusterlog` FUSE plugin uses this to provide JSON log output compatible with the Proxmox web UI.
> +
> +## References
> +
> +### C Implementation
> +- `src/pmxcfs/logger.c` / `logger.h` - Cluster log implementation
> +
> +### Related Crates
> +- **pmxcfs-status**: Integrates ClusterLog for status tracking
> +- **pmxcfs**: FUSE plugin exposes cluster log via `.clusterlog`
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs b/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
> new file mode 100644
> index 000000000..c9d04ee47
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
> @@ -0,0 +1,615 @@
> +/// Cluster Log Implementation
> +///
> +/// This module implements the cluster-wide log system with deduplication
> +/// and merging support, matching C's clusterlog_t.
> +use crate::entry::LogEntry;
> +use crate::ring_buffer::{RingBuffer, CLOG_DEFAULT_SIZE};
> +use anyhow::Result;
> +use parking_lot::Mutex;
> +use std::collections::{BTreeMap, HashMap};
> +use std::sync::Arc;
> +
> +/// Deduplication entry - tracks the latest UID and time for each node
> +///
> +/// Note: C's `dedup_entry_t` includes node_digest field because GHashTable stores
> +/// the struct pointer both as key and value. In Rust, we use HashMap<u64, DedupEntry>
> +/// where node_digest is the key, so we don't need to duplicate it in the value.
> +/// This is functionally equivalent but more efficient.
> +#[derive(Debug, Clone)]
> +pub(crate) struct DedupEntry {
> + /// Latest UID seen from this node
> + pub uid: u32,
> + /// Latest timestamp seen from this node
> + pub time: u32,
> +}
> +
> +/// Internal state protected by a single mutex
> +/// Matches C's clusterlog_t which uses a single mutex for both base and dedup
> +struct ClusterLogInner {
> + /// Ring buffer for log storage (matches C's cl->base)
> + buffer: RingBuffer,
> + /// Deduplication tracker (matches C's cl->dedup)
> + dedup: HashMap<u64, DedupEntry>,
> +}
> +
> +/// Cluster-wide log with deduplication and merging support
> +/// Matches C's `clusterlog_t`
> +///
> +/// Note: Unlike the initial implementation with separate mutexes, we use a single
> +/// mutex to match C's semantics and ensure atomic updates of buffer+dedup.
> +pub struct ClusterLog {
> + /// Inner state protected by a single mutex
> + /// Matches C's single g_mutex_t protecting both cl->base and cl->dedup
> + inner: Arc<Mutex<ClusterLogInner>>,
> +}
> +
> +impl ClusterLog {
> + /// Create a new cluster log with default size
> + pub fn new() -> Self {
> + Self::with_capacity(CLOG_DEFAULT_SIZE)
> + }
> +
> + /// Create a new cluster log with specified capacity
> + pub fn with_capacity(capacity: usize) -> Self {
> + Self {
> + inner: Arc::new(Mutex::new(ClusterLogInner {
> + buffer: RingBuffer::new(capacity),
> + dedup: HashMap::new(),
> + })),
> + }
> + }
> +
> + /// Matches C's `clusterlog_add` function
> + #[allow(clippy::too_many_arguments)]
> + pub fn add(
> + &self,
> + node: &str,
> + ident: &str,
> + tag: &str,
> + pid: u32,
> + priority: u8,
> + time: u32,
> + message: &str,
> + ) -> Result<()> {
> + let entry = LogEntry::pack(node, ident, tag, pid, time, priority, message)?;
> + self.insert(&entry)
> + }
> +
> + /// Insert a log entry (with deduplication)
> + ///
> + /// Matches C's `clusterlog_insert` function
> + pub fn insert(&self, entry: &LogEntry) -> Result<()> {
> + let mut inner = self.inner.lock();
> +
> + // Check deduplication
> + if Self::is_not_duplicate(&mut inner.dedup, entry) {
> + // Entry is not a duplicate, add it
> + inner.buffer.add_entry(entry)?;
> + } else {
> + tracing::debug!("Ignoring duplicate cluster log entry");
> + }
> +
> + Ok(())
> + }
> +
> + /// Check if entry is a duplicate (returns true if NOT a duplicate)
> + ///
> + /// Matches C's `dedup_lookup` function
> + ///
> + /// ## Hash Collision Risk
> + ///
> + /// Uses FNV-1a hash (`node_digest`) as deduplication key. Hash collisions
> + /// are theoretically possible but extremely rare in practice:
> + ///
> + /// - FNV-1a produces 64-bit hashes (2^64 possible values)
> + /// - Collision probability with N entries: ~N²/(2 × 2^64)
> + /// - For 10,000 log entries: collision probability < 10^-11
> + ///
> + /// If a collision occurs, two different log entries (from different nodes
> + /// or with different content) will be treated as duplicates, causing one
> + /// to be silently dropped.
> + ///
> + /// This design is inherited from the C implementation for compatibility.
> + /// The risk is acceptable because:
> + /// 1. Collisions are astronomically rare
> + /// 2. Only affects log deduplication, not critical data integrity
> + /// 3. Lost log entries don't compromise cluster operation
> + ///
> + /// Changing this would break wire format compatibility with C nodes.
> + fn is_not_duplicate(dedup: &mut HashMap<u64, DedupEntry>, entry: &LogEntry) -> bool {
> + match dedup.get_mut(&entry.node_digest) {
> + None => {
> + dedup.insert(
> + entry.node_digest,
> + DedupEntry {
> + time: entry.time,
> + uid: entry.uid,
> + },
> + );
> + true
> + }
> + Some(dd) => {
> + if entry.time > dd.time || (entry.time == dd.time && entry.uid > dd.uid) {
> + dd.time = entry.time;
> + dd.uid = entry.uid;
> + true
> + } else {
> + false
> + }
> + }
> + }
> + }
> +
> + pub fn get_entries(&self, max: usize) -> Vec<LogEntry> {
> + let inner = self.inner.lock();
> + inner.buffer.iter().take(max).cloned().collect()
> + }
> +
> + /// Get the current buffer (for testing)
> + pub fn get_buffer(&self) -> RingBuffer {
> + let inner = self.inner.lock();
> + inner.buffer.clone()
> + }
> +
> + /// Get buffer length (for testing)
> + pub fn len(&self) -> usize {
> + let inner = self.inner.lock();
> + inner.buffer.len()
> + }
> +
> + /// Get buffer capacity (for testing)
> + pub fn capacity(&self) -> usize {
> + let inner = self.inner.lock();
> + inner.buffer.capacity()
> + }
> +
> + /// Check if buffer is empty (for testing)
> + pub fn is_empty(&self) -> bool {
> + let inner = self.inner.lock();
> + inner.buffer.is_empty()
> + }
> +
> + /// Clear all log entries (for testing)
> + pub fn clear(&self) {
> + let mut inner = self.inner.lock();
> + let capacity = inner.buffer.capacity();
> + inner.buffer = RingBuffer::new(capacity);
> + inner.dedup.clear();
> + }
> +
> + /// Sort the log entries by time
> + ///
> + /// Matches C's `clog_sort` function
> + pub fn sort(&self) -> Result<RingBuffer> {
> + let inner = self.inner.lock();
> + inner.buffer.sort()
> + }
> +
> + /// Merge logs from multiple nodes
> + ///
> + /// Matches C's `clusterlog_merge` function
> + ///
> + /// This method atomically updates both the buffer and dedup state under a single
> + /// mutex lock, matching C's behavior where both cl->base and cl->dedup are
> + /// updated under cl->mutex.
> + pub fn merge(&self, remote_logs: Vec<RingBuffer>, include_local: bool) -> Result<()> {
> + let mut sorted_entries: BTreeMap<(u32, u64, u32), LogEntry> = BTreeMap::new();
> + let mut merge_dedup: HashMap<u64, DedupEntry> = HashMap::new();
> +
> + // Lock once for the entire operation (matching C's single mutex)
> + let mut inner = self.inner.lock();
> +
> + // Calculate maximum capacity
> + let max_size = if include_local {
> + let local_cap = inner.buffer.capacity();
> +
> + std::iter::once(local_cap)
> + .chain(remote_logs.iter().map(|b| b.capacity()))
> + .max()
> + .unwrap_or(CLOG_DEFAULT_SIZE)
> + } else {
> + remote_logs
> + .iter()
> + .map(|b| b.capacity())
> + .max()
> + .unwrap_or(CLOG_DEFAULT_SIZE)
> + };
> +
> + // Add local entries if requested
> + if include_local {
> + for entry in inner.buffer.iter() {
> + let key = (entry.time, entry.node_digest, entry.uid);
> + // Keep-first: only insert if key doesn't exist, matching C's g_tree_lookup guard
> + if let std::collections::btree_map::Entry::Vacant(e) = sorted_entries.entry(key) {
> + e.insert(entry.clone());
> + Self::is_not_duplicate(&mut merge_dedup, entry);
> + }
> + }
> + }
> +
> + // Add remote entries
> + for remote_buffer in &remote_logs {
> + for entry in remote_buffer.iter() {
> + let key = (entry.time, entry.node_digest, entry.uid);
> + // Keep-first: only insert if key doesn't exist, matching C's g_tree_lookup guard
> + if let std::collections::btree_map::Entry::Vacant(e) = sorted_entries.entry(key) {
> + e.insert(entry.clone());
> + Self::is_not_duplicate(&mut merge_dedup, entry);
> + }
> + }
> + }
> +
> + let mut result = RingBuffer::new(max_size);
> +
> + // BTreeMap iterates oldest->newest. We add each as new head (push_front),
> + // so result ends with newest at head, matching C's behavior.
> + // Fill to 100% capacity (matching C's behavior), not just 90%
> + for (_key, entry) in sorted_entries.iter() {
> + // add_entry will automatically evict old entries if needed to stay within capacity
> + result.add_entry(entry)?;
> + }
> +
> + // Atomically update both buffer and dedup (matches C lines 503-507)
> + inner.buffer = result;
> + inner.dedup = merge_dedup;
> +
> + Ok(())
> + }
> +
> + /// Export log to JSON format
> + ///
> + /// Matches C's `clog_dump_json` function
> + pub fn dump_json(&self, ident_filter: Option<&str>, max_entries: usize) -> String {
> + let inner = self.inner.lock();
> + inner.buffer.dump_json(ident_filter, max_entries)
> + }
> +
> + /// Export log to JSON format with sorted entries
> + pub fn dump_json_sorted(
> + &self,
> + ident_filter: Option<&str>,
> + max_entries: usize,
> + ) -> Result<String> {
> + let sorted = self.sort()?;
> + Ok(sorted.dump_json(ident_filter, max_entries))
> + }
> +
> + /// Matches C's `clusterlog_get_state` function
> + ///
> + /// Returns binary-serialized clog_base_t structure for network transmission.
> + /// This format is compatible with C nodes for mixed-cluster operation.
> + pub fn get_state(&self) -> Result<Vec<u8>> {
> + let sorted = self.sort()?;
> + Ok(sorted.serialize_binary())
> + }
> +
> + pub fn deserialize_state(data: &[u8]) -> Result<RingBuffer> {
> + RingBuffer::deserialize_binary(data)
> + }
> +
> +}
> +
> +impl Default for ClusterLog {
> + fn default() -> Self {
> + Self::new()
> + }
> +}
> +
> +#[cfg(test)]
> +mod tests {
> + use super::*;
> +
> + #[test]
> + fn test_cluster_log_creation() {
> + let log = ClusterLog::new();
> + assert!(log.inner.lock().buffer.is_empty());
> + }
> +
> + #[test]
> + fn test_add_entry() {
> + let log = ClusterLog::new();
> +
> + let result = log.add(
> + "node1",
> + "root",
> + "cluster",
> + 12345,
> + 6, // Info priority
> + 1234567890,
> + "Test message",
> + );
> +
> + assert!(result.is_ok());
> + assert!(!log.inner.lock().buffer.is_empty());
> + }
> +
> + #[test]
> + fn test_deduplication() {
> + let log = ClusterLog::new();
> +
> + // Add same entry twice (but with different UIDs since each add creates a new entry)
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Message 1");
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Message 1");
> +
> + // Both entries are added because they have different UIDs
> + // Deduplication tracks the latest (time, UID) per node, not content
> + let inner = log.inner.lock();
> + assert_eq!(inner.buffer.len(), 2);
> + }
> +
> + #[test]
> + fn test_newer_entry_replaces() {
> + let log = ClusterLog::new();
> +
> + // Add older entry
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Old message");
> +
> + // Add newer entry from same node
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1001, "New message");
> +
> + // Should have both entries (newer doesn't remove older, just updates dedup tracker)
> + let inner = log.inner.lock();
> + assert_eq!(inner.buffer.len(), 2);
> + }
> +
> + #[test]
> + fn test_json_export() {
> + let log = ClusterLog::new();
> +
> + let _ = log.add(
> + "node1",
> + "root",
> + "cluster",
> + 123,
> + 6,
> + 1234567890,
> + "Test message",
> + );
> +
> + let json = log.dump_json(None, 50);
> +
> + // Should be valid JSON
> + assert!(serde_json::from_str::<serde_json::Value>(&json).is_ok());
> +
> + // Should contain "data" field
> + let value: serde_json::Value = serde_json::from_str(&json).unwrap();
> + assert!(value.get("data").is_some());
> + }
> +
> + #[test]
> + fn test_merge_logs() {
> + let log1 = ClusterLog::new();
> + let log2 = ClusterLog::new();
> +
> + // Add entries to first log
> + let _ = log1.add(
> + "node1",
> + "root",
> + "cluster",
> + 123,
> + 6,
> + 1000,
> + "Message from node1",
> + );
> +
> + // Add entries to second log
> + let _ = log2.add(
> + "node2",
> + "root",
> + "cluster",
> + 456,
> + 6,
> + 1001,
> + "Message from node2",
> + );
> +
> + // Get log2's buffer for merging
> + let log2_buffer = log2.inner.lock().buffer.clone();
> +
> + // Merge into log1 (updates log1's buffer atomically)
> + log1.merge(vec![log2_buffer], true).unwrap();
> +
> + // Check log1's buffer now contains entries from both logs
> + let inner = log1.inner.lock();
> + assert!(inner.buffer.len() >= 2);
> + }
> +
> + // ========================================================================
> + // HIGH PRIORITY TESTS - Merge Edge Cases
> + // ========================================================================
> +
> + #[test]
> + fn test_merge_empty_logs() {
> + let log = ClusterLog::new();
> +
> + // Add some entries to local log
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Local entry");
> +
> + // Merge with empty remote logs (updates buffer atomically)
> + log.merge(vec![], true).unwrap();
> +
> + // Check buffer has 1 entry (from local log)
> + let inner = log.inner.lock();
> + assert_eq!(inner.buffer.len(), 1);
> + let entry = inner.buffer.iter().next().unwrap();
> + assert_eq!(entry.node, "node1");
> + }
> +
> + #[test]
> + fn test_merge_single_node_only() {
> + let log = ClusterLog::new();
> +
> + // Add entries only from single node
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
> + let _ = log.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
> + let _ = log.add("node1", "root", "cluster", 125, 6, 1002, "Entry 3");
> +
> + // Merge with no remote logs (just sort local)
> + log.merge(vec![], true).unwrap();
> +
> + // Check buffer has all 3 entries
> + let inner = log.inner.lock();
> + assert_eq!(inner.buffer.len(), 3);
> +
> + // Entries should be sorted by time (buffer stores newest first)
> + let times: Vec<u32> = inner.buffer.iter().map(|e| e.time).collect();
> + let mut expected = vec![1002, 1001, 1000];
> + expected.sort();
> + expected.reverse(); // Newest first
> +
> + let mut actual = times.clone();
> + actual.sort();
> + actual.reverse();
> +
> + assert_eq!(actual, expected);
> + }
> +
> + #[test]
> + fn test_merge_all_duplicates() {
> + let log1 = ClusterLog::new();
> + let log2 = ClusterLog::new();
> +
> + // Add same entries to both logs (same node, time, but different UIDs)
> + let _ = log1.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
> + let _ = log1.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
> +
> + let _ = log2.add("node1", "root", "cluster", 125, 6, 1000, "Entry 1");
> + let _ = log2.add("node1", "root", "cluster", 126, 6, 1001, "Entry 2");
> +
> + let log2_buffer = log2.inner.lock().buffer.clone();
> +
> + // Merge - should handle entries from same node at same times
> + log1.merge(vec![log2_buffer], true).unwrap();
> +
> + // Check merged buffer has 4 entries (all are unique by UID despite same time/node)
> + let inner = log1.inner.lock();
> + assert_eq!(inner.buffer.len(), 4);
> + }
> +
> + #[test]
> + fn test_merge_exceeding_capacity() {
> + // Create small buffer to test capacity enforcement
> + let log = ClusterLog::with_capacity(50_000); // Small buffer
> +
> + // Add many entries to fill beyond capacity
> + for i in 0..100 {
> + let _ = log.add(
> + "node1",
> + "root",
> + "cluster",
> + 100 + i,
> + 6,
> + 1000 + i,
> + &format!("Entry {}", i),
> + );
> + }
> +
> + // Create remote log with many entries
> + let remote = ClusterLog::with_capacity(50_000);
> + for i in 0..100 {
> + let _ = remote.add(
> + "node2",
> + "root",
> + "cluster",
> + 200 + i,
> + 6,
> + 1000 + i,
> + &format!("Remote {}", i),
> + );
> + }
> +
> + let remote_buffer = remote.inner.lock().buffer.clone();
> +
> + // Merge - should stop when buffer is near full
> + log.merge(vec![remote_buffer], true).unwrap();
> +
> + // Buffer should be limited by capacity, not necessarily < 200
> + // The actual limit depends on entry sizes and capacity
> + // Just verify we got some reasonable number of entries
> + let inner = log.inner.lock();
> + assert!(!inner.buffer.is_empty(), "Should have some entries");
> + assert!(
> + inner.buffer.len() <= 200,
> + "Should not exceed total available entries"
> + );
> + }
> +
> + #[test]
> + fn test_merge_preserves_dedup_state() {
> + let log = ClusterLog::new();
> +
> + // Add entries from node1
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
> + let _ = log.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
> +
> + // Create remote log with later entries from node1
> + let remote = ClusterLog::new();
> + let _ = remote.add("node1", "root", "cluster", 125, 6, 1002, "Entry 3");
> +
> + let remote_buffer = remote.inner.lock().buffer.clone();
> +
> + // Merge
> + log.merge(vec![remote_buffer], true).unwrap();
> +
> + // Check that dedup state was updated
> + let inner = log.inner.lock();
> + let node1_digest = crate::hash::fnv_64a_str("node1");
> + let dedup_entry = inner.dedup.get(&node1_digest).unwrap();
> +
> + // Should track the latest time from node1
> + assert_eq!(dedup_entry.time, 1002);
> + // UID is auto-generated, so just verify it exists and is reasonable
> + assert!(dedup_entry.uid > 0);
> + }
> +
> + #[test]
> + fn test_get_state_binary_format() {
> + let log = ClusterLog::new();
> +
> + // Add some entries
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
> + let _ = log.add("node2", "admin", "system", 456, 6, 1001, "Entry 2");
> +
> + // Get state
> + let state = log.get_state().unwrap();
> +
> + // Should be binary format, not JSON
> + assert!(state.len() >= 8); // At least header
> +
> + // Check header format (clog_base_t)
> + let size = u32::from_le_bytes(state[0..4].try_into().unwrap()) as usize;
> + let cpos = u32::from_le_bytes(state[4..8].try_into().unwrap());
> +
> + assert_eq!(size, state.len());
> + assert_eq!(cpos, 8); // First entry at offset 8
> +
> + // Should be able to deserialize back
> + let deserialized = ClusterLog::deserialize_state(&state).unwrap();
> + assert_eq!(deserialized.len(), 2);
> + }
> +
> + #[test]
> + fn test_state_roundtrip() {
> + let log = ClusterLog::new();
> +
> + // Add entries
> + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Test 1");
> + let _ = log.add("node2", "admin", "system", 456, 6, 1001, "Test 2");
> +
> + // Serialize
> + let state = log.get_state().unwrap();
> +
> + // Deserialize
> + let deserialized = ClusterLog::deserialize_state(&state).unwrap();
> +
> + // Check entries preserved
> + assert_eq!(deserialized.len(), 2);
> +
> + // Buffer is stored newest-first after sorting and serialization
> + let entries: Vec<_> = deserialized.iter().collect();
> + assert_eq!(entries[0].node, "node2"); // Newest (time 1001)
> + assert_eq!(entries[0].message, "Test 2");
> + assert_eq!(entries[1].node, "node1"); // Oldest (time 1000)
> + assert_eq!(entries[1].message, "Test 1");
> + }
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs b/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
> new file mode 100644
> index 000000000..81d5cecbc
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
> @@ -0,0 +1,694 @@
> +/// Log Entry Implementation
> +///
> +/// This module implements the cluster log entry structure, matching the C
> +/// implementation's clog_entry_t (logger.c).
> +use super::hash::fnv_64a_str;
> +use anyhow::{bail, Result};
> +use serde::Serialize;
> +use std::sync::atomic::{AtomicU32, Ordering};
> +
> +// Import constant from ring_buffer to avoid duplication
> +use crate::ring_buffer::CLOG_MAX_ENTRY_SIZE;
> +
> +/// Global UID counter (matches C's `uid_counter` global variable)
> +///
> +/// # UID Wraparound Behavior
> +///
> +/// The UID counter is a 32-bit unsigned integer that wraps around after 2^32 entries.
> +/// This matches the C implementation's behavior (logger.c:62).
> +///
> +/// **Wraparound implications:**
> +/// - At 1000 entries/second: wraparound after ~49 days
> +/// - At 100 entries/second: wraparound after ~497 days
> +/// - After wraparound, UIDs restart from 1
> +///
> +/// **Impact on deduplication:**
> +/// The deduplication logic compares (time, UID) tuples. After wraparound, an entry
> +/// with UID=1 might be incorrectly considered older than an entry with UID=4294967295,
> +/// even if they have the same timestamp. This is a known limitation inherited from
> +/// the C implementation.
> +///
> +/// **Mitigation:**
> +/// - Entries with different timestamps are correctly ordered (time is primary sort key)
> +/// - Wraparound only affects entries with identical timestamps from the same node
> +/// - A warning is logged when wraparound occurs (see fetch_add below)
> +static UID_COUNTER: AtomicU32 = AtomicU32::new(0);
> +
> +/// Log entry structure
> +///
> +/// Matches C's `clog_entry_t` from logger.c:
> +/// ```c
> +/// typedef struct {
> +/// uint32_t prev; // Previous entry offset
> +/// uint32_t next; // Next entry offset
> +/// uint32_t uid; // Unique ID
> +/// uint32_t time; // Timestamp
> +/// uint64_t node_digest; // FNV-1a hash of node name
> +/// uint64_t ident_digest; // FNV-1a hash of ident
> +/// uint32_t pid; // Process ID
> +/// uint8_t priority; // Syslog priority (0-7)
> +/// uint8_t node_len; // Length of node name (including null)
> +/// uint8_t ident_len; // Length of ident (including null)
> +/// uint8_t tag_len; // Length of tag (including null)
> +/// uint32_t msg_len; // Length of message (including null)
> +/// char data[]; // Variable length data: node + ident + tag + msg
> +/// } clog_entry_t;
> +/// ```
> +#[derive(Debug, Clone, Serialize)]
> +pub struct LogEntry {
> + /// Unique ID for this entry (auto-incrementing)
> + pub uid: u32,
> +
> + /// Unix timestamp
> + pub time: u32,
> +
> + /// FNV-1a hash of node name
> + pub node_digest: u64,
> +
> + /// FNV-1a hash of ident (user)
> + pub ident_digest: u64,
> +
> + /// Process ID
> + pub pid: u32,
> +
> + /// Syslog priority (0-7)
> + pub priority: u8,
> +
> + /// Node name
> + pub node: String,
> +
> + /// Identity/user
> + pub ident: String,
> +
> + /// Tag (e.g., "cluster", "pmxcfs")
> + pub tag: String,
> +
> + /// Log message
> + pub message: String,
> +}
> +
> +impl LogEntry {
> + /// Matches C's `clog_pack` function
> + pub fn pack(
> + node: &str,
> + ident: &str,
> + tag: &str,
> + pid: u32,
> + time: u32,
> + priority: u8,
> + message: &str,
> + ) -> Result<Self> {
> + if priority >= 8 {
> + bail!("Invalid priority: {priority} (must be 0-7)");
> + }
> +
> + // Truncate to 254 bytes to leave room for null terminator (C uses MIN(strlen+1, 255))
> + let node = Self::truncate_string(node, 254);
> + let ident = Self::truncate_string(ident, 254);
> + let tag = Self::truncate_string(tag, 254);
> + let message = Self::utf8_to_ascii(message);
> +
> + let node_len = node.len() + 1;
> + let ident_len = ident.len() + 1;
> + let tag_len = tag.len() + 1;
> + let mut msg_len = message.len() + 1;
> +
> + // Use checked arithmetic to prevent integer overflow
> + // Header: 48 bytes fixed (prev, next, uid, time, digests, pid, priority, lengths)
Shouldn't this be 44 bytes?
> + // Variable: node_len + ident_len + tag_len + msg_len
> + let header_size = std::mem::size_of::<u32>() * 4 // prev, next, uid, time
> + + std::mem::size_of::<u64>() * 2 // node_digest, ident_digest
> + + std::mem::size_of::<u32>() * 2 // pid, msg_len
> + + std::mem::size_of::<u8>() * 4; // priority, node_len, ident_len, tag_len
> +
> + let total_size = header_size
> + .checked_add(node_len)
> + .and_then(|s| s.checked_add(ident_len))
> + .and_then(|s| s.checked_add(tag_len))
> + .and_then(|s| s.checked_add(msg_len))
> + .ok_or_else(|| anyhow::anyhow!("Entry size calculation overflow"))?;
> +
> + if total_size > CLOG_MAX_ENTRY_SIZE {
> + let diff = total_size - CLOG_MAX_ENTRY_SIZE;
> + msg_len = msg_len.saturating_sub(diff);
> + }
> +
> + let node_digest = fnv_64a_str(&node);
> + let ident_digest = fnv_64a_str(&ident);
> +
> + // Increment UID counter with wraparound detection
> + let old_uid = UID_COUNTER.fetch_add(1, Ordering::SeqCst);
> +
> + // Warn on wraparound (when counter goes from u32::MAX to 0)
> + // This happens approximately every 49 days at 1000 entries/second
> + if old_uid == u32::MAX {
> + tracing::warn!(
> + "UID counter wrapped around (2^32 entries reached). \
> + Deduplication may be affected for entries with identical timestamps. \
> + This is expected behavior matching the C implementation."
> + );
> + }
> +
> + let uid = old_uid.wrapping_add(1);
> +
> + Ok(Self {
> + uid,
> + time,
> + node_digest,
> + ident_digest,
> + pid,
> + priority,
> + node,
> + ident,
> + tag,
> + message: message[..msg_len.saturating_sub(1)].to_string(),
> + })
> + }
> +
> + /// Truncate string to max length (safe for multi-byte UTF-8)
> + fn truncate_string(s: &str, max_len: usize) -> String {
> + if s.len() <= max_len {
> + return s.to_string();
> + }
> +
> + // Find the last valid UTF-8 character that fits within max_len
> + let truncate_at = s
> + .char_indices()
> + .take_while(|(idx, ch)| idx + ch.len_utf8() <= max_len)
> + .last()
> + .map(|(idx, ch)| idx + ch.len_utf8())
> + .unwrap_or(0);
> +
> + s[..truncate_at].to_string()
> + }
> +
> + /// Convert UTF-8 to ASCII with proper escaping
> + ///
> + /// Matches C's `utf8_to_ascii` function behavior:
> + /// - Control characters (0x00-0x1F, 0x7F): Escaped as #XXX (e.g., #007 for BEL)
> + /// - Unicode (U+0080 to U+FFFF): Escaped as \uXXXX (e.g., \u4e16 for 世)
> + /// - Quotes: Escaped as \" (matches C's quotequote=TRUE behavior)
> + /// - Characters > U+FFFF: Silently dropped
> + /// - ASCII printable (0x20-0x7E except quotes): Passed through unchanged
> + fn utf8_to_ascii(s: &str) -> String {
> + let mut result = String::with_capacity(s.len());
> +
> + for c in s.chars() {
> + match c {
> + // Control characters: #XXX format (3 decimal digits)
> + '\x00'..='\x1F' | '\x7F' => {
> + let code = c as u32;
> + result.push('#');
> + // Format as 3 decimal digits with leading zeros (e.g., #007 for BEL)
> + result.push_str(&format!("{:03}", code));
> + }
> + // Quote escaping: matches C's quotequote=TRUE behavior (logger.c:245)
> + '"' => {
> + result.push('\\');
> + result.push('"');
> + }
> + // ASCII printable characters: pass through
> + c if c.is_ascii() => {
> + result.push(c);
> + }
> + // Unicode U+0080 to U+FFFF: \uXXXX format
> + c if (c as u32) < 0x10000 => {
> + result.push('\\');
> + result.push('u');
> + result.push_str(&format!("{:04x}", c as u32));
> + }
> + // Characters > U+FFFF: silently drop (matches C behavior)
> + _ => {}
> + }
> + }
> +
> + result
> + }
> +
> + /// Matches C's `clog_entry_size` function
> + pub fn size(&self) -> usize {
> + std::mem::size_of::<u32>() * 4 // prev, next, uid, time
> + + std::mem::size_of::<u64>() * 2 // node_digest, ident_digest
> + + std::mem::size_of::<u32>() * 2 // pid, msg_len
> + + std::mem::size_of::<u8>() * 4 // priority, node_len, ident_len, tag_len
> + + self.node.len() + 1
> + + self.ident.len() + 1
> + + self.tag.len() + 1
> + + self.message.len() + 1
> + }
> +
> + /// C implementation: `uint32_t realsize = ((size + 7) & 0xfffffff8);`
> + pub fn aligned_size(&self) -> usize {
> + let size = self.size();
> + (size + 7) & !7
> + }
> +
> + pub fn to_json_object(&self) -> serde_json::Value {
> + serde_json::json!({
> + "uid": self.uid,
> + "time": self.time,
> + "pri": self.priority,
> + "tag": self.tag,
> + "pid": self.pid,
> + "node": self.node,
> + "user": self.ident,
> + "msg": self.message,
> + })
> + }
> +
> + /// Serialize to C binary format (clog_entry_t)
> + ///
> + /// Binary layout matches C structure:
> + /// ```c
> + /// struct {
> + /// uint32_t prev; // Will be filled by ring buffer
> + /// uint32_t next; // Will be filled by ring buffer
> + /// uint32_t uid;
> + /// uint32_t time;
> + /// uint64_t node_digest;
> + /// uint64_t ident_digest;
> + /// uint32_t pid;
> + /// uint8_t priority;
> + /// uint8_t node_len;
> + /// uint8_t ident_len;
> + /// uint8_t tag_len;
> + /// uint32_t msg_len;
> + /// char data[]; // node + ident + tag + msg (null-terminated)
> + /// }
> + /// ```
> + pub fn serialize_binary(&self, prev: u32, next: u32) -> Vec<u8> {
> + let mut buf = Vec::new();
> +
> + buf.extend_from_slice(&prev.to_le_bytes());
> + buf.extend_from_slice(&next.to_le_bytes());
> + buf.extend_from_slice(&self.uid.to_le_bytes());
> + buf.extend_from_slice(&self.time.to_le_bytes());
> + buf.extend_from_slice(&self.node_digest.to_le_bytes());
> + buf.extend_from_slice(&self.ident_digest.to_le_bytes());
> + buf.extend_from_slice(&self.pid.to_le_bytes());
> + buf.push(self.priority);
> +
> + // Cap at 255 to match C's MIN(strlen+1, 255) and prevent u8 overflow
> + let node_len = (self.node.len() + 1).min(255) as u8;
> + let ident_len = (self.ident.len() + 1).min(255) as u8;
> + let tag_len = (self.tag.len() + 1).min(255) as u8;
> + let msg_len = (self.message.len() + 1) as u32;
> +
> + buf.push(node_len);
> + buf.push(ident_len);
> + buf.push(tag_len);
> + buf.extend_from_slice(&msg_len.to_le_bytes());
> +
> + buf.extend_from_slice(self.node.as_bytes());
> + buf.push(0);
> +
> + buf.extend_from_slice(self.ident.as_bytes());
> + buf.push(0);
> +
> + buf.extend_from_slice(self.tag.as_bytes());
> + buf.push(0);
> +
> + buf.extend_from_slice(self.message.as_bytes());
> + buf.push(0);
> +
> + buf
> + }
> +
> + pub(crate) fn deserialize_binary(data: &[u8]) -> Result<(Self, u32, u32)> {
> + if data.len() < 48 {
> + bail!(
> + "Entry too small: {} bytes (need at least 48 for header)",
> + data.len()
> + );
> + }
> +
> + let mut offset = 0;
> +
> + let prev = u32::from_le_bytes(data[offset..offset + 4].try_into()?);
> + offset += 4;
> +
> + let next = u32::from_le_bytes(data[offset..offset + 4].try_into()?);
> + offset += 4;
> +
> + let uid = u32::from_le_bytes(data[offset..offset + 4].try_into()?);
> + offset += 4;
> +
> + let time = u32::from_le_bytes(data[offset..offset + 4].try_into()?);
> + offset += 4;
> +
> + let node_digest = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
> + offset += 8;
> +
> + let ident_digest = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
> + offset += 8;
> +
> + let pid = u32::from_le_bytes(data[offset..offset + 4].try_into()?);
> + offset += 4;
> +
> + let priority = data[offset];
> + offset += 1;
> +
> + let node_len = data[offset] as usize;
> + offset += 1;
> +
> + let ident_len = data[offset] as usize;
> + offset += 1;
> +
> + let tag_len = data[offset] as usize;
> + offset += 1;
> +
> + let msg_len = u32::from_le_bytes(data[offset..offset + 4].try_into()?) as usize;
> + offset += 4;
> +
> + if offset + node_len + ident_len + tag_len + msg_len > data.len() {
> + bail!("Entry data exceeds buffer size");
> + }
> +
> + let node = read_null_terminated(&data[offset..offset + node_len])?;
> + offset += node_len;
> +
> + let ident = read_null_terminated(&data[offset..offset + ident_len])?;
> + offset += ident_len;
> +
> + let tag = read_null_terminated(&data[offset..offset + tag_len])?;
> + offset += tag_len;
> +
> + let message = read_null_terminated(&data[offset..offset + msg_len])?;
> +
> + Ok((
> + Self {
> + uid,
> + time,
> + node_digest,
> + ident_digest,
> + pid,
> + priority,
> + node,
> + ident,
> + tag,
> + message,
> + },
> + prev,
> + next,
> + ))
> + }
> +}
> +
> +fn read_null_terminated(data: &[u8]) -> Result<String> {
> + let len = data.iter().position(|&b| b == 0).unwrap_or(data.len());
> + Ok(String::from_utf8_lossy(&data[..len]).into_owned())
> +}
> +
> +#[cfg(test)]
> +pub fn reset_uid_counter() {
> + UID_COUNTER.store(0, Ordering::SeqCst);
> +}
> +
> +#[cfg(test)]
> +mod tests {
> + use super::*;
> +
> + #[test]
> + fn test_pack_entry() {
> + reset_uid_counter();
> +
> + let entry = LogEntry::pack(
> + "node1",
> + "root",
> + "cluster",
> + 12345,
> + 1234567890,
> + 6, // Info priority
> + "Test message",
> + )
> + .unwrap();
> +
> + assert_eq!(entry.uid, 1);
> + assert_eq!(entry.time, 1234567890);
> + assert_eq!(entry.node, "node1");
> + assert_eq!(entry.ident, "root");
> + assert_eq!(entry.tag, "cluster");
> + assert_eq!(entry.pid, 12345);
> + assert_eq!(entry.priority, 6);
> + assert_eq!(entry.message, "Test message");
> + }
> +
> + #[test]
> + fn test_uid_increment() {
> + reset_uid_counter();
> +
> + let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg1").unwrap();
> + let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg2").unwrap();
> +
> + assert_eq!(entry1.uid, 1);
> + assert_eq!(entry2.uid, 2);
> + }
> +
> + #[test]
> + fn test_invalid_priority() {
> + let result = LogEntry::pack("node1", "root", "tag", 0, 1000, 8, "message");
> + assert!(result.is_err());
> + }
> +
> + #[test]
> + fn test_node_digest() {
> + let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
> + let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg").unwrap();
> + let entry3 = LogEntry::pack("node2", "root", "tag", 0, 1000, 6, "msg").unwrap();
> +
> + // Same node should have same digest
> + assert_eq!(entry1.node_digest, entry2.node_digest);
> +
> + // Different node should have different digest
> + assert_ne!(entry1.node_digest, entry3.node_digest);
> + }
> +
> + #[test]
> + fn test_ident_digest() {
> + let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
> + let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg").unwrap();
> + let entry3 = LogEntry::pack("node1", "admin", "tag", 0, 1000, 6, "msg").unwrap();
> +
> + // Same ident should have same digest
> + assert_eq!(entry1.ident_digest, entry2.ident_digest);
> +
> + // Different ident should have different digest
> + assert_ne!(entry1.ident_digest, entry3.ident_digest);
> + }
> +
> + #[test]
> + fn test_utf8_to_ascii() {
> + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "Hello 世界").unwrap();
> + assert!(entry.message.is_ascii());
> + // Unicode chars escaped as \uXXXX format (matches C implementation)
> + assert!(entry.message.contains("\\u4e16")); // 世 = U+4E16
> + assert!(entry.message.contains("\\u754c")); // 界 = U+754C
> + }
> +
> + #[test]
> + fn test_utf8_control_chars() {
> + // Test control character escaping
> + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "Hello\x07World").unwrap();
> + assert!(entry.message.is_ascii());
> + // BEL (0x07) should be escaped as #007 (matches C implementation)
> + assert!(entry.message.contains("#007"));
> + }
> +
> + #[test]
> + fn test_utf8_mixed_content() {
> + // Test mix of ASCII, Unicode, and control chars
> + let entry = LogEntry::pack(
> + "node1",
> + "root",
> + "tag",
> + 0,
> + 1000,
> + 6,
> + "Test\x01\nUnicode世\ttab",
> + )
> + .unwrap();
> + assert!(entry.message.is_ascii());
> + // SOH (0x01) -> #001
> + assert!(entry.message.contains("#001"));
> + // Newline (0x0A) -> #010
> + assert!(entry.message.contains("#010"));
> + // Unicode 世 (U+4E16) -> \u4e16
> + assert!(entry.message.contains("\\u4e16"));
> + // Tab (0x09) -> #009
> + assert!(entry.message.contains("#009"));
> + }
> +
> + #[test]
> + fn test_string_truncation() {
> + let long_node = "a".repeat(300);
> + let entry = LogEntry::pack(&long_node, "root", "tag", 0, 1000, 6, "msg").unwrap();
> + assert!(entry.node.len() <= 255);
> + }
> +
> + #[test]
> + fn test_truncate_multibyte_utf8() {
> + // Test that truncate_string doesn't panic on multi-byte UTF-8 boundaries
> + // "世" is 3 bytes in UTF-8 (0xE4 0xB8 0x96)
> + let s = "x".repeat(253) + "世";
> +
> + // This should not panic, even though 254 falls in the middle of "世"
> + let entry = LogEntry::pack(&s, "root", "tag", 0, 1000, 6, "msg").unwrap();
> +
> + // Should truncate to 253 bytes (before the multi-byte char)
> + assert_eq!(entry.node.len(), 253);
> + assert_eq!(entry.node, "x".repeat(253));
> + }
> +
> + #[test]
> + fn test_message_truncation() {
> + let long_message = "a".repeat(CLOG_MAX_ENTRY_SIZE);
> + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, &long_message).unwrap();
> + // Entry should fit within max size
> + assert!(entry.size() <= CLOG_MAX_ENTRY_SIZE);
> + }
> +
> + #[test]
> + fn test_aligned_size() {
> + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
> + let aligned = entry.aligned_size();
> +
> + // Aligned size should be multiple of 8
> + assert_eq!(aligned % 8, 0);
> +
> + // Aligned size should be >= actual size
> + assert!(aligned >= entry.size());
> +
> + // Aligned size should be within 7 bytes of actual size
> + assert!(aligned - entry.size() < 8);
> + }
> +
> + #[test]
> + fn test_json_export() {
> + let entry = LogEntry::pack("node1", "root", "cluster", 123, 1234567890, 6, "Test").unwrap();
> + let json = entry.to_json_object();
> +
> + assert_eq!(json["node"], "node1");
> + assert_eq!(json["user"], "root");
> + assert_eq!(json["tag"], "cluster");
> + assert_eq!(json["pid"], 123);
> + assert_eq!(json["time"], 1234567890);
> + assert_eq!(json["pri"], 6);
> + assert_eq!(json["msg"], "Test");
> + }
> +
> + #[test]
> + fn test_binary_serialization_roundtrip() {
> + let entry = LogEntry::pack(
> + "node1",
> + "root",
> + "cluster",
> + 12345,
> + 1234567890,
> + 6,
> + "Test message",
> + )
> + .unwrap();
> +
> + // Serialize with prev/next pointers
> + let binary = entry.serialize_binary(100, 200);
> +
> + // Deserialize
> + let (deserialized, prev, next) = LogEntry::deserialize_binary(&binary).unwrap();
> +
> + // Check prev/next pointers
> + assert_eq!(prev, 100);
> + assert_eq!(next, 200);
> +
> + // Check entry fields
> + assert_eq!(deserialized.uid, entry.uid);
> + assert_eq!(deserialized.time, entry.time);
> + assert_eq!(deserialized.node_digest, entry.node_digest);
> + assert_eq!(deserialized.ident_digest, entry.ident_digest);
> + assert_eq!(deserialized.pid, entry.pid);
> + assert_eq!(deserialized.priority, entry.priority);
> + assert_eq!(deserialized.node, entry.node);
> + assert_eq!(deserialized.ident, entry.ident);
> + assert_eq!(deserialized.tag, entry.tag);
> + assert_eq!(deserialized.message, entry.message);
> + }
> +
> + #[test]
> + fn test_binary_format_header_size() {
> + let entry = LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap();
> + let binary = entry.serialize_binary(0, 0);
> +
> + // Header should be exactly 48 bytes
> + // prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) +
> + // pid(4) + priority(1) + node_len(1) + ident_len(1) + tag_len(1) + msg_len(4)
> + assert!(binary.len() >= 48);
> +
> + // First 48 bytes are header
> + assert_eq!(&binary[0..4], &0u32.to_le_bytes()); // prev
> + assert_eq!(&binary[4..8], &0u32.to_le_bytes()); // next
> + }
> +
> + #[test]
> + fn test_binary_deserialize_invalid_size() {
> + let too_small = vec![0u8; 40]; // Less than 48 byte header
> + let result = LogEntry::deserialize_binary(&too_small);
> + assert!(result.is_err());
> + }
> +
> + #[test]
> + fn test_binary_null_terminators() {
> + let entry = LogEntry::pack("node1", "root", "tag", 123, 1000, 6, "message").unwrap();
> + let binary = entry.serialize_binary(0, 0);
> +
> + // Check that strings are null-terminated
> + // Find null bytes in data section (after 48-byte header)
> + let data_section = &binary[48..];
> + let null_count = data_section.iter().filter(|&&b| b == 0).count();
> + assert_eq!(null_count, 4); // 4 null terminators (node, ident, tag, msg)
> + }
> +
> + #[test]
> + fn test_length_field_overflow_prevention() {
> + // Test that 255-byte strings are handled correctly (prevent u8 overflow)
> + // C does: MIN(strlen(s) + 1, 255) to cap at 255
> + let long_string = "a".repeat(255);
> +
> + let entry = LogEntry::pack(&long_string, &long_string, &long_string, 123, 1000, 6, "msg").unwrap();
> +
> + // Strings should be truncated to 254 bytes (leaving room for null)
> + assert_eq!(entry.node.len(), 254);
> + assert_eq!(entry.ident.len(), 254);
> + assert_eq!(entry.tag.len(), 254);
> +
> + // Serialize and check length fields are capped at 255 (254 bytes + null)
> + let binary = entry.serialize_binary(0, 0);
> +
> + // Extract length fields from header
> + // Layout: prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) +
> + // pid(4) + priority(1) + node_len(1) + ident_len(1) + tag_len(1) + msg_len(4)
> + // Offsets: node_len=37, ident_len=38, tag_len=39
> + let node_len = binary[37];
> + let ident_len = binary[38];
> + let tag_len = binary[39];
> +
> + assert_eq!(node_len, 255); // 254 bytes + 1 null = 255
> + assert_eq!(ident_len, 255);
> + assert_eq!(tag_len, 255);
> + }
> +
> + #[test]
> + fn test_length_field_no_wraparound() {
> + // Even if somehow a 255+ byte string gets through, serialize should cap at 255
> + // This tests the defensive .min(255) in serialize_binary
> + let mut entry = LogEntry::pack("node", "ident", "tag", 123, 1000, 6, "msg").unwrap();
> +
> + // Artificially create an edge case (though pack() already prevents this)
> + entry.node = "x".repeat(254); // Max valid size
> +
> + let binary = entry.serialize_binary(0, 0);
> + let node_len = binary[37]; // Offset 37 for node_len
> +
> + // Should be 255 (254 + 1 for null), not wrap to 0
> + assert_eq!(node_len, 255);
> + assert_ne!(node_len, 0); // Ensure no wraparound
> + }
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs b/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
> new file mode 100644
> index 000000000..09dad6afd
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
> @@ -0,0 +1,176 @@
> +/// FNV-1a (Fowler-Noll-Vo) 64-bit hash function
> +///
> +/// This matches the C implementation's `fnv_64a_buf` function
> +/// Used for generating node and ident digests for deduplication.
> +/// FNV-1a 64-bit non-zero initial basis
> +pub(crate) const FNV1A_64_INIT: u64 = 0xcbf29ce484222325;
> +
> +/// Compute 64-bit FNV-1a hash
> +///
> +/// This is a faithful port of the C implementation's `fnv_64a_buf` function:
> +/// ```c
> +/// static inline uint64_t fnv_64a_buf(const void *buf, size_t len, uint64_t hval) {
> +/// unsigned char *bp = (unsigned char *)buf;
> +/// unsigned char *be = bp + len;
> +/// while (bp < be) {
> +/// hval ^= (uint64_t)*bp++;
> +/// hval += (hval << 1) + (hval << 4) + (hval << 5) + (hval << 7) + (hval << 8) + (hval << 40);
> +/// }
> +/// return hval;
> +/// }
> +/// ```
> +///
> +/// # Arguments
> +/// * `data` - The data to hash
> +/// * `init` - Initial hash value (use FNV1A_64_INIT for first hash)
> +///
> +/// # Returns
> +/// 64-bit hash value
> +///
> +/// Note: This function appears unused but is actually called via `fnv_64a_str` below,
> +/// which provides the primary API for string hashing. Both functions share the core
> +/// FNV-1a implementation logic.
> +#[inline]
> +#[allow(dead_code)] // Used via fnv_64a_str wrapper
> +pub(crate) fn fnv_64a(data: &[u8], init: u64) -> u64 {
> + let mut hval = init;
> +
> + for &byte in data {
> + hval ^= byte as u64;
> + // FNV magic prime multiplication done via shifts and adds
> + // This is equivalent to: hval *= 0x100000001b3 (FNV 64-bit prime)
> + hval = hval.wrapping_add(
> + (hval << 1)
> + .wrapping_add(hval << 4)
> + .wrapping_add(hval << 5)
> + .wrapping_add(hval << 7)
> + .wrapping_add(hval << 8)
> + .wrapping_add(hval << 40),
> + );
> + }
> +
> + hval
> +}
> +
> +/// Hash a null-terminated string (includes the null byte)
> +///
> +/// The C implementation includes the null terminator in the hash:
> +/// `fnv_64a_buf(node, node_len, FNV1A_64_INIT)` where node_len includes the '\0'
> +///
> +/// This function adds a null byte to match that behavior.
> +#[inline]
> +pub(crate) fn fnv_64a_str(s: &str) -> u64 {
> + let bytes = s.as_bytes();
> + let mut hval = FNV1A_64_INIT;
> +
> + for &byte in bytes {
> + hval ^= byte as u64;
> + hval = hval.wrapping_add(
> + (hval << 1)
> + .wrapping_add(hval << 4)
> + .wrapping_add(hval << 5)
> + .wrapping_add(hval << 7)
> + .wrapping_add(hval << 8)
> + .wrapping_add(hval << 40),
> + );
> + }
> +
> + // Hash the null terminator to match C behavior
> + // C implementation: `hval ^= (uint64_t)*bp++` where *bp is '\0'
> + // Since XOR with 0 is a no-op (hval ^ 0 == hval), we skip it and proceed
> + // directly to the multiplication step. This optimization produces identical
> + // results to the C implementation while being more explicit about the intent.
> + hval.wrapping_add(
> + (hval << 1)
> + .wrapping_add(hval << 4)
> + .wrapping_add(hval << 5)
> + .wrapping_add(hval << 7)
> + .wrapping_add(hval << 8)
> + .wrapping_add(hval << 40),
> + )
> +}
> +
> +#[cfg(test)]
> +mod tests {
> + use super::*;
> +
> + #[test]
> + fn test_fnv1a_init() {
> + // Test that init constant matches C implementation
> + assert_eq!(FNV1A_64_INIT, 0xcbf29ce484222325);
> + }
> +
> + #[test]
> + fn test_fnv1a_empty() {
> + // Empty string with null terminator
> + let hash = fnv_64a(&[0], FNV1A_64_INIT);
> + assert_ne!(hash, FNV1A_64_INIT); // Should be different from init
> + }
> +
> + #[test]
> + fn test_fnv1a_consistency() {
> + // Same input should produce same output
> + let data = b"test";
> + let hash1 = fnv_64a(data, FNV1A_64_INIT);
> + let hash2 = fnv_64a(data, FNV1A_64_INIT);
> + assert_eq!(hash1, hash2);
> + }
> +
> + #[test]
> + fn test_fnv1a_different_data() {
> + // Different input should (usually) produce different output
> + let hash1 = fnv_64a(b"test1", FNV1A_64_INIT);
> + let hash2 = fnv_64a(b"test2", FNV1A_64_INIT);
> + assert_ne!(hash1, hash2);
> + }
> +
> + #[test]
> + fn test_fnv1a_str() {
> + // Test string hashing with null terminator
> + let hash1 = fnv_64a_str("node1");
> + let hash2 = fnv_64a_str("node1");
> + let hash3 = fnv_64a_str("node2");
> +
> + assert_eq!(hash1, hash2); // Same string should hash the same
> + assert_ne!(hash1, hash3); // Different strings should hash differently
> + }
> +
> + #[test]
> + fn test_fnv1a_node_names() {
> + // Test with typical Proxmox node names
> + let nodes = vec!["pve1", "pve2", "pve3"];
> + let mut hashes = Vec::new();
> +
> + for node in &nodes {
> + let hash = fnv_64a_str(node);
> + hashes.push(hash);
> + }
> +
> + // All hashes should be unique
> + for i in 0..hashes.len() {
> + for j in (i + 1)..hashes.len() {
> + assert_ne!(
> + hashes[i], hashes[j],
> + "Hashes for {} and {} should differ",
> + nodes[i], nodes[j]
> + );
> + }
> + }
> + }
> +
> + #[test]
> + fn test_fnv1a_chaining() {
> + // Test that we can chain hashes
> + let data1 = b"first";
> + let data2 = b"second";
> +
> + let hash1 = fnv_64a(data1, FNV1A_64_INIT);
> + let hash2 = fnv_64a(data2, hash1); // Use previous hash as init
> +
> + // Should produce a deterministic result
> + let hash1_again = fnv_64a(data1, FNV1A_64_INIT);
> + let hash2_again = fnv_64a(data2, hash1_again);
> +
> + assert_eq!(hash2, hash2_again);
> + }
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs b/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
> new file mode 100644
> index 000000000..964f0b3a6
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
> @@ -0,0 +1,27 @@
> +/// Cluster Log Implementation
> +///
> +/// This module provides a cluster-wide log system compatible with the C implementation.
> +/// It maintains a ring buffer of log entries that can be merged from multiple nodes,
> +/// deduplicated, and exported to JSON.
> +///
> +/// Key features:
> +/// - Ring buffer storage for efficient memory usage
> +/// - FNV-1a hashing for node and ident tracking
> +/// - Deduplication across nodes
> +/// - Time-based sorting
> +/// - Multi-node log merging
> +/// - JSON export for web UI
> +// Internal modules (not exposed)
> +mod cluster_log;
> +mod entry;
> +mod hash;
> +mod ring_buffer;
> +
> +// Public API - only expose what's needed externally
> +pub use cluster_log::ClusterLog;
> +
> +// Re-export types only for testing or internal crate use
> +#[doc(hidden)]
> +pub use entry::LogEntry;
> +#[doc(hidden)]
> +pub use ring_buffer::RingBuffer;
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs b/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
> new file mode 100644
> index 000000000..2c82308c9
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
> @@ -0,0 +1,628 @@
> +/// Ring Buffer Implementation for Cluster Log
> +///
> +/// This module implements a circular buffer for storing log entries,
> +/// matching the C implementation's clog_base_t structure.
> +use super::entry::LogEntry;
> +use super::hash::fnv_64a_str;
> +use anyhow::{bail, Result};
> +use std::collections::VecDeque;
> +
> +/// Matches C's CLOG_DEFAULT_SIZE constant
> +pub(crate) const CLOG_DEFAULT_SIZE: usize = 8192 * 16; // 131,072 bytes (128 KB)
> +
> +/// Matches C's CLOG_MAX_ENTRY_SIZE constant
> +pub(crate) const CLOG_MAX_ENTRY_SIZE: usize = 4096; // 4,096 bytes (4 KB)
> +
> +/// Ring buffer for log entries
> +///
> +/// This is a simplified Rust version of the C implementation's ring buffer.
> +/// The C version uses a raw byte buffer with manual pointer arithmetic,
> +/// but we use a VecDeque for safety and simplicity while maintaining
> +/// the same conceptual behavior.
> +///
> +/// C structure (clog_base_t):
> +/// ```c
> +/// struct clog_base {
> +/// uint32_t size; // Total buffer size
> +/// uint32_t cpos; // Current position
> +/// char data[]; // Variable length data
> +/// };
> +/// ```
> +#[derive(Debug, Clone)]
> +pub struct RingBuffer {
> + /// Maximum capacity in bytes
> + capacity: usize,
> +
> + /// Current size in bytes (approximate)
> + current_size: usize,
> +
> + /// Entries stored in the buffer (newest first)
> + /// We use VecDeque for efficient push/pop at both ends
> + entries: VecDeque<LogEntry>,
> +}
> +
> +impl RingBuffer {
> + /// Create a new ring buffer with specified capacity
> + pub fn new(capacity: usize) -> Self {
> + // Ensure minimum capacity
> + let capacity = if capacity < CLOG_MAX_ENTRY_SIZE * 10 {
> + CLOG_DEFAULT_SIZE
> + } else {
> + capacity
> + };
> +
> + Self {
> + capacity,
> + current_size: 0,
> + entries: VecDeque::new(),
> + }
> + }
> +
> + /// Add an entry to the buffer
> + ///
> + /// Matches C's `clog_copy` function which calls `clog_alloc_entry`
> + /// to allocate space in the ring buffer.
> + pub fn add_entry(&mut self, entry: &LogEntry) -> Result<()> {
> + let entry_size = entry.aligned_size();
> +
> + // Make room if needed (remove oldest entries)
> + while self.current_size + entry_size > self.capacity && !self.entries.is_empty() {
> + if let Some(old_entry) = self.entries.pop_back() {
> + self.current_size = self.current_size.saturating_sub(old_entry.aligned_size());
> + }
> + }
> +
> + // Add new entry at the front (newest first)
> + self.entries.push_front(entry.clone());
> + self.current_size += entry_size;
> +
> + Ok(())
> + }
> +
> + /// Check if buffer is near full (>90% capacity)
> + pub fn is_near_full(&self) -> bool {
> + self.current_size > (self.capacity * 9 / 10)
> + }
> +
> + /// Check if buffer is empty
> + pub fn is_empty(&self) -> bool {
> + self.entries.is_empty()
> + }
> +
> + /// Get number of entries
> + pub fn len(&self) -> usize {
> + self.entries.len()
> + }
> +
> + /// Get buffer capacity
> + pub fn capacity(&self) -> usize {
> + self.capacity
> + }
> +
> + /// Iterate over entries (newest first)
> + pub fn iter(&self) -> impl Iterator<Item = &LogEntry> {
> + self.entries.iter()
> + }
> +
> + /// Sort entries by time, node_digest, and uid
> + ///
> + /// Matches C's `clog_sort` function
> + ///
> + /// C uses GTree with custom comparison function `clog_entry_sort_fn`:
> + /// ```c
> + /// if (entry1->time != entry2->time) {
> + /// return entry1->time - entry2->time;
> + /// }
> + /// if (entry1->node_digest != entry2->node_digest) {
> + /// return entry1->node_digest - entry2->node_digest;
> + /// }
> + /// return entry1->uid - entry2->uid;
> + /// ```
> + pub fn sort(&self) -> Result<Self> {
> + let mut new_buffer = Self::new(self.capacity);
> +
> + // Collect and sort entries
> + let mut sorted: Vec<LogEntry> = self.entries.iter().cloned().collect();
> +
> + // Sort by time (ascending), then node_digest, then uid
> + sorted.sort_by_key(|e| (e.time, e.node_digest, e.uid));
> +
> + // Add sorted entries to new buffer
> + // Since add_entry pushes to front, we add in forward order to get newest-first
> + // sorted = [oldest...newest], add_entry pushes to front, so:
> + // - Add oldest: [oldest]
> + // - Add next: [next, oldest]
> + // - Add newest: [newest, next, oldest]
> + for entry in sorted.iter() {
> + new_buffer.add_entry(entry)?;
> + }
> +
> + Ok(new_buffer)
> + }
> +
> + /// Dump buffer to JSON format
> + ///
> + /// Matches C's `clog_dump_json` function
> + ///
> + /// # Arguments
> + /// * `ident_filter` - Optional ident filter (user filter)
> + /// * `max_entries` - Maximum number of entries to include
> + pub fn dump_json(&self, ident_filter: Option<&str>, max_entries: usize) -> String {
> + // Compute ident digest if filter is provided
> + let ident_digest = ident_filter.map(fnv_64a_str);
> +
> + let mut data = Vec::new();
> + let mut count = 0;
> +
> + // Iterate over entries (newest first, matching C's walk from cpos->prev)
> + for entry in self.iter() {
> + if count >= max_entries {
> + break;
> + }
> +
> + // Apply ident filter if specified
> + if let Some(digest) = ident_digest {
> + if digest != entry.ident_digest {
> + continue;
> + }
> + }
> +
> + data.push(entry.to_json_object());
> + count += 1;
> + }
> +
> + let result = serde_json::json!({
> + "data": data
> + });
> +
> + serde_json::to_string_pretty(&result).unwrap_or_else(|_| "{}".to_string())
> + }
> +
> + /// Dump buffer contents (for debugging)
> + ///
> + /// Matches C's `clog_dump` function
> + #[allow(dead_code)]
> + pub fn dump(&self) {
> + for (idx, entry) in self.entries.iter().enumerate() {
> + println!(
> + "[{}] uid={:08x} time={} node={}{{{:016X}}} tag={}[{}{{{:016X}}}]: {}",
> + idx,
> + entry.uid,
> + entry.time,
> + entry.node,
> + entry.node_digest,
> + entry.tag,
> + entry.ident,
> + entry.ident_digest,
> + entry.message
> + );
> + }
> + }
> +
> + /// Serialize to C binary format (clog_base_t)
> + ///
> + /// Returns a full memory dump of the ring buffer matching C's format.
> + /// C's clusterlog_get_state() returns g_memdup2(cl->base, clog->size),
> + /// which is the entire allocated buffer capacity, not just used space.
> + ///
> + /// Binary layout matches C structure:
> + /// ```c
> + /// struct clog_base {
> + /// uint32_t size; // Total allocated buffer capacity
> + /// uint32_t cpos; // Offset to newest entry (not always 8!)
> + /// char data[]; // Ring buffer data (entries at various offsets)
> + /// };
> + /// ```
> + ///
> + /// Entry offsets and linkage:
> + /// - entry.prev: offset to previous (older) entry
> + /// - entry.next: end offset of THIS entry (offset + aligned_size), NOT pointer to next entry!
> + pub fn serialize_binary(&self) -> Vec<u8> {
> + // Allocate full buffer capacity (matching C's g_malloc0(size))
> + let mut buf = vec![0u8; self.capacity];
> +
> + // Empty buffer case
> + if self.entries.is_empty() {
> + buf[0..4].copy_from_slice(&(self.capacity as u32).to_le_bytes()); // size
> + buf[4..8].copy_from_slice(&0u32.to_le_bytes()); // cpos = 0 (empty)
> + return buf;
> + }
> +
> + // Calculate all offsets first
> + let mut offsets = Vec::with_capacity(self.entries.len());
> + let mut current_offset = 8usize;
> +
> + for entry in self.iter() {
> + let aligned_size = entry.aligned_size();
> +
> + // Check if we have space
> + if current_offset + aligned_size > self.capacity {
> + break;
> + }
> +
> + offsets.push(current_offset as u32);
> + current_offset += aligned_size;
> + }
> +
> + // Track where newest entry is (first entry at offset 8)
> + let newest_offset = 8u32;
> +
> + // Write entries with correct prev/next pointers
> + // Entries are in newest-first order: [newest, second-newest, ..., oldest]
> + for (i, entry) in self.iter().enumerate() {
> + let offset = offsets[i] as usize;
> + let aligned_size = entry.aligned_size();
> +
> + // entry.prev points to the next-older entry (or 0 if this is oldest)
> + let prev = if i + 1 < offsets.len() {
> + offsets[i + 1]
> + } else {
> + 0 // Oldest entry has prev = 0
> + };
> +
> + // entry.next is the end offset of THIS entry
> + let next = offset as u32 + aligned_size as u32;
> +
> + let entry_bytes = entry.serialize_binary(prev, next);
> +
> + // Write entry data
> + buf[offset..offset + entry_bytes.len()].copy_from_slice(&entry_bytes);
> +
> + // Padding is already zeroed in vec![0u8; capacity]
> + }
> +
> + // Write header
> + buf[0..4].copy_from_slice(&(self.capacity as u32).to_le_bytes()); // size = full capacity
> + buf[4..8].copy_from_slice(&newest_offset.to_le_bytes()); // cpos = offset to newest entry
> +
> + buf
> + }
> +
> + /// Deserialize from C binary format
> + ///
> + /// Parses clog_base_t structure and extracts all entries.
> + /// Includes wrap-around guards matching C's logic in `clog_dump`, `clog_dump_json`,
> + /// and `clog_sort` functions.
> + pub fn deserialize_binary(data: &[u8]) -> Result<Self> {
> + if data.len() < 8 {
> + bail!(
> + "Buffer too small: {} bytes (need at least 8 for header)",
> + data.len()
> + );
> + }
> +
> + // Read header
> + let size = u32::from_le_bytes(data[0..4].try_into()?) as usize;
> + let initial_cpos = u32::from_le_bytes(data[4..8].try_into()?) as usize;
> +
> + if size != data.len() {
> + bail!(
> + "Size mismatch: header says {}, got {} bytes",
> + size,
> + data.len()
> + );
> + }
> +
> + // Empty buffer (cpos == 0)
> + if initial_cpos == 0 {
> + return Ok(Self::new(size));
> + }
> +
> + // Validate cpos range
> + if initial_cpos < 8 || initial_cpos >= size {
> + bail!("Invalid cpos: {initial_cpos} (size: {size})");
> + }
> +
> + // Parse entries starting from cpos, walking backwards via prev pointers
> + // Apply C's wrap-around guards from `clog_dump` and `clog_dump_json`
> + let mut entries = VecDeque::new();
> + let mut current_pos = initial_cpos;
> + let mut visited = std::collections::HashSet::new();
> +
> + loop {
> + // Guard against infinite loops
> + if !visited.insert(current_pos) {
> + break; // Already visited this position
> + }
> +
> + // C guard: cpos must be non-zero
> + if current_pos == 0 {
> + break;
> + }
> +
> + // Validate bounds
> + if current_pos >= size {
> + break;
> + }
> +
> + // Parse entry at current_pos
> + let entry_data = &data[current_pos..];
> + let (entry, prev, _next) = LogEntry::deserialize_binary(entry_data)?;
> +
> + // Add to back (we're walking backwards in time, newest to oldest)
> + // VecDeque should end up as [newest, ..., oldest]
> + entries.push_back(entry);
> +
> + // C wrap-around guard: if (cpos < cur->prev && cur->prev <= clog->cpos) break;
> + // Detects when prev wraps around past initial position
> + if current_pos < prev as usize && prev as usize <= initial_cpos {
> + break;
> + }
> +
> + current_pos = prev as usize;
> + }
> +
> + // Create ring buffer with entries
> + let mut buffer = Self::new(size);
> + buffer.entries = entries;
> +
> + // Recalculate current_size
> + buffer.current_size = buffer
> + .entries
> + .iter()
> + .map(|e| e.aligned_size())
> + .sum();
> +
> + Ok(buffer)
> + }
> +}
> +
> +impl Default for RingBuffer {
> + fn default() -> Self {
> + Self::new(CLOG_DEFAULT_SIZE)
> + }
> +}
> +
> +#[cfg(test)]
> +mod tests {
> + use super::*;
> +
> + #[test]
> + fn test_ring_buffer_creation() {
> + let buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> + assert_eq!(buffer.capacity, CLOG_DEFAULT_SIZE);
> + assert_eq!(buffer.len(), 0);
> + assert!(buffer.is_empty());
> + }
> +
> + #[test]
> + fn test_add_entry() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "message").unwrap();
> +
> + let result = buffer.add_entry(&entry);
> + assert!(result.is_ok());
> + assert_eq!(buffer.len(), 1);
> + assert!(!buffer.is_empty());
> + }
> +
> + #[test]
> + fn test_ring_buffer_wraparound() {
> + // Create a buffer with minimum required size (CLOG_MAX_ENTRY_SIZE * 10)
> + // but fill it beyond 90% to trigger wraparound
> + let mut buffer = RingBuffer::new(CLOG_MAX_ENTRY_SIZE * 10);
> +
> + // Add many small entries to fill the buffer
> + // Each entry is small, so we need many to fill the buffer
> + let initial_count = 50_usize;
> + for i in 0..initial_count {
> + let entry =
> + LogEntry::pack("node1", "root", "tag", 0, 1000 + i as u32, 6, "msg").unwrap();
> + let _ = buffer.add_entry(&entry);
> + }
> +
> + // All entries should fit initially
> + let count_before = buffer.len();
> + assert_eq!(count_before, initial_count);
> +
> + // Now add entries with large messages to trigger wraparound
> + // Make messages large enough to fill the buffer beyond capacity
> + let large_msg = "x".repeat(7000); // Very large message (close to max)
> + let large_entries_count = 20_usize;
> + for i in 0..large_entries_count {
> + let entry =
> + LogEntry::pack("node1", "root", "tag", 0, 2000 + i as u32, 6, &large_msg).unwrap();
> + let _ = buffer.add_entry(&entry);
> + }
> +
> + // Should have removed some old entries due to capacity limits
> + assert!(
> + buffer.len() < count_before + large_entries_count,
> + "Expected wraparound to remove old entries (have {} entries, expected < {})",
> + buffer.len(),
> + count_before + large_entries_count
> + );
> +
> + // Newest entry should be present
> + let newest = buffer.iter().next().unwrap();
> + assert_eq!(newest.time, 2000 + large_entries_count as u32 - 1); // Last added entry
> + }
> +
> + #[test]
> + fn test_sort_by_time() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> +
> + // Add entries in random time order
> + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "c").unwrap());
> + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
> + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "b").unwrap());
> +
> + let sorted = buffer.sort().unwrap();
> +
> + // Check that entries are sorted by time (oldest first after reversing)
> + let times: Vec<u32> = sorted.iter().map(|e| e.time).collect();
> + let mut times_sorted = times.clone();
> + times_sorted.sort();
> + times_sorted.reverse(); // Newest first in buffer
> + assert_eq!(times, times_sorted);
> + }
> +
> + #[test]
> + fn test_sort_by_node_digest() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> +
> + // Add entries with same time but different nodes
> + let _ = buffer.add_entry(&LogEntry::pack("node3", "root", "tag", 0, 1000, 6, "c").unwrap());
> + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
> + let _ = buffer.add_entry(&LogEntry::pack("node2", "root", "tag", 0, 1000, 6, "b").unwrap());
> +
> + let sorted = buffer.sort().unwrap();
> +
> + // Entries with same time should be sorted by node_digest
> + // Within same time, should be sorted
> + for entries in sorted.iter().collect::<Vec<_>>().windows(2) {
> + if entries[0].time == entries[1].time {
> + assert!(entries[0].node_digest >= entries[1].node_digest);
> + }
> + }
> + }
> +
> + #[test]
> + fn test_json_dump() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> + let _ = buffer
> + .add_entry(&LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "msg").unwrap());
> +
> + let json = buffer.dump_json(None, 50);
> +
> + // Should be valid JSON
> + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
> + assert!(parsed.get("data").is_some());
> +
> + let data = parsed["data"].as_array().unwrap();
> + assert_eq!(data.len(), 1);
> +
> + let entry = &data[0];
> + assert_eq!(entry["node"], "node1");
> + assert_eq!(entry["user"], "root");
> + assert_eq!(entry["tag"], "cluster");
> + }
> +
> + #[test]
> + fn test_json_dump_with_filter() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> +
> + // Add entries with different users
> + let _ =
> + buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg1").unwrap());
> + let _ =
> + buffer.add_entry(&LogEntry::pack("node1", "admin", "tag", 0, 1001, 6, "msg2").unwrap());
> + let _ =
> + buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "msg3").unwrap());
> +
> + // Filter for "root" only
> + let json = buffer.dump_json(Some("root"), 50);
> +
> + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
> + let data = parsed["data"].as_array().unwrap();
> +
> + // Should only have 2 entries (the ones from "root")
> + assert_eq!(data.len(), 2);
> +
> + for entry in data {
> + assert_eq!(entry["user"], "root");
> + }
> + }
> +
> + #[test]
> + fn test_json_dump_max_entries() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> +
> + // Add 10 entries
> + for i in 0..10 {
> + let _ = buffer
> + .add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000 + i, 6, "msg").unwrap());
> + }
> +
> + // Request only 5 entries
> + let json = buffer.dump_json(None, 5);
> +
> + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
> + let data = parsed["data"].as_array().unwrap();
> +
> + assert_eq!(data.len(), 5);
> + }
> +
> + #[test]
> + fn test_iterator() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> +
> + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
> + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "b").unwrap());
> + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "c").unwrap());
> +
> + let messages: Vec<String> = buffer.iter().map(|e| e.message.clone()).collect();
> +
> + // Should be in reverse order (newest first)
> + assert_eq!(messages, vec!["c", "b", "a"]);
> + }
> +
> + #[test]
> + fn test_binary_serialization_roundtrip() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> +
> + let _ = buffer.add_entry(
> + &LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "Entry 1").unwrap(),
> + );
> + let _ = buffer.add_entry(
> + &LogEntry::pack("node2", "admin", "system", 456, 1001, 5, "Entry 2").unwrap(),
> + );
> +
> + // Serialize
> + let binary = buffer.serialize_binary();
> +
> + // Deserialize
> + let deserialized = RingBuffer::deserialize_binary(&binary).unwrap();
> +
> + // Check entry count
> + assert_eq!(deserialized.len(), buffer.len());
> +
> + // Check entries match
> + let orig_entries: Vec<_> = buffer.iter().collect();
> + let deser_entries: Vec<_> = deserialized.iter().collect();
> +
> + for (orig, deser) in orig_entries.iter().zip(deser_entries.iter()) {
> + assert_eq!(deser.uid, orig.uid);
> + assert_eq!(deser.time, orig.time);
> + assert_eq!(deser.node, orig.node);
> + assert_eq!(deser.message, orig.message);
> + }
> + }
> +
> + #[test]
> + fn test_binary_format_header() {
> + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
> + let _ = buffer.add_entry(&LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap());
> +
> + let binary = buffer.serialize_binary();
> +
> + // Check header format
> + assert!(binary.len() >= 8);
> +
> + let size = u32::from_le_bytes(binary[0..4].try_into().unwrap()) as usize;
> + let cpos = u32::from_le_bytes(binary[4..8].try_into().unwrap());
> +
> + assert_eq!(size, binary.len());
> + assert_eq!(cpos, 8); // First entry at offset 8
> + }
> +
> + #[test]
> + fn test_binary_empty_buffer() {
> + let buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); // Use default size to avoid capacity upgrade
> + let binary = buffer.serialize_binary();
> +
> + // Empty buffer returns full capacity (matching C's g_memdup2(cl->base, clog->size))
> + assert_eq!(binary.len(), CLOG_DEFAULT_SIZE); // Full capacity, not just header!
> +
> + // Check header
> + let size = u32::from_le_bytes(binary[0..4].try_into().unwrap()) as usize;
> + let cpos = u32::from_le_bytes(binary[4..8].try_into().unwrap());
> +
> + assert_eq!(size, CLOG_DEFAULT_SIZE);
> + assert_eq!(cpos, 0); // Empty buffer has cpos = 0
> +
> + let deserialized = RingBuffer::deserialize_binary(&binary).unwrap();
> + assert_eq!(deserialized.len(), 0);
> + assert_eq!(deserialized.capacity(), CLOG_DEFAULT_SIZE);
> + }
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs b/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
> new file mode 100644
> index 000000000..5185386dc
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
> @@ -0,0 +1,315 @@
> +//! Binary compatibility tests for pmxcfs-logger
> +//!
> +//! These tests verify that the Rust implementation can correctly
> +//! serialize/deserialize binary data in a format compatible with
> +//! the C implementation.
> +
> +use pmxcfs_logger::{ClusterLog, LogEntry, RingBuffer};
> +
> +/// Test deserializing a minimal C-compatible binary blob
> +///
> +/// This test uses a hand-crafted binary blob that matches C's clog_base_t format:
> +/// - 8-byte header (size + cpos)
> +/// - Single entry at offset 8
> +#[test]
> +fn test_deserialize_minimal_c_blob() {
> + // Create a minimal valid C binary blob
> + // Header: size=8+entry_size, cpos=8 (points to first entry)
> + // Entry: minimal valid entry with all required fields
> +
> + let entry = LogEntry::pack("node1", "root", "test", 123, 1000, 6, "msg").unwrap();
> + let entry_bytes = entry.serialize_binary(0, 0); // prev=0 (end), next=0
> + let entry_size = entry_bytes.len();
> +
> + // Allocate buffer with capacity for header + entry
> + let total_size = 8 + entry_size;
> + let mut blob = vec![0u8; total_size];
> +
> + // Write header
> + blob[0..4].copy_from_slice(&(total_size as u32).to_le_bytes()); // size
> + blob[4..8].copy_from_slice(&8u32.to_le_bytes()); // cpos = 8
> +
> + // Write entry
> + blob[8..8 + entry_size].copy_from_slice(&entry_bytes);
> +
> + // Deserialize
> + let buffer = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
> +
> + // Verify
> + assert_eq!(buffer.len(), 1, "Should have 1 entry");
> + let entries: Vec<_> = buffer.iter().collect();
> + assert_eq!(entries[0].node, "node1");
> + assert_eq!(entries[0].message, "msg");
> +}
> +
> +/// Test round-trip: Rust serialize -> deserialize
> +///
> +/// Verifies that Rust can serialize and deserialize its own format
> +#[test]
> +fn test_roundtrip_single_entry() {
> + let mut buffer = RingBuffer::new(8192 * 16);
> +
> + let entry = LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "Test message").unwrap();
> + buffer.add_entry(&entry).unwrap();
> +
> + // Serialize
> + let blob = buffer.serialize_binary();
> +
> + // Verify header
> + let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
> + let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
> +
> + assert_eq!(size, blob.len(), "Size should match blob length");
> + assert_eq!(cpos, 8, "First entry should be at offset 8");
> +
> + // Deserialize
> + let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
> +
> + // Verify
> + assert_eq!(deserialized.len(), 1);
> + let entries: Vec<_> = deserialized.iter().collect();
> + assert_eq!(entries[0].node, "node1");
> + assert_eq!(entries[0].ident, "root");
> + assert_eq!(entries[0].message, "Test message");
> +}
> +
> +/// Test round-trip with multiple entries
> +///
> +/// Verifies linked list structure (prev/next pointers)
> +#[test]
> +fn test_roundtrip_multiple_entries() {
> + let mut buffer = RingBuffer::new(8192 * 16);
> +
> + // Add 3 entries
> + for i in 0..3 {
> + let entry = LogEntry::pack(
> + "node1",
> + "root",
> + "test",
> + 100 + i,
> + 1000 + i,
> + 6,
> + &format!("Message {}", i),
> + )
> + .unwrap();
> + buffer.add_entry(&entry).unwrap();
> + }
> +
> + // Serialize
> + let blob = buffer.serialize_binary();
> +
> + // Deserialize
> + let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
> +
> + // Verify all entries preserved
> + assert_eq!(deserialized.len(), 3);
> +
> + let entries: Vec<_> = deserialized.iter().collect();
> + // Entries are stored newest-first
> + assert_eq!(entries[0].message, "Message 2"); // Newest
> + assert_eq!(entries[1].message, "Message 1");
> + assert_eq!(entries[2].message, "Message 0"); // Oldest
> +}
> +
> +/// Test empty buffer serialization
> +///
> +/// C returns a buffer with size and cpos=0 for empty buffers
> +#[test]
> +fn test_empty_buffer_format() {
> + let buffer = RingBuffer::new(8192 * 16);
> +
> + // Serialize empty buffer
> + let blob = buffer.serialize_binary();
> +
> + // Verify format
> + assert_eq!(blob.len(), 8192 * 16, "Should be full capacity");
> +
> + let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
> + let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
> +
> + assert_eq!(size, 8192 * 16, "Size should match capacity");
> + assert_eq!(cpos, 0, "Empty buffer should have cpos=0");
> +
> + // Deserialize
> + let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
> + assert_eq!(deserialized.len(), 0, "Should be empty");
> +}
> +
> +/// Test entry alignment (8-byte boundaries)
> +///
> +/// C uses ((size + 7) & ~7) for alignment
> +#[test]
> +fn test_entry_alignment() {
> + let entry = LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap();
> +
> + let aligned_size = entry.aligned_size();
> +
> + // Should be multiple of 8
> + assert_eq!(aligned_size % 8, 0, "Aligned size should be multiple of 8");
> +
> + // Should be >= actual size
> + assert!(aligned_size >= entry.size());
> +
> + // Should be within 7 bytes of actual size
> + assert!(aligned_size - entry.size() < 8);
> +}
> +
> +/// Test string length capping (prevents u8 overflow)
> +///
> +/// node_len, ident_len, tag_len are u8 and must cap at 255
> +#[test]
> +fn test_string_length_capping() {
> + // Create entry with very long strings
> + let long_node = "a".repeat(300);
> + let long_ident = "b".repeat(300);
> + let long_tag = "c".repeat(300);
> +
> + let entry = LogEntry::pack(&long_node, &long_ident, &long_tag, 1, 1000, 6, "msg").unwrap();
> +
> + // Serialize
> + let blob = entry.serialize_binary(0, 0);
> +
> + // Check length fields (at offsets 32, 33, 34 after header)
> + let node_len = blob[32];
> + let ident_len = blob[33];
> + let tag_len = blob[34];
Shoudnt this be 37/38/39 ?
> +
> + // All should be capped at 255 (including null terminator)
> + assert!(node_len <= 255, "node_len should be capped at 255");
> + assert!(ident_len <= 255, "ident_len should be capped at 255");
> + assert!(tag_len <= 255, "tag_len should be capped at 255");
Assert the expected value instead.
> +}
> +
> +/// Test ClusterLog state serialization
> +///
> +/// Verifies get_state() returns C-compatible format
> +#[test]
> +fn test_cluster_log_state_format() {
> + let log = ClusterLog::new();
> +
> + // Add some entries
> + log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1")
> + .unwrap();
> + log.add("node2", "admin", "system", 456, 6, 1001, "Entry 2")
> + .unwrap();
> +
> + // Get state
> + let state = log.get_state().expect("Should serialize");
> +
> + // Verify header format
> + assert!(state.len() >= 8, "Should have at least header");
> +
> + let size = u32::from_le_bytes(state[0..4].try_into().unwrap()) as usize;
> + let cpos = u32::from_le_bytes(state[4..8].try_into().unwrap()) as usize;
> +
> + assert_eq!(size, state.len(), "Size should match blob length");
> + assert!(cpos >= 8, "cpos should point into data section");
> + assert!(cpos < size, "cpos should be within buffer");
> +
> + // Deserialize and verify
> + let deserialized = ClusterLog::deserialize_state(&state).expect("Should deserialize");
> + assert_eq!(deserialized.len(), 2, "Should have 2 entries");
> +}
> +
> +/// Test wrap-around detection in deserialization
> +///
> +/// Verifies that circular buffer wrap-around is handled correctly
> +#[test]
> +fn test_wraparound_detection() {
> + // Create a buffer with entries
> + let mut buffer = RingBuffer::new(8192 * 16);
> +
> + for i in 0..5 {
> + let entry = LogEntry::pack("node1", "root", "test", 100 + i, 1000 + i, 6, "msg").unwrap();
> + buffer.add_entry(&entry).unwrap();
> + }
> +
> + // Serialize
> + let blob = buffer.serialize_binary();
> +
> + // Deserialize (should handle prev pointers correctly)
> + let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
> +
> + // Should get all entries
> + assert_eq!(deserialized.len(), 5);
> +}
> +
> +/// Test invalid binary data handling
> +///
> +/// Verifies that malformed data is rejected
> +#[test]
> +fn test_invalid_binary_data() {
> + // Too small
> + let too_small = vec![0u8; 4];
> + assert!(RingBuffer::deserialize_binary(&too_small).is_err());
> +
> + // Size mismatch
> + let mut size_mismatch = vec![0u8; 100];
> + size_mismatch[0..4].copy_from_slice(&200u32.to_le_bytes()); // Claims 200 bytes
> + assert!(RingBuffer::deserialize_binary(&size_mismatch).is_err());
> +
> + // Invalid cpos (beyond buffer)
> + let mut invalid_cpos = vec![0u8; 100];
> + invalid_cpos[0..4].copy_from_slice(&100u32.to_le_bytes()); // size = 100
> + invalid_cpos[4..8].copy_from_slice(&200u32.to_le_bytes()); // cpos = 200 (invalid)
> + assert!(RingBuffer::deserialize_binary(&invalid_cpos).is_err());
> +}
> +
> +/// Test FNV-1a hash consistency
> +///
> +/// Verifies that node_digest and ident_digest are computed correctly
> +#[test]
> +fn test_hash_consistency() {
> + let entry1 = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "msg1").unwrap();
> + let entry2 = LogEntry::pack("node1", "root", "test", 2, 1001, 6, "msg2").unwrap();
> + let entry3 = LogEntry::pack("node2", "admin", "test", 3, 1002, 6, "msg3").unwrap();
> +
> + // Same node should have same digest
> + assert_eq!(entry1.node_digest, entry2.node_digest);
> +
> + // Same ident should have same digest
> + assert_eq!(entry1.ident_digest, entry2.ident_digest);
> +
> + // Different node should have different digest
> + assert_ne!(entry1.node_digest, entry3.node_digest);
> +
> + // Different ident should have different digest
> + assert_ne!(entry1.ident_digest, entry3.ident_digest);
> +}
> +
> +/// Test priority validation
> +///
> +/// Priority must be 0-7 (syslog priority)
> +#[test]
> +fn test_priority_validation() {
> + // Valid priorities (0-7)
> + for pri in 0..=7 {
> + let result = LogEntry::pack("node1", "root", "test", 1, 1000, pri, "msg");
> + assert!(result.is_ok(), "Priority {} should be valid", pri);
> + }
> +
> + // Invalid priority (8+)
> + let result = LogEntry::pack("node1", "root", "test", 1, 1000, 8, "msg");
> + assert!(result.is_err(), "Priority 8 should be invalid");
> +}
> +
> +/// Test UTF-8 to ASCII conversion
> +///
> +/// Verifies control character and Unicode escaping (matches C implementation)
> +#[test]
> +fn test_utf8_escaping() {
> + // Control characters (C format: #XXX with 3 decimal digits)
> + let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Hello\x07World").unwrap();
> + assert!(entry.message.contains("#007"), "BEL should be escaped as #007");
> +
> + // Unicode characters
> + let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Hello 世界").unwrap();
> + assert!(entry.message.contains("\\u4e16"), "世 should be escaped as \\u4e16");
> + assert!(entry.message.contains("\\u754c"), "界 should be escaped as \\u754c");
> +
> + // Mixed content
> + let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Test\x01\n世").unwrap();
> + assert!(entry.message.contains("#001"), "SOH should be escaped");
> + assert!(entry.message.contains("#010"), "LF should be escaped");
> + assert!(entry.message.contains("\\u4e16"), "Unicode should be escaped");
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs b/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs
> new file mode 100644
> index 000000000..eec7470d3
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs
> @@ -0,0 +1,294 @@
> +//! Performance tests for pmxcfs-logger
> +//!
> +//! These tests verify that the logger implementation scales properly
> +//! and handles large log merges efficiently.
> +
> +use pmxcfs_logger::ClusterLog;
> +
> +/// Test merging large logs from multiple nodes
> +///
> +/// This test verifies:
> +/// 1. Large log merge performance (multiple nodes with many entries)
> +/// 2. Memory usage stays bounded
> +/// 3. Deduplication works correctly at scale
> +#[test]
> +fn test_large_log_merge_performance() {
> + // Create 3 nodes with large logs
> + let node1 = ClusterLog::new();
> + let node2 = ClusterLog::new();
> + let node3 = ClusterLog::new();
> +
> + // Add 1000 entries per node (3000 total)
> + for i in 0..1000 {
> + let _ = node1.add(
> + "node1",
> + "root",
> + "cluster",
> + 1000 + i,
> + 6,
> + 1000000 + i,
> + &format!("Node1 entry {}", i),
> + );
> + let _ = node2.add(
> + "node2",
> + "admin",
> + "system",
> + 2000 + i,
> + 6,
> + 1000000 + i,
> + &format!("Node2 entry {}", i),
> + );
> + let _ = node3.add(
> + "node3",
> + "user",
> + "service",
> + 3000 + i,
> + 6,
> + 1000000 + i,
> + &format!("Node3 entry {}", i),
> + );
> + }
> +
> + // Get remote buffers
> + let node2_buffer = node2.get_buffer();
> + let node3_buffer = node3.get_buffer();
> +
> + // Merge all logs into node1
> + let start = std::time::Instant::now();
> + node1
> + .merge(vec![node2_buffer, node3_buffer], true)
> + .expect("Merge should succeed");
> + let duration = start.elapsed();
> +
> + // Verify merge completed
> + let merged_count = node1.len();
> +
> + // Should have merged entries (may be less than 3000 due to capacity limits)
> + assert!(
> + merged_count > 0,
> + "Should have some entries after merge (got {})",
> + merged_count
> + );
> +
> + // Performance check: merge should complete in reasonable time
> + // For 3000 entries, should be well under 1 second
> + assert!(
> + duration.as_millis() < 1000,
> + "Large merge took too long: {:?}",
> + duration
> + );
> +
> + println!(
> + "[OK] Merged 3000 entries from 3 nodes in {:?} (result: {} entries)",
> + duration, merged_count
> + );
> +}
> +
> +/// Test deduplication performance with high duplicate rate
> +///
> +/// This test verifies that deduplication works efficiently when
> +/// many duplicate entries are present.
> +#[test]
> +fn test_deduplication_performance() {
> + let log = ClusterLog::new();
> +
> + // Add 500 entries from same node with overlapping times
> + // This creates many potential duplicates
> + for i in 0..500 {
> + let _ = log.add(
> + "node1",
> + "root",
> + "cluster",
> + 1000 + i,
> + 6,
> + 1000 + (i / 10), // Reuse timestamps (50 unique times)
> + &format!("Entry {}", i),
> + );
> + }
> +
> + // Create remote log with overlapping entries
> + let remote = ClusterLog::new();
> + for i in 0..500 {
> + let _ = remote.add(
> + "node1",
> + "root",
> + "cluster",
> + 2000 + i,
> + 6,
> + 1000 + (i / 10), // Same timestamp pattern
> + &format!("Remote entry {}", i),
> + );
> + }
> +
> + let remote_buffer = remote.get_buffer();
> +
> + // Merge with deduplication
> + let start = std::time::Instant::now();
> + log.merge(vec![remote_buffer], true)
> + .expect("Merge should succeed");
> + let duration = start.elapsed();
> +
> + let final_count = log.len();
> +
> + // Should have deduplicated some entries
> + assert!(
> + final_count > 0,
> + "Should have entries after deduplication"
> + );
> +
> + // Performance check
> + assert!(
> + duration.as_millis() < 500,
> + "Deduplication took too long: {:?}",
> + duration
> + );
> +
> + println!(
> + "[OK] Deduplicated 1000 entries in {:?} (result: {} entries)",
> + duration, final_count
> + );
> +}
> +
> +/// Test memory usage stays bounded during large operations
> +///
> +/// This test verifies that the ring buffer properly limits memory
> +/// usage even when adding many entries.
> +#[test]
> +fn test_memory_bounded() {
> + // Create log with default capacity
> + let log = ClusterLog::new();
> +
> + // Add many entries (more than capacity)
> + for i in 0..10000 {
> + let _ = log.add(
> + "node1",
> + "root",
> + "cluster",
> + 1000 + i,
> + 6,
> + 1000000 + i,
> + &format!("Entry with some message content {}", i),
> + );
> + }
> +
> + let entry_count = log.len();
> + let capacity = log.capacity();
> +
> + // Buffer should not grow unbounded
> + // Entry count should be reasonable relative to capacity
> + assert!(
> + entry_count < 10000,
> + "Buffer should not store all 10000 entries (got {})",
> + entry_count
> + );
> +
> + // Verify capacity is respected
> + assert!(
> + capacity > 0,
> + "Capacity should be set (got {})",
> + capacity
> + );
> +
> + println!(
> + "[OK] Added 10000 entries, buffer contains {} (capacity: {} bytes)",
> + entry_count, capacity
> + );
> +}
> +
> +/// Test JSON export performance with large logs
> +///
> +/// This test verifies that JSON export scales properly.
> +#[test]
> +fn test_json_export_performance() {
> + let log = ClusterLog::new();
> +
> + // Add 1000 entries
> + for i in 0..1000 {
> + let _ = log.add(
> + "node1",
> + "root",
> + "cluster",
> + 1000 + i,
> + 6,
> + 1000000 + i,
> + &format!("Test message {}", i),
> + );
> + }
> +
> + // Export to JSON
> + let start = std::time::Instant::now();
> + let json = log.dump_json(None, 1000);
> + let duration = start.elapsed();
> +
> + // Verify JSON is valid
> + let parsed: serde_json::Value =
> + serde_json::from_str(&json).expect("Should be valid JSON");
> + let data = parsed["data"].as_array().expect("Should have data array");
> +
> + assert!(data.len() > 0, "Should have entries in JSON");
> +
> + // Performance check
> + assert!(
> + duration.as_millis() < 500,
> + "JSON export took too long: {:?}",
> + duration
> + );
> +
> + println!(
> + "[OK] Exported {} entries to JSON in {:?}",
> + data.len(),
> + duration
> + );
> +}
> +
> +/// Test binary serialization performance
> +///
> +/// This test verifies that binary serialization/deserialization
> +/// is efficient for large buffers.
> +#[test]
> +fn test_binary_serialization_performance() {
> + let log = ClusterLog::new();
> +
> + // Add 500 entries
> + for i in 0..500 {
> + let _ = log.add(
> + "node1",
> + "root",
> + "cluster",
> + 1000 + i,
> + 6,
> + 1000000 + i,
> + &format!("Entry {}", i),
> + );
> + }
> +
> + // Serialize
> + let start = std::time::Instant::now();
> + let state = log.get_state().expect("Should serialize");
> + let serialize_duration = start.elapsed();
> +
> + // Deserialize
> + let start = std::time::Instant::now();
> + let deserialized = ClusterLog::deserialize_state(&state).expect("Should deserialize");
> + let deserialize_duration = start.elapsed();
> +
> + // Verify round-trip
> + assert_eq!(deserialized.len(), 500, "Should preserve entry count");
> +
> + // Performance checks
> + assert!(
> + serialize_duration.as_millis() < 200,
> + "Serialization took too long: {:?}",
> + serialize_duration
> + );
> + assert!(
> + deserialize_duration.as_millis() < 200,
> + "Deserialization took too long: {:?}",
> + deserialize_duration
> + );
> +
> + println!(
> + "[OK] Serialized 500 entries in {:?}, deserialized in {:?}",
> + serialize_duration, deserialize_duration
> + );
> +}
next prev parent reply other threads:[~2026-02-24 16:48 UTC|newest]
Thread overview: 18+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-02-13 9:33 [PATCH pve-cluster 00/14 v2] Rewrite pmxcfs with Rust Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 01/14 v2] pmxcfs-rs: add Rust workspace configuration Kefu Chai
2026-02-18 10:41 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 02/14 v2] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-02-18 15:06 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 03/14 v2] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-02-18 16:41 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 04/14 v2] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-02-24 16:17 ` Samuel Rufinatscha [this message]
2026-02-13 9:33 ` [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 06/14 v2] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 07/14 v2] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 08/14 v2] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 09/14 v2] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 10/14 v2] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 11/14 v2] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 12/14 v2] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 14/14 v2] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=edfa171a-8238-4592-9bf0-fb33f9b6ad26@proxmox.com \
--to=s.rufinatscha@proxmox.com \
--cc=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox