From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Cc: Kefu Chai <tchaikov@gmail.com>
Subject: [PATCH pve-cluster v3 03/13] pmxcfs-rs: add pmxcfs-logger crate
Date: Mon, 23 Mar 2026 19:32:18 +0800 [thread overview]
Message-ID: <20260323113239.942866-4-k.chai@proxmox.com> (raw)
In-Reply-To: <20260323113239.942866-1-k.chai@proxmox.com>
Add cluster log management crate compatible with the C logger.c:
- LogEntry: Individual log entry with automatic UID generation
- RingBuffer: Circular buffer matching C's clog_base_t layout
- ClusterLog: Main API with per-node deduplication and merging
- FNV-1a hash functions matching the C implementation
Includes binary compatibility tests with C-generated fixtures to
verify the on-wire log entry format.
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/pmxcfs-rs/Cargo.toml | 2 +
src/pmxcfs-rs/pmxcfs-logger/Cargo.toml | 15 +
src/pmxcfs-rs/pmxcfs-logger/README.md | 58 ++
.../pmxcfs-logger/src/cluster_log.rs | 610 +++++++++++++++
src/pmxcfs-rs/pmxcfs-logger/src/entry.rs | 734 ++++++++++++++++++
src/pmxcfs-rs/pmxcfs-logger/src/hash.rs | 129 +++
src/pmxcfs-rs/pmxcfs-logger/src/lib.rs | 29 +
.../pmxcfs-logger/src/ring_buffer.rs | 542 +++++++++++++
.../tests/binary_compatibility_tests.rs | 611 +++++++++++++++
.../tests/fixtures/gen_fixtures.c | 144 ++++
.../tests/fixtures/multi_entry.bin | Bin 0 -> 131072 bytes
.../pmxcfs-logger/tests/fixtures/nonascii.bin | Bin 0 -> 131072 bytes
.../tests/fixtures/nonascii.json | 5 +
.../pmxcfs-logger/tests/fixtures/overflow.bin | Bin 0 -> 40960 bytes
.../tests/fixtures/single_entry.bin | Bin 0 -> 131072 bytes
.../tests/fixtures/single_entry.json | 5 +
.../pmxcfs-logger/tests/performance_tests.rs | 286 +++++++
17 files changed, 3170 insertions(+)
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/multi_entry.bin
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.bin
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.json
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/overflow.bin
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.bin
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.json
create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 99bb79266..f2ed02c6f 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -3,6 +3,7 @@
members = [
"pmxcfs-api-types", # Shared types and error definitions
"pmxcfs-config", # Configuration management
+ "pmxcfs-logger", # Cluster log with ring buffer and deduplication
]
resolver = "2"
@@ -18,6 +19,7 @@ rust-version = "1.85"
# Internal workspace dependencies
pmxcfs-api-types = { path = "pmxcfs-api-types" }
pmxcfs-config = { path = "pmxcfs-config" }
+pmxcfs-logger = { path = "pmxcfs-logger" }
# Error handling
thiserror = "2.0"
diff --git a/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml b/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
new file mode 100644
index 000000000..1af3f015c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "pmxcfs-logger"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+anyhow = "1.0"
+parking_lot = "0.12"
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+tracing = "0.1"
+
+[dev-dependencies]
+tempfile = "3.0"
+
diff --git a/src/pmxcfs-rs/pmxcfs-logger/README.md b/src/pmxcfs-rs/pmxcfs-logger/README.md
new file mode 100644
index 000000000..38f102c27
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/README.md
@@ -0,0 +1,58 @@
+# pmxcfs-logger
+
+Cluster-wide log management for pmxcfs, fully compatible with the C implementation (logger.c).
+
+## Overview
+
+This crate implements a cluster log system matching Proxmox's C-based logger.c behavior. It provides:
+
+- **Ring Buffer Storage**: Circular buffer for log entries with automatic capacity management
+- **FNV-1a Hashing**: Hashing for node and identity-based deduplication
+- **Deduplication**: Per-node tracking of latest log entries to avoid duplicates
+- **Time-based Sorting**: Chronological ordering of log entries across nodes
+- **Multi-node Merging**: Combining logs from multiple cluster nodes
+- **JSON Export**: Web UI-compatible JSON output matching C format
+
+## Architecture
+
+### Key Components
+
+1. **LogEntry** (`entry.rs`): Individual log entry with automatic UID generation
+2. **RingBuffer** (`ring_buffer.rs`): Circular buffer with capacity management
+3. **ClusterLog** (`lib.rs`): Main API with deduplication and merging
+4. **Hash Functions** (`hash.rs`): FNV-1a implementation matching C
+
+## C to Rust Mapping
+
+| C Function | Rust Equivalent | Location |
+|------------|-----------------|----------|
+| `fnv_64a_buf` | `hash::fnv_64a` | hash.rs |
+| `clog_pack` | `LogEntry::pack` | entry.rs |
+| `clog_copy` | `RingBuffer::add_entry` | ring_buffer.rs |
+| `clog_sort` | `RingBuffer::sort` | ring_buffer.rs |
+| `clog_dump_json` | `RingBuffer::dump_json` | ring_buffer.rs |
+| `clusterlog_insert` | `ClusterLog::insert` | lib.rs |
+| `clusterlog_add` | `ClusterLog::add` | lib.rs |
+| `clusterlog_merge` | `ClusterLog::merge` | lib.rs |
+| `dedup_lookup` | `ClusterLog::dedup_lookup` | lib.rs |
+
+## Key Differences from C
+
+1. **No `node_digest` in DedupEntry**: C stores `node_digest` both as HashMap key and in the struct. Rust only uses it as the key, saving 8 bytes per entry.
+
+2. **Mutex granularity**: C uses a single global mutex. Rust uses separate Arc<Mutex<>> for buffer and dedup table, allowing better concurrency.
+
+3. **Code size**: Rust implementation is ~24% the size of C (740 lines vs 3,000+) while maintaining equivalent functionality.
+
+## Integration
+
+This crate is integrated into `pmxcfs-status` to provide cluster log functionality. The `.clusterlog` FUSE plugin uses this to provide JSON log output compatible with the Proxmox web UI.
+
+## References
+
+### C Implementation
+- `src/pmxcfs/logger.c` / `logger.h` - Cluster log implementation
+
+### Related Crates
+- **pmxcfs-status**: Integrates ClusterLog for status tracking
+- **pmxcfs**: FUSE plugin exposes cluster log via `.clusterlog`
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs b/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
new file mode 100644
index 000000000..1f4b5307f
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
@@ -0,0 +1,610 @@
+/// Cluster Log Implementation
+///
+/// This module implements the cluster-wide log system with deduplication
+/// and merging support, matching C's clusterlog_t.
+use crate::entry::LogEntry;
+use crate::ring_buffer::{RingBuffer, CLOG_DEFAULT_SIZE};
+use anyhow::Result;
+use parking_lot::Mutex;
+use std::collections::{BTreeMap, HashMap};
+use std::sync::Arc;
+
+/// Deduplication entry - tracks the latest UID and time for each node
+///
+/// Note: C's `dedup_entry_t` includes node_digest field because GHashTable stores
+/// the struct pointer both as key and value. In Rust, we use HashMap<u64, DedupEntry>
+/// where node_digest is the key, so we don't need to duplicate it in the value.
+/// This is functionally equivalent but more efficient.
+#[derive(Debug, Clone)]
+pub(crate) struct DedupEntry {
+ /// Latest UID seen from this node
+ pub uid: u32,
+ /// Latest timestamp seen from this node
+ pub time: u32,
+}
+
+/// Internal state protected by a single mutex
+/// Matches C's clusterlog_t which uses a single mutex for both base and dedup
+struct ClusterLogInner {
+ /// Ring buffer for log storage (matches C's cl->base)
+ buffer: RingBuffer,
+ /// Deduplication tracker (matches C's cl->dedup)
+ dedup: HashMap<u64, DedupEntry>,
+}
+
+/// Cluster-wide log with deduplication and merging support
+/// Matches C's `clusterlog_t`
+///
+/// Note: Unlike the initial implementation with separate mutexes, we use a single
+/// mutex to match C's semantics and ensure atomic updates of buffer+dedup.
+pub struct ClusterLog {
+ /// Inner state protected by a single mutex
+ /// Matches C's single g_mutex_t protecting both cl->base and cl->dedup
+ inner: Arc<Mutex<ClusterLogInner>>,
+}
+
+impl ClusterLog {
+ /// Create a new cluster log with default size
+ pub fn new() -> Self {
+ Self::with_capacity(CLOG_DEFAULT_SIZE)
+ }
+
+ /// Create a new cluster log with specified capacity
+ pub fn with_capacity(capacity: usize) -> Self {
+ Self {
+ inner: Arc::new(Mutex::new(ClusterLogInner {
+ buffer: RingBuffer::new(capacity),
+ dedup: HashMap::new(),
+ })),
+ }
+ }
+
+ /// Matches C's `clusterlog_add` function
+ #[must_use = "log insertion errors should be handled"]
+ #[allow(clippy::too_many_arguments)]
+ pub fn add(
+ &self,
+ node: &str,
+ ident: &str,
+ tag: &str,
+ pid: u32,
+ priority: u8,
+ time: u32,
+ message: &str,
+ ) -> Result<()> {
+ let entry = LogEntry::pack(node, ident, tag, pid, time, priority, message)?;
+ self.insert(&entry)
+ }
+
+ /// Insert a log entry (with deduplication)
+ ///
+ /// Matches C's `clusterlog_insert` function
+ #[must_use = "log insertion errors should be handled"]
+ pub fn insert(&self, entry: &LogEntry) -> Result<()> {
+ let mut inner = self.inner.lock();
+
+ // Check deduplication
+ if Self::is_not_duplicate(&mut inner.dedup, entry) {
+ // Entry is not a duplicate, add it
+ inner.buffer.add_entry(entry)?;
+ } else {
+ tracing::debug!("Ignoring duplicate cluster log entry");
+ }
+
+ Ok(())
+ }
+
+ /// Check if entry is a duplicate (returns true if NOT a duplicate)
+ ///
+ /// Matches C's `dedup_lookup` function
+ ///
+ /// ## Hash Collision Risk
+ ///
+ /// Uses FNV-1a hash (`node_digest`) as deduplication key. Hash collisions
+ /// are theoretically possible but extremely rare in practice:
+ ///
+ /// - FNV-1a produces 64-bit hashes (2^64 possible values)
+ /// - Collision probability with N entries: ~N²/(2 × 2^64)
+ /// - For 10,000 log entries: collision probability < 10^-11
+ ///
+ /// If a collision occurs, two different log entries (from different nodes
+ /// or with different content) will be treated as duplicates, causing one
+ /// to be silently dropped.
+ ///
+ /// This design is inherited from the C implementation for compatibility.
+ /// The risk is acceptable because:
+ /// 1. Collisions are astronomically rare
+ /// 2. Only affects log deduplication, not critical data integrity
+ /// 3. Lost log entries don't compromise cluster operation
+ ///
+ /// Changing this would break wire format compatibility with C nodes.
+ fn is_not_duplicate(dedup: &mut HashMap<u64, DedupEntry>, entry: &LogEntry) -> bool {
+ let dd = dedup
+ .entry(entry.node_digest)
+ .or_insert(DedupEntry { time: 0, uid: 0 });
+
+ if entry.time > dd.time || (entry.time == dd.time && entry.uid > dd.uid) {
+ dd.time = entry.time;
+ dd.uid = entry.uid;
+ true
+ } else {
+ false
+ }
+ }
+
+ pub fn get_entries(&self, max: usize) -> Vec<LogEntry> {
+ let inner = self.inner.lock();
+ inner.buffer.iter().take(max).collect()
+ }
+
+ /// Get the current buffer (for testing)
+ pub fn get_buffer(&self) -> RingBuffer {
+ let inner = self.inner.lock();
+ inner.buffer.clone()
+ }
+
+ /// Get buffer length (for testing)
+ pub fn len(&self) -> usize {
+ let inner = self.inner.lock();
+ inner.buffer.len()
+ }
+
+ /// Get buffer capacity (for testing)
+ pub fn capacity(&self) -> usize {
+ let inner = self.inner.lock();
+ inner.buffer.capacity()
+ }
+
+ /// Check if buffer is empty (for testing)
+ pub fn is_empty(&self) -> bool {
+ let inner = self.inner.lock();
+ inner.buffer.is_empty()
+ }
+
+ /// Clear all log entries (for testing)
+ pub fn clear(&self) {
+ let mut inner = self.inner.lock();
+ let capacity = inner.buffer.capacity();
+ inner.buffer = RingBuffer::new(capacity);
+ inner.dedup.clear();
+ }
+
+ /// Sort the log entries by time, returning a new `RingBuffer` with entries in sorted order.
+ ///
+ /// Matches C's `clog_sort` function.
+ pub fn sort(&self) -> RingBuffer {
+ let inner = self.inner.lock();
+ let sorted_refs = inner.buffer.sort_entries();
+ let mut result = RingBuffer::new(inner.buffer.capacity());
+ // sort_entries() returns newest-first; add_entry pushes to front (newest-first),
+ // so iterate in reverse (oldest-first) to preserve the correct order.
+ for entry in sorted_refs.iter().rev() {
+ let _ = result.add_entry(entry);
+ }
+ result
+ }
+
+ /// Merge logs from multiple nodes
+ ///
+ /// Matches C's `clusterlog_merge` function
+ ///
+ /// This method atomically updates both the buffer and dedup state under a single
+ /// mutex lock, matching C's behavior where both cl->base and cl->dedup are
+ /// updated under cl->mutex.
+ #[must_use = "merge errors should be handled"]
+ pub fn merge(&self, remote_logs: Vec<RingBuffer>, include_local: bool) -> Result<()> {
+ let mut sorted_entries: BTreeMap<(u32, u64, u32), LogEntry> = BTreeMap::new();
+ let mut merge_dedup: HashMap<u64, DedupEntry> = HashMap::new();
+
+ // Lock once for the entire operation (matching C's single mutex)
+ let mut inner = self.inner.lock();
+
+ // Calculate maximum capacity across all buffers
+ let local_cap = if include_local {
+ Some(inner.buffer.capacity())
+ } else {
+ None
+ };
+ let max_size = local_cap
+ .into_iter()
+ .chain(remote_logs.iter().map(|b| b.capacity()))
+ .max()
+ .unwrap_or(CLOG_DEFAULT_SIZE);
+
+ // Helper: insert entry into sorted map with keep-first dedup (matching C's g_tree_lookup guard)
+ let mut insert_entry = |entry: &LogEntry| {
+ let key = (entry.time, entry.node_digest, entry.uid);
+ if let std::collections::btree_map::Entry::Vacant(e) = sorted_entries.entry(key) {
+ e.insert(entry.clone());
+ let _ = Self::is_not_duplicate(&mut merge_dedup, entry);
+ }
+ };
+
+ // Add local entries if requested
+ if include_local {
+ for entry in inner.buffer.iter() {
+ insert_entry(&entry);
+ }
+ }
+
+ // Add remote entries
+ for remote_buffer in &remote_logs {
+ for entry in remote_buffer.iter() {
+ insert_entry(&entry);
+ }
+ }
+
+ let mut result = RingBuffer::new(max_size);
+
+ // BTreeMap iterates oldest->newest. We add each as new head (push_front),
+ // so result ends with newest at head, matching C's behavior.
+ // Fill to 100% capacity (matching C's behavior), not just 90%
+ for (_key, entry) in sorted_entries.iter() {
+ // add_entry will automatically evict old entries if needed to stay within capacity
+ result.add_entry(entry)?;
+ }
+
+ // Atomically update both buffer and dedup (matches C lines 503-507)
+ inner.buffer = result;
+ inner.dedup = merge_dedup;
+
+ Ok(())
+ }
+
+ /// Export log to JSON format
+ ///
+ /// Matches C's `clog_dump_json` function
+ pub fn dump_json(&self, ident_filter: Option<&str>, max_entries: usize) -> String {
+ let inner = self.inner.lock();
+ inner.buffer.dump_json(ident_filter, max_entries)
+ }
+
+ /// Export log to JSON format with sorted entries
+ pub fn dump_json_sorted(&self, ident_filter: Option<&str>, max_entries: usize) -> String {
+ let inner = self.inner.lock();
+ let sorted = inner.buffer.sort_entries();
+ let ident_digest = ident_filter.map(crate::hash::fnv_64a_str);
+ let data: Vec<serde_json::Value> = sorted
+ .iter()
+ .filter(|entry| ident_digest.is_none_or(|digest| digest == entry.ident_digest))
+ .take(max_entries)
+ .map(|entry| entry.to_json_object())
+ .collect();
+ let result = serde_json::json!({ "data": data });
+ serde_json::to_string_pretty(&result).unwrap_or_else(|_| "{}".to_string())
+ }
+
+ /// Matches C's `clusterlog_get_state` function
+ ///
+ /// Returns binary-serialized clog_base_t structure for network transmission.
+ /// This format is compatible with C nodes for mixed-cluster operation.
+ #[must_use = "serialized state should be used or stored"]
+ pub fn get_state(&self) -> Result<Vec<u8>> {
+ let sorted = self.sort();
+ Ok(sorted.serialize_binary())
+ }
+
+ pub fn deserialize_state(data: &[u8]) -> Result<RingBuffer> {
+ RingBuffer::deserialize_binary(data)
+ }
+}
+
+impl Default for ClusterLog {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_cluster_log_creation() {
+ let log = ClusterLog::new();
+ assert!(log.inner.lock().buffer.is_empty());
+ }
+
+ #[test]
+ fn test_add_entry() {
+ let log = ClusterLog::new();
+
+ let result = log.add(
+ "node1",
+ "root",
+ "cluster",
+ 12345,
+ 6, // Info priority
+ 1234567890,
+ "Test message",
+ );
+
+ assert!(result.is_ok());
+ assert!(!log.inner.lock().buffer.is_empty());
+ }
+
+ #[test]
+ fn test_deduplication() {
+ let log = ClusterLog::new();
+
+ // Add same entry twice (but with different UIDs since each add creates a new entry)
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Message 1");
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Message 1");
+
+ // Both entries are added because they have different UIDs
+ // Deduplication tracks the latest (time, UID) per node, not content
+ let inner = log.inner.lock();
+ assert_eq!(inner.buffer.len(), 2);
+ }
+
+ #[test]
+ fn test_newer_entry_replaces() {
+ let log = ClusterLog::new();
+
+ // Add older entry
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Old message");
+
+ // Add newer entry from same node
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1001, "New message");
+
+ // Should have both entries (newer doesn't remove older, just updates dedup tracker)
+ let inner = log.inner.lock();
+ assert_eq!(inner.buffer.len(), 2);
+ }
+
+ #[test]
+ fn test_json_export() {
+ let log = ClusterLog::new();
+
+ let _ = log.add(
+ "node1",
+ "root",
+ "cluster",
+ 123,
+ 6,
+ 1234567890,
+ "Test message",
+ );
+
+ let json = log.dump_json(None, 50);
+
+ // Should be valid JSON
+ assert!(serde_json::from_str::<serde_json::Value>(&json).is_ok());
+
+ // Should contain "data" field
+ let value: serde_json::Value = serde_json::from_str(&json).unwrap();
+ assert!(value.get("data").is_some());
+ }
+
+ #[test]
+ fn test_merge_logs() {
+ let log1 = ClusterLog::new();
+ let log2 = ClusterLog::new();
+
+ // Add entries to first log
+ let _ = log1.add(
+ "node1",
+ "root",
+ "cluster",
+ 123,
+ 6,
+ 1000,
+ "Message from node1",
+ );
+
+ // Add entries to second log
+ let _ = log2.add(
+ "node2",
+ "root",
+ "cluster",
+ 456,
+ 6,
+ 1001,
+ "Message from node2",
+ );
+
+ // Get log2's buffer for merging
+ let log2_buffer = log2.inner.lock().buffer.clone();
+
+ // Merge into log1 (updates log1's buffer atomically)
+ log1.merge(vec![log2_buffer], true).unwrap();
+
+ // Check log1's buffer now contains entries from both logs
+ let inner = log1.inner.lock();
+ assert!(inner.buffer.len() >= 2);
+ }
+
+ // ========================================================================
+ // HIGH PRIORITY TESTS - Merge Edge Cases
+ // ========================================================================
+
+ #[test]
+ fn test_merge_empty_logs() {
+ let log = ClusterLog::new();
+
+ // Add some entries to local log
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Local entry");
+
+ // Merge with empty remote logs (updates buffer atomically)
+ log.merge(vec![], true).unwrap();
+
+ // Check buffer has 1 entry (from local log)
+ let inner = log.inner.lock();
+ assert_eq!(inner.buffer.len(), 1);
+ let entry = inner.buffer.iter().next().unwrap();
+ assert_eq!(entry.node, "node1");
+ }
+
+ #[test]
+ fn test_merge_single_node_only() {
+ let log = ClusterLog::new();
+
+ // Add entries only from single node
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
+ let _ = log.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
+ let _ = log.add("node1", "root", "cluster", 125, 6, 1002, "Entry 3");
+
+ // Merge with no remote logs (just sort local)
+ log.merge(vec![], true).unwrap();
+
+ // Check buffer has all 3 entries
+ let inner = log.inner.lock();
+ assert_eq!(inner.buffer.len(), 3);
+
+ // Entries should be stored newest first in the buffer
+ let times: Vec<u32> = inner.buffer.iter().map(|e| e.time).collect();
+ assert_eq!(times, vec![1002, 1001, 1000]);
+ }
+
+ #[test]
+ fn test_merge_all_duplicates() {
+ let log1 = ClusterLog::new();
+ let log2 = ClusterLog::new();
+
+ // Add same entries to both logs (same node, time, but different UIDs)
+ let _ = log1.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
+ let _ = log1.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
+
+ let _ = log2.add("node1", "root", "cluster", 125, 6, 1000, "Entry 1");
+ let _ = log2.add("node1", "root", "cluster", 126, 6, 1001, "Entry 2");
+
+ let log2_buffer = log2.inner.lock().buffer.clone();
+
+ // Merge - should handle entries from same node at same times
+ log1.merge(vec![log2_buffer], true).unwrap();
+
+ // Check merged buffer has 4 entries (all are unique by UID despite same time/node)
+ let inner = log1.inner.lock();
+ assert_eq!(inner.buffer.len(), 4);
+ }
+
+ #[test]
+ fn test_merge_exceeding_capacity() {
+ // Create small buffer to test capacity enforcement
+ let log = ClusterLog::with_capacity(50_000); // Small buffer
+
+ // Add many entries to fill beyond capacity
+ for i in 0..100 {
+ let _ = log.add(
+ "node1",
+ "root",
+ "cluster",
+ 100 + i,
+ 6,
+ 1000 + i,
+ &format!("Entry {}", i),
+ );
+ }
+
+ // Create remote log with many entries
+ let remote = ClusterLog::with_capacity(50_000);
+ for i in 0..100 {
+ let _ = remote.add(
+ "node2",
+ "root",
+ "cluster",
+ 200 + i,
+ 6,
+ 1000 + i,
+ &format!("Remote {}", i),
+ );
+ }
+
+ let remote_buffer = remote.inner.lock().buffer.clone();
+
+ // Merge - should stop when buffer is near full
+ log.merge(vec![remote_buffer], true).unwrap();
+
+ // Buffer should be limited by capacity, not necessarily < 200
+ // The actual limit depends on entry sizes and capacity
+ // Just verify we got some reasonable number of entries
+ let inner = log.inner.lock();
+ assert!(!inner.buffer.is_empty(), "Should have some entries");
+ assert!(
+ inner.buffer.len() <= 200,
+ "Should not exceed total available entries"
+ );
+ }
+
+ #[test]
+ fn test_merge_preserves_dedup_state() {
+ let log = ClusterLog::new();
+
+ // Add entries from node1
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
+ let _ = log.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
+
+ // Create remote log with later entries from node1
+ let remote = ClusterLog::new();
+ let _ = remote.add("node1", "root", "cluster", 125, 6, 1002, "Entry 3");
+
+ let remote_buffer = remote.inner.lock().buffer.clone();
+
+ // Merge
+ log.merge(vec![remote_buffer], true).unwrap();
+
+ // Check that dedup state was updated
+ let inner = log.inner.lock();
+ let node1_digest = crate::hash::fnv_64a_str("node1");
+ let dedup_entry = inner.dedup.get(&node1_digest).unwrap();
+
+ // Should track the latest time from node1
+ assert_eq!(dedup_entry.time, 1002);
+ // UID is auto-generated, so just verify it exists and is reasonable
+ assert!(dedup_entry.uid > 0);
+ }
+
+ #[test]
+ fn test_get_state_binary_format() {
+ let log = ClusterLog::new();
+
+ // Add some entries
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
+ let _ = log.add("node2", "admin", "system", 456, 6, 1001, "Entry 2");
+
+ // Get state
+ let state = log.get_state().unwrap();
+
+ // Should be binary format, not JSON
+ assert!(state.len() >= 8); // At least header
+
+ // Check header format (clog_base_t)
+ let size = u32::from_le_bytes(state[0..4].try_into().unwrap()) as usize;
+ let cpos = u32::from_le_bytes(state[4..8].try_into().unwrap());
+
+ assert_eq!(size, state.len());
+ // cpos points to the newest entry. With N entries stored oldest-first,
+ // the newest entry is at offset > 8 (not necessarily 8).
+ assert!(cpos >= 8, "cpos must point into the data section");
+ assert!((cpos as usize) < size, "cpos must be within buffer bounds");
+
+ // Should be able to deserialize back
+ let deserialized = ClusterLog::deserialize_state(&state).unwrap();
+ assert_eq!(deserialized.len(), 2);
+ }
+
+ #[test]
+ fn test_state_roundtrip() {
+ let log = ClusterLog::new();
+
+ // Add entries
+ let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Test 1");
+ let _ = log.add("node2", "admin", "system", 456, 6, 1001, "Test 2");
+
+ // Serialize
+ let state = log.get_state().unwrap();
+
+ // Deserialize
+ let deserialized = ClusterLog::deserialize_state(&state).unwrap();
+
+ // Check entries preserved
+ assert_eq!(deserialized.len(), 2);
+
+ // Buffer is stored newest-first after sorting and serialization
+ let entries: Vec<_> = deserialized.iter().collect();
+ assert_eq!(entries[0].node, "node2"); // Newest (time 1001)
+ assert_eq!(entries[0].message, "Test 2");
+ assert_eq!(entries[1].node, "node1"); // Oldest (time 1000)
+ assert_eq!(entries[1].message, "Test 1");
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs b/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
new file mode 100644
index 000000000..3f8bd936c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
@@ -0,0 +1,734 @@
+/// Log Entry Implementation
+///
+/// This module implements the cluster log entry structure, matching the C
+/// implementation's clog_entry_t (logger.c).
+use super::hash::fnv_64a_str;
+use anyhow::{bail, Result};
+use serde::Serialize;
+use std::fmt::Write;
+use std::sync::atomic::{AtomicU32, Ordering};
+
+// Import constant from ring_buffer to avoid duplication
+use crate::ring_buffer::CLOG_MAX_ENTRY_SIZE;
+
+/// Fixed header size of a clog_entry_t in bytes:
+/// prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) +
+/// pid(4) + priority(1) + node_len(1) + ident_len(1) + tag_len(1) + msg_len(4) = 44
+pub const CLOG_ENTRY_HEADER_SIZE: usize = 44;
+
+/// Global UID counter (matches C's `uid_counter` global variable)
+///
+/// # UID Wraparound Behavior
+///
+/// The UID counter is a 32-bit unsigned integer that wraps around after 2^32 entries.
+/// This matches the C implementation's behavior (logger.c:62).
+///
+/// **Wraparound implications:**
+/// - At 1000 entries/second: wraparound after ~49 days
+/// - At 100 entries/second: wraparound after ~497 days
+/// - After wraparound, UIDs restart from 1
+///
+/// **Impact on deduplication:**
+/// The deduplication logic compares (time, UID) tuples. After wraparound, an entry
+/// with UID=1 might be incorrectly considered older than an entry with UID=4294967295,
+/// even if they have the same timestamp. This is a known limitation inherited from
+/// the C implementation.
+///
+/// **Mitigation:**
+/// - Entries with different timestamps are correctly ordered (time is primary sort key)
+/// - Wraparound only affects entries with identical timestamps from the same node
+/// - A warning is logged when wraparound occurs (see fetch_add below)
+static UID_COUNTER: AtomicU32 = AtomicU32::new(0);
+
+/// Allocate the next unique ID from the global log entry UID counter.
+///
+/// Shared by all log entry producers so they draw from a single sequence,
+/// preventing (time, nodeid, uid) collisions between the database DFSM and
+/// the status DFSM on the same node.
+pub fn next_uid() -> u32 {
+ let old = UID_COUNTER.fetch_add(1, Ordering::SeqCst);
+ if old == u32::MAX {
+ tracing::warn!(
+ "UID counter wrapped around (2^32 entries reached). \
+ Deduplication may be affected for entries with identical timestamps."
+ );
+ }
+ old.wrapping_add(1)
+}
+
+/// Log entry structure
+///
+/// Matches C's `clog_entry_t` from logger.c:
+/// ```c
+/// typedef struct {
+/// uint32_t prev; // Previous entry offset
+/// uint32_t next; // Next entry offset
+/// uint32_t uid; // Unique ID
+/// uint32_t time; // Timestamp
+/// uint64_t node_digest; // FNV-1a hash of node name
+/// uint64_t ident_digest; // FNV-1a hash of ident
+/// uint32_t pid; // Process ID
+/// uint8_t priority; // Syslog priority (0-7)
+/// uint8_t node_len; // Length of node name (including null)
+/// uint8_t ident_len; // Length of ident (including null)
+/// uint8_t tag_len; // Length of tag (including null)
+/// uint32_t msg_len; // Length of message (including null)
+/// char data[]; // Variable length data: node + ident + tag + msg
+/// } clog_entry_t;
+/// ```
+#[derive(Debug, Clone, Serialize)]
+pub struct LogEntry {
+ /// Unique ID for this entry (auto-incrementing)
+ pub uid: u32,
+
+ /// Unix timestamp
+ pub time: u32,
+
+ /// FNV-1a hash of node name.
+ ///
+ /// Stored separately from `node` for C binary wire format compatibility
+ /// (these fields appear in the clog_entry_t struct). Not re-derived on
+ /// deserialization — the stored value must round-trip exactly.
+ pub node_digest: u64,
+
+ /// FNV-1a hash of ident (user).
+ ///
+ /// Stored separately from `ident` for C binary wire format compatibility.
+ /// Not re-derived on deserialization.
+ pub ident_digest: u64,
+
+ /// Process ID
+ pub pid: u32,
+
+ /// Syslog priority (0-7)
+ pub priority: u8,
+
+ /// Node name
+ pub node: String,
+
+ /// Identity/user
+ pub ident: String,
+
+ /// Tag (e.g., "cluster", "pmxcfs")
+ pub tag: String,
+
+ /// Log message
+ pub message: String,
+}
+
+impl LogEntry {
+ /// Matches C's `clog_pack` function
+ #[must_use = "packed entry should be used"]
+ pub fn pack(
+ node: &str,
+ ident: &str,
+ tag: &str,
+ pid: u32,
+ time: u32,
+ priority: u8,
+ message: &str,
+ ) -> Result<Self> {
+ if priority >= 8 {
+ bail!("Invalid priority: {priority} (must be 0-7)");
+ }
+
+ // Truncate to 254 bytes to leave room for null terminator (C uses MIN(strlen+1, 255))
+ let node = Self::truncate_string(node, 254);
+ let ident = Self::truncate_string(ident, 254);
+ let tag = Self::truncate_string(tag, 254);
+ let message = Self::utf8_to_ascii(message);
+
+ let node_len = node.len() + 1;
+ let ident_len = ident.len() + 1;
+ let tag_len = tag.len() + 1;
+ let mut msg_len = message.len() + 1;
+
+ // Use checked arithmetic to prevent integer overflow
+ let total_size = CLOG_ENTRY_HEADER_SIZE
+ .checked_add(node_len)
+ .and_then(|s| s.checked_add(ident_len))
+ .and_then(|s| s.checked_add(tag_len))
+ .and_then(|s| s.checked_add(msg_len))
+ .ok_or_else(|| anyhow::anyhow!("Entry size calculation overflow"))?;
+
+ if total_size > CLOG_MAX_ENTRY_SIZE {
+ let diff = total_size - CLOG_MAX_ENTRY_SIZE;
+ msg_len = msg_len.saturating_sub(diff);
+ }
+
+ let node_digest = fnv_64a_str(&node);
+ let ident_digest = fnv_64a_str(&ident);
+
+ let uid = next_uid();
+
+ Ok(Self {
+ uid,
+ time,
+ node_digest,
+ ident_digest,
+ pid,
+ priority,
+ node,
+ ident,
+ tag,
+ message: message[..msg_len.saturating_sub(1)].to_string(),
+ })
+ }
+
+ /// Truncate string to max length (safe for multi-byte UTF-8)
+ fn truncate_string(s: &str, max_len: usize) -> String {
+ if s.len() <= max_len {
+ return s.to_string();
+ }
+
+ s[..s.floor_char_boundary(max_len)].to_string()
+ }
+
+ /// Convert UTF-8 to ASCII with proper escaping
+ ///
+ /// Matches C's `utf8_to_ascii` function behavior:
+ /// - Control characters (0x00-0x1F, 0x7F): Escaped as #XXX (e.g., #007 for BEL)
+ /// - Unicode (U+0080 to U+FFFF): Escaped as \uXXXX (e.g., \u4e16 for 世)
+ /// - Quotes: Escaped as \" (matches C's quotequote=TRUE behavior)
+ /// - Characters > U+FFFF: Silently dropped
+ /// - ASCII printable (0x20-0x7E except quotes): Passed through unchanged
+ fn utf8_to_ascii(s: &str) -> String {
+ let mut result = String::with_capacity(s.len());
+
+ for c in s.chars() {
+ match c {
+ // Control characters: #XXX format (3 decimal digits)
+ '\x00'..='\x1F' | '\x7F' => {
+ let _ = write!(result, "#{:03}", c as u32);
+ }
+ // Quote escaping: matches C's quotequote=TRUE behavior (logger.c:245)
+ '"' => result.push_str("\\\""),
+ // ASCII printable characters: pass through
+ c if c.is_ascii() => result.push(c),
+ // Unicode U+0080 to U+FFFF: \uXXXX format
+ c if (c as u32) < 0x10000 => {
+ let _ = write!(result, "\\u{:04x}", c as u32);
+ }
+ // Characters > U+FFFF: silently drop (matches C behavior)
+ _ => {}
+ }
+ }
+
+ result
+ }
+
+ /// Matches C's `clog_entry_size` function
+ pub fn size(&self) -> usize {
+ CLOG_ENTRY_HEADER_SIZE
+ + self.node.len()
+ + 1
+ + self.ident.len()
+ + 1
+ + self.tag.len()
+ + 1
+ + self.message.len()
+ + 1
+ }
+
+ /// C implementation: `uint32_t realsize = ((size + 7) & 0xfffffff8);`
+ pub fn aligned_size(&self) -> usize {
+ let size = self.size();
+ (size + 7) & !7
+ }
+
+ pub fn to_json_object(&self) -> serde_json::Value {
+ // Decode utf8_to_ascii backslash escapes before JSON serialization so that
+ // serde_json re-encodes them correctly. Without this, serde_json double-escapes:
+ // \u4e16 → \\u4e16 (JSON literal string \u4e16, not the Unicode char 世)
+ // \" → \\\" (JSON literal string \", not a quote)
+ // C's clog_dump_json embeds message bytes directly via printf "%s", so \u4e16
+ // becomes a real JSON Unicode escape (decoded as 世). We match that here.
+ serde_json::json!({
+ "uid": self.uid,
+ "time": self.time,
+ "pri": self.priority,
+ "tag": self.tag,
+ "pid": self.pid,
+ "node": self.node,
+ "user": self.ident,
+ "msg": Self::decode_for_json(&self.message),
+ })
+ }
+
+ /// Reverse the backslash escapes that `utf8_to_ascii` introduced so that
+ /// serde_json can serialize the message without double-escaping them.
+ ///
+ /// Only two sequences need reversing:
+ /// - `\"` → `"` (quote escape from utf8_to_ascii quotequote mode)
+ /// - `\uXXXX` → actual Unicode char (e.g. `\u4e16` → '世')
+ ///
+ /// `#XXX` control-char escapes are plain ASCII and need no adjustment —
+ /// serde_json will round-trip them as literal `#XXX` strings, matching C.
+ fn decode_for_json(s: &str) -> String {
+ let mut result = String::with_capacity(s.len());
+ let mut chars = s.chars().peekable();
+ while let Some(c) = chars.next() {
+ if c != '\\' {
+ result.push(c);
+ continue;
+ }
+ match chars.peek().copied() {
+ Some('"') => {
+ chars.next();
+ result.push('"');
+ }
+ Some('u') => {
+ chars.next(); // consume 'u'
+ let hex: String = chars.by_ref().take(4).collect();
+ if hex.len() == 4 {
+ if let Ok(n) = u32::from_str_radix(&hex, 16) {
+ if let Some(ch) = char::from_u32(n) {
+ result.push(ch);
+ continue;
+ }
+ }
+ }
+ // Unrecognised sequence — keep as-is
+ result.push('\\');
+ result.push('u');
+ result.push_str(&hex);
+ }
+ _ => result.push('\\'),
+ }
+ }
+ result
+ }
+
+ /// Write entry body (bytes 8+) into a pre-allocated ring buffer slot.
+ ///
+ /// Mirrors C's `clog_copy` behavior:
+ /// `memcpy((char*)new + 8, (char*)entry + 8, size - 8)`
+ /// The first 8 bytes (prev/next) are set by `alloc_slot`; this writes
+ /// the remaining `size() - 8` bytes directly into the provided slice.
+ ///
+ /// The `slot` must be at least `self.size() - 8` bytes long.
+ pub(crate) fn write_body_to(&self, slot: &mut [u8]) {
+ let mut off = 0usize;
+
+ macro_rules! write_le {
+ ($val:expr) => {{
+ let b = $val;
+ slot[off..off + b.len()].copy_from_slice(&b);
+ off += b.len();
+ }};
+ }
+
+ write_le!(self.uid.to_le_bytes());
+ write_le!(self.time.to_le_bytes());
+ write_le!(self.node_digest.to_le_bytes());
+ write_le!(self.ident_digest.to_le_bytes());
+ write_le!(self.pid.to_le_bytes());
+ slot[off] = self.priority;
+ off += 1;
+
+ let node_len = (self.node.len() + 1).min(255) as u8;
+ let ident_len = (self.ident.len() + 1).min(255) as u8;
+ let tag_len = (self.tag.len() + 1).min(255) as u8;
+ slot[off] = node_len;
+ off += 1;
+ slot[off] = ident_len;
+ off += 1;
+ slot[off] = tag_len;
+ off += 1;
+ write_le!((self.message.len() as u32 + 1).to_le_bytes());
+
+ slot[off..off + self.node.len()].copy_from_slice(self.node.as_bytes());
+ off += self.node.len();
+ slot[off] = 0;
+ off += 1;
+ slot[off..off + self.ident.len()].copy_from_slice(self.ident.as_bytes());
+ off += self.ident.len();
+ slot[off] = 0;
+ off += 1;
+ slot[off..off + self.tag.len()].copy_from_slice(self.tag.as_bytes());
+ off += self.tag.len();
+ slot[off] = 0;
+ off += 1;
+ slot[off..off + self.message.len()].copy_from_slice(self.message.as_bytes());
+ off += self.message.len();
+ slot[off] = 0;
+ }
+
+ /// Serialize to C binary format (clog_entry_t)
+ ///
+ /// Binary layout matches C structure:
+ /// ```c
+ /// struct {
+ /// uint32_t prev; // Will be filled by ring buffer
+ /// uint32_t next; // Will be filled by ring buffer
+ /// uint32_t uid;
+ /// uint32_t time;
+ /// uint64_t node_digest;
+ /// uint64_t ident_digest;
+ /// uint32_t pid;
+ /// uint8_t priority;
+ /// uint8_t node_len;
+ /// uint8_t ident_len;
+ /// uint8_t tag_len;
+ /// uint32_t msg_len;
+ /// char data[]; // node + ident + tag + msg (null-terminated)
+ /// }
+ /// ```
+ pub fn serialize_binary(&self, prev: u32, next: u32) -> Vec<u8> {
+ let mut buf = vec![0u8; self.size()];
+ buf[0..4].copy_from_slice(&prev.to_le_bytes());
+ buf[4..8].copy_from_slice(&next.to_le_bytes());
+ self.write_body_to(&mut buf[8..]);
+ buf
+ }
+
+ pub(crate) fn deserialize_binary(data: &[u8]) -> Result<(Self, u32, u32)> {
+ if data.len() < CLOG_ENTRY_HEADER_SIZE {
+ bail!(
+ "Entry too small: {} bytes (need at least {} for header)",
+ data.len(),
+ CLOG_ENTRY_HEADER_SIZE,
+ );
+ }
+
+ // Read fixed header fields directly from byte offsets, matching the
+ // clog_entry_t layout documented above CLOG_ENTRY_HEADER_SIZE.
+ let prev = u32::from_le_bytes(data[ 0.. 4].try_into()?);
+ let next = u32::from_le_bytes(data[ 4.. 8].try_into()?);
+ let uid = u32::from_le_bytes(data[ 8..12].try_into()?);
+ let time = u32::from_le_bytes(data[12..16].try_into()?);
+ let node_digest = u64::from_le_bytes(data[16..24].try_into()?);
+ let ident_digest = u64::from_le_bytes(data[24..32].try_into()?);
+ let pid = u32::from_le_bytes(data[32..36].try_into()?);
+ let priority = data[36];
+ let node_len = data[37] as usize;
+ let ident_len = data[38] as usize;
+ let tag_len = data[39] as usize;
+ let msg_len = u32::from_le_bytes(data[40..44].try_into()?) as usize;
+
+ let offset = CLOG_ENTRY_HEADER_SIZE;
+ if offset + node_len + ident_len + tag_len + msg_len > data.len() {
+ bail!("Entry data exceeds buffer size");
+ }
+
+ let node = read_null_terminated(&data[offset..offset + node_len])?;
+ let ident = read_null_terminated(&data[offset + node_len..offset + node_len + ident_len])?;
+ let tag_start = offset + node_len + ident_len;
+ let tag = read_null_terminated(&data[tag_start..tag_start + tag_len])?;
+ let msg_start = tag_start + tag_len;
+ let message = read_null_terminated(&data[msg_start..msg_start + msg_len])?;
+
+ Ok((
+ Self {
+ uid,
+ time,
+ node_digest,
+ ident_digest,
+ pid,
+ priority,
+ node,
+ ident,
+ tag,
+ message,
+ },
+ prev,
+ next,
+ ))
+ }
+}
+
+fn read_null_terminated(data: &[u8]) -> Result<String> {
+ let len = data.iter().position(|&b| b == 0).unwrap_or(data.len());
+ Ok(String::from_utf8_lossy(&data[..len]).into_owned())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_pack_entry() {
+ let entry = LogEntry::pack(
+ "node1",
+ "root",
+ "cluster",
+ 12345,
+ 1234567890,
+ 6, // Info priority
+ "Test message",
+ )
+ .unwrap();
+
+ assert!(entry.uid > 0, "uid must be non-zero");
+ assert_eq!(entry.time, 1234567890);
+ assert_eq!(entry.node, "node1");
+ assert_eq!(entry.ident, "root");
+ assert_eq!(entry.tag, "cluster");
+ assert_eq!(entry.pid, 12345);
+ assert_eq!(entry.priority, 6);
+ assert_eq!(entry.message, "Test message");
+ }
+
+ #[test]
+ fn test_uid_increment() {
+ let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg1").unwrap();
+ let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg2").unwrap();
+
+ // UIDs must be consecutive — the global counter increments by 1 per entry
+ assert_eq!(entry2.uid, entry1.uid + 1, "UIDs must be consecutive");
+ }
+
+ #[test]
+ fn test_invalid_priority() {
+ let result = LogEntry::pack("node1", "root", "tag", 0, 1000, 8, "message");
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_node_digest() {
+ let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
+ let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg").unwrap();
+ let entry3 = LogEntry::pack("node2", "root", "tag", 0, 1000, 6, "msg").unwrap();
+
+ // Same node should have same digest
+ assert_eq!(entry1.node_digest, entry2.node_digest);
+
+ // Different node should have different digest
+ assert_ne!(entry1.node_digest, entry3.node_digest);
+ }
+
+ #[test]
+ fn test_ident_digest() {
+ let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
+ let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg").unwrap();
+ let entry3 = LogEntry::pack("node1", "admin", "tag", 0, 1000, 6, "msg").unwrap();
+
+ // Same ident should have same digest
+ assert_eq!(entry1.ident_digest, entry2.ident_digest);
+
+ // Different ident should have different digest
+ assert_ne!(entry1.ident_digest, entry3.ident_digest);
+ }
+
+ #[test]
+ fn test_utf8_to_ascii() {
+ let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "Hello 世界").unwrap();
+ assert!(entry.message.is_ascii());
+ // Unicode chars escaped as \uXXXX format (matches C implementation)
+ assert!(entry.message.contains("\\u4e16")); // 世 = U+4E16
+ assert!(entry.message.contains("\\u754c")); // 界 = U+754C
+ }
+
+ #[test]
+ fn test_utf8_control_chars() {
+ // Test control character escaping
+ let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "Hello\x07World").unwrap();
+ assert!(entry.message.is_ascii());
+ // BEL (0x07) should be escaped as #007 (matches C implementation)
+ assert!(entry.message.contains("#007"));
+ }
+
+ #[test]
+ fn test_utf8_mixed_content() {
+ // Test mix of ASCII, Unicode, and control chars
+ let entry = LogEntry::pack(
+ "node1",
+ "root",
+ "tag",
+ 0,
+ 1000,
+ 6,
+ "Test\x01\nUnicode世\ttab",
+ )
+ .unwrap();
+ assert!(entry.message.is_ascii());
+ // SOH (0x01) -> #001
+ assert!(entry.message.contains("#001"));
+ // Newline (0x0A) -> #010
+ assert!(entry.message.contains("#010"));
+ // Unicode 世 (U+4E16) -> \u4e16
+ assert!(entry.message.contains("\\u4e16"));
+ // Tab (0x09) -> #009
+ assert!(entry.message.contains("#009"));
+ }
+
+ #[test]
+ fn test_string_truncation() {
+ let long_node = "a".repeat(300);
+ let entry = LogEntry::pack(&long_node, "root", "tag", 0, 1000, 6, "msg").unwrap();
+ assert!(entry.node.len() <= 255);
+ }
+
+ #[test]
+ fn test_truncate_multibyte_utf8() {
+ // Test that truncate_string doesn't panic on multi-byte UTF-8 boundaries
+ // "世" is 3 bytes in UTF-8 (0xE4 0xB8 0x96)
+ let s = "x".repeat(253) + "世";
+
+ // This should not panic, even though 254 falls in the middle of "世"
+ let entry = LogEntry::pack(&s, "root", "tag", 0, 1000, 6, "msg").unwrap();
+
+ // Should truncate to 253 bytes (before the multi-byte char)
+ assert_eq!(entry.node.len(), 253);
+ assert_eq!(entry.node, "x".repeat(253));
+ }
+
+ #[test]
+ fn test_message_truncation() {
+ let long_message = "a".repeat(CLOG_MAX_ENTRY_SIZE);
+ let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, &long_message).unwrap();
+ // Entry should fit within max size
+ assert!(entry.size() <= CLOG_MAX_ENTRY_SIZE);
+ }
+
+ #[test]
+ fn test_aligned_size() {
+ let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
+ let aligned = entry.aligned_size();
+
+ // Aligned size should be multiple of 8
+ assert_eq!(aligned % 8, 0);
+
+ // Aligned size should be >= actual size
+ assert!(aligned >= entry.size());
+
+ // Aligned size should be within 7 bytes of actual size
+ assert!(aligned - entry.size() < 8);
+ }
+
+ #[test]
+ fn test_json_export() {
+ let entry = LogEntry::pack("node1", "root", "cluster", 123, 1234567890, 6, "Test").unwrap();
+ let json = entry.to_json_object();
+
+ assert_eq!(json["node"], "node1");
+ assert_eq!(json["user"], "root");
+ assert_eq!(json["tag"], "cluster");
+ assert_eq!(json["pid"], 123);
+ assert_eq!(json["time"], 1234567890);
+ assert_eq!(json["pri"], 6);
+ assert_eq!(json["msg"], "Test");
+ }
+
+ #[test]
+ fn test_binary_serialization_roundtrip() {
+ let entry = LogEntry::pack(
+ "node1",
+ "root",
+ "cluster",
+ 12345,
+ 1234567890,
+ 6,
+ "Test message",
+ )
+ .unwrap();
+
+ // Serialize with prev/next pointers
+ let binary = entry.serialize_binary(100, 200);
+
+ // Deserialize
+ let (deserialized, prev, next) = LogEntry::deserialize_binary(&binary).unwrap();
+
+ // Check prev/next pointers
+ assert_eq!(prev, 100);
+ assert_eq!(next, 200);
+
+ // Check entry fields
+ assert_eq!(deserialized.uid, entry.uid);
+ assert_eq!(deserialized.time, entry.time);
+ assert_eq!(deserialized.node_digest, entry.node_digest);
+ assert_eq!(deserialized.ident_digest, entry.ident_digest);
+ assert_eq!(deserialized.pid, entry.pid);
+ assert_eq!(deserialized.priority, entry.priority);
+ assert_eq!(deserialized.node, entry.node);
+ assert_eq!(deserialized.ident, entry.ident);
+ assert_eq!(deserialized.tag, entry.tag);
+ assert_eq!(deserialized.message, entry.message);
+ }
+
+ #[test]
+ fn test_binary_format_header_size() {
+ let entry = LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap();
+ let binary = entry.serialize_binary(0, 0);
+
+ assert!(binary.len() >= CLOG_ENTRY_HEADER_SIZE);
+
+ // First 44 bytes are the fixed header (CLOG_ENTRY_HEADER_SIZE)
+ assert_eq!(&binary[0..4], &0u32.to_le_bytes()); // prev
+ assert_eq!(&binary[4..8], &0u32.to_le_bytes()); // next
+ }
+
+ #[test]
+ fn test_binary_deserialize_invalid_size() {
+ let too_small = vec![0u8; CLOG_ENTRY_HEADER_SIZE - 1];
+ let result = LogEntry::deserialize_binary(&too_small);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_binary_null_terminators() {
+ let entry = LogEntry::pack("node1", "root", "tag", 123, 1000, 6, "message").unwrap();
+ let binary = entry.serialize_binary(0, 0);
+
+ // Check that strings are null-terminated
+ // Find null bytes in data section (after 44-byte header = CLOG_ENTRY_HEADER_SIZE)
+ let data_section = &binary[CLOG_ENTRY_HEADER_SIZE..];
+ let null_count = data_section.iter().filter(|&&b| b == 0).count();
+ assert_eq!(null_count, 4); // 4 null terminators (node, ident, tag, msg)
+ }
+
+ #[test]
+ fn test_length_field_overflow_prevention() {
+ // Test that 255-byte strings are handled correctly (prevent u8 overflow)
+ // C does: MIN(strlen(s) + 1, 255) to cap at 255
+ let long_string = "a".repeat(255);
+
+ let entry = LogEntry::pack(
+ &long_string,
+ &long_string,
+ &long_string,
+ 123,
+ 1000,
+ 6,
+ "msg",
+ )
+ .unwrap();
+
+ // Strings should be truncated to 254 bytes (leaving room for null)
+ assert_eq!(entry.node.len(), 254);
+ assert_eq!(entry.ident.len(), 254);
+ assert_eq!(entry.tag.len(), 254);
+
+ // Serialize and check length fields are capped at 255 (254 bytes + null)
+ let binary = entry.serialize_binary(0, 0);
+
+ // Extract length fields from header
+ // Layout: prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) +
+ // pid(4) + priority(1) + node_len(1) + ident_len(1) + tag_len(1) + msg_len(4)
+ // Offsets: node_len=37, ident_len=38, tag_len=39
+ let node_len = binary[37];
+ let ident_len = binary[38];
+ let tag_len = binary[39];
+
+ assert_eq!(node_len, 255); // 254 bytes + 1 null = 255
+ assert_eq!(ident_len, 255);
+ assert_eq!(tag_len, 255);
+ }
+
+ #[test]
+ fn test_length_field_no_wraparound() {
+ // Even if somehow a 255+ byte string gets through, serialize should cap at 255
+ // This tests the defensive .min(255) in serialize_binary
+ let mut entry = LogEntry::pack("node", "ident", "tag", 123, 1000, 6, "msg").unwrap();
+
+ // Artificially create an edge case (though pack() already prevents this)
+ entry.node = "x".repeat(254); // Max valid size
+
+ let binary = entry.serialize_binary(0, 0);
+ let node_len = binary[37]; // Offset 37 for node_len
+
+ // Should be 255 (254 + 1 for null), not wrap to 0
+ assert_eq!(node_len, 255);
+ assert_ne!(node_len, 0); // Ensure no wraparound
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs b/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
new file mode 100644
index 000000000..f87d9bc9b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
@@ -0,0 +1,129 @@
+//! FNV-1a (Fowler-Noll-Vo) 64-bit hash function
+//!
+//! This matches the C implementation's `fnv_64a_buf` function.
+//! Used for generating node and ident digests for deduplication.
+
+/// FNV-1a 64-bit non-zero initial basis
+pub const FNV1A_64_INIT: u64 = 0xcbf29ce484222325;
+
+/// Compute 64-bit FNV-1a hash
+///
+/// Faithful port of C's `fnv_64a_buf` function. The multiplication by the
+/// FNV 64-bit prime (0x100000001b3) is done via shifts and adds.
+#[inline]
+pub fn fnv_64a(data: &[u8], init: u64) -> u64 {
+ let mut hval = init;
+
+ for &byte in data {
+ hval ^= byte as u64;
+ hval = hval.wrapping_add(
+ (hval << 1)
+ .wrapping_add(hval << 4)
+ .wrapping_add(hval << 5)
+ .wrapping_add(hval << 7)
+ .wrapping_add(hval << 8)
+ .wrapping_add(hval << 40),
+ );
+ }
+
+ hval
+}
+
+/// Hash a null-terminated string (includes the null byte)
+///
+/// The C implementation includes the null terminator in the hash:
+/// `fnv_64a_buf(node, node_len, FNV1A_64_INIT)` where node_len includes the '\0'.
+/// We append a null byte to match that behavior.
+#[inline]
+pub fn fnv_64a_str(s: &str) -> u64 {
+ // Hash the string bytes, then the null terminator to match C behavior.
+ // XOR with 0 is a no-op for the null byte, but the FNV multiplication
+ // step still applies, so we feed it through fnv_64a rather than
+ // duplicating the multiplication logic here.
+ fnv_64a(&[0], fnv_64a(s.as_bytes(), FNV1A_64_INIT))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_fnv1a_init() {
+ // Test that init constant matches C implementation
+ assert_eq!(FNV1A_64_INIT, 0xcbf29ce484222325);
+ }
+
+ #[test]
+ fn test_fnv1a_empty() {
+ // Empty string with null terminator
+ let hash = fnv_64a(&[0], FNV1A_64_INIT);
+ assert_ne!(hash, FNV1A_64_INIT); // Should be different from init
+ }
+
+ #[test]
+ fn test_fnv1a_consistency() {
+ // Same input should produce same output
+ let data = b"test";
+ let hash1 = fnv_64a(data, FNV1A_64_INIT);
+ let hash2 = fnv_64a(data, FNV1A_64_INIT);
+ assert_eq!(hash1, hash2);
+ }
+
+ #[test]
+ fn test_fnv1a_different_data() {
+ // Different input should (usually) produce different output
+ let hash1 = fnv_64a(b"test1", FNV1A_64_INIT);
+ let hash2 = fnv_64a(b"test2", FNV1A_64_INIT);
+ assert_ne!(hash1, hash2);
+ }
+
+ #[test]
+ fn test_fnv1a_str() {
+ // Test string hashing with null terminator
+ let hash1 = fnv_64a_str("node1");
+ let hash2 = fnv_64a_str("node1");
+ let hash3 = fnv_64a_str("node2");
+
+ assert_eq!(hash1, hash2); // Same string should hash the same
+ assert_ne!(hash1, hash3); // Different strings should hash differently
+ }
+
+ #[test]
+ fn test_fnv1a_node_names() {
+ // Test with typical Proxmox node names
+ let nodes = vec!["pve1", "pve2", "pve3"];
+ let mut hashes = Vec::new();
+
+ for node in &nodes {
+ let hash = fnv_64a_str(node);
+ hashes.push(hash);
+ }
+
+ // All hashes should be unique
+ for i in 0..hashes.len() {
+ for j in (i + 1)..hashes.len() {
+ assert_ne!(
+ hashes[i], hashes[j],
+ "Hashes for {} and {} should differ",
+ nodes[i], nodes[j]
+ );
+ }
+ }
+ }
+
+ #[test]
+ fn test_fnv1a_chaining() {
+ // Test that we can chain hashes
+ let data1 = b"first";
+ let data2 = b"second";
+
+ let hash1 = fnv_64a(data1, FNV1A_64_INIT);
+ let hash2 = fnv_64a(data2, hash1); // Use previous hash as init
+
+ // Should produce a deterministic result
+ let hash1_again = fnv_64a(data1, FNV1A_64_INIT);
+ let hash2_again = fnv_64a(data2, hash1_again);
+
+ assert_eq!(hash2, hash2_again);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs b/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
new file mode 100644
index 000000000..1de0cd830
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
@@ -0,0 +1,29 @@
+/// Cluster Log Implementation
+///
+/// This module provides a cluster-wide log system compatible with the C implementation.
+/// It maintains a ring buffer of log entries that can be merged from multiple nodes,
+/// deduplicated, and exported to JSON.
+///
+/// Key features:
+/// - Ring buffer storage for efficient memory usage
+/// - FNV-1a hashing for node and ident tracking
+/// - Deduplication across nodes
+/// - Time-based sorting
+/// - Multi-node log merging
+/// - JSON export for web UI
+// Internal modules (not exposed)
+mod cluster_log;
+mod entry;
+mod hash;
+mod ring_buffer;
+
+// Public API - only expose what's needed externally
+pub use cluster_log::ClusterLog;
+pub use entry::{next_uid, CLOG_ENTRY_HEADER_SIZE};
+pub use hash::{fnv_64a, fnv_64a_str, FNV1A_64_INIT};
+
+// Re-export types only for testing or internal crate use
+#[doc(hidden)]
+pub use entry::LogEntry;
+#[doc(hidden)]
+pub use ring_buffer::RingBuffer;
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs b/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
new file mode 100644
index 000000000..2d9f79fcf
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
@@ -0,0 +1,542 @@
+/// Ring Buffer Implementation for Cluster Log
+///
+/// Directly mirrors C's `clog_base_t` byte layout so that the in-memory
+/// representation IS the wire format. `serialize_binary()` is a simple
+/// `Vec::clone()` — no entry iteration or struct reconstruction needed.
+///
+/// Key design:
+/// - `data` is the complete byte slab: header(8 bytes) + ring data.
+/// - `alloc_slot()` ports C's `clog_alloc_entry` (wrap-around ring allocation).
+/// - `add_entry()` writes entry body directly into the slab via `write_body_to`,
+/// matching C's `clog_copy`'s `memcpy((char*)new + 8, (char*)entry + 8, size-8)`.
+/// - `iter()` walks the prev-chain from `cpos`, parsing `LogEntry` on the fly.
+use super::entry::LogEntry;
+use super::hash::fnv_64a_str;
+use anyhow::{bail, Result};
+
+/// Matches C's CLOG_DEFAULT_SIZE constant
+pub(crate) const CLOG_DEFAULT_SIZE: usize = 8192 * 16; // 131,072 bytes (128 KB)
+
+/// Matches C's CLOG_MAX_ENTRY_SIZE constant
+pub(crate) const CLOG_MAX_ENTRY_SIZE: usize = 4096; // 4,096 bytes (4 KB)
+
+/// Ring buffer for cluster log entries.
+///
+/// Directly mirrors C's `clog_base_t` byte layout:
+/// ```c
+/// struct clog_base {
+/// uint32_t size; // Total buffer capacity (bytes)
+/// uint32_t cpos; // Offset to newest entry (0 = empty)
+/// char data[]; // Ring data (entries at various offsets from byte 8)
+/// };
+/// ```
+///
+/// `data.len() == size` at all times. `serialize_binary()` returns a clone of
+/// `data` — no entry iteration or struct reconstruction needed.
+///
+/// Entry layout within the buffer (matches `clog_entry_t`):
+/// - Each entry occupies an 8-byte-aligned region.
+/// - `entry.prev` → offset to the previous (older) entry (0 = oldest in chain).
+/// - `entry.next` → end-of-entry offset (this offset + aligned_size).
+/// - `cpos` points to the **newest** entry.
+/// - Traversal: newest → cpos → prev → ... → 0.
+#[derive(Debug, Clone)]
+pub struct RingBuffer {
+ /// Complete buffer: header(8 bytes) + ring data.
+ /// `data[0..4]` = total capacity as u32 LE.
+ /// `data[4..8]` = cpos as u32 LE (offset of newest entry, 0 if empty).
+ data: Vec<u8>,
+}
+
+impl RingBuffer {
+ /// Create a new empty ring buffer with the given capacity.
+ ///
+ /// Mirrors C's `clog_new(size)`: allocates zero-filled memory and sets
+ /// `clog->size = size; clog->cpos = 0;`.
+ pub fn new(capacity: usize) -> Self {
+ let capacity = if capacity < CLOG_MAX_ENTRY_SIZE * 10 {
+ CLOG_DEFAULT_SIZE
+ } else {
+ capacity
+ };
+ let mut data = vec![0u8; capacity];
+ data[0..4].copy_from_slice(&(capacity as u32).to_le_bytes());
+ // data[4..8] = 0 (cpos = 0, empty)
+ Self { data }
+ }
+
+ /// Total buffer capacity in bytes.
+ pub fn capacity(&self) -> usize {
+ u32::from_le_bytes(self.data[0..4].try_into().unwrap()) as usize
+ }
+
+ /// Offset of the newest entry, or 0 if the buffer is empty.
+ fn cpos(&self) -> usize {
+ u32::from_le_bytes(self.data[4..8].try_into().unwrap()) as usize
+ }
+
+ /// Update the cpos header field.
+ fn set_cpos(&mut self, cpos: usize) {
+ self.data[4..8].copy_from_slice(&(cpos as u32).to_le_bytes());
+ }
+
+ /// Check if buffer is empty.
+ pub fn is_empty(&self) -> bool {
+ self.cpos() == 0
+ }
+
+ /// Count entries by walking the prev-chain (O(N)).
+ pub fn len(&self) -> usize {
+ self.iter().count()
+ }
+
+ /// Allocate a slot for an entry of the given byte size.
+ ///
+ /// Mirrors C's `clog_alloc_entry`:
+ /// - Empty buffer: place at offset 8.
+ /// - Otherwise: place at `cur->next`; wrap to 8 if the new entry won't fit.
+ /// - Sets `entry->prev = old_cpos` and `entry->next = newpos + aligned_size`,
+ /// updates `clog->cpos = newpos`.
+ ///
+ /// Returns the byte offset where the new entry begins.
+ fn alloc_slot(&mut self, size: usize) -> usize {
+ let realsize = (size + 7) & !7usize;
+ let capacity = self.capacity();
+ let old_cpos = self.cpos();
+
+ let newpos = if old_cpos == 0 {
+ 8
+ } else {
+ // cur->next is at old_cpos + 4
+ let cur_next =
+ u32::from_le_bytes(self.data[old_cpos + 4..old_cpos + 8].try_into().unwrap())
+ as usize;
+ if cur_next + realsize >= capacity {
+ 8 // wrap around
+ } else {
+ cur_next
+ }
+ };
+
+ // entry->prev = old_cpos
+ self.data[newpos..newpos + 4].copy_from_slice(&(old_cpos as u32).to_le_bytes());
+ // entry->next = newpos + realsize
+ self.data[newpos + 4..newpos + 8]
+ .copy_from_slice(&((newpos + realsize) as u32).to_le_bytes());
+ // clog->cpos = newpos
+ self.set_cpos(newpos);
+
+ newpos
+ }
+
+ /// Add a log entry to the ring buffer.
+ ///
+ /// Mirrors C's `clog_copy`: calls `clog_alloc_entry` to get a slot, then
+ /// writes the entry body (bytes 8+) directly into the raw buffer — matching
+ /// `memcpy((char*)new + 8, (char*)entry + 8, size - 8)`.
+ pub fn add_entry(&mut self, entry: &LogEntry) -> Result<()> {
+ let slot_offset = self.alloc_slot(entry.size());
+ let body_end = slot_offset + entry.aligned_size();
+ entry.write_body_to(&mut self.data[slot_offset + 8..body_end]);
+ Ok(())
+ }
+
+ /// Iterate over entries from newest to oldest.
+ ///
+ /// Walks the prev-chain from `cpos` backwards, applying C's wrap-around
+ /// guard to stop at the correct oldest entry. Each `LogEntry` is parsed
+ /// on the fly from the raw byte slab.
+ pub fn iter(&self) -> impl Iterator<Item = LogEntry> + '_ {
+ RingBufferIter::new(self)
+ }
+
+ /// Sort entries by (time, node_digest, uid), returning newest-first.
+ ///
+ /// Mirrors C's `clog_sort` comparison function.
+ pub fn sort_entries(&self) -> Vec<LogEntry> {
+ let mut entries: Vec<LogEntry> = self.iter().collect();
+ entries.sort_unstable_by(|a, b| {
+ (b.time, b.node_digest, b.uid).cmp(&(a.time, a.node_digest, a.uid))
+ });
+ entries
+ }
+
+ /// Dump buffer to JSON format.
+ ///
+ /// Matches C's `clog_dump_json`.
+ pub fn dump_json(&self, ident_filter: Option<&str>, max_entries: usize) -> String {
+ let ident_digest = ident_filter.map(fnv_64a_str);
+
+ let data: Vec<serde_json::Value> = self
+ .iter()
+ .filter(|entry| ident_digest.is_none_or(|digest| digest == entry.ident_digest))
+ .take(max_entries)
+ .map(|entry| entry.to_json_object())
+ .collect();
+
+ let result = serde_json::json!({ "data": data });
+ serde_json::to_string_pretty(&result).unwrap_or_else(|_| "{}".to_string())
+ }
+
+ /// Print all entries to stdout (debugging only).
+ ///
+ /// Matches C's `clog_dump` function. Not called in normal operation but
+ /// kept for debugging parity with the C implementation.
+ #[allow(dead_code)]
+ pub fn dump(&self) {
+ for (idx, entry) in self.iter().enumerate() {
+ println!(
+ "[{}] uid={:08x} time={} node={}{{{:016X}}} tag={}[{}{{{:016X}}}]: {}",
+ idx,
+ entry.uid,
+ entry.time,
+ entry.node,
+ entry.node_digest,
+ entry.tag,
+ entry.ident,
+ entry.ident_digest,
+ entry.message
+ );
+ }
+ }
+
+ /// Serialize to C binary format: returns a clone of the raw byte slab.
+ ///
+ /// C's `clusterlog_get_state()` returns `g_memdup2(cl->base, clog->size)` —
+ /// a raw memory copy of the entire buffer. Since `data` IS that buffer,
+ /// this is an O(capacity) clone with no entry iteration.
+ pub fn serialize_binary(&self) -> Vec<u8> {
+ self.data.clone()
+ }
+
+ /// Deserialize from C binary format.
+ ///
+ /// Validates the header and stores the raw bytes. Entry parsing is deferred
+ /// to `iter()`, which applies C's wrap-around traversal guards.
+ pub fn deserialize_binary(data: &[u8]) -> Result<Self> {
+ if data.len() < 8 {
+ bail!(
+ "Buffer too small: {} bytes (need at least 8 for header)",
+ data.len()
+ );
+ }
+
+ let size = u32::from_le_bytes(data[0..4].try_into()?) as usize;
+ let cpos = u32::from_le_bytes(data[4..8].try_into()?) as usize;
+
+ if size != data.len() {
+ bail!(
+ "Size mismatch: header says {}, got {} bytes",
+ size,
+ data.len()
+ );
+ }
+
+ if cpos != 0 && (cpos < 8 || cpos >= size) {
+ bail!("Invalid cpos: {cpos} (size: {size})");
+ }
+
+ Ok(Self {
+ data: data.to_vec(),
+ })
+ }
+}
+
+impl Default for RingBuffer {
+ fn default() -> Self {
+ Self::new(CLOG_DEFAULT_SIZE)
+ }
+}
+
+/// Iterator that walks the prev-chain from newest to oldest entry.
+///
+/// Applies C's wrap-around guard from `clog_dump`/`clog_dump_json`:
+/// stops when following `prev` would jump forward past `initial_cpos`,
+/// which signals that older entries have been overwritten by the ring.
+///
+/// A `HashSet` of visited offsets guards against cycles through stale
+/// prev pointers in overwritten regions that the C guard alone cannot detect.
+struct RingBufferIter<'a> {
+ data: &'a [u8],
+ current_cpos: usize,
+ initial_cpos: usize,
+ visited: std::collections::HashSet<usize>,
+ done: bool,
+}
+
+impl<'a> RingBufferIter<'a> {
+ fn new(buf: &'a RingBuffer) -> Self {
+ let initial_cpos = buf.cpos();
+ Self {
+ data: &buf.data,
+ current_cpos: initial_cpos,
+ initial_cpos,
+ visited: std::collections::HashSet::new(),
+ done: false,
+ }
+ }
+}
+
+impl Iterator for RingBufferIter<'_> {
+ type Item = LogEntry;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.done || self.current_cpos == 0 {
+ return None;
+ }
+
+ // Guard against cycles through stale prev pointers in overwritten regions.
+ if !self.visited.insert(self.current_cpos) {
+ return None;
+ }
+
+ if self.current_cpos >= self.data.len() {
+ return None;
+ }
+
+ let entry_data = &self.data[self.current_cpos..];
+ match LogEntry::deserialize_binary(entry_data) {
+ Err(_) => None,
+ Ok((entry, prev, _next)) => {
+ // C wrap-around guard: if (cpos < cur->prev && cur->prev <= clog->cpos) break
+ // Detects when following prev would jump forward past initial_cpos,
+ // meaning the entry it points to was overwritten by the ring.
+ if self.current_cpos < prev as usize && (prev as usize) <= self.initial_cpos {
+ self.done = true;
+ return Some(entry);
+ }
+ self.current_cpos = prev as usize;
+ Some(entry)
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_ring_buffer_creation() {
+ let buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+ assert_eq!(buffer.capacity(), CLOG_DEFAULT_SIZE);
+ assert_eq!(buffer.len(), 0);
+ assert!(buffer.is_empty());
+ }
+
+ #[test]
+ fn test_add_entry() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+ let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "message").unwrap();
+
+ let result = buffer.add_entry(&entry);
+ assert!(result.is_ok());
+ assert_eq!(buffer.len(), 1);
+ assert!(!buffer.is_empty());
+ }
+
+ #[test]
+ fn test_ring_buffer_wraparound() {
+ // Create a buffer with minimum required size (CLOG_MAX_ENTRY_SIZE * 10)
+ // but fill it beyond capacity to trigger wraparound
+ let mut buffer = RingBuffer::new(CLOG_MAX_ENTRY_SIZE * 10);
+
+ let initial_count = 50_usize;
+ for i in 0..initial_count {
+ let entry =
+ LogEntry::pack("node1", "root", "tag", 0, 1000 + i as u32, 6, "msg").unwrap();
+ let _ = buffer.add_entry(&entry);
+ }
+
+ let count_before = buffer.len();
+ assert_eq!(count_before, initial_count);
+
+ // Add entries with large messages to trigger ring eviction
+ let large_msg = "x".repeat(7000);
+ let large_entries_count = 20_usize;
+ for i in 0..large_entries_count {
+ let entry =
+ LogEntry::pack("node1", "root", "tag", 0, 2000 + i as u32, 6, &large_msg).unwrap();
+ let _ = buffer.add_entry(&entry);
+ }
+
+ // Should have fewer entries than total added (ring evicted old ones)
+ assert!(
+ buffer.len() < count_before + large_entries_count,
+ "Expected wraparound to evict old entries (have {} entries, expected < {})",
+ buffer.len(),
+ count_before + large_entries_count
+ );
+
+ // Newest entry (iter starts from cpos = newest) should be the last added
+ let newest = buffer.iter().next().unwrap();
+ assert_eq!(newest.time, 2000 + large_entries_count as u32 - 1);
+ }
+
+ #[test]
+ fn test_sort_by_time() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+ let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "c").unwrap());
+ let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
+ let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "b").unwrap());
+
+ let sorted = buffer.sort_entries();
+
+ // Entries are newest-first after sort
+ let times: Vec<u32> = sorted.iter().map(|e| e.time).collect();
+ assert_eq!(times, vec![1002, 1001, 1000]);
+ }
+
+ #[test]
+ fn test_sort_by_node_digest() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+ let _ = buffer.add_entry(&LogEntry::pack("node3", "root", "tag", 0, 1000, 6, "c").unwrap());
+ let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
+ let _ = buffer.add_entry(&LogEntry::pack("node2", "root", "tag", 0, 1000, 6, "b").unwrap());
+
+ let sorted = buffer.sort_entries();
+
+ // Entries with same time should be sorted by node_digest (descending)
+ for entries in sorted.windows(2) {
+ if entries[0].time == entries[1].time {
+ assert!(entries[0].node_digest >= entries[1].node_digest);
+ }
+ }
+ }
+
+ #[test]
+ fn test_json_dump() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+ let _ = buffer
+ .add_entry(&LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "msg").unwrap());
+
+ let json = buffer.dump_json(None, 50);
+
+ let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
+ assert!(parsed.get("data").is_some());
+
+ let data = parsed["data"].as_array().unwrap();
+ assert_eq!(data.len(), 1);
+
+ let entry = &data[0];
+ assert_eq!(entry["node"], "node1");
+ assert_eq!(entry["user"], "root");
+ assert_eq!(entry["tag"], "cluster");
+ }
+
+ #[test]
+ fn test_json_dump_with_filter() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+ let _ =
+ buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg1").unwrap());
+ let _ =
+ buffer.add_entry(&LogEntry::pack("node1", "admin", "tag", 0, 1001, 6, "msg2").unwrap());
+ let _ =
+ buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "msg3").unwrap());
+
+ let json = buffer.dump_json(Some("root"), 50);
+
+ let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
+ let data = parsed["data"].as_array().unwrap();
+
+ assert_eq!(data.len(), 2);
+ for entry in data {
+ assert_eq!(entry["user"], "root");
+ }
+ }
+
+ #[test]
+ fn test_json_dump_max_entries() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+ for i in 0..10 {
+ let _ = buffer
+ .add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000 + i, 6, "msg").unwrap());
+ }
+
+ let json = buffer.dump_json(None, 5);
+
+ let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
+ let data = parsed["data"].as_array().unwrap();
+
+ assert_eq!(data.len(), 5);
+ }
+
+ #[test]
+ fn test_iterator() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+ let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
+ let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "b").unwrap());
+ let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "c").unwrap());
+
+ let messages: Vec<String> = buffer.iter().map(|e| e.message).collect();
+
+ // Newest first (iter walks from cpos backwards via prev)
+ assert_eq!(messages, vec!["c", "b", "a"]);
+ }
+
+ #[test]
+ fn test_binary_serialization_roundtrip() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+ let _ = buffer.add_entry(
+ &LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "Entry 1").unwrap(),
+ );
+ let _ = buffer.add_entry(
+ &LogEntry::pack("node2", "admin", "system", 456, 1001, 5, "Entry 2").unwrap(),
+ );
+
+ let binary = buffer.serialize_binary();
+ let deserialized = RingBuffer::deserialize_binary(&binary).unwrap();
+
+ assert_eq!(deserialized.len(), buffer.len());
+
+ let orig_entries: Vec<_> = buffer.iter().collect();
+ let deser_entries: Vec<_> = deserialized.iter().collect();
+
+ for (orig, deser) in orig_entries.iter().zip(deser_entries.iter()) {
+ assert_eq!(deser.uid, orig.uid);
+ assert_eq!(deser.time, orig.time);
+ assert_eq!(deser.node, orig.node);
+ assert_eq!(deser.message, orig.message);
+ }
+ }
+
+ #[test]
+ fn test_binary_format_header() {
+ let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+ let _ = buffer.add_entry(&LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap());
+
+ let binary = buffer.serialize_binary();
+
+ assert!(binary.len() >= 8);
+
+ let size = u32::from_le_bytes(binary[0..4].try_into().unwrap()) as usize;
+ let cpos = u32::from_le_bytes(binary[4..8].try_into().unwrap());
+
+ assert_eq!(size, binary.len());
+ assert_eq!(cpos, 8); // First entry at offset 8
+ }
+
+ #[test]
+ fn test_binary_empty_buffer() {
+ let buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+ let binary = buffer.serialize_binary();
+
+ assert_eq!(binary.len(), CLOG_DEFAULT_SIZE);
+
+ let size = u32::from_le_bytes(binary[0..4].try_into().unwrap()) as usize;
+ let cpos = u32::from_le_bytes(binary[4..8].try_into().unwrap());
+
+ assert_eq!(size, CLOG_DEFAULT_SIZE);
+ assert_eq!(cpos, 0); // Empty buffer has cpos = 0
+
+ let deserialized = RingBuffer::deserialize_binary(&binary).unwrap();
+ assert_eq!(deserialized.len(), 0);
+ assert_eq!(deserialized.capacity(), CLOG_DEFAULT_SIZE);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs b/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
new file mode 100644
index 000000000..17a11f9c4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
@@ -0,0 +1,611 @@
+//! Binary compatibility tests for pmxcfs-logger
+//!
+//! These tests verify that the Rust implementation can correctly
+//! serialize/deserialize binary data in a format compatible with
+//! the C implementation.
+//!
+//! ## Real C Fixture Tests
+//!
+//! The `test_c_fixture_*` tests use blobs generated by the C implementation
+//! (via `clog_pack` + `clog_copy` + `clog_new`). To regenerate:
+//!
+//! ```sh
+//! cd src/pmxcfs
+//! gcc -o /tmp/gen_fixtures /tmp/gen_fixtures.c \
+//! -I. $(pkg-config --cflags glib-2.0) \
+//! libpmxcfs.a $(pkg-config --libs glib-2.0) -lpthread
+//! /tmp/gen_fixtures tests/fixtures
+//! ```
+//!
+//! The generator source lives at `src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c`.
+
+use pmxcfs_logger::{ClusterLog, LogEntry, RingBuffer};
+
+/// Test deserializing a minimal C-compatible binary blob
+///
+/// This test uses a hand-crafted binary blob that matches C's clog_base_t format:
+/// - 8-byte header (size + cpos)
+/// - Single entry at offset 8
+#[test]
+fn test_deserialize_minimal_c_blob() {
+ // Create a minimal valid C binary blob
+ // Header: size=8+entry_size, cpos=8 (points to first entry)
+ // Entry: minimal valid entry with all required fields
+
+ let entry = LogEntry::pack("node1", "root", "test", 123, 1000, 6, "msg").unwrap();
+ let entry_bytes = entry.serialize_binary(0, 0); // prev=0 (end), next=0
+ let entry_size = entry_bytes.len();
+
+ // Allocate buffer with capacity for header + entry
+ let total_size = 8 + entry_size;
+ let mut blob = vec![0u8; total_size];
+
+ // Write header
+ blob[0..4].copy_from_slice(&(total_size as u32).to_le_bytes()); // size
+ blob[4..8].copy_from_slice(&8u32.to_le_bytes()); // cpos = 8
+
+ // Write entry
+ blob[8..8 + entry_size].copy_from_slice(&entry_bytes);
+
+ // Deserialize
+ let buffer = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+
+ // Verify
+ assert_eq!(buffer.len(), 1, "Should have 1 entry");
+ let entries: Vec<_> = buffer.iter().collect();
+ assert_eq!(entries[0].node, "node1");
+ assert_eq!(entries[0].message, "msg");
+}
+
+/// Test round-trip: Rust serialize -> deserialize
+///
+/// Verifies that Rust can serialize and deserialize its own format
+#[test]
+fn test_roundtrip_single_entry() {
+ let mut buffer = RingBuffer::new(8192 * 16);
+
+ let entry = LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "Test message").unwrap();
+ buffer.add_entry(&entry).unwrap();
+
+ // Serialize
+ let blob = buffer.serialize_binary();
+
+ // Verify header
+ let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
+ let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+
+ assert_eq!(size, blob.len(), "Size should match blob length");
+ assert_eq!(cpos, 8, "First entry should be at offset 8");
+
+ // Deserialize
+ let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+
+ // Verify
+ assert_eq!(deserialized.len(), 1);
+ let entries: Vec<_> = deserialized.iter().collect();
+ assert_eq!(entries[0].node, "node1");
+ assert_eq!(entries[0].ident, "root");
+ assert_eq!(entries[0].message, "Test message");
+}
+
+/// Test round-trip with multiple entries
+///
+/// Verifies linked list structure (prev/next pointers)
+#[test]
+fn test_roundtrip_multiple_entries() {
+ let mut buffer = RingBuffer::new(8192 * 16);
+
+ // Add 3 entries
+ for i in 0..3 {
+ let entry = LogEntry::pack(
+ "node1",
+ "root",
+ "test",
+ 100 + i,
+ 1000 + i,
+ 6,
+ &format!("Message {}", i),
+ )
+ .unwrap();
+ buffer.add_entry(&entry).unwrap();
+ }
+
+ // Serialize
+ let blob = buffer.serialize_binary();
+
+ // Deserialize
+ let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+
+ // Verify all entries preserved
+ assert_eq!(deserialized.len(), 3);
+
+ let entries: Vec<_> = deserialized.iter().collect();
+ // Entries are stored newest-first
+ assert_eq!(entries[0].message, "Message 2"); // Newest
+ assert_eq!(entries[1].message, "Message 1");
+ assert_eq!(entries[2].message, "Message 0"); // Oldest
+}
+
+/// Test empty buffer serialization
+///
+/// C returns a buffer with size and cpos=0 for empty buffers
+#[test]
+fn test_empty_buffer_format() {
+ let buffer = RingBuffer::new(8192 * 16);
+
+ // Serialize empty buffer
+ let blob = buffer.serialize_binary();
+
+ // Verify format
+ assert_eq!(blob.len(), 8192 * 16, "Should be full capacity");
+
+ let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
+ let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+
+ assert_eq!(size, 8192 * 16, "Size should match capacity");
+ assert_eq!(cpos, 0, "Empty buffer should have cpos=0");
+
+ // Deserialize
+ let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+ assert_eq!(deserialized.len(), 0, "Should be empty");
+}
+
+/// Test entry alignment (8-byte boundaries)
+///
+/// C uses ((size + 7) & ~7) for alignment
+#[test]
+fn test_entry_alignment() {
+ let entry = LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap();
+
+ let aligned_size = entry.aligned_size();
+
+ // Should be multiple of 8
+ assert_eq!(aligned_size % 8, 0, "Aligned size should be multiple of 8");
+
+ // Should be >= actual size
+ assert!(aligned_size >= entry.size());
+
+ // Should be within 7 bytes of actual size
+ assert!(aligned_size - entry.size() < 8);
+}
+
+/// Test string length capping (prevents u8 overflow)
+///
+/// node_len, ident_len, tag_len are u8 and must cap at 255
+#[test]
+fn test_string_length_capping() {
+ // Create entry with very long strings
+ let long_node = "a".repeat(300);
+ let long_ident = "b".repeat(300);
+ let long_tag = "c".repeat(300);
+
+ let entry = LogEntry::pack(&long_node, &long_ident, &long_tag, 1, 1000, 6, "msg").unwrap();
+
+ // Serialize
+ let blob = entry.serialize_binary(0, 0);
+
+ // Check length fields at correct offsets in clog_entry_t:
+ // prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) + pid(4) + priority(1) = 37
+ // node_len=37, ident_len=38, tag_len=39
+ let node_len = blob[37];
+ let ident_len = blob[38];
+ let tag_len = blob[39];
+
+ // All should be capped at 255 (254 bytes + 1 null terminator)
+ assert_eq!(node_len, 255, "node_len should be capped at 255");
+ assert_eq!(ident_len, 255, "ident_len should be capped at 255");
+ assert_eq!(tag_len, 255, "tag_len should be capped at 255");
+}
+
+/// Test ClusterLog state serialization
+///
+/// Verifies get_state() returns C-compatible format
+#[test]
+fn test_cluster_log_state_format() {
+ let log = ClusterLog::new();
+
+ // Add some entries
+ log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1")
+ .unwrap();
+ log.add("node2", "admin", "system", 456, 6, 1001, "Entry 2")
+ .unwrap();
+
+ // Get state
+ let state = log.get_state().expect("Should serialize");
+
+ // Verify header format
+ assert!(state.len() >= 8, "Should have at least header");
+
+ let size = u32::from_le_bytes(state[0..4].try_into().unwrap()) as usize;
+ let cpos = u32::from_le_bytes(state[4..8].try_into().unwrap()) as usize;
+
+ assert_eq!(size, state.len(), "Size should match blob length");
+ assert!(cpos >= 8, "cpos should point into data section");
+ assert!(cpos < size, "cpos should be within buffer");
+
+ // Deserialize and verify
+ let deserialized = ClusterLog::deserialize_state(&state).expect("Should deserialize");
+ assert_eq!(deserialized.len(), 2, "Should have 2 entries");
+}
+
+/// Test wrap-around detection in deserialization
+///
+/// Verifies that circular buffer wrap-around is handled correctly
+#[test]
+fn test_wraparound_detection() {
+ // Create a buffer with entries
+ let mut buffer = RingBuffer::new(8192 * 16);
+
+ for i in 0..5 {
+ let entry = LogEntry::pack("node1", "root", "test", 100 + i, 1000 + i, 6, "msg").unwrap();
+ buffer.add_entry(&entry).unwrap();
+ }
+
+ // Serialize
+ let blob = buffer.serialize_binary();
+
+ // Deserialize (should handle prev pointers correctly)
+ let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+
+ // Should get all entries
+ assert_eq!(deserialized.len(), 5);
+}
+
+/// Test invalid binary data handling
+///
+/// Verifies that malformed data is rejected
+#[test]
+fn test_invalid_binary_data() {
+ // Too small
+ let too_small = vec![0u8; 4];
+ assert!(RingBuffer::deserialize_binary(&too_small).is_err());
+
+ // Size mismatch
+ let mut size_mismatch = vec![0u8; 100];
+ size_mismatch[0..4].copy_from_slice(&200u32.to_le_bytes()); // Claims 200 bytes
+ assert!(RingBuffer::deserialize_binary(&size_mismatch).is_err());
+
+ // Invalid cpos (beyond buffer)
+ let mut invalid_cpos = vec![0u8; 100];
+ invalid_cpos[0..4].copy_from_slice(&100u32.to_le_bytes()); // size = 100
+ invalid_cpos[4..8].copy_from_slice(&200u32.to_le_bytes()); // cpos = 200 (invalid)
+ assert!(RingBuffer::deserialize_binary(&invalid_cpos).is_err());
+}
+
+/// Test FNV-1a hash consistency
+///
+/// Verifies that node_digest and ident_digest are computed correctly
+#[test]
+fn test_hash_consistency() {
+ let entry1 = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "msg1").unwrap();
+ let entry2 = LogEntry::pack("node1", "root", "test", 2, 1001, 6, "msg2").unwrap();
+ let entry3 = LogEntry::pack("node2", "admin", "test", 3, 1002, 6, "msg3").unwrap();
+
+ // Same node should have same digest
+ assert_eq!(entry1.node_digest, entry2.node_digest);
+
+ // Same ident should have same digest
+ assert_eq!(entry1.ident_digest, entry2.ident_digest);
+
+ // Different node should have different digest
+ assert_ne!(entry1.node_digest, entry3.node_digest);
+
+ // Different ident should have different digest
+ assert_ne!(entry1.ident_digest, entry3.ident_digest);
+}
+
+/// Test priority validation
+///
+/// Priority must be 0-7 (syslog priority)
+#[test]
+fn test_priority_validation() {
+ // Valid priorities (0-7)
+ for pri in 0..=7 {
+ let result = LogEntry::pack("node1", "root", "test", 1, 1000, pri, "msg");
+ assert!(result.is_ok(), "Priority {} should be valid", pri);
+ }
+
+ // Invalid priority (8+)
+ let result = LogEntry::pack("node1", "root", "test", 1, 1000, 8, "msg");
+ assert!(result.is_err(), "Priority 8 should be invalid");
+}
+
+/// Test UTF-8 to ASCII conversion
+///
+/// Verifies control character and Unicode escaping (matches C implementation)
+#[test]
+fn test_utf8_escaping() {
+ // Control characters (C format: #XXX with 3 decimal digits)
+ let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Hello\x07World").unwrap();
+ assert!(
+ entry.message.contains("#007"),
+ "BEL should be escaped as #007"
+ );
+
+ // Unicode characters
+ let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Hello 世界").unwrap();
+ assert!(
+ entry.message.contains("\\u4e16"),
+ "世 should be escaped as \\u4e16"
+ );
+ assert!(
+ entry.message.contains("\\u754c"),
+ "界 should be escaped as \\u754c"
+ );
+
+ // Mixed content
+ let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Test\x01\n世").unwrap();
+ assert!(entry.message.contains("#001"), "SOH should be escaped");
+ assert!(entry.message.contains("#010"), "LF should be escaped");
+ assert!(
+ entry.message.contains("\\u4e16"),
+ "Unicode should be escaped"
+ );
+}
+
+/// Test that multi-entry serialization produces C-compatible prev pointer direction.
+///
+/// In C's format (produced by clog_sort + clog_alloc_entry):
+/// - Entries are laid out oldest-first in memory (oldest at offset 8).
+/// - `cpos` points to the newest (highest-offset) entry.
+/// - Each entry's `prev` points to the previous (older, lower-offset) entry.
+/// - The oldest entry has `prev = 0`.
+///
+/// This layout means that when C's deserialization loop starts at `cpos` and
+/// follows `prev` pointers, it sees `prev < current_pos` at every step, which
+/// keeps it inside the C guard condition. The previous (buggy) Rust layout
+/// stored newest-first so `prev > cpos`, causing C's guard to exit after one
+/// entry.
+#[test]
+fn test_multi_entry_cpos_and_prev_direction() {
+ use pmxcfs_logger::LogEntry;
+
+ let mut buffer = RingBuffer::new(8192 * 16);
+
+ // Add two entries: "old" (time=1000) then "new" (time=1001)
+ let old_entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "old").unwrap();
+ let new_entry = LogEntry::pack("node1", "root", "test", 2, 1001, 6, "new").unwrap();
+ buffer.add_entry(&old_entry).unwrap();
+ buffer.add_entry(&new_entry).unwrap();
+
+ let blob = buffer.serialize_binary();
+
+ // Parse the header
+ let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
+ let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+
+ assert_eq!(size, blob.len());
+
+ // cpos must point PAST offset 8 (i.e., the newest entry is not at offset 8
+ // when there are two entries; the oldest is at offset 8 instead).
+ assert!(
+ cpos > 8,
+ "cpos ({cpos}) should be > 8 when entries are stored oldest-first"
+ );
+ assert!(cpos < size, "cpos must be within buffer bounds");
+
+ // Parse the newest entry at cpos and check its prev pointer.
+ // The newest entry's prev must point BACK (lower offset) to the older entry.
+ let newest_at_cpos = &blob[cpos..];
+ // prev is the first u32 in the entry
+ let prev_of_newest = u32::from_le_bytes(newest_at_cpos[0..4].try_into().unwrap()) as usize;
+
+ assert_eq!(
+ prev_of_newest, 8,
+ "newest entry's prev ({prev_of_newest}) should point to oldest entry at offset 8"
+ );
+
+ // Parse the oldest entry at offset 8 and check its prev is 0 (no predecessor).
+ let oldest_at_8 = &blob[8..];
+ let prev_of_oldest = u32::from_le_bytes(oldest_at_8[0..4].try_into().unwrap()) as usize;
+
+ assert_eq!(
+ prev_of_oldest, 0,
+ "oldest entry's prev ({prev_of_oldest}) should be 0 (no predecessor)"
+ );
+
+ // Finally, round-trip deserialization must recover both entries in newest-first order.
+ let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+ assert_eq!(deserialized.len(), 2);
+ let entries: Vec<_> = deserialized.iter().collect();
+ assert_eq!(
+ entries[0].message, "new",
+ "First deserialized entry should be newest"
+ );
+ assert_eq!(
+ entries[1].message, "old",
+ "Second deserialized entry should be oldest"
+ );
+}
+
+// ============================================================================
+// Real C Fixture Tests
+//
+// Blobs generated by gen_fixtures.c using the real C logger (clog_pack +
+// clog_copy). These test that Rust correctly parses actual C output, not
+// just a format we happened to write the same way.
+// ============================================================================
+
+/// C fixture: single entry, cpos == 8.
+///
+/// Verifies the happy path: one entry at the expected offset with all fields
+/// matching the values supplied to clog_pack.
+#[test]
+fn test_c_fixture_single_entry() {
+ let blob = include_bytes!("fixtures/single_entry.bin");
+
+ let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
+ let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+
+ assert_eq!(size, blob.len(), "header size matches blob length");
+ assert_eq!(cpos, 8, "single entry starts at offset 8");
+
+ let buffer =
+ RingBuffer::deserialize_binary(blob).expect("C single-entry blob should deserialize");
+ assert_eq!(buffer.len(), 1);
+
+ let entries: Vec<_> = buffer.iter().collect();
+ assert_eq!(entries[0].node, "node1");
+ assert_eq!(entries[0].ident, "root");
+ assert_eq!(entries[0].tag, "cluster");
+ assert_eq!(entries[0].pid, 123);
+ assert_eq!(entries[0].time, 1000);
+ assert_eq!(entries[0].priority, 6);
+ assert_eq!(entries[0].message, "Hello from C");
+}
+
+/// C fixture: three entries, cpos > 8.
+///
+/// Verifies multi-entry linked-list traversal with a cpos that is not the
+/// first possible offset. Entries must arrive in newest-first order.
+#[test]
+fn test_c_fixture_multi_entry() {
+ let blob = include_bytes!("fixtures/multi_entry.bin");
+
+ let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+ assert!(
+ cpos > 8,
+ "multi-entry fixture must have cpos > 8 (got {cpos})"
+ );
+
+ let buffer =
+ RingBuffer::deserialize_binary(blob).expect("C multi-entry blob should deserialize");
+ assert_eq!(buffer.len(), 3, "all three C entries should be recovered");
+
+ // Entries must arrive newest-first (C sorts by time descending from cpos)
+ let entries: Vec<_> = buffer.iter().collect();
+ assert_eq!(entries[0].message, "Entry three"); // time 1002 — newest
+ assert_eq!(entries[0].node, "node1");
+ assert_eq!(entries[1].message, "Entry two"); // time 1001
+ assert_eq!(entries[1].node, "node2");
+ assert_eq!(entries[2].message, "Entry one"); // time 1000 — oldest
+ assert_eq!(entries[2].node, "node1");
+}
+
+/// C fixture: non-ASCII message content.
+///
+/// C's utf8_to_ascii encodes:
+/// BEL (0x07) → #007
+/// 世 (U+4E16) → \u4e16
+/// 界 (U+754C) → \u754c
+/// " (double-quote) → \"
+///
+/// These escape sequences are stored verbatim in the binary entry.
+/// Rust must recover them unchanged.
+#[test]
+fn test_c_fixture_nonascii() {
+ let blob = include_bytes!("fixtures/nonascii.bin");
+
+ let buffer = RingBuffer::deserialize_binary(blob).expect("C nonascii blob should deserialize");
+ assert_eq!(buffer.len(), 1);
+
+ let entries: Vec<_> = buffer.iter().collect();
+ let msg = &entries[0].message;
+
+ // Must be pure ASCII after C's escaping
+ assert!(msg.is_ascii(), "message must be ASCII after C escaping");
+
+ // Verify each escape sequence the C implementation produces
+ assert!(msg.contains("#007"), "BEL must be escaped as #007");
+ assert!(msg.contains("\\u4e16"), "世 must be escaped as \\u4e16");
+ assert!(msg.contains("\\u754c"), "界 must be escaped as \\u754c");
+ assert!(msg.contains("\\\""), "quote must be escaped as \\\"");
+}
+
+/// C fixture: ring-buffer overflow.
+///
+/// 20 large (~3 KiB) entries were added to a minimum-size (40 960-byte)
+/// buffer. Only the most recent entries fit; older ones were evicted by the
+/// ring. Verifies that Rust handles the truncated ring without panic or data
+/// corruption, and that it recovers fewer entries than were originally added.
+#[test]
+fn test_c_fixture_overflow() {
+ let blob = include_bytes!("fixtures/overflow.bin");
+
+ let buffer = RingBuffer::deserialize_binary(blob).expect("C overflow blob should deserialize");
+
+ // Fewer than 20 entries must have survived the eviction
+ assert!(
+ buffer.len() > 0,
+ "at least some entries must survive ring overflow"
+ );
+ assert!(
+ buffer.len() < 20,
+ "older entries must have been evicted (got {})",
+ buffer.len()
+ );
+
+ // The surviving entries should be the most recent ones (highest time values).
+ // time runs from 3000+0 to 3000+19 = 3019. Oldest surviving entry must
+ // have a time greater than 3000 (i.e. not the very first entries added).
+ let entries: Vec<_> = buffer.iter().collect();
+ let oldest_time = entries.iter().map(|e| e.time).min().unwrap();
+ assert!(
+ oldest_time > 3000,
+ "oldest surviving entry (time={oldest_time}) should not be the very first one added"
+ );
+}
+
+/// C fixture: JSON output for single-entry log matches C's clog_dump_json.
+///
+/// Deserializes the binary fixture, calls Rust's dump_json, then compares
+/// the logical content against the JSON file produced by clog_dump_json.
+/// Field order and whitespace may differ; we compare parsed values.
+#[test]
+fn test_c_fixture_json_matches_c_output() {
+ let bin = include_bytes!("fixtures/single_entry.bin");
+ let c_json = include_str!("fixtures/single_entry.json");
+
+ let buffer = RingBuffer::deserialize_binary(bin).expect("should deserialize");
+ let rust_json = buffer.dump_json(None, 50);
+
+ let c_val: serde_json::Value = serde_json::from_str(c_json).expect("C JSON valid");
+ let rust_val: serde_json::Value = serde_json::from_str(&rust_json).expect("Rust JSON valid");
+
+ let c_entries = c_val["data"].as_array().expect("C has data array");
+ let rust_entries = rust_val["data"].as_array().expect("Rust has data array");
+
+ assert_eq!(
+ rust_entries.len(),
+ c_entries.len(),
+ "entry count must match"
+ );
+
+ // Compare every field that C emits
+ for (r, c) in rust_entries.iter().zip(c_entries.iter()) {
+ assert_eq!(r["uid"], c["uid"], "uid mismatch");
+ assert_eq!(r["time"], c["time"], "time mismatch");
+ assert_eq!(r["pri"], c["pri"], "priority mismatch");
+ assert_eq!(r["tag"], c["tag"], "tag mismatch");
+ assert_eq!(r["pid"], c["pid"], "pid mismatch");
+ assert_eq!(r["node"], c["node"], "node mismatch");
+ assert_eq!(r["user"], c["user"], "user/ident mismatch");
+ assert_eq!(r["msg"], c["msg"], "message mismatch");
+ }
+}
+
+/// C fixture: JSON output for non-ASCII content matches C's clog_dump_json.
+///
+/// The C-generated JSON contains C's exact escape sequences (#007, \uXXXX,
+/// \"). Rust must reproduce the same msg string when dumping JSON from the
+/// same binary data.
+#[test]
+fn test_c_fixture_nonascii_json_matches_c_output() {
+ let bin = include_bytes!("fixtures/nonascii.bin");
+ let c_json = include_str!("fixtures/nonascii.json");
+
+ let buffer = RingBuffer::deserialize_binary(bin).expect("should deserialize");
+ let rust_json = buffer.dump_json(None, 50);
+
+ let c_val: serde_json::Value = serde_json::from_str(c_json).expect("C JSON valid");
+ let rust_val: serde_json::Value = serde_json::from_str(&rust_json).expect("Rust JSON valid");
+
+ let c_msg = c_val["data"][0]["msg"].as_str().expect("C has msg");
+ let rust_msg = rust_val["data"][0]["msg"].as_str().expect("Rust has msg");
+
+ // The msg field must be identical — same C escape sequences, same quote escaping
+ assert_eq!(
+ rust_msg, c_msg,
+ "non-ASCII message must match C's clog_dump_json output"
+ );
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c
new file mode 100644
index 000000000..021edd2b0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c
@@ -0,0 +1,144 @@
+/*
+ * C fixture generator for pmxcfs-logger binary compatibility tests.
+ *
+ * Uses clog_pack + clog_copy for full control over node/time values.
+ *
+ * Compile from src/pmxcfs/:
+ * gcc -o /tmp/gen_fixtures /tmp/gen_fixtures.c \
+ * -I. $(pkg-config --cflags glib-2.0) \
+ * libpmxcfs.a $(pkg-config --libs glib-2.0) -lpthread
+ *
+ * Outputs (written to outdir argument, default "."):
+ * single_entry.bin — one entry; cpos == 8
+ * multi_entry.bin — three entries, cpos > 8
+ * nonascii.bin — BEL + UTF-8 CJK + quoted string
+ * overflow.bin — 4 KiB buffer, not all 40 entries fit
+ * single_entry.json — clog_dump_json for single-entry clog
+ * nonascii.json — clog_dump_json showing C's escaping
+ */
+
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <glib.h>
+
+#include "cfs-utils.h"
+#include "logger.h"
+
+/* Stub for the global cfs_t used by cfs_log() in cfs-utils.c */
+cfs_t cfs = {
+ .nodename = "fixture-generator",
+ .ip = "127.0.0.1",
+ .gid = 0,
+ .debug = 0,
+};
+
+/* Stub for qb_log_from_external_source (from corosync libqb) */
+void qb_log_from_external_source(
+ const char *function, const char *filename,
+ const char *format, uint8_t priority, uint32_t lineno,
+ uint32_t tags, ...)
+{
+ /* suppress all logging during fixture generation */
+ (void)function; (void)filename; (void)format;
+ (void)priority; (void)lineno; (void)tags;
+}
+
+static void write_file(const char *path, const void *data, size_t len) {
+ FILE *f = fopen(path, "wb");
+ if (!f) { perror(path); exit(1); }
+ if (fwrite(data, 1, len, f) != len) { perror(path); exit(1); }
+ fclose(f);
+ fprintf(stderr, "Wrote %zu bytes -> %s\n", len, path);
+}
+
+/* Pack one entry and append it to the clog. */
+static void add_entry(
+ clog_base_t *clog,
+ const char *node, const char *ident, const char *tag,
+ uint32_t pid, time_t logtime, uint8_t priority, const char *msg)
+{
+ char buf[CLOG_MAX_ENTRY_SIZE];
+ clog_pack((clog_entry_t *)buf, node, ident, tag, pid, logtime, priority, msg);
+ clog_copy(clog, (clog_entry_t *)buf);
+}
+
+int main(int argc, char **argv) {
+ const char *outdir = argc > 1 ? argv[1] : ".";
+ char path[512];
+ clog_base_t *clog, *sorted;
+ GString *json_str;
+
+ /* ------------------------------------------------------------------ */
+ /* fixture 1: single_entry — one entry, cpos == 8 */
+ /* ------------------------------------------------------------------ */
+ clog = clog_new(CLOG_DEFAULT_SIZE);
+ add_entry(clog, "node1", "root", "cluster", 123, 1000, 6, "Hello from C");
+ snprintf(path, sizeof(path), "%s/single_entry.bin", outdir);
+ write_file(path, clog, clog_size(clog));
+ /* JSON */
+ sorted = clog_sort(clog);
+ json_str = g_string_new(NULL);
+ clog_dump_json(sorted, json_str, NULL, 50);
+ snprintf(path, sizeof(path), "%s/single_entry.json", outdir);
+ write_file(path, json_str->str, json_str->len);
+ g_string_free(json_str, TRUE);
+ g_free(sorted);
+ g_free(clog);
+
+ /* ------------------------------------------------------------------ */
+ /* fixture 2: multi_entry — three entries, cpos > 8 */
+ /* ------------------------------------------------------------------ */
+ clog = clog_new(CLOG_DEFAULT_SIZE);
+ add_entry(clog, "node1", "root", "cluster", 101, 1000, 6, "Entry one");
+ add_entry(clog, "node2", "admin", "cluster", 102, 1001, 6, "Entry two");
+ add_entry(clog, "node1", "root", "cluster", 103, 1002, 6, "Entry three");
+ snprintf(path, sizeof(path), "%s/multi_entry.bin", outdir);
+ write_file(path, clog, clog_size(clog));
+ g_free(clog);
+
+ /* ------------------------------------------------------------------ */
+ /* fixture 3: nonascii — BEL + UTF-8 CJK + quoted string */
+ /* C escapes: BEL->#007, 世->\u4e16, 界->\u754c, "->\" */
+ /* ------------------------------------------------------------------ */
+ clog = clog_new(CLOG_DEFAULT_SIZE);
+ add_entry(clog, "node1", "root", "cluster", 111, 2000, 6,
+ "Hello\x07World \xe4\xb8\x96\xe7\x95\x8c and \"quoted\"");
+ snprintf(path, sizeof(path), "%s/nonascii.bin", outdir);
+ write_file(path, clog, clog_size(clog));
+ /* JSON showing C's escaping */
+ sorted = clog_sort(clog);
+ json_str = g_string_new(NULL);
+ clog_dump_json(sorted, json_str, NULL, 50);
+ snprintf(path, sizeof(path), "%s/nonascii.json", outdir);
+ write_file(path, json_str->str, json_str->len);
+ g_string_free(json_str, TRUE);
+ g_free(sorted);
+ g_free(clog);
+
+ /* ------------------------------------------------------------------ */
+ /* fixture 4: overflow — minimum-size buffer, large entries */
+ /* clog_new requires size >= CLOG_MAX_ENTRY_SIZE * 10 (= 40960). */
+ /* Each entry is ~3 KiB so ~13 fit; adding 20 evicts the first 7+. */
+ /* ------------------------------------------------------------------ */
+ {
+ /* Build a large message (~3000 chars) to consume space quickly */
+ char big_msg[3001];
+ memset(big_msg, 'X', 3000);
+ big_msg[3000] = '\0';
+
+ clog = clog_new(CLOG_MAX_ENTRY_SIZE * 10);
+ for (int i = 0; i < 20; i++) {
+ big_msg[0] = '0' + (i % 10); /* vary first char for uniqueness */
+ big_msg[1] = 'A' + i;
+ add_entry(clog, "node1", "root", "cluster", 200 + i, 3000 + i, 6, big_msg);
+ }
+ snprintf(path, sizeof(path), "%s/overflow.bin", outdir);
+ write_file(path, clog, clog_size(clog));
+ g_free(clog);
+ }
+
+ return 0;
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/multi_entry.bin b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/multi_entry.bin
new file mode 100644
index 0000000000000000000000000000000000000000..6beabb4dcda1ce92e84c0819c4ebb76330a04ee3
GIT binary patch
literal 131072
zcmeIuJ4ypl6b9f!G(!+uOJTO4TM+H6Y_t?JK0s$+90~5grKEEM0U<WFX)G)ieD#8b
zh<6gO2)cvs!np?ym!EULh)xV+c6L+iq<US5`1-!f$4?*6H#?)%t;flBk>**}?JcEV
zR{dfv>Z)qu;Pm3WDeBlPoBA@Z$|CZO^dh2{s?AMN@s_T^aFUN#;$`%3f4g^8Tpy-+
zmSw+r>#^TIJ1OS^n?V(`ymq(GREw$JQ{Mc3N7KA+Z#ngU_iK*pqWy?NfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk
z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs
z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ
zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U
zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7
z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N
z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+
z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
v5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+z@Gv?lPg7&
literal 0
HcmV?d00001
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.bin b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.bin
new file mode 100644
index 0000000000000000000000000000000000000000..9502f7335db8448e570793dab7ea121b43afc069
GIT binary patch
literal 131072
zcmeIuF-ikb6a~<SkeE#fLo96)h|Uhg#>&EG8p-@_$j8YHxHut@%F=8hf$YFe#1C%5
z;#_!KxbT`2aS^ll%Uh{QxzEdp@1KuqfBHJ_p5CwSkB7%x_UUvoyD7cNZCXa3^APK9
z+zzRahtzfXda<aVbKkX9Gp^F|t{Lmw)w+7wlwtll<{`Dsy!;CR0t5&UAV7cs0RjXF
z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk
z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs
z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ
zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U
zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7
z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N
z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+
z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk
r1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk|5M->_K6_A
literal 0
HcmV?d00001
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.json b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.json
new file mode 100644
index 000000000..91dec616c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.json
@@ -0,0 +1,5 @@
+{
+"data": [
+{"uid": 5, "time": 2000, "pri": 6, "tag": "cluster", "pid": 111, "node": "node1", "user": "root", "msg": "Hello#007World \u4e16\u754c and \"quoted\""}
+]
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/overflow.bin b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/overflow.bin
new file mode 100644
index 0000000000000000000000000000000000000000..4eeb77352bb6d949a4d3397558cbdb734b58c13b
GIT binary patch
literal 40960
zcmeI)xoXr=6u{xr7%sT)S_l?CK;o7JtR*gSONx{MO&1Kr2tI%hkO#2HYY0L#Zcz)3
z`_d`+26iHzA>hBc{Ob#T1H%kwntQ+M#~Ef~C8A8^b2*>eBKAha)2$J||EwQf`|<1N
z>ePcHXTNVue#rUI(3WyPKfAebV{RlC7Z#S{`uy!%OLL2H;?j%<0RjXF5FkK+009Dx
z3uM)AK1#lnLlOJ(^;xC=Kjyrr|4&|S{GyIWfB*pk1PBlyK!Cu^X8aG@KX?D-Sj7H(
z|Gd)wpZfd%)D@Tn2oNAZfB*pk1PHV)(Di@r{>yO0ft;*Y`u}r(|DV3v`c0jW009C7
z2oNAZfB=E6|8w_W&O{u{$%{(=f9dc4(Q7OT5FkK+009C72oPvppzHtK{SSTzoL>XP
z%S!)$?eG7wsn&1md;|y(AV7cs0RjXFbp4;Z|1uqMI47?v{r|1M|Hr3U5+Fc;009C7
z2oNC9x<J?egZ}`Xjo6-()k^=r?(hH6iPmrGd;|y(AV7cs0RjXFbp4;Z|1ux3BPS0l
z{r{%F|Hsa;BtU=w0RjXF5FkLHb%Cz`XS4tBMC{DTqe}m8^!NYx`POghd;|y(AV7cs
z0RjXFbp4;Z|8h5CS5DR{{r|SV|A#NIBtU=w0RjXF5FkLHb%Cz`bN64CBX;NHai#y?
z_4ohC#nx}?d;|y(AV7cs0RjXFbp4;Z|8g&4Pfngx`u}}@{~w=ZNq_(W0t5&UAV7dX
P>jMA!-x~r1{y%}ghvsR-
literal 0
HcmV?d00001
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.bin b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.bin
new file mode 100644
index 0000000000000000000000000000000000000000..248c0de3108c6566fcef182ef0a843bdc0bcc472
GIT binary patch
literal 131072
zcmeIuK?*@(6b0amyi6=Xy)9USl8KRtInU_v*Tcxxlrl241xhwxCi%-M)OYH3>U2k6
zL_2!%%RE;r-?J0({#?rQ{q;D_j)U>-Iz8mQD7w9V?oC=&!)Q|4#iHJCcU2RUs;*PH
zYSOwK<qsi1fB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk
z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs
z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ
zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U
zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7
z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N
z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+
z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
K5FkL{9|gX3I2TI*
literal 0
HcmV?d00001
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.json b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.json
new file mode 100644
index 000000000..c5ac7b8d7
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.json
@@ -0,0 +1,5 @@
+{
+"data": [
+{"uid": 1, "time": 1000, "pri": 6, "tag": "cluster", "pid": 123, "node": "node1", "user": "root", "msg": "Hello from C"}
+]
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs b/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs
new file mode 100644
index 000000000..b89084d27
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs
@@ -0,0 +1,286 @@
+//! Performance tests for pmxcfs-logger
+//!
+//! These tests verify that the logger implementation scales properly
+//! and handles large log merges efficiently.
+
+use pmxcfs_logger::ClusterLog;
+
+/// Test merging large logs from multiple nodes
+///
+/// This test verifies:
+/// 1. Large log merge performance (multiple nodes with many entries)
+/// 2. Memory usage stays bounded
+/// 3. Deduplication works correctly at scale
+#[test]
+fn test_large_log_merge_performance() {
+ // Create 3 nodes with large logs
+ let node1 = ClusterLog::new();
+ let node2 = ClusterLog::new();
+ let node3 = ClusterLog::new();
+
+ // Add 1000 entries per node (3000 total)
+ for i in 0..1000 {
+ let _ = node1.add(
+ "node1",
+ "root",
+ "cluster",
+ 1000 + i,
+ 6,
+ 1000000 + i,
+ &format!("Node1 entry {}", i),
+ );
+ let _ = node2.add(
+ "node2",
+ "admin",
+ "system",
+ 2000 + i,
+ 6,
+ 1000000 + i,
+ &format!("Node2 entry {}", i),
+ );
+ let _ = node3.add(
+ "node3",
+ "user",
+ "service",
+ 3000 + i,
+ 6,
+ 1000000 + i,
+ &format!("Node3 entry {}", i),
+ );
+ }
+
+ // Get remote buffers
+ let node2_buffer = node2.get_buffer();
+ let node3_buffer = node3.get_buffer();
+
+ // Merge all logs into node1
+ let start = std::time::Instant::now();
+ node1
+ .merge(vec![node2_buffer, node3_buffer], true)
+ .expect("Merge should succeed");
+ let duration = start.elapsed();
+
+ // Verify merge completed
+ let merged_count = node1.len();
+
+ // Should have merged entries (may be less than 3000 due to capacity limits)
+ assert!(
+ merged_count > 0,
+ "Should have some entries after merge (got {})",
+ merged_count
+ );
+
+ // Performance check: merge should complete in reasonable time
+ // For 3000 entries, should be well under 1 second
+ assert!(
+ duration.as_millis() < 1000,
+ "Large merge took too long: {:?}",
+ duration
+ );
+
+ println!(
+ "[OK] Merged 3000 entries from 3 nodes in {:?} (result: {} entries)",
+ duration, merged_count
+ );
+}
+
+/// Test deduplication performance with high duplicate rate
+///
+/// This test verifies that deduplication works efficiently when
+/// many duplicate entries are present.
+#[test]
+fn test_deduplication_performance() {
+ let log = ClusterLog::new();
+
+ // Add 500 entries from same node with overlapping times
+ // This creates many potential duplicates
+ for i in 0..500 {
+ let _ = log.add(
+ "node1",
+ "root",
+ "cluster",
+ 1000 + i,
+ 6,
+ 1000 + (i / 10), // Reuse timestamps (50 unique times)
+ &format!("Entry {}", i),
+ );
+ }
+
+ // Create remote log with overlapping entries
+ let remote = ClusterLog::new();
+ for i in 0..500 {
+ let _ = remote.add(
+ "node1",
+ "root",
+ "cluster",
+ 2000 + i,
+ 6,
+ 1000 + (i / 10), // Same timestamp pattern
+ &format!("Remote entry {}", i),
+ );
+ }
+
+ let remote_buffer = remote.get_buffer();
+
+ // Merge with deduplication
+ let start = std::time::Instant::now();
+ log.merge(vec![remote_buffer], true)
+ .expect("Merge should succeed");
+ let duration = start.elapsed();
+
+ let final_count = log.len();
+
+ // Should have deduplicated some entries
+ assert!(final_count > 0, "Should have entries after deduplication");
+
+ // Performance check
+ assert!(
+ duration.as_millis() < 500,
+ "Deduplication took too long: {:?}",
+ duration
+ );
+
+ println!(
+ "[OK] Deduplicated 1000 entries in {:?} (result: {} entries)",
+ duration, final_count
+ );
+}
+
+/// Test memory usage stays bounded during large operations
+///
+/// This test verifies that the ring buffer properly limits memory
+/// usage even when adding many entries.
+#[test]
+fn test_memory_bounded() {
+ // Create log with default capacity
+ let log = ClusterLog::new();
+
+ // Add many entries (more than capacity)
+ for i in 0..10000 {
+ let _ = log.add(
+ "node1",
+ "root",
+ "cluster",
+ 1000 + i,
+ 6,
+ 1000000 + i,
+ &format!("Entry with some message content {}", i),
+ );
+ }
+
+ let entry_count = log.len();
+ let capacity = log.capacity();
+
+ // Buffer should not grow unbounded
+ // Entry count should be reasonable relative to capacity
+ assert!(
+ entry_count < 10000,
+ "Buffer should not store all 10000 entries (got {})",
+ entry_count
+ );
+
+ // Verify capacity is respected
+ assert!(capacity > 0, "Capacity should be set (got {})", capacity);
+
+ println!(
+ "[OK] Added 10000 entries, buffer contains {} (capacity: {} bytes)",
+ entry_count, capacity
+ );
+}
+
+/// Test JSON export performance with large logs
+///
+/// This test verifies that JSON export scales properly.
+#[test]
+fn test_json_export_performance() {
+ let log = ClusterLog::new();
+
+ // Add 1000 entries
+ for i in 0..1000 {
+ let _ = log.add(
+ "node1",
+ "root",
+ "cluster",
+ 1000 + i,
+ 6,
+ 1000000 + i,
+ &format!("Test message {}", i),
+ );
+ }
+
+ // Export to JSON
+ let start = std::time::Instant::now();
+ let json = log.dump_json(None, 1000);
+ let duration = start.elapsed();
+
+ // Verify JSON is valid
+ let parsed: serde_json::Value = serde_json::from_str(&json).expect("Should be valid JSON");
+ let data = parsed["data"].as_array().expect("Should have data array");
+
+ assert!(!data.is_empty(), "Should have entries in JSON");
+
+ // Performance check
+ assert!(
+ duration.as_millis() < 500,
+ "JSON export took too long: {:?}",
+ duration
+ );
+
+ println!(
+ "[OK] Exported {} entries to JSON in {:?}",
+ data.len(),
+ duration
+ );
+}
+
+/// Test binary serialization performance
+///
+/// This test verifies that binary serialization/deserialization
+/// is efficient for large buffers.
+#[test]
+fn test_binary_serialization_performance() {
+ let log = ClusterLog::new();
+
+ // Add 500 entries
+ for i in 0..500 {
+ let _ = log.add(
+ "node1",
+ "root",
+ "cluster",
+ 1000 + i,
+ 6,
+ 1000000 + i,
+ &format!("Entry {}", i),
+ );
+ }
+
+ // Serialize
+ let start = std::time::Instant::now();
+ let state = log.get_state().expect("Should serialize");
+ let serialize_duration = start.elapsed();
+
+ // Deserialize
+ let start = std::time::Instant::now();
+ let deserialized = ClusterLog::deserialize_state(&state).expect("Should deserialize");
+ let deserialize_duration = start.elapsed();
+
+ // Verify round-trip
+ assert_eq!(deserialized.len(), 500, "Should preserve entry count");
+
+ // Performance checks
+ assert!(
+ serialize_duration.as_millis() < 200,
+ "Serialization took too long: {:?}",
+ serialize_duration
+ );
+ assert!(
+ deserialize_duration.as_millis() < 200,
+ "Deserialization took too long: {:?}",
+ deserialize_duration
+ );
+
+ println!(
+ "[OK] Serialized 500 entries in {:?}, deserialized in {:?}",
+ serialize_duration, deserialize_duration
+ );
+}
--
2.47.3
next prev parent reply other threads:[~2026-03-23 13:00 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-03-23 11:32 [PATCH pve-cluster v3 00/13] Rewrite pmxcfs with Rust Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 01/13] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 02/13] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-03-23 11:32 ` Kefu Chai [this message]
2026-03-23 11:32 ` [PATCH pve-cluster v3 04/13] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 05/13] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-03-23 11:32 ` SPAM: [PATCH pve-cluster v3 06/13] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 07/13] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 08/13] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 09/13] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 10/13] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 11/13] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 13/13] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260323113239.942866-4-k.chai@proxmox.com \
--to=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
--cc=tchaikov@gmail.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox