public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Cc: Kefu Chai <tchaikov@gmail.com>
Subject: [PATCH pve-cluster v3 03/13] pmxcfs-rs: add pmxcfs-logger crate
Date: Mon, 23 Mar 2026 19:32:18 +0800	[thread overview]
Message-ID: <20260323113239.942866-4-k.chai@proxmox.com> (raw)
In-Reply-To: <20260323113239.942866-1-k.chai@proxmox.com>

Add cluster log management crate compatible with the C logger.c:
- LogEntry: Individual log entry with automatic UID generation
- RingBuffer: Circular buffer matching C's clog_base_t layout
- ClusterLog: Main API with per-node deduplication and merging
- FNV-1a hash functions matching the C implementation

Includes binary compatibility tests with C-generated fixtures to
verify the on-wire log entry format.

Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
 src/pmxcfs-rs/Cargo.toml                      |   2 +
 src/pmxcfs-rs/pmxcfs-logger/Cargo.toml        |  15 +
 src/pmxcfs-rs/pmxcfs-logger/README.md         |  58 ++
 .../pmxcfs-logger/src/cluster_log.rs          | 610 +++++++++++++++
 src/pmxcfs-rs/pmxcfs-logger/src/entry.rs      | 734 ++++++++++++++++++
 src/pmxcfs-rs/pmxcfs-logger/src/hash.rs       | 129 +++
 src/pmxcfs-rs/pmxcfs-logger/src/lib.rs        |  29 +
 .../pmxcfs-logger/src/ring_buffer.rs          | 542 +++++++++++++
 .../tests/binary_compatibility_tests.rs       | 611 +++++++++++++++
 .../tests/fixtures/gen_fixtures.c             | 144 ++++
 .../tests/fixtures/multi_entry.bin            | Bin 0 -> 131072 bytes
 .../pmxcfs-logger/tests/fixtures/nonascii.bin | Bin 0 -> 131072 bytes
 .../tests/fixtures/nonascii.json              |   5 +
 .../pmxcfs-logger/tests/fixtures/overflow.bin | Bin 0 -> 40960 bytes
 .../tests/fixtures/single_entry.bin           | Bin 0 -> 131072 bytes
 .../tests/fixtures/single_entry.json          |   5 +
 .../pmxcfs-logger/tests/performance_tests.rs  | 286 +++++++
 17 files changed, 3170 insertions(+)
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/README.md
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/multi_entry.bin
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.bin
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.json
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/overflow.bin
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.bin
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.json
 create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs

diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 99bb79266..f2ed02c6f 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -3,6 +3,7 @@
 members = [
     "pmxcfs-api-types",  # Shared types and error definitions
     "pmxcfs-config",     # Configuration management
+    "pmxcfs-logger",     # Cluster log with ring buffer and deduplication
 ]
 resolver = "2"
 
@@ -18,6 +19,7 @@ rust-version = "1.85"
 # Internal workspace dependencies
 pmxcfs-api-types = { path = "pmxcfs-api-types" }
 pmxcfs-config = { path = "pmxcfs-config" }
+pmxcfs-logger = { path = "pmxcfs-logger" }
 
 # Error handling
 thiserror = "2.0"
diff --git a/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml b/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
new file mode 100644
index 000000000..1af3f015c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "pmxcfs-logger"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+anyhow = "1.0"
+parking_lot = "0.12"
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+tracing = "0.1"
+
+[dev-dependencies]
+tempfile = "3.0"
+
diff --git a/src/pmxcfs-rs/pmxcfs-logger/README.md b/src/pmxcfs-rs/pmxcfs-logger/README.md
new file mode 100644
index 000000000..38f102c27
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/README.md
@@ -0,0 +1,58 @@
+# pmxcfs-logger
+
+Cluster-wide log management for pmxcfs, fully compatible with the C implementation (logger.c).
+
+## Overview
+
+This crate implements a cluster log system matching Proxmox's C-based logger.c behavior. It provides:
+
+- **Ring Buffer Storage**: Circular buffer for log entries with automatic capacity management
+- **FNV-1a Hashing**: Hashing for node and identity-based deduplication
+- **Deduplication**: Per-node tracking of latest log entries to avoid duplicates
+- **Time-based Sorting**: Chronological ordering of log entries across nodes
+- **Multi-node Merging**: Combining logs from multiple cluster nodes
+- **JSON Export**: Web UI-compatible JSON output matching C format
+
+## Architecture
+
+### Key Components
+
+1. **LogEntry** (`entry.rs`): Individual log entry with automatic UID generation
+2. **RingBuffer** (`ring_buffer.rs`): Circular buffer with capacity management
+3. **ClusterLog** (`lib.rs`): Main API with deduplication and merging
+4. **Hash Functions** (`hash.rs`): FNV-1a implementation matching C
+
+## C to Rust Mapping
+
+| C Function | Rust Equivalent | Location |
+|------------|-----------------|----------|
+| `fnv_64a_buf` | `hash::fnv_64a` | hash.rs |
+| `clog_pack` | `LogEntry::pack` | entry.rs |
+| `clog_copy` | `RingBuffer::add_entry` | ring_buffer.rs |
+| `clog_sort` | `RingBuffer::sort` | ring_buffer.rs |
+| `clog_dump_json` | `RingBuffer::dump_json` | ring_buffer.rs |
+| `clusterlog_insert` | `ClusterLog::insert` | lib.rs |
+| `clusterlog_add` | `ClusterLog::add` | lib.rs |
+| `clusterlog_merge` | `ClusterLog::merge` | lib.rs |
+| `dedup_lookup` | `ClusterLog::dedup_lookup` | lib.rs |
+
+## Key Differences from C
+
+1. **No `node_digest` in DedupEntry**: C stores `node_digest` both as HashMap key and in the struct. Rust only uses it as the key, saving 8 bytes per entry.
+
+2. **Mutex granularity**: C uses a single global mutex. Rust uses separate Arc<Mutex<>> for buffer and dedup table, allowing better concurrency.
+
+3. **Code size**: Rust implementation is ~24% the size of C (740 lines vs 3,000+) while maintaining equivalent functionality.
+
+## Integration
+
+This crate is integrated into `pmxcfs-status` to provide cluster log functionality. The `.clusterlog` FUSE plugin uses this to provide JSON log output compatible with the Proxmox web UI.
+
+## References
+
+### C Implementation
+- `src/pmxcfs/logger.c` / `logger.h` - Cluster log implementation
+
+### Related Crates
+- **pmxcfs-status**: Integrates ClusterLog for status tracking
+- **pmxcfs**: FUSE plugin exposes cluster log via `.clusterlog`
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs b/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
new file mode 100644
index 000000000..1f4b5307f
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs
@@ -0,0 +1,610 @@
+/// Cluster Log Implementation
+///
+/// This module implements the cluster-wide log system with deduplication
+/// and merging support, matching C's clusterlog_t.
+use crate::entry::LogEntry;
+use crate::ring_buffer::{RingBuffer, CLOG_DEFAULT_SIZE};
+use anyhow::Result;
+use parking_lot::Mutex;
+use std::collections::{BTreeMap, HashMap};
+use std::sync::Arc;
+
+/// Deduplication entry - tracks the latest UID and time for each node
+///
+/// Note: C's `dedup_entry_t` includes node_digest field because GHashTable stores
+/// the struct pointer both as key and value. In Rust, we use HashMap<u64, DedupEntry>
+/// where node_digest is the key, so we don't need to duplicate it in the value.
+/// This is functionally equivalent but more efficient.
+#[derive(Debug, Clone)]
+pub(crate) struct DedupEntry {
+    /// Latest UID seen from this node
+    pub uid: u32,
+    /// Latest timestamp seen from this node
+    pub time: u32,
+}
+
+/// Internal state protected by a single mutex
+/// Matches C's clusterlog_t which uses a single mutex for both base and dedup
+struct ClusterLogInner {
+    /// Ring buffer for log storage (matches C's cl->base)
+    buffer: RingBuffer,
+    /// Deduplication tracker (matches C's cl->dedup)
+    dedup: HashMap<u64, DedupEntry>,
+}
+
+/// Cluster-wide log with deduplication and merging support
+/// Matches C's `clusterlog_t`
+///
+/// Note: Unlike the initial implementation with separate mutexes, we use a single
+/// mutex to match C's semantics and ensure atomic updates of buffer+dedup.
+pub struct ClusterLog {
+    /// Inner state protected by a single mutex
+    /// Matches C's single g_mutex_t protecting both cl->base and cl->dedup
+    inner: Arc<Mutex<ClusterLogInner>>,
+}
+
+impl ClusterLog {
+    /// Create a new cluster log with default size
+    pub fn new() -> Self {
+        Self::with_capacity(CLOG_DEFAULT_SIZE)
+    }
+
+    /// Create a new cluster log with specified capacity
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            inner: Arc::new(Mutex::new(ClusterLogInner {
+                buffer: RingBuffer::new(capacity),
+                dedup: HashMap::new(),
+            })),
+        }
+    }
+
+    /// Matches C's `clusterlog_add` function
+    #[must_use = "log insertion errors should be handled"]
+    #[allow(clippy::too_many_arguments)]
+    pub fn add(
+        &self,
+        node: &str,
+        ident: &str,
+        tag: &str,
+        pid: u32,
+        priority: u8,
+        time: u32,
+        message: &str,
+    ) -> Result<()> {
+        let entry = LogEntry::pack(node, ident, tag, pid, time, priority, message)?;
+        self.insert(&entry)
+    }
+
+    /// Insert a log entry (with deduplication)
+    ///
+    /// Matches C's `clusterlog_insert` function
+    #[must_use = "log insertion errors should be handled"]
+    pub fn insert(&self, entry: &LogEntry) -> Result<()> {
+        let mut inner = self.inner.lock();
+
+        // Check deduplication
+        if Self::is_not_duplicate(&mut inner.dedup, entry) {
+            // Entry is not a duplicate, add it
+            inner.buffer.add_entry(entry)?;
+        } else {
+            tracing::debug!("Ignoring duplicate cluster log entry");
+        }
+
+        Ok(())
+    }
+
+    /// Check if entry is a duplicate (returns true if NOT a duplicate)
+    ///
+    /// Matches C's `dedup_lookup` function
+    ///
+    /// ## Hash Collision Risk
+    ///
+    /// Uses FNV-1a hash (`node_digest`) as deduplication key. Hash collisions
+    /// are theoretically possible but extremely rare in practice:
+    ///
+    /// - FNV-1a produces 64-bit hashes (2^64 possible values)
+    /// - Collision probability with N entries: ~N²/(2 × 2^64)
+    /// - For 10,000 log entries: collision probability < 10^-11
+    ///
+    /// If a collision occurs, two different log entries (from different nodes
+    /// or with different content) will be treated as duplicates, causing one
+    /// to be silently dropped.
+    ///
+    /// This design is inherited from the C implementation for compatibility.
+    /// The risk is acceptable because:
+    /// 1. Collisions are astronomically rare
+    /// 2. Only affects log deduplication, not critical data integrity
+    /// 3. Lost log entries don't compromise cluster operation
+    ///
+    /// Changing this would break wire format compatibility with C nodes.
+    fn is_not_duplicate(dedup: &mut HashMap<u64, DedupEntry>, entry: &LogEntry) -> bool {
+        let dd = dedup
+            .entry(entry.node_digest)
+            .or_insert(DedupEntry { time: 0, uid: 0 });
+
+        if entry.time > dd.time || (entry.time == dd.time && entry.uid > dd.uid) {
+            dd.time = entry.time;
+            dd.uid = entry.uid;
+            true
+        } else {
+            false
+        }
+    }
+
+    pub fn get_entries(&self, max: usize) -> Vec<LogEntry> {
+        let inner = self.inner.lock();
+        inner.buffer.iter().take(max).collect()
+    }
+
+    /// Get the current buffer (for testing)
+    pub fn get_buffer(&self) -> RingBuffer {
+        let inner = self.inner.lock();
+        inner.buffer.clone()
+    }
+
+    /// Get buffer length (for testing)
+    pub fn len(&self) -> usize {
+        let inner = self.inner.lock();
+        inner.buffer.len()
+    }
+
+    /// Get buffer capacity (for testing)
+    pub fn capacity(&self) -> usize {
+        let inner = self.inner.lock();
+        inner.buffer.capacity()
+    }
+
+    /// Check if buffer is empty (for testing)
+    pub fn is_empty(&self) -> bool {
+        let inner = self.inner.lock();
+        inner.buffer.is_empty()
+    }
+
+    /// Clear all log entries (for testing)
+    pub fn clear(&self) {
+        let mut inner = self.inner.lock();
+        let capacity = inner.buffer.capacity();
+        inner.buffer = RingBuffer::new(capacity);
+        inner.dedup.clear();
+    }
+
+    /// Sort the log entries by time, returning a new `RingBuffer` with entries in sorted order.
+    ///
+    /// Matches C's `clog_sort` function.
+    pub fn sort(&self) -> RingBuffer {
+        let inner = self.inner.lock();
+        let sorted_refs = inner.buffer.sort_entries();
+        let mut result = RingBuffer::new(inner.buffer.capacity());
+        // sort_entries() returns newest-first; add_entry pushes to front (newest-first),
+        // so iterate in reverse (oldest-first) to preserve the correct order.
+        for entry in sorted_refs.iter().rev() {
+            let _ = result.add_entry(entry);
+        }
+        result
+    }
+
+    /// Merge logs from multiple nodes
+    ///
+    /// Matches C's `clusterlog_merge` function
+    ///
+    /// This method atomically updates both the buffer and dedup state under a single
+    /// mutex lock, matching C's behavior where both cl->base and cl->dedup are
+    /// updated under cl->mutex.
+    #[must_use = "merge errors should be handled"]
+    pub fn merge(&self, remote_logs: Vec<RingBuffer>, include_local: bool) -> Result<()> {
+        let mut sorted_entries: BTreeMap<(u32, u64, u32), LogEntry> = BTreeMap::new();
+        let mut merge_dedup: HashMap<u64, DedupEntry> = HashMap::new();
+
+        // Lock once for the entire operation (matching C's single mutex)
+        let mut inner = self.inner.lock();
+
+        // Calculate maximum capacity across all buffers
+        let local_cap = if include_local {
+            Some(inner.buffer.capacity())
+        } else {
+            None
+        };
+        let max_size = local_cap
+            .into_iter()
+            .chain(remote_logs.iter().map(|b| b.capacity()))
+            .max()
+            .unwrap_or(CLOG_DEFAULT_SIZE);
+
+        // Helper: insert entry into sorted map with keep-first dedup (matching C's g_tree_lookup guard)
+        let mut insert_entry = |entry: &LogEntry| {
+            let key = (entry.time, entry.node_digest, entry.uid);
+            if let std::collections::btree_map::Entry::Vacant(e) = sorted_entries.entry(key) {
+                e.insert(entry.clone());
+                let _ = Self::is_not_duplicate(&mut merge_dedup, entry);
+            }
+        };
+
+        // Add local entries if requested
+        if include_local {
+            for entry in inner.buffer.iter() {
+                insert_entry(&entry);
+            }
+        }
+
+        // Add remote entries
+        for remote_buffer in &remote_logs {
+            for entry in remote_buffer.iter() {
+                insert_entry(&entry);
+            }
+        }
+
+        let mut result = RingBuffer::new(max_size);
+
+        // BTreeMap iterates oldest->newest. We add each as new head (push_front),
+        // so result ends with newest at head, matching C's behavior.
+        // Fill to 100% capacity (matching C's behavior), not just 90%
+        for (_key, entry) in sorted_entries.iter() {
+            // add_entry will automatically evict old entries if needed to stay within capacity
+            result.add_entry(entry)?;
+        }
+
+        // Atomically update both buffer and dedup (matches C lines 503-507)
+        inner.buffer = result;
+        inner.dedup = merge_dedup;
+
+        Ok(())
+    }
+
+    /// Export log to JSON format
+    ///
+    /// Matches C's `clog_dump_json` function
+    pub fn dump_json(&self, ident_filter: Option<&str>, max_entries: usize) -> String {
+        let inner = self.inner.lock();
+        inner.buffer.dump_json(ident_filter, max_entries)
+    }
+
+    /// Export log to JSON format with sorted entries
+    pub fn dump_json_sorted(&self, ident_filter: Option<&str>, max_entries: usize) -> String {
+        let inner = self.inner.lock();
+        let sorted = inner.buffer.sort_entries();
+        let ident_digest = ident_filter.map(crate::hash::fnv_64a_str);
+        let data: Vec<serde_json::Value> = sorted
+            .iter()
+            .filter(|entry| ident_digest.is_none_or(|digest| digest == entry.ident_digest))
+            .take(max_entries)
+            .map(|entry| entry.to_json_object())
+            .collect();
+        let result = serde_json::json!({ "data": data });
+        serde_json::to_string_pretty(&result).unwrap_or_else(|_| "{}".to_string())
+    }
+
+    /// Matches C's `clusterlog_get_state` function
+    ///
+    /// Returns binary-serialized clog_base_t structure for network transmission.
+    /// This format is compatible with C nodes for mixed-cluster operation.
+    #[must_use = "serialized state should be used or stored"]
+    pub fn get_state(&self) -> Result<Vec<u8>> {
+        let sorted = self.sort();
+        Ok(sorted.serialize_binary())
+    }
+
+    pub fn deserialize_state(data: &[u8]) -> Result<RingBuffer> {
+        RingBuffer::deserialize_binary(data)
+    }
+}
+
+impl Default for ClusterLog {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_cluster_log_creation() {
+        let log = ClusterLog::new();
+        assert!(log.inner.lock().buffer.is_empty());
+    }
+
+    #[test]
+    fn test_add_entry() {
+        let log = ClusterLog::new();
+
+        let result = log.add(
+            "node1",
+            "root",
+            "cluster",
+            12345,
+            6, // Info priority
+            1234567890,
+            "Test message",
+        );
+
+        assert!(result.is_ok());
+        assert!(!log.inner.lock().buffer.is_empty());
+    }
+
+    #[test]
+    fn test_deduplication() {
+        let log = ClusterLog::new();
+
+        // Add same entry twice (but with different UIDs since each add creates a new entry)
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Message 1");
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Message 1");
+
+        // Both entries are added because they have different UIDs
+        // Deduplication tracks the latest (time, UID) per node, not content
+        let inner = log.inner.lock();
+        assert_eq!(inner.buffer.len(), 2);
+    }
+
+    #[test]
+    fn test_newer_entry_replaces() {
+        let log = ClusterLog::new();
+
+        // Add older entry
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Old message");
+
+        // Add newer entry from same node
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1001, "New message");
+
+        // Should have both entries (newer doesn't remove older, just updates dedup tracker)
+        let inner = log.inner.lock();
+        assert_eq!(inner.buffer.len(), 2);
+    }
+
+    #[test]
+    fn test_json_export() {
+        let log = ClusterLog::new();
+
+        let _ = log.add(
+            "node1",
+            "root",
+            "cluster",
+            123,
+            6,
+            1234567890,
+            "Test message",
+        );
+
+        let json = log.dump_json(None, 50);
+
+        // Should be valid JSON
+        assert!(serde_json::from_str::<serde_json::Value>(&json).is_ok());
+
+        // Should contain "data" field
+        let value: serde_json::Value = serde_json::from_str(&json).unwrap();
+        assert!(value.get("data").is_some());
+    }
+
+    #[test]
+    fn test_merge_logs() {
+        let log1 = ClusterLog::new();
+        let log2 = ClusterLog::new();
+
+        // Add entries to first log
+        let _ = log1.add(
+            "node1",
+            "root",
+            "cluster",
+            123,
+            6,
+            1000,
+            "Message from node1",
+        );
+
+        // Add entries to second log
+        let _ = log2.add(
+            "node2",
+            "root",
+            "cluster",
+            456,
+            6,
+            1001,
+            "Message from node2",
+        );
+
+        // Get log2's buffer for merging
+        let log2_buffer = log2.inner.lock().buffer.clone();
+
+        // Merge into log1 (updates log1's buffer atomically)
+        log1.merge(vec![log2_buffer], true).unwrap();
+
+        // Check log1's buffer now contains entries from both logs
+        let inner = log1.inner.lock();
+        assert!(inner.buffer.len() >= 2);
+    }
+
+    // ========================================================================
+    // HIGH PRIORITY TESTS - Merge Edge Cases
+    // ========================================================================
+
+    #[test]
+    fn test_merge_empty_logs() {
+        let log = ClusterLog::new();
+
+        // Add some entries to local log
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Local entry");
+
+        // Merge with empty remote logs (updates buffer atomically)
+        log.merge(vec![], true).unwrap();
+
+        // Check buffer has 1 entry (from local log)
+        let inner = log.inner.lock();
+        assert_eq!(inner.buffer.len(), 1);
+        let entry = inner.buffer.iter().next().unwrap();
+        assert_eq!(entry.node, "node1");
+    }
+
+    #[test]
+    fn test_merge_single_node_only() {
+        let log = ClusterLog::new();
+
+        // Add entries only from single node
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
+        let _ = log.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
+        let _ = log.add("node1", "root", "cluster", 125, 6, 1002, "Entry 3");
+
+        // Merge with no remote logs (just sort local)
+        log.merge(vec![], true).unwrap();
+
+        // Check buffer has all 3 entries
+        let inner = log.inner.lock();
+        assert_eq!(inner.buffer.len(), 3);
+
+        // Entries should be stored newest first in the buffer
+        let times: Vec<u32> = inner.buffer.iter().map(|e| e.time).collect();
+        assert_eq!(times, vec![1002, 1001, 1000]);
+    }
+
+    #[test]
+    fn test_merge_all_duplicates() {
+        let log1 = ClusterLog::new();
+        let log2 = ClusterLog::new();
+
+        // Add same entries to both logs (same node, time, but different UIDs)
+        let _ = log1.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
+        let _ = log1.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
+
+        let _ = log2.add("node1", "root", "cluster", 125, 6, 1000, "Entry 1");
+        let _ = log2.add("node1", "root", "cluster", 126, 6, 1001, "Entry 2");
+
+        let log2_buffer = log2.inner.lock().buffer.clone();
+
+        // Merge - should handle entries from same node at same times
+        log1.merge(vec![log2_buffer], true).unwrap();
+
+        // Check merged buffer has 4 entries (all are unique by UID despite same time/node)
+        let inner = log1.inner.lock();
+        assert_eq!(inner.buffer.len(), 4);
+    }
+
+    #[test]
+    fn test_merge_exceeding_capacity() {
+        // Create small buffer to test capacity enforcement
+        let log = ClusterLog::with_capacity(50_000); // Small buffer
+
+        // Add many entries to fill beyond capacity
+        for i in 0..100 {
+            let _ = log.add(
+                "node1",
+                "root",
+                "cluster",
+                100 + i,
+                6,
+                1000 + i,
+                &format!("Entry {}", i),
+            );
+        }
+
+        // Create remote log with many entries
+        let remote = ClusterLog::with_capacity(50_000);
+        for i in 0..100 {
+            let _ = remote.add(
+                "node2",
+                "root",
+                "cluster",
+                200 + i,
+                6,
+                1000 + i,
+                &format!("Remote {}", i),
+            );
+        }
+
+        let remote_buffer = remote.inner.lock().buffer.clone();
+
+        // Merge - should stop when buffer is near full
+        log.merge(vec![remote_buffer], true).unwrap();
+
+        // Buffer should be limited by capacity, not necessarily < 200
+        // The actual limit depends on entry sizes and capacity
+        // Just verify we got some reasonable number of entries
+        let inner = log.inner.lock();
+        assert!(!inner.buffer.is_empty(), "Should have some entries");
+        assert!(
+            inner.buffer.len() <= 200,
+            "Should not exceed total available entries"
+        );
+    }
+
+    #[test]
+    fn test_merge_preserves_dedup_state() {
+        let log = ClusterLog::new();
+
+        // Add entries from node1
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
+        let _ = log.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2");
+
+        // Create remote log with later entries from node1
+        let remote = ClusterLog::new();
+        let _ = remote.add("node1", "root", "cluster", 125, 6, 1002, "Entry 3");
+
+        let remote_buffer = remote.inner.lock().buffer.clone();
+
+        // Merge
+        log.merge(vec![remote_buffer], true).unwrap();
+
+        // Check that dedup state was updated
+        let inner = log.inner.lock();
+        let node1_digest = crate::hash::fnv_64a_str("node1");
+        let dedup_entry = inner.dedup.get(&node1_digest).unwrap();
+
+        // Should track the latest time from node1
+        assert_eq!(dedup_entry.time, 1002);
+        // UID is auto-generated, so just verify it exists and is reasonable
+        assert!(dedup_entry.uid > 0);
+    }
+
+    #[test]
+    fn test_get_state_binary_format() {
+        let log = ClusterLog::new();
+
+        // Add some entries
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1");
+        let _ = log.add("node2", "admin", "system", 456, 6, 1001, "Entry 2");
+
+        // Get state
+        let state = log.get_state().unwrap();
+
+        // Should be binary format, not JSON
+        assert!(state.len() >= 8); // At least header
+
+        // Check header format (clog_base_t)
+        let size = u32::from_le_bytes(state[0..4].try_into().unwrap()) as usize;
+        let cpos = u32::from_le_bytes(state[4..8].try_into().unwrap());
+
+        assert_eq!(size, state.len());
+        // cpos points to the newest entry. With N entries stored oldest-first,
+        // the newest entry is at offset > 8 (not necessarily 8).
+        assert!(cpos >= 8, "cpos must point into the data section");
+        assert!((cpos as usize) < size, "cpos must be within buffer bounds");
+
+        // Should be able to deserialize back
+        let deserialized = ClusterLog::deserialize_state(&state).unwrap();
+        assert_eq!(deserialized.len(), 2);
+    }
+
+    #[test]
+    fn test_state_roundtrip() {
+        let log = ClusterLog::new();
+
+        // Add entries
+        let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Test 1");
+        let _ = log.add("node2", "admin", "system", 456, 6, 1001, "Test 2");
+
+        // Serialize
+        let state = log.get_state().unwrap();
+
+        // Deserialize
+        let deserialized = ClusterLog::deserialize_state(&state).unwrap();
+
+        // Check entries preserved
+        assert_eq!(deserialized.len(), 2);
+
+        // Buffer is stored newest-first after sorting and serialization
+        let entries: Vec<_> = deserialized.iter().collect();
+        assert_eq!(entries[0].node, "node2"); // Newest (time 1001)
+        assert_eq!(entries[0].message, "Test 2");
+        assert_eq!(entries[1].node, "node1"); // Oldest (time 1000)
+        assert_eq!(entries[1].message, "Test 1");
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs b/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
new file mode 100644
index 000000000..3f8bd936c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs
@@ -0,0 +1,734 @@
+/// Log Entry Implementation
+///
+/// This module implements the cluster log entry structure, matching the C
+/// implementation's clog_entry_t (logger.c).
+use super::hash::fnv_64a_str;
+use anyhow::{bail, Result};
+use serde::Serialize;
+use std::fmt::Write;
+use std::sync::atomic::{AtomicU32, Ordering};
+
+// Import constant from ring_buffer to avoid duplication
+use crate::ring_buffer::CLOG_MAX_ENTRY_SIZE;
+
+/// Fixed header size of a clog_entry_t in bytes:
+/// prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) +
+/// pid(4) + priority(1) + node_len(1) + ident_len(1) + tag_len(1) + msg_len(4) = 44
+pub const CLOG_ENTRY_HEADER_SIZE: usize = 44;
+
+/// Global UID counter (matches C's `uid_counter` global variable)
+///
+/// # UID Wraparound Behavior
+///
+/// The UID counter is a 32-bit unsigned integer that wraps around after 2^32 entries.
+/// This matches the C implementation's behavior (logger.c:62).
+///
+/// **Wraparound implications:**
+/// - At 1000 entries/second: wraparound after ~49 days
+/// - At 100 entries/second: wraparound after ~497 days
+/// - After wraparound, UIDs restart from 1
+///
+/// **Impact on deduplication:**
+/// The deduplication logic compares (time, UID) tuples. After wraparound, an entry
+/// with UID=1 might be incorrectly considered older than an entry with UID=4294967295,
+/// even if they have the same timestamp. This is a known limitation inherited from
+/// the C implementation.
+///
+/// **Mitigation:**
+/// - Entries with different timestamps are correctly ordered (time is primary sort key)
+/// - Wraparound only affects entries with identical timestamps from the same node
+/// - A warning is logged when wraparound occurs (see fetch_add below)
+static UID_COUNTER: AtomicU32 = AtomicU32::new(0);
+
+/// Allocate the next unique ID from the global log entry UID counter.
+///
+/// Shared by all log entry producers so they draw from a single sequence,
+/// preventing (time, nodeid, uid) collisions between the database DFSM and
+/// the status DFSM on the same node.
+pub fn next_uid() -> u32 {
+    let old = UID_COUNTER.fetch_add(1, Ordering::SeqCst);
+    if old == u32::MAX {
+        tracing::warn!(
+            "UID counter wrapped around (2^32 entries reached). \
+             Deduplication may be affected for entries with identical timestamps."
+        );
+    }
+    old.wrapping_add(1)
+}
+
+/// Log entry structure
+///
+/// Matches C's `clog_entry_t` from logger.c:
+/// ```c
+/// typedef struct {
+///     uint32_t prev;          // Previous entry offset
+///     uint32_t next;          // Next entry offset
+///     uint32_t uid;           // Unique ID
+///     uint32_t time;          // Timestamp
+///     uint64_t node_digest;   // FNV-1a hash of node name
+///     uint64_t ident_digest;  // FNV-1a hash of ident
+///     uint32_t pid;           // Process ID
+///     uint8_t priority;       // Syslog priority (0-7)
+///     uint8_t node_len;       // Length of node name (including null)
+///     uint8_t ident_len;      // Length of ident (including null)
+///     uint8_t tag_len;        // Length of tag (including null)
+///     uint32_t msg_len;       // Length of message (including null)
+///     char data[];            // Variable length data: node + ident + tag + msg
+/// } clog_entry_t;
+/// ```
+#[derive(Debug, Clone, Serialize)]
+pub struct LogEntry {
+    /// Unique ID for this entry (auto-incrementing)
+    pub uid: u32,
+
+    /// Unix timestamp
+    pub time: u32,
+
+    /// FNV-1a hash of node name.
+    ///
+    /// Stored separately from `node` for C binary wire format compatibility
+    /// (these fields appear in the clog_entry_t struct). Not re-derived on
+    /// deserialization — the stored value must round-trip exactly.
+    pub node_digest: u64,
+
+    /// FNV-1a hash of ident (user).
+    ///
+    /// Stored separately from `ident` for C binary wire format compatibility.
+    /// Not re-derived on deserialization.
+    pub ident_digest: u64,
+
+    /// Process ID
+    pub pid: u32,
+
+    /// Syslog priority (0-7)
+    pub priority: u8,
+
+    /// Node name
+    pub node: String,
+
+    /// Identity/user
+    pub ident: String,
+
+    /// Tag (e.g., "cluster", "pmxcfs")
+    pub tag: String,
+
+    /// Log message
+    pub message: String,
+}
+
+impl LogEntry {
+    /// Matches C's `clog_pack` function
+    #[must_use = "packed entry should be used"]
+    pub fn pack(
+        node: &str,
+        ident: &str,
+        tag: &str,
+        pid: u32,
+        time: u32,
+        priority: u8,
+        message: &str,
+    ) -> Result<Self> {
+        if priority >= 8 {
+            bail!("Invalid priority: {priority} (must be 0-7)");
+        }
+
+        // Truncate to 254 bytes to leave room for null terminator (C uses MIN(strlen+1, 255))
+        let node = Self::truncate_string(node, 254);
+        let ident = Self::truncate_string(ident, 254);
+        let tag = Self::truncate_string(tag, 254);
+        let message = Self::utf8_to_ascii(message);
+
+        let node_len = node.len() + 1;
+        let ident_len = ident.len() + 1;
+        let tag_len = tag.len() + 1;
+        let mut msg_len = message.len() + 1;
+
+        // Use checked arithmetic to prevent integer overflow
+        let total_size = CLOG_ENTRY_HEADER_SIZE
+            .checked_add(node_len)
+            .and_then(|s| s.checked_add(ident_len))
+            .and_then(|s| s.checked_add(tag_len))
+            .and_then(|s| s.checked_add(msg_len))
+            .ok_or_else(|| anyhow::anyhow!("Entry size calculation overflow"))?;
+
+        if total_size > CLOG_MAX_ENTRY_SIZE {
+            let diff = total_size - CLOG_MAX_ENTRY_SIZE;
+            msg_len = msg_len.saturating_sub(diff);
+        }
+
+        let node_digest = fnv_64a_str(&node);
+        let ident_digest = fnv_64a_str(&ident);
+
+        let uid = next_uid();
+
+        Ok(Self {
+            uid,
+            time,
+            node_digest,
+            ident_digest,
+            pid,
+            priority,
+            node,
+            ident,
+            tag,
+            message: message[..msg_len.saturating_sub(1)].to_string(),
+        })
+    }
+
+    /// Truncate string to max length (safe for multi-byte UTF-8)
+    fn truncate_string(s: &str, max_len: usize) -> String {
+        if s.len() <= max_len {
+            return s.to_string();
+        }
+
+        s[..s.floor_char_boundary(max_len)].to_string()
+    }
+
+    /// Convert UTF-8 to ASCII with proper escaping
+    ///
+    /// Matches C's `utf8_to_ascii` function behavior:
+    /// - Control characters (0x00-0x1F, 0x7F): Escaped as #XXX (e.g., #007 for BEL)
+    /// - Unicode (U+0080 to U+FFFF): Escaped as \uXXXX (e.g., \u4e16 for 世)
+    /// - Quotes: Escaped as \" (matches C's quotequote=TRUE behavior)
+    /// - Characters > U+FFFF: Silently dropped
+    /// - ASCII printable (0x20-0x7E except quotes): Passed through unchanged
+    fn utf8_to_ascii(s: &str) -> String {
+        let mut result = String::with_capacity(s.len());
+
+        for c in s.chars() {
+            match c {
+                // Control characters: #XXX format (3 decimal digits)
+                '\x00'..='\x1F' | '\x7F' => {
+                    let _ = write!(result, "#{:03}", c as u32);
+                }
+                // Quote escaping: matches C's quotequote=TRUE behavior (logger.c:245)
+                '"' => result.push_str("\\\""),
+                // ASCII printable characters: pass through
+                c if c.is_ascii() => result.push(c),
+                // Unicode U+0080 to U+FFFF: \uXXXX format
+                c if (c as u32) < 0x10000 => {
+                    let _ = write!(result, "\\u{:04x}", c as u32);
+                }
+                // Characters > U+FFFF: silently drop (matches C behavior)
+                _ => {}
+            }
+        }
+
+        result
+    }
+
+    /// Matches C's `clog_entry_size` function
+    pub fn size(&self) -> usize {
+        CLOG_ENTRY_HEADER_SIZE
+            + self.node.len()
+            + 1
+            + self.ident.len()
+            + 1
+            + self.tag.len()
+            + 1
+            + self.message.len()
+            + 1
+    }
+
+    /// C implementation: `uint32_t realsize = ((size + 7) & 0xfffffff8);`
+    pub fn aligned_size(&self) -> usize {
+        let size = self.size();
+        (size + 7) & !7
+    }
+
+    pub fn to_json_object(&self) -> serde_json::Value {
+        // Decode utf8_to_ascii backslash escapes before JSON serialization so that
+        // serde_json re-encodes them correctly. Without this, serde_json double-escapes:
+        //   \u4e16  →  \\u4e16  (JSON literal string \u4e16, not the Unicode char 世)
+        //   \"      →  \\\"    (JSON literal string \", not a quote)
+        // C's clog_dump_json embeds message bytes directly via printf "%s", so \u4e16
+        // becomes a real JSON Unicode escape (decoded as 世). We match that here.
+        serde_json::json!({
+            "uid": self.uid,
+            "time": self.time,
+            "pri": self.priority,
+            "tag": self.tag,
+            "pid": self.pid,
+            "node": self.node,
+            "user": self.ident,
+            "msg": Self::decode_for_json(&self.message),
+        })
+    }
+
+    /// Reverse the backslash escapes that `utf8_to_ascii` introduced so that
+    /// serde_json can serialize the message without double-escaping them.
+    ///
+    /// Only two sequences need reversing:
+    /// - `\"` → `"` (quote escape from utf8_to_ascii quotequote mode)
+    /// - `\uXXXX` → actual Unicode char (e.g. `\u4e16` → '世')
+    ///
+    /// `#XXX` control-char escapes are plain ASCII and need no adjustment —
+    /// serde_json will round-trip them as literal `#XXX` strings, matching C.
+    fn decode_for_json(s: &str) -> String {
+        let mut result = String::with_capacity(s.len());
+        let mut chars = s.chars().peekable();
+        while let Some(c) = chars.next() {
+            if c != '\\' {
+                result.push(c);
+                continue;
+            }
+            match chars.peek().copied() {
+                Some('"') => {
+                    chars.next();
+                    result.push('"');
+                }
+                Some('u') => {
+                    chars.next(); // consume 'u'
+                    let hex: String = chars.by_ref().take(4).collect();
+                    if hex.len() == 4 {
+                        if let Ok(n) = u32::from_str_radix(&hex, 16) {
+                            if let Some(ch) = char::from_u32(n) {
+                                result.push(ch);
+                                continue;
+                            }
+                        }
+                    }
+                    // Unrecognised sequence — keep as-is
+                    result.push('\\');
+                    result.push('u');
+                    result.push_str(&hex);
+                }
+                _ => result.push('\\'),
+            }
+        }
+        result
+    }
+
+    /// Write entry body (bytes 8+) into a pre-allocated ring buffer slot.
+    ///
+    /// Mirrors C's `clog_copy` behavior:
+    /// `memcpy((char*)new + 8, (char*)entry + 8, size - 8)`
+    /// The first 8 bytes (prev/next) are set by `alloc_slot`; this writes
+    /// the remaining `size() - 8` bytes directly into the provided slice.
+    ///
+    /// The `slot` must be at least `self.size() - 8` bytes long.
+    pub(crate) fn write_body_to(&self, slot: &mut [u8]) {
+        let mut off = 0usize;
+
+        macro_rules! write_le {
+            ($val:expr) => {{
+                let b = $val;
+                slot[off..off + b.len()].copy_from_slice(&b);
+                off += b.len();
+            }};
+        }
+
+        write_le!(self.uid.to_le_bytes());
+        write_le!(self.time.to_le_bytes());
+        write_le!(self.node_digest.to_le_bytes());
+        write_le!(self.ident_digest.to_le_bytes());
+        write_le!(self.pid.to_le_bytes());
+        slot[off] = self.priority;
+        off += 1;
+
+        let node_len = (self.node.len() + 1).min(255) as u8;
+        let ident_len = (self.ident.len() + 1).min(255) as u8;
+        let tag_len = (self.tag.len() + 1).min(255) as u8;
+        slot[off] = node_len;
+        off += 1;
+        slot[off] = ident_len;
+        off += 1;
+        slot[off] = tag_len;
+        off += 1;
+        write_le!((self.message.len() as u32 + 1).to_le_bytes());
+
+        slot[off..off + self.node.len()].copy_from_slice(self.node.as_bytes());
+        off += self.node.len();
+        slot[off] = 0;
+        off += 1;
+        slot[off..off + self.ident.len()].copy_from_slice(self.ident.as_bytes());
+        off += self.ident.len();
+        slot[off] = 0;
+        off += 1;
+        slot[off..off + self.tag.len()].copy_from_slice(self.tag.as_bytes());
+        off += self.tag.len();
+        slot[off] = 0;
+        off += 1;
+        slot[off..off + self.message.len()].copy_from_slice(self.message.as_bytes());
+        off += self.message.len();
+        slot[off] = 0;
+    }
+
+    /// Serialize to C binary format (clog_entry_t)
+    ///
+    /// Binary layout matches C structure:
+    /// ```c
+    /// struct {
+    ///     uint32_t prev;          // Will be filled by ring buffer
+    ///     uint32_t next;          // Will be filled by ring buffer
+    ///     uint32_t uid;
+    ///     uint32_t time;
+    ///     uint64_t node_digest;
+    ///     uint64_t ident_digest;
+    ///     uint32_t pid;
+    ///     uint8_t priority;
+    ///     uint8_t node_len;
+    ///     uint8_t ident_len;
+    ///     uint8_t tag_len;
+    ///     uint32_t msg_len;
+    ///     char data[];  // node + ident + tag + msg (null-terminated)
+    /// }
+    /// ```
+    pub fn serialize_binary(&self, prev: u32, next: u32) -> Vec<u8> {
+        let mut buf = vec![0u8; self.size()];
+        buf[0..4].copy_from_slice(&prev.to_le_bytes());
+        buf[4..8].copy_from_slice(&next.to_le_bytes());
+        self.write_body_to(&mut buf[8..]);
+        buf
+    }
+
+    pub(crate) fn deserialize_binary(data: &[u8]) -> Result<(Self, u32, u32)> {
+        if data.len() < CLOG_ENTRY_HEADER_SIZE {
+            bail!(
+                "Entry too small: {} bytes (need at least {} for header)",
+                data.len(),
+                CLOG_ENTRY_HEADER_SIZE,
+            );
+        }
+
+        // Read fixed header fields directly from byte offsets, matching the
+        // clog_entry_t layout documented above CLOG_ENTRY_HEADER_SIZE.
+        let prev         = u32::from_le_bytes(data[ 0.. 4].try_into()?);
+        let next         = u32::from_le_bytes(data[ 4.. 8].try_into()?);
+        let uid          = u32::from_le_bytes(data[ 8..12].try_into()?);
+        let time         = u32::from_le_bytes(data[12..16].try_into()?);
+        let node_digest  = u64::from_le_bytes(data[16..24].try_into()?);
+        let ident_digest = u64::from_le_bytes(data[24..32].try_into()?);
+        let pid          = u32::from_le_bytes(data[32..36].try_into()?);
+        let priority     = data[36];
+        let node_len     = data[37] as usize;
+        let ident_len    = data[38] as usize;
+        let tag_len      = data[39] as usize;
+        let msg_len      = u32::from_le_bytes(data[40..44].try_into()?) as usize;
+
+        let offset = CLOG_ENTRY_HEADER_SIZE;
+        if offset + node_len + ident_len + tag_len + msg_len > data.len() {
+            bail!("Entry data exceeds buffer size");
+        }
+
+        let node  = read_null_terminated(&data[offset..offset + node_len])?;
+        let ident = read_null_terminated(&data[offset + node_len..offset + node_len + ident_len])?;
+        let tag_start = offset + node_len + ident_len;
+        let tag   = read_null_terminated(&data[tag_start..tag_start + tag_len])?;
+        let msg_start = tag_start + tag_len;
+        let message = read_null_terminated(&data[msg_start..msg_start + msg_len])?;
+
+        Ok((
+            Self {
+                uid,
+                time,
+                node_digest,
+                ident_digest,
+                pid,
+                priority,
+                node,
+                ident,
+                tag,
+                message,
+            },
+            prev,
+            next,
+        ))
+    }
+}
+
+fn read_null_terminated(data: &[u8]) -> Result<String> {
+    let len = data.iter().position(|&b| b == 0).unwrap_or(data.len());
+    Ok(String::from_utf8_lossy(&data[..len]).into_owned())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_pack_entry() {
+        let entry = LogEntry::pack(
+            "node1",
+            "root",
+            "cluster",
+            12345,
+            1234567890,
+            6, // Info priority
+            "Test message",
+        )
+        .unwrap();
+
+        assert!(entry.uid > 0, "uid must be non-zero");
+        assert_eq!(entry.time, 1234567890);
+        assert_eq!(entry.node, "node1");
+        assert_eq!(entry.ident, "root");
+        assert_eq!(entry.tag, "cluster");
+        assert_eq!(entry.pid, 12345);
+        assert_eq!(entry.priority, 6);
+        assert_eq!(entry.message, "Test message");
+    }
+
+    #[test]
+    fn test_uid_increment() {
+        let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg1").unwrap();
+        let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg2").unwrap();
+
+        // UIDs must be consecutive — the global counter increments by 1 per entry
+        assert_eq!(entry2.uid, entry1.uid + 1, "UIDs must be consecutive");
+    }
+
+    #[test]
+    fn test_invalid_priority() {
+        let result = LogEntry::pack("node1", "root", "tag", 0, 1000, 8, "message");
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_node_digest() {
+        let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
+        let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg").unwrap();
+        let entry3 = LogEntry::pack("node2", "root", "tag", 0, 1000, 6, "msg").unwrap();
+
+        // Same node should have same digest
+        assert_eq!(entry1.node_digest, entry2.node_digest);
+
+        // Different node should have different digest
+        assert_ne!(entry1.node_digest, entry3.node_digest);
+    }
+
+    #[test]
+    fn test_ident_digest() {
+        let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
+        let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg").unwrap();
+        let entry3 = LogEntry::pack("node1", "admin", "tag", 0, 1000, 6, "msg").unwrap();
+
+        // Same ident should have same digest
+        assert_eq!(entry1.ident_digest, entry2.ident_digest);
+
+        // Different ident should have different digest
+        assert_ne!(entry1.ident_digest, entry3.ident_digest);
+    }
+
+    #[test]
+    fn test_utf8_to_ascii() {
+        let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "Hello 世界").unwrap();
+        assert!(entry.message.is_ascii());
+        // Unicode chars escaped as \uXXXX format (matches C implementation)
+        assert!(entry.message.contains("\\u4e16")); // 世 = U+4E16
+        assert!(entry.message.contains("\\u754c")); // 界 = U+754C
+    }
+
+    #[test]
+    fn test_utf8_control_chars() {
+        // Test control character escaping
+        let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "Hello\x07World").unwrap();
+        assert!(entry.message.is_ascii());
+        // BEL (0x07) should be escaped as #007 (matches C implementation)
+        assert!(entry.message.contains("#007"));
+    }
+
+    #[test]
+    fn test_utf8_mixed_content() {
+        // Test mix of ASCII, Unicode, and control chars
+        let entry = LogEntry::pack(
+            "node1",
+            "root",
+            "tag",
+            0,
+            1000,
+            6,
+            "Test\x01\nUnicode世\ttab",
+        )
+        .unwrap();
+        assert!(entry.message.is_ascii());
+        // SOH (0x01) -> #001
+        assert!(entry.message.contains("#001"));
+        // Newline (0x0A) -> #010
+        assert!(entry.message.contains("#010"));
+        // Unicode 世 (U+4E16) -> \u4e16
+        assert!(entry.message.contains("\\u4e16"));
+        // Tab (0x09) -> #009
+        assert!(entry.message.contains("#009"));
+    }
+
+    #[test]
+    fn test_string_truncation() {
+        let long_node = "a".repeat(300);
+        let entry = LogEntry::pack(&long_node, "root", "tag", 0, 1000, 6, "msg").unwrap();
+        assert!(entry.node.len() <= 255);
+    }
+
+    #[test]
+    fn test_truncate_multibyte_utf8() {
+        // Test that truncate_string doesn't panic on multi-byte UTF-8 boundaries
+        // "世" is 3 bytes in UTF-8 (0xE4 0xB8 0x96)
+        let s = "x".repeat(253) + "世";
+
+        // This should not panic, even though 254 falls in the middle of "世"
+        let entry = LogEntry::pack(&s, "root", "tag", 0, 1000, 6, "msg").unwrap();
+
+        // Should truncate to 253 bytes (before the multi-byte char)
+        assert_eq!(entry.node.len(), 253);
+        assert_eq!(entry.node, "x".repeat(253));
+    }
+
+    #[test]
+    fn test_message_truncation() {
+        let long_message = "a".repeat(CLOG_MAX_ENTRY_SIZE);
+        let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, &long_message).unwrap();
+        // Entry should fit within max size
+        assert!(entry.size() <= CLOG_MAX_ENTRY_SIZE);
+    }
+
+    #[test]
+    fn test_aligned_size() {
+        let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap();
+        let aligned = entry.aligned_size();
+
+        // Aligned size should be multiple of 8
+        assert_eq!(aligned % 8, 0);
+
+        // Aligned size should be >= actual size
+        assert!(aligned >= entry.size());
+
+        // Aligned size should be within 7 bytes of actual size
+        assert!(aligned - entry.size() < 8);
+    }
+
+    #[test]
+    fn test_json_export() {
+        let entry = LogEntry::pack("node1", "root", "cluster", 123, 1234567890, 6, "Test").unwrap();
+        let json = entry.to_json_object();
+
+        assert_eq!(json["node"], "node1");
+        assert_eq!(json["user"], "root");
+        assert_eq!(json["tag"], "cluster");
+        assert_eq!(json["pid"], 123);
+        assert_eq!(json["time"], 1234567890);
+        assert_eq!(json["pri"], 6);
+        assert_eq!(json["msg"], "Test");
+    }
+
+    #[test]
+    fn test_binary_serialization_roundtrip() {
+        let entry = LogEntry::pack(
+            "node1",
+            "root",
+            "cluster",
+            12345,
+            1234567890,
+            6,
+            "Test message",
+        )
+        .unwrap();
+
+        // Serialize with prev/next pointers
+        let binary = entry.serialize_binary(100, 200);
+
+        // Deserialize
+        let (deserialized, prev, next) = LogEntry::deserialize_binary(&binary).unwrap();
+
+        // Check prev/next pointers
+        assert_eq!(prev, 100);
+        assert_eq!(next, 200);
+
+        // Check entry fields
+        assert_eq!(deserialized.uid, entry.uid);
+        assert_eq!(deserialized.time, entry.time);
+        assert_eq!(deserialized.node_digest, entry.node_digest);
+        assert_eq!(deserialized.ident_digest, entry.ident_digest);
+        assert_eq!(deserialized.pid, entry.pid);
+        assert_eq!(deserialized.priority, entry.priority);
+        assert_eq!(deserialized.node, entry.node);
+        assert_eq!(deserialized.ident, entry.ident);
+        assert_eq!(deserialized.tag, entry.tag);
+        assert_eq!(deserialized.message, entry.message);
+    }
+
+    #[test]
+    fn test_binary_format_header_size() {
+        let entry = LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap();
+        let binary = entry.serialize_binary(0, 0);
+
+        assert!(binary.len() >= CLOG_ENTRY_HEADER_SIZE);
+
+        // First 44 bytes are the fixed header (CLOG_ENTRY_HEADER_SIZE)
+        assert_eq!(&binary[0..4], &0u32.to_le_bytes()); // prev
+        assert_eq!(&binary[4..8], &0u32.to_le_bytes()); // next
+    }
+
+    #[test]
+    fn test_binary_deserialize_invalid_size() {
+        let too_small = vec![0u8; CLOG_ENTRY_HEADER_SIZE - 1];
+        let result = LogEntry::deserialize_binary(&too_small);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_binary_null_terminators() {
+        let entry = LogEntry::pack("node1", "root", "tag", 123, 1000, 6, "message").unwrap();
+        let binary = entry.serialize_binary(0, 0);
+
+        // Check that strings are null-terminated
+        // Find null bytes in data section (after 44-byte header = CLOG_ENTRY_HEADER_SIZE)
+        let data_section = &binary[CLOG_ENTRY_HEADER_SIZE..];
+        let null_count = data_section.iter().filter(|&&b| b == 0).count();
+        assert_eq!(null_count, 4); // 4 null terminators (node, ident, tag, msg)
+    }
+
+    #[test]
+    fn test_length_field_overflow_prevention() {
+        // Test that 255-byte strings are handled correctly (prevent u8 overflow)
+        // C does: MIN(strlen(s) + 1, 255) to cap at 255
+        let long_string = "a".repeat(255);
+
+        let entry = LogEntry::pack(
+            &long_string,
+            &long_string,
+            &long_string,
+            123,
+            1000,
+            6,
+            "msg",
+        )
+        .unwrap();
+
+        // Strings should be truncated to 254 bytes (leaving room for null)
+        assert_eq!(entry.node.len(), 254);
+        assert_eq!(entry.ident.len(), 254);
+        assert_eq!(entry.tag.len(), 254);
+
+        // Serialize and check length fields are capped at 255 (254 bytes + null)
+        let binary = entry.serialize_binary(0, 0);
+
+        // Extract length fields from header
+        // Layout: prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) +
+        //         pid(4) + priority(1) + node_len(1) + ident_len(1) + tag_len(1) + msg_len(4)
+        // Offsets: node_len=37, ident_len=38, tag_len=39
+        let node_len = binary[37];
+        let ident_len = binary[38];
+        let tag_len = binary[39];
+
+        assert_eq!(node_len, 255); // 254 bytes + 1 null = 255
+        assert_eq!(ident_len, 255);
+        assert_eq!(tag_len, 255);
+    }
+
+    #[test]
+    fn test_length_field_no_wraparound() {
+        // Even if somehow a 255+ byte string gets through, serialize should cap at 255
+        // This tests the defensive .min(255) in serialize_binary
+        let mut entry = LogEntry::pack("node", "ident", "tag", 123, 1000, 6, "msg").unwrap();
+
+        // Artificially create an edge case (though pack() already prevents this)
+        entry.node = "x".repeat(254); // Max valid size
+
+        let binary = entry.serialize_binary(0, 0);
+        let node_len = binary[37]; // Offset 37 for node_len
+
+        // Should be 255 (254 + 1 for null), not wrap to 0
+        assert_eq!(node_len, 255);
+        assert_ne!(node_len, 0); // Ensure no wraparound
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs b/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
new file mode 100644
index 000000000..f87d9bc9b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs
@@ -0,0 +1,129 @@
+//! FNV-1a (Fowler-Noll-Vo) 64-bit hash function
+//!
+//! This matches the C implementation's `fnv_64a_buf` function.
+//! Used for generating node and ident digests for deduplication.
+
+/// FNV-1a 64-bit non-zero initial basis
+pub const FNV1A_64_INIT: u64 = 0xcbf29ce484222325;
+
+/// Compute 64-bit FNV-1a hash
+///
+/// Faithful port of C's `fnv_64a_buf` function. The multiplication by the
+/// FNV 64-bit prime (0x100000001b3) is done via shifts and adds.
+#[inline]
+pub fn fnv_64a(data: &[u8], init: u64) -> u64 {
+    let mut hval = init;
+
+    for &byte in data {
+        hval ^= byte as u64;
+        hval = hval.wrapping_add(
+            (hval << 1)
+                .wrapping_add(hval << 4)
+                .wrapping_add(hval << 5)
+                .wrapping_add(hval << 7)
+                .wrapping_add(hval << 8)
+                .wrapping_add(hval << 40),
+        );
+    }
+
+    hval
+}
+
+/// Hash a null-terminated string (includes the null byte)
+///
+/// The C implementation includes the null terminator in the hash:
+/// `fnv_64a_buf(node, node_len, FNV1A_64_INIT)` where node_len includes the '\0'.
+/// We append a null byte to match that behavior.
+#[inline]
+pub fn fnv_64a_str(s: &str) -> u64 {
+    // Hash the string bytes, then the null terminator to match C behavior.
+    // XOR with 0 is a no-op for the null byte, but the FNV multiplication
+    // step still applies, so we feed it through fnv_64a rather than
+    // duplicating the multiplication logic here.
+    fnv_64a(&[0], fnv_64a(s.as_bytes(), FNV1A_64_INIT))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_fnv1a_init() {
+        // Test that init constant matches C implementation
+        assert_eq!(FNV1A_64_INIT, 0xcbf29ce484222325);
+    }
+
+    #[test]
+    fn test_fnv1a_empty() {
+        // Empty string with null terminator
+        let hash = fnv_64a(&[0], FNV1A_64_INIT);
+        assert_ne!(hash, FNV1A_64_INIT); // Should be different from init
+    }
+
+    #[test]
+    fn test_fnv1a_consistency() {
+        // Same input should produce same output
+        let data = b"test";
+        let hash1 = fnv_64a(data, FNV1A_64_INIT);
+        let hash2 = fnv_64a(data, FNV1A_64_INIT);
+        assert_eq!(hash1, hash2);
+    }
+
+    #[test]
+    fn test_fnv1a_different_data() {
+        // Different input should (usually) produce different output
+        let hash1 = fnv_64a(b"test1", FNV1A_64_INIT);
+        let hash2 = fnv_64a(b"test2", FNV1A_64_INIT);
+        assert_ne!(hash1, hash2);
+    }
+
+    #[test]
+    fn test_fnv1a_str() {
+        // Test string hashing with null terminator
+        let hash1 = fnv_64a_str("node1");
+        let hash2 = fnv_64a_str("node1");
+        let hash3 = fnv_64a_str("node2");
+
+        assert_eq!(hash1, hash2); // Same string should hash the same
+        assert_ne!(hash1, hash3); // Different strings should hash differently
+    }
+
+    #[test]
+    fn test_fnv1a_node_names() {
+        // Test with typical Proxmox node names
+        let nodes = vec!["pve1", "pve2", "pve3"];
+        let mut hashes = Vec::new();
+
+        for node in &nodes {
+            let hash = fnv_64a_str(node);
+            hashes.push(hash);
+        }
+
+        // All hashes should be unique
+        for i in 0..hashes.len() {
+            for j in (i + 1)..hashes.len() {
+                assert_ne!(
+                    hashes[i], hashes[j],
+                    "Hashes for {} and {} should differ",
+                    nodes[i], nodes[j]
+                );
+            }
+        }
+    }
+
+    #[test]
+    fn test_fnv1a_chaining() {
+        // Test that we can chain hashes
+        let data1 = b"first";
+        let data2 = b"second";
+
+        let hash1 = fnv_64a(data1, FNV1A_64_INIT);
+        let hash2 = fnv_64a(data2, hash1); // Use previous hash as init
+
+        // Should produce a deterministic result
+        let hash1_again = fnv_64a(data1, FNV1A_64_INIT);
+        let hash2_again = fnv_64a(data2, hash1_again);
+
+        assert_eq!(hash2, hash2_again);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs b/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
new file mode 100644
index 000000000..1de0cd830
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs
@@ -0,0 +1,29 @@
+/// Cluster Log Implementation
+///
+/// This module provides a cluster-wide log system compatible with the C implementation.
+/// It maintains a ring buffer of log entries that can be merged from multiple nodes,
+/// deduplicated, and exported to JSON.
+///
+/// Key features:
+/// - Ring buffer storage for efficient memory usage
+/// - FNV-1a hashing for node and ident tracking
+/// - Deduplication across nodes
+/// - Time-based sorting
+/// - Multi-node log merging
+/// - JSON export for web UI
+// Internal modules (not exposed)
+mod cluster_log;
+mod entry;
+mod hash;
+mod ring_buffer;
+
+// Public API - only expose what's needed externally
+pub use cluster_log::ClusterLog;
+pub use entry::{next_uid, CLOG_ENTRY_HEADER_SIZE};
+pub use hash::{fnv_64a, fnv_64a_str, FNV1A_64_INIT};
+
+// Re-export types only for testing or internal crate use
+#[doc(hidden)]
+pub use entry::LogEntry;
+#[doc(hidden)]
+pub use ring_buffer::RingBuffer;
diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs b/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
new file mode 100644
index 000000000..2d9f79fcf
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs
@@ -0,0 +1,542 @@
+/// Ring Buffer Implementation for Cluster Log
+///
+/// Directly mirrors C's `clog_base_t` byte layout so that the in-memory
+/// representation IS the wire format.  `serialize_binary()` is a simple
+/// `Vec::clone()` — no entry iteration or struct reconstruction needed.
+///
+/// Key design:
+/// - `data` is the complete byte slab: header(8 bytes) + ring data.
+/// - `alloc_slot()` ports C's `clog_alloc_entry` (wrap-around ring allocation).
+/// - `add_entry()` writes entry body directly into the slab via `write_body_to`,
+///   matching C's `clog_copy`'s `memcpy((char*)new + 8, (char*)entry + 8, size-8)`.
+/// - `iter()` walks the prev-chain from `cpos`, parsing `LogEntry` on the fly.
+use super::entry::LogEntry;
+use super::hash::fnv_64a_str;
+use anyhow::{bail, Result};
+
+/// Matches C's CLOG_DEFAULT_SIZE constant
+pub(crate) const CLOG_DEFAULT_SIZE: usize = 8192 * 16; // 131,072 bytes (128 KB)
+
+/// Matches C's CLOG_MAX_ENTRY_SIZE constant
+pub(crate) const CLOG_MAX_ENTRY_SIZE: usize = 4096; // 4,096 bytes (4 KB)
+
+/// Ring buffer for cluster log entries.
+///
+/// Directly mirrors C's `clog_base_t` byte layout:
+/// ```c
+/// struct clog_base {
+///     uint32_t size;    // Total buffer capacity (bytes)
+///     uint32_t cpos;    // Offset to newest entry (0 = empty)
+///     char data[];      // Ring data (entries at various offsets from byte 8)
+/// };
+/// ```
+///
+/// `data.len() == size` at all times. `serialize_binary()` returns a clone of
+/// `data` — no entry iteration or struct reconstruction needed.
+///
+/// Entry layout within the buffer (matches `clog_entry_t`):
+/// - Each entry occupies an 8-byte-aligned region.
+/// - `entry.prev` → offset to the previous (older) entry (0 = oldest in chain).
+/// - `entry.next` → end-of-entry offset (this offset + aligned_size).
+/// - `cpos` points to the **newest** entry.
+/// - Traversal: newest → cpos → prev → ... → 0.
+#[derive(Debug, Clone)]
+pub struct RingBuffer {
+    /// Complete buffer: header(8 bytes) + ring data.
+    /// `data[0..4]` = total capacity as u32 LE.
+    /// `data[4..8]` = cpos as u32 LE (offset of newest entry, 0 if empty).
+    data: Vec<u8>,
+}
+
+impl RingBuffer {
+    /// Create a new empty ring buffer with the given capacity.
+    ///
+    /// Mirrors C's `clog_new(size)`: allocates zero-filled memory and sets
+    /// `clog->size = size; clog->cpos = 0;`.
+    pub fn new(capacity: usize) -> Self {
+        let capacity = if capacity < CLOG_MAX_ENTRY_SIZE * 10 {
+            CLOG_DEFAULT_SIZE
+        } else {
+            capacity
+        };
+        let mut data = vec![0u8; capacity];
+        data[0..4].copy_from_slice(&(capacity as u32).to_le_bytes());
+        // data[4..8] = 0 (cpos = 0, empty)
+        Self { data }
+    }
+
+    /// Total buffer capacity in bytes.
+    pub fn capacity(&self) -> usize {
+        u32::from_le_bytes(self.data[0..4].try_into().unwrap()) as usize
+    }
+
+    /// Offset of the newest entry, or 0 if the buffer is empty.
+    fn cpos(&self) -> usize {
+        u32::from_le_bytes(self.data[4..8].try_into().unwrap()) as usize
+    }
+
+    /// Update the cpos header field.
+    fn set_cpos(&mut self, cpos: usize) {
+        self.data[4..8].copy_from_slice(&(cpos as u32).to_le_bytes());
+    }
+
+    /// Check if buffer is empty.
+    pub fn is_empty(&self) -> bool {
+        self.cpos() == 0
+    }
+
+    /// Count entries by walking the prev-chain (O(N)).
+    pub fn len(&self) -> usize {
+        self.iter().count()
+    }
+
+    /// Allocate a slot for an entry of the given byte size.
+    ///
+    /// Mirrors C's `clog_alloc_entry`:
+    /// - Empty buffer: place at offset 8.
+    /// - Otherwise: place at `cur->next`; wrap to 8 if the new entry won't fit.
+    /// - Sets `entry->prev = old_cpos` and `entry->next = newpos + aligned_size`,
+    ///   updates `clog->cpos = newpos`.
+    ///
+    /// Returns the byte offset where the new entry begins.
+    fn alloc_slot(&mut self, size: usize) -> usize {
+        let realsize = (size + 7) & !7usize;
+        let capacity = self.capacity();
+        let old_cpos = self.cpos();
+
+        let newpos = if old_cpos == 0 {
+            8
+        } else {
+            // cur->next is at old_cpos + 4
+            let cur_next =
+                u32::from_le_bytes(self.data[old_cpos + 4..old_cpos + 8].try_into().unwrap())
+                    as usize;
+            if cur_next + realsize >= capacity {
+                8 // wrap around
+            } else {
+                cur_next
+            }
+        };
+
+        // entry->prev = old_cpos
+        self.data[newpos..newpos + 4].copy_from_slice(&(old_cpos as u32).to_le_bytes());
+        // entry->next = newpos + realsize
+        self.data[newpos + 4..newpos + 8]
+            .copy_from_slice(&((newpos + realsize) as u32).to_le_bytes());
+        // clog->cpos = newpos
+        self.set_cpos(newpos);
+
+        newpos
+    }
+
+    /// Add a log entry to the ring buffer.
+    ///
+    /// Mirrors C's `clog_copy`: calls `clog_alloc_entry` to get a slot, then
+    /// writes the entry body (bytes 8+) directly into the raw buffer — matching
+    /// `memcpy((char*)new + 8, (char*)entry + 8, size - 8)`.
+    pub fn add_entry(&mut self, entry: &LogEntry) -> Result<()> {
+        let slot_offset = self.alloc_slot(entry.size());
+        let body_end = slot_offset + entry.aligned_size();
+        entry.write_body_to(&mut self.data[slot_offset + 8..body_end]);
+        Ok(())
+    }
+
+    /// Iterate over entries from newest to oldest.
+    ///
+    /// Walks the prev-chain from `cpos` backwards, applying C's wrap-around
+    /// guard to stop at the correct oldest entry. Each `LogEntry` is parsed
+    /// on the fly from the raw byte slab.
+    pub fn iter(&self) -> impl Iterator<Item = LogEntry> + '_ {
+        RingBufferIter::new(self)
+    }
+
+    /// Sort entries by (time, node_digest, uid), returning newest-first.
+    ///
+    /// Mirrors C's `clog_sort` comparison function.
+    pub fn sort_entries(&self) -> Vec<LogEntry> {
+        let mut entries: Vec<LogEntry> = self.iter().collect();
+        entries.sort_unstable_by(|a, b| {
+            (b.time, b.node_digest, b.uid).cmp(&(a.time, a.node_digest, a.uid))
+        });
+        entries
+    }
+
+    /// Dump buffer to JSON format.
+    ///
+    /// Matches C's `clog_dump_json`.
+    pub fn dump_json(&self, ident_filter: Option<&str>, max_entries: usize) -> String {
+        let ident_digest = ident_filter.map(fnv_64a_str);
+
+        let data: Vec<serde_json::Value> = self
+            .iter()
+            .filter(|entry| ident_digest.is_none_or(|digest| digest == entry.ident_digest))
+            .take(max_entries)
+            .map(|entry| entry.to_json_object())
+            .collect();
+
+        let result = serde_json::json!({ "data": data });
+        serde_json::to_string_pretty(&result).unwrap_or_else(|_| "{}".to_string())
+    }
+
+    /// Print all entries to stdout (debugging only).
+    ///
+    /// Matches C's `clog_dump` function. Not called in normal operation but
+    /// kept for debugging parity with the C implementation.
+    #[allow(dead_code)]
+    pub fn dump(&self) {
+        for (idx, entry) in self.iter().enumerate() {
+            println!(
+                "[{}] uid={:08x} time={} node={}{{{:016X}}} tag={}[{}{{{:016X}}}]: {}",
+                idx,
+                entry.uid,
+                entry.time,
+                entry.node,
+                entry.node_digest,
+                entry.tag,
+                entry.ident,
+                entry.ident_digest,
+                entry.message
+            );
+        }
+    }
+
+    /// Serialize to C binary format: returns a clone of the raw byte slab.
+    ///
+    /// C's `clusterlog_get_state()` returns `g_memdup2(cl->base, clog->size)` —
+    /// a raw memory copy of the entire buffer. Since `data` IS that buffer,
+    /// this is an O(capacity) clone with no entry iteration.
+    pub fn serialize_binary(&self) -> Vec<u8> {
+        self.data.clone()
+    }
+
+    /// Deserialize from C binary format.
+    ///
+    /// Validates the header and stores the raw bytes. Entry parsing is deferred
+    /// to `iter()`, which applies C's wrap-around traversal guards.
+    pub fn deserialize_binary(data: &[u8]) -> Result<Self> {
+        if data.len() < 8 {
+            bail!(
+                "Buffer too small: {} bytes (need at least 8 for header)",
+                data.len()
+            );
+        }
+
+        let size = u32::from_le_bytes(data[0..4].try_into()?) as usize;
+        let cpos = u32::from_le_bytes(data[4..8].try_into()?) as usize;
+
+        if size != data.len() {
+            bail!(
+                "Size mismatch: header says {}, got {} bytes",
+                size,
+                data.len()
+            );
+        }
+
+        if cpos != 0 && (cpos < 8 || cpos >= size) {
+            bail!("Invalid cpos: {cpos} (size: {size})");
+        }
+
+        Ok(Self {
+            data: data.to_vec(),
+        })
+    }
+}
+
+impl Default for RingBuffer {
+    fn default() -> Self {
+        Self::new(CLOG_DEFAULT_SIZE)
+    }
+}
+
+/// Iterator that walks the prev-chain from newest to oldest entry.
+///
+/// Applies C's wrap-around guard from `clog_dump`/`clog_dump_json`:
+/// stops when following `prev` would jump forward past `initial_cpos`,
+/// which signals that older entries have been overwritten by the ring.
+///
+/// A `HashSet` of visited offsets guards against cycles through stale
+/// prev pointers in overwritten regions that the C guard alone cannot detect.
+struct RingBufferIter<'a> {
+    data: &'a [u8],
+    current_cpos: usize,
+    initial_cpos: usize,
+    visited: std::collections::HashSet<usize>,
+    done: bool,
+}
+
+impl<'a> RingBufferIter<'a> {
+    fn new(buf: &'a RingBuffer) -> Self {
+        let initial_cpos = buf.cpos();
+        Self {
+            data: &buf.data,
+            current_cpos: initial_cpos,
+            initial_cpos,
+            visited: std::collections::HashSet::new(),
+            done: false,
+        }
+    }
+}
+
+impl Iterator for RingBufferIter<'_> {
+    type Item = LogEntry;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.done || self.current_cpos == 0 {
+            return None;
+        }
+
+        // Guard against cycles through stale prev pointers in overwritten regions.
+        if !self.visited.insert(self.current_cpos) {
+            return None;
+        }
+
+        if self.current_cpos >= self.data.len() {
+            return None;
+        }
+
+        let entry_data = &self.data[self.current_cpos..];
+        match LogEntry::deserialize_binary(entry_data) {
+            Err(_) => None,
+            Ok((entry, prev, _next)) => {
+                // C wrap-around guard: if (cpos < cur->prev && cur->prev <= clog->cpos) break
+                // Detects when following prev would jump forward past initial_cpos,
+                // meaning the entry it points to was overwritten by the ring.
+                if self.current_cpos < prev as usize && (prev as usize) <= self.initial_cpos {
+                    self.done = true;
+                    return Some(entry);
+                }
+                self.current_cpos = prev as usize;
+                Some(entry)
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_ring_buffer_creation() {
+        let buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+        assert_eq!(buffer.capacity(), CLOG_DEFAULT_SIZE);
+        assert_eq!(buffer.len(), 0);
+        assert!(buffer.is_empty());
+    }
+
+    #[test]
+    fn test_add_entry() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+        let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "message").unwrap();
+
+        let result = buffer.add_entry(&entry);
+        assert!(result.is_ok());
+        assert_eq!(buffer.len(), 1);
+        assert!(!buffer.is_empty());
+    }
+
+    #[test]
+    fn test_ring_buffer_wraparound() {
+        // Create a buffer with minimum required size (CLOG_MAX_ENTRY_SIZE * 10)
+        // but fill it beyond capacity to trigger wraparound
+        let mut buffer = RingBuffer::new(CLOG_MAX_ENTRY_SIZE * 10);
+
+        let initial_count = 50_usize;
+        for i in 0..initial_count {
+            let entry =
+                LogEntry::pack("node1", "root", "tag", 0, 1000 + i as u32, 6, "msg").unwrap();
+            let _ = buffer.add_entry(&entry);
+        }
+
+        let count_before = buffer.len();
+        assert_eq!(count_before, initial_count);
+
+        // Add entries with large messages to trigger ring eviction
+        let large_msg = "x".repeat(7000);
+        let large_entries_count = 20_usize;
+        for i in 0..large_entries_count {
+            let entry =
+                LogEntry::pack("node1", "root", "tag", 0, 2000 + i as u32, 6, &large_msg).unwrap();
+            let _ = buffer.add_entry(&entry);
+        }
+
+        // Should have fewer entries than total added (ring evicted old ones)
+        assert!(
+            buffer.len() < count_before + large_entries_count,
+            "Expected wraparound to evict old entries (have {} entries, expected < {})",
+            buffer.len(),
+            count_before + large_entries_count
+        );
+
+        // Newest entry (iter starts from cpos = newest) should be the last added
+        let newest = buffer.iter().next().unwrap();
+        assert_eq!(newest.time, 2000 + large_entries_count as u32 - 1);
+    }
+
+    #[test]
+    fn test_sort_by_time() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+        let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "c").unwrap());
+        let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
+        let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "b").unwrap());
+
+        let sorted = buffer.sort_entries();
+
+        // Entries are newest-first after sort
+        let times: Vec<u32> = sorted.iter().map(|e| e.time).collect();
+        assert_eq!(times, vec![1002, 1001, 1000]);
+    }
+
+    #[test]
+    fn test_sort_by_node_digest() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+        let _ = buffer.add_entry(&LogEntry::pack("node3", "root", "tag", 0, 1000, 6, "c").unwrap());
+        let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
+        let _ = buffer.add_entry(&LogEntry::pack("node2", "root", "tag", 0, 1000, 6, "b").unwrap());
+
+        let sorted = buffer.sort_entries();
+
+        // Entries with same time should be sorted by node_digest (descending)
+        for entries in sorted.windows(2) {
+            if entries[0].time == entries[1].time {
+                assert!(entries[0].node_digest >= entries[1].node_digest);
+            }
+        }
+    }
+
+    #[test]
+    fn test_json_dump() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+        let _ = buffer
+            .add_entry(&LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "msg").unwrap());
+
+        let json = buffer.dump_json(None, 50);
+
+        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
+        assert!(parsed.get("data").is_some());
+
+        let data = parsed["data"].as_array().unwrap();
+        assert_eq!(data.len(), 1);
+
+        let entry = &data[0];
+        assert_eq!(entry["node"], "node1");
+        assert_eq!(entry["user"], "root");
+        assert_eq!(entry["tag"], "cluster");
+    }
+
+    #[test]
+    fn test_json_dump_with_filter() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+        let _ =
+            buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg1").unwrap());
+        let _ =
+            buffer.add_entry(&LogEntry::pack("node1", "admin", "tag", 0, 1001, 6, "msg2").unwrap());
+        let _ =
+            buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "msg3").unwrap());
+
+        let json = buffer.dump_json(Some("root"), 50);
+
+        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
+        let data = parsed["data"].as_array().unwrap();
+
+        assert_eq!(data.len(), 2);
+        for entry in data {
+            assert_eq!(entry["user"], "root");
+        }
+    }
+
+    #[test]
+    fn test_json_dump_max_entries() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+        for i in 0..10 {
+            let _ = buffer
+                .add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000 + i, 6, "msg").unwrap());
+        }
+
+        let json = buffer.dump_json(None, 5);
+
+        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
+        let data = parsed["data"].as_array().unwrap();
+
+        assert_eq!(data.len(), 5);
+    }
+
+    #[test]
+    fn test_iterator() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+        let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap());
+        let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "b").unwrap());
+        let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "c").unwrap());
+
+        let messages: Vec<String> = buffer.iter().map(|e| e.message).collect();
+
+        // Newest first (iter walks from cpos backwards via prev)
+        assert_eq!(messages, vec!["c", "b", "a"]);
+    }
+
+    #[test]
+    fn test_binary_serialization_roundtrip() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+
+        let _ = buffer.add_entry(
+            &LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "Entry 1").unwrap(),
+        );
+        let _ = buffer.add_entry(
+            &LogEntry::pack("node2", "admin", "system", 456, 1001, 5, "Entry 2").unwrap(),
+        );
+
+        let binary = buffer.serialize_binary();
+        let deserialized = RingBuffer::deserialize_binary(&binary).unwrap();
+
+        assert_eq!(deserialized.len(), buffer.len());
+
+        let orig_entries: Vec<_> = buffer.iter().collect();
+        let deser_entries: Vec<_> = deserialized.iter().collect();
+
+        for (orig, deser) in orig_entries.iter().zip(deser_entries.iter()) {
+            assert_eq!(deser.uid, orig.uid);
+            assert_eq!(deser.time, orig.time);
+            assert_eq!(deser.node, orig.node);
+            assert_eq!(deser.message, orig.message);
+        }
+    }
+
+    #[test]
+    fn test_binary_format_header() {
+        let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+        let _ = buffer.add_entry(&LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap());
+
+        let binary = buffer.serialize_binary();
+
+        assert!(binary.len() >= 8);
+
+        let size = u32::from_le_bytes(binary[0..4].try_into().unwrap()) as usize;
+        let cpos = u32::from_le_bytes(binary[4..8].try_into().unwrap());
+
+        assert_eq!(size, binary.len());
+        assert_eq!(cpos, 8); // First entry at offset 8
+    }
+
+    #[test]
+    fn test_binary_empty_buffer() {
+        let buffer = RingBuffer::new(CLOG_DEFAULT_SIZE);
+        let binary = buffer.serialize_binary();
+
+        assert_eq!(binary.len(), CLOG_DEFAULT_SIZE);
+
+        let size = u32::from_le_bytes(binary[0..4].try_into().unwrap()) as usize;
+        let cpos = u32::from_le_bytes(binary[4..8].try_into().unwrap());
+
+        assert_eq!(size, CLOG_DEFAULT_SIZE);
+        assert_eq!(cpos, 0); // Empty buffer has cpos = 0
+
+        let deserialized = RingBuffer::deserialize_binary(&binary).unwrap();
+        assert_eq!(deserialized.len(), 0);
+        assert_eq!(deserialized.capacity(), CLOG_DEFAULT_SIZE);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs b/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
new file mode 100644
index 000000000..17a11f9c4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs
@@ -0,0 +1,611 @@
+//! Binary compatibility tests for pmxcfs-logger
+//!
+//! These tests verify that the Rust implementation can correctly
+//! serialize/deserialize binary data in a format compatible with
+//! the C implementation.
+//!
+//! ## Real C Fixture Tests
+//!
+//! The `test_c_fixture_*` tests use blobs generated by the C implementation
+//! (via `clog_pack` + `clog_copy` + `clog_new`).  To regenerate:
+//!
+//! ```sh
+//! cd src/pmxcfs
+//! gcc -o /tmp/gen_fixtures /tmp/gen_fixtures.c \
+//!     -I. $(pkg-config --cflags glib-2.0) \
+//!     libpmxcfs.a $(pkg-config --libs glib-2.0) -lpthread
+//! /tmp/gen_fixtures tests/fixtures
+//! ```
+//!
+//! The generator source lives at `src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c`.
+
+use pmxcfs_logger::{ClusterLog, LogEntry, RingBuffer};
+
+/// Test deserializing a minimal C-compatible binary blob
+///
+/// This test uses a hand-crafted binary blob that matches C's clog_base_t format:
+/// - 8-byte header (size + cpos)
+/// - Single entry at offset 8
+#[test]
+fn test_deserialize_minimal_c_blob() {
+    // Create a minimal valid C binary blob
+    // Header: size=8+entry_size, cpos=8 (points to first entry)
+    // Entry: minimal valid entry with all required fields
+
+    let entry = LogEntry::pack("node1", "root", "test", 123, 1000, 6, "msg").unwrap();
+    let entry_bytes = entry.serialize_binary(0, 0); // prev=0 (end), next=0
+    let entry_size = entry_bytes.len();
+
+    // Allocate buffer with capacity for header + entry
+    let total_size = 8 + entry_size;
+    let mut blob = vec![0u8; total_size];
+
+    // Write header
+    blob[0..4].copy_from_slice(&(total_size as u32).to_le_bytes()); // size
+    blob[4..8].copy_from_slice(&8u32.to_le_bytes()); // cpos = 8
+
+    // Write entry
+    blob[8..8 + entry_size].copy_from_slice(&entry_bytes);
+
+    // Deserialize
+    let buffer = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+
+    // Verify
+    assert_eq!(buffer.len(), 1, "Should have 1 entry");
+    let entries: Vec<_> = buffer.iter().collect();
+    assert_eq!(entries[0].node, "node1");
+    assert_eq!(entries[0].message, "msg");
+}
+
+/// Test round-trip: Rust serialize -> deserialize
+///
+/// Verifies that Rust can serialize and deserialize its own format
+#[test]
+fn test_roundtrip_single_entry() {
+    let mut buffer = RingBuffer::new(8192 * 16);
+
+    let entry = LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "Test message").unwrap();
+    buffer.add_entry(&entry).unwrap();
+
+    // Serialize
+    let blob = buffer.serialize_binary();
+
+    // Verify header
+    let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
+    let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+
+    assert_eq!(size, blob.len(), "Size should match blob length");
+    assert_eq!(cpos, 8, "First entry should be at offset 8");
+
+    // Deserialize
+    let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+
+    // Verify
+    assert_eq!(deserialized.len(), 1);
+    let entries: Vec<_> = deserialized.iter().collect();
+    assert_eq!(entries[0].node, "node1");
+    assert_eq!(entries[0].ident, "root");
+    assert_eq!(entries[0].message, "Test message");
+}
+
+/// Test round-trip with multiple entries
+///
+/// Verifies linked list structure (prev/next pointers)
+#[test]
+fn test_roundtrip_multiple_entries() {
+    let mut buffer = RingBuffer::new(8192 * 16);
+
+    // Add 3 entries
+    for i in 0..3 {
+        let entry = LogEntry::pack(
+            "node1",
+            "root",
+            "test",
+            100 + i,
+            1000 + i,
+            6,
+            &format!("Message {}", i),
+        )
+        .unwrap();
+        buffer.add_entry(&entry).unwrap();
+    }
+
+    // Serialize
+    let blob = buffer.serialize_binary();
+
+    // Deserialize
+    let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+
+    // Verify all entries preserved
+    assert_eq!(deserialized.len(), 3);
+
+    let entries: Vec<_> = deserialized.iter().collect();
+    // Entries are stored newest-first
+    assert_eq!(entries[0].message, "Message 2"); // Newest
+    assert_eq!(entries[1].message, "Message 1");
+    assert_eq!(entries[2].message, "Message 0"); // Oldest
+}
+
+/// Test empty buffer serialization
+///
+/// C returns a buffer with size and cpos=0 for empty buffers
+#[test]
+fn test_empty_buffer_format() {
+    let buffer = RingBuffer::new(8192 * 16);
+
+    // Serialize empty buffer
+    let blob = buffer.serialize_binary();
+
+    // Verify format
+    assert_eq!(blob.len(), 8192 * 16, "Should be full capacity");
+
+    let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
+    let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+
+    assert_eq!(size, 8192 * 16, "Size should match capacity");
+    assert_eq!(cpos, 0, "Empty buffer should have cpos=0");
+
+    // Deserialize
+    let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+    assert_eq!(deserialized.len(), 0, "Should be empty");
+}
+
+/// Test entry alignment (8-byte boundaries)
+///
+/// C uses ((size + 7) & ~7) for alignment
+#[test]
+fn test_entry_alignment() {
+    let entry = LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap();
+
+    let aligned_size = entry.aligned_size();
+
+    // Should be multiple of 8
+    assert_eq!(aligned_size % 8, 0, "Aligned size should be multiple of 8");
+
+    // Should be >= actual size
+    assert!(aligned_size >= entry.size());
+
+    // Should be within 7 bytes of actual size
+    assert!(aligned_size - entry.size() < 8);
+}
+
+/// Test string length capping (prevents u8 overflow)
+///
+/// node_len, ident_len, tag_len are u8 and must cap at 255
+#[test]
+fn test_string_length_capping() {
+    // Create entry with very long strings
+    let long_node = "a".repeat(300);
+    let long_ident = "b".repeat(300);
+    let long_tag = "c".repeat(300);
+
+    let entry = LogEntry::pack(&long_node, &long_ident, &long_tag, 1, 1000, 6, "msg").unwrap();
+
+    // Serialize
+    let blob = entry.serialize_binary(0, 0);
+
+    // Check length fields at correct offsets in clog_entry_t:
+    // prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) + pid(4) + priority(1) = 37
+    // node_len=37, ident_len=38, tag_len=39
+    let node_len = blob[37];
+    let ident_len = blob[38];
+    let tag_len = blob[39];
+
+    // All should be capped at 255 (254 bytes + 1 null terminator)
+    assert_eq!(node_len, 255, "node_len should be capped at 255");
+    assert_eq!(ident_len, 255, "ident_len should be capped at 255");
+    assert_eq!(tag_len, 255, "tag_len should be capped at 255");
+}
+
+/// Test ClusterLog state serialization
+///
+/// Verifies get_state() returns C-compatible format
+#[test]
+fn test_cluster_log_state_format() {
+    let log = ClusterLog::new();
+
+    // Add some entries
+    log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1")
+        .unwrap();
+    log.add("node2", "admin", "system", 456, 6, 1001, "Entry 2")
+        .unwrap();
+
+    // Get state
+    let state = log.get_state().expect("Should serialize");
+
+    // Verify header format
+    assert!(state.len() >= 8, "Should have at least header");
+
+    let size = u32::from_le_bytes(state[0..4].try_into().unwrap()) as usize;
+    let cpos = u32::from_le_bytes(state[4..8].try_into().unwrap()) as usize;
+
+    assert_eq!(size, state.len(), "Size should match blob length");
+    assert!(cpos >= 8, "cpos should point into data section");
+    assert!(cpos < size, "cpos should be within buffer");
+
+    // Deserialize and verify
+    let deserialized = ClusterLog::deserialize_state(&state).expect("Should deserialize");
+    assert_eq!(deserialized.len(), 2, "Should have 2 entries");
+}
+
+/// Test wrap-around detection in deserialization
+///
+/// Verifies that circular buffer wrap-around is handled correctly
+#[test]
+fn test_wraparound_detection() {
+    // Create a buffer with entries
+    let mut buffer = RingBuffer::new(8192 * 16);
+
+    for i in 0..5 {
+        let entry = LogEntry::pack("node1", "root", "test", 100 + i, 1000 + i, 6, "msg").unwrap();
+        buffer.add_entry(&entry).unwrap();
+    }
+
+    // Serialize
+    let blob = buffer.serialize_binary();
+
+    // Deserialize (should handle prev pointers correctly)
+    let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+
+    // Should get all entries
+    assert_eq!(deserialized.len(), 5);
+}
+
+/// Test invalid binary data handling
+///
+/// Verifies that malformed data is rejected
+#[test]
+fn test_invalid_binary_data() {
+    // Too small
+    let too_small = vec![0u8; 4];
+    assert!(RingBuffer::deserialize_binary(&too_small).is_err());
+
+    // Size mismatch
+    let mut size_mismatch = vec![0u8; 100];
+    size_mismatch[0..4].copy_from_slice(&200u32.to_le_bytes()); // Claims 200 bytes
+    assert!(RingBuffer::deserialize_binary(&size_mismatch).is_err());
+
+    // Invalid cpos (beyond buffer)
+    let mut invalid_cpos = vec![0u8; 100];
+    invalid_cpos[0..4].copy_from_slice(&100u32.to_le_bytes()); // size = 100
+    invalid_cpos[4..8].copy_from_slice(&200u32.to_le_bytes()); // cpos = 200 (invalid)
+    assert!(RingBuffer::deserialize_binary(&invalid_cpos).is_err());
+}
+
+/// Test FNV-1a hash consistency
+///
+/// Verifies that node_digest and ident_digest are computed correctly
+#[test]
+fn test_hash_consistency() {
+    let entry1 = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "msg1").unwrap();
+    let entry2 = LogEntry::pack("node1", "root", "test", 2, 1001, 6, "msg2").unwrap();
+    let entry3 = LogEntry::pack("node2", "admin", "test", 3, 1002, 6, "msg3").unwrap();
+
+    // Same node should have same digest
+    assert_eq!(entry1.node_digest, entry2.node_digest);
+
+    // Same ident should have same digest
+    assert_eq!(entry1.ident_digest, entry2.ident_digest);
+
+    // Different node should have different digest
+    assert_ne!(entry1.node_digest, entry3.node_digest);
+
+    // Different ident should have different digest
+    assert_ne!(entry1.ident_digest, entry3.ident_digest);
+}
+
+/// Test priority validation
+///
+/// Priority must be 0-7 (syslog priority)
+#[test]
+fn test_priority_validation() {
+    // Valid priorities (0-7)
+    for pri in 0..=7 {
+        let result = LogEntry::pack("node1", "root", "test", 1, 1000, pri, "msg");
+        assert!(result.is_ok(), "Priority {} should be valid", pri);
+    }
+
+    // Invalid priority (8+)
+    let result = LogEntry::pack("node1", "root", "test", 1, 1000, 8, "msg");
+    assert!(result.is_err(), "Priority 8 should be invalid");
+}
+
+/// Test UTF-8 to ASCII conversion
+///
+/// Verifies control character and Unicode escaping (matches C implementation)
+#[test]
+fn test_utf8_escaping() {
+    // Control characters (C format: #XXX with 3 decimal digits)
+    let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Hello\x07World").unwrap();
+    assert!(
+        entry.message.contains("#007"),
+        "BEL should be escaped as #007"
+    );
+
+    // Unicode characters
+    let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Hello 世界").unwrap();
+    assert!(
+        entry.message.contains("\\u4e16"),
+        "世 should be escaped as \\u4e16"
+    );
+    assert!(
+        entry.message.contains("\\u754c"),
+        "界 should be escaped as \\u754c"
+    );
+
+    // Mixed content
+    let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Test\x01\n世").unwrap();
+    assert!(entry.message.contains("#001"), "SOH should be escaped");
+    assert!(entry.message.contains("#010"), "LF should be escaped");
+    assert!(
+        entry.message.contains("\\u4e16"),
+        "Unicode should be escaped"
+    );
+}
+
+/// Test that multi-entry serialization produces C-compatible prev pointer direction.
+///
+/// In C's format (produced by clog_sort + clog_alloc_entry):
+/// - Entries are laid out oldest-first in memory (oldest at offset 8).
+/// - `cpos` points to the newest (highest-offset) entry.
+/// - Each entry's `prev` points to the previous (older, lower-offset) entry.
+/// - The oldest entry has `prev = 0`.
+///
+/// This layout means that when C's deserialization loop starts at `cpos` and
+/// follows `prev` pointers, it sees `prev < current_pos` at every step, which
+/// keeps it inside the C guard condition.  The previous (buggy) Rust layout
+/// stored newest-first so `prev > cpos`, causing C's guard to exit after one
+/// entry.
+#[test]
+fn test_multi_entry_cpos_and_prev_direction() {
+    use pmxcfs_logger::LogEntry;
+
+    let mut buffer = RingBuffer::new(8192 * 16);
+
+    // Add two entries: "old" (time=1000) then "new" (time=1001)
+    let old_entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "old").unwrap();
+    let new_entry = LogEntry::pack("node1", "root", "test", 2, 1001, 6, "new").unwrap();
+    buffer.add_entry(&old_entry).unwrap();
+    buffer.add_entry(&new_entry).unwrap();
+
+    let blob = buffer.serialize_binary();
+
+    // Parse the header
+    let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
+    let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+
+    assert_eq!(size, blob.len());
+
+    // cpos must point PAST offset 8 (i.e., the newest entry is not at offset 8
+    // when there are two entries; the oldest is at offset 8 instead).
+    assert!(
+        cpos > 8,
+        "cpos ({cpos}) should be > 8 when entries are stored oldest-first"
+    );
+    assert!(cpos < size, "cpos must be within buffer bounds");
+
+    // Parse the newest entry at cpos and check its prev pointer.
+    // The newest entry's prev must point BACK (lower offset) to the older entry.
+    let newest_at_cpos = &blob[cpos..];
+    // prev is the first u32 in the entry
+    let prev_of_newest = u32::from_le_bytes(newest_at_cpos[0..4].try_into().unwrap()) as usize;
+
+    assert_eq!(
+        prev_of_newest, 8,
+        "newest entry's prev ({prev_of_newest}) should point to oldest entry at offset 8"
+    );
+
+    // Parse the oldest entry at offset 8 and check its prev is 0 (no predecessor).
+    let oldest_at_8 = &blob[8..];
+    let prev_of_oldest = u32::from_le_bytes(oldest_at_8[0..4].try_into().unwrap()) as usize;
+
+    assert_eq!(
+        prev_of_oldest, 0,
+        "oldest entry's prev ({prev_of_oldest}) should be 0 (no predecessor)"
+    );
+
+    // Finally, round-trip deserialization must recover both entries in newest-first order.
+    let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize");
+    assert_eq!(deserialized.len(), 2);
+    let entries: Vec<_> = deserialized.iter().collect();
+    assert_eq!(
+        entries[0].message, "new",
+        "First deserialized entry should be newest"
+    );
+    assert_eq!(
+        entries[1].message, "old",
+        "Second deserialized entry should be oldest"
+    );
+}
+
+// ============================================================================
+// Real C Fixture Tests
+//
+// Blobs generated by gen_fixtures.c using the real C logger (clog_pack +
+// clog_copy).  These test that Rust correctly parses actual C output, not
+// just a format we happened to write the same way.
+// ============================================================================
+
+/// C fixture: single entry, cpos == 8.
+///
+/// Verifies the happy path: one entry at the expected offset with all fields
+/// matching the values supplied to clog_pack.
+#[test]
+fn test_c_fixture_single_entry() {
+    let blob = include_bytes!("fixtures/single_entry.bin");
+
+    let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize;
+    let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+
+    assert_eq!(size, blob.len(), "header size matches blob length");
+    assert_eq!(cpos, 8, "single entry starts at offset 8");
+
+    let buffer =
+        RingBuffer::deserialize_binary(blob).expect("C single-entry blob should deserialize");
+    assert_eq!(buffer.len(), 1);
+
+    let entries: Vec<_> = buffer.iter().collect();
+    assert_eq!(entries[0].node, "node1");
+    assert_eq!(entries[0].ident, "root");
+    assert_eq!(entries[0].tag, "cluster");
+    assert_eq!(entries[0].pid, 123);
+    assert_eq!(entries[0].time, 1000);
+    assert_eq!(entries[0].priority, 6);
+    assert_eq!(entries[0].message, "Hello from C");
+}
+
+/// C fixture: three entries, cpos > 8.
+///
+/// Verifies multi-entry linked-list traversal with a cpos that is not the
+/// first possible offset.  Entries must arrive in newest-first order.
+#[test]
+fn test_c_fixture_multi_entry() {
+    let blob = include_bytes!("fixtures/multi_entry.bin");
+
+    let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
+    assert!(
+        cpos > 8,
+        "multi-entry fixture must have cpos > 8 (got {cpos})"
+    );
+
+    let buffer =
+        RingBuffer::deserialize_binary(blob).expect("C multi-entry blob should deserialize");
+    assert_eq!(buffer.len(), 3, "all three C entries should be recovered");
+
+    // Entries must arrive newest-first (C sorts by time descending from cpos)
+    let entries: Vec<_> = buffer.iter().collect();
+    assert_eq!(entries[0].message, "Entry three"); // time 1002 — newest
+    assert_eq!(entries[0].node, "node1");
+    assert_eq!(entries[1].message, "Entry two"); // time 1001
+    assert_eq!(entries[1].node, "node2");
+    assert_eq!(entries[2].message, "Entry one"); // time 1000 — oldest
+    assert_eq!(entries[2].node, "node1");
+}
+
+/// C fixture: non-ASCII message content.
+///
+/// C's utf8_to_ascii encodes:
+///   BEL (0x07) → #007
+///   世 (U+4E16) → \u4e16
+///   界 (U+754C) → \u754c
+///   " (double-quote) → \"
+///
+/// These escape sequences are stored verbatim in the binary entry.
+/// Rust must recover them unchanged.
+#[test]
+fn test_c_fixture_nonascii() {
+    let blob = include_bytes!("fixtures/nonascii.bin");
+
+    let buffer = RingBuffer::deserialize_binary(blob).expect("C nonascii blob should deserialize");
+    assert_eq!(buffer.len(), 1);
+
+    let entries: Vec<_> = buffer.iter().collect();
+    let msg = &entries[0].message;
+
+    // Must be pure ASCII after C's escaping
+    assert!(msg.is_ascii(), "message must be ASCII after C escaping");
+
+    // Verify each escape sequence the C implementation produces
+    assert!(msg.contains("#007"), "BEL must be escaped as #007");
+    assert!(msg.contains("\\u4e16"), "世 must be escaped as \\u4e16");
+    assert!(msg.contains("\\u754c"), "界 must be escaped as \\u754c");
+    assert!(msg.contains("\\\""), "quote must be escaped as \\\"");
+}
+
+/// C fixture: ring-buffer overflow.
+///
+/// 20 large (~3 KiB) entries were added to a minimum-size (40 960-byte)
+/// buffer.  Only the most recent entries fit; older ones were evicted by the
+/// ring.  Verifies that Rust handles the truncated ring without panic or data
+/// corruption, and that it recovers fewer entries than were originally added.
+#[test]
+fn test_c_fixture_overflow() {
+    let blob = include_bytes!("fixtures/overflow.bin");
+
+    let buffer = RingBuffer::deserialize_binary(blob).expect("C overflow blob should deserialize");
+
+    // Fewer than 20 entries must have survived the eviction
+    assert!(
+        buffer.len() > 0,
+        "at least some entries must survive ring overflow"
+    );
+    assert!(
+        buffer.len() < 20,
+        "older entries must have been evicted (got {})",
+        buffer.len()
+    );
+
+    // The surviving entries should be the most recent ones (highest time values).
+    // time runs from 3000+0 to 3000+19 = 3019.  Oldest surviving entry must
+    // have a time greater than 3000 (i.e. not the very first entries added).
+    let entries: Vec<_> = buffer.iter().collect();
+    let oldest_time = entries.iter().map(|e| e.time).min().unwrap();
+    assert!(
+        oldest_time > 3000,
+        "oldest surviving entry (time={oldest_time}) should not be the very first one added"
+    );
+}
+
+/// C fixture: JSON output for single-entry log matches C's clog_dump_json.
+///
+/// Deserializes the binary fixture, calls Rust's dump_json, then compares
+/// the logical content against the JSON file produced by clog_dump_json.
+/// Field order and whitespace may differ; we compare parsed values.
+#[test]
+fn test_c_fixture_json_matches_c_output() {
+    let bin = include_bytes!("fixtures/single_entry.bin");
+    let c_json = include_str!("fixtures/single_entry.json");
+
+    let buffer = RingBuffer::deserialize_binary(bin).expect("should deserialize");
+    let rust_json = buffer.dump_json(None, 50);
+
+    let c_val: serde_json::Value = serde_json::from_str(c_json).expect("C JSON valid");
+    let rust_val: serde_json::Value = serde_json::from_str(&rust_json).expect("Rust JSON valid");
+
+    let c_entries = c_val["data"].as_array().expect("C has data array");
+    let rust_entries = rust_val["data"].as_array().expect("Rust has data array");
+
+    assert_eq!(
+        rust_entries.len(),
+        c_entries.len(),
+        "entry count must match"
+    );
+
+    // Compare every field that C emits
+    for (r, c) in rust_entries.iter().zip(c_entries.iter()) {
+        assert_eq!(r["uid"], c["uid"], "uid mismatch");
+        assert_eq!(r["time"], c["time"], "time mismatch");
+        assert_eq!(r["pri"], c["pri"], "priority mismatch");
+        assert_eq!(r["tag"], c["tag"], "tag mismatch");
+        assert_eq!(r["pid"], c["pid"], "pid mismatch");
+        assert_eq!(r["node"], c["node"], "node mismatch");
+        assert_eq!(r["user"], c["user"], "user/ident mismatch");
+        assert_eq!(r["msg"], c["msg"], "message mismatch");
+    }
+}
+
+/// C fixture: JSON output for non-ASCII content matches C's clog_dump_json.
+///
+/// The C-generated JSON contains C's exact escape sequences (#007, \uXXXX,
+/// \").  Rust must reproduce the same msg string when dumping JSON from the
+/// same binary data.
+#[test]
+fn test_c_fixture_nonascii_json_matches_c_output() {
+    let bin = include_bytes!("fixtures/nonascii.bin");
+    let c_json = include_str!("fixtures/nonascii.json");
+
+    let buffer = RingBuffer::deserialize_binary(bin).expect("should deserialize");
+    let rust_json = buffer.dump_json(None, 50);
+
+    let c_val: serde_json::Value = serde_json::from_str(c_json).expect("C JSON valid");
+    let rust_val: serde_json::Value = serde_json::from_str(&rust_json).expect("Rust JSON valid");
+
+    let c_msg = c_val["data"][0]["msg"].as_str().expect("C has msg");
+    let rust_msg = rust_val["data"][0]["msg"].as_str().expect("Rust has msg");
+
+    // The msg field must be identical — same C escape sequences, same quote escaping
+    assert_eq!(
+        rust_msg, c_msg,
+        "non-ASCII message must match C's clog_dump_json output"
+    );
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c
new file mode 100644
index 000000000..021edd2b0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/gen_fixtures.c
@@ -0,0 +1,144 @@
+/*
+ * C fixture generator for pmxcfs-logger binary compatibility tests.
+ *
+ * Uses clog_pack + clog_copy for full control over node/time values.
+ *
+ * Compile from src/pmxcfs/:
+ *   gcc -o /tmp/gen_fixtures /tmp/gen_fixtures.c \
+ *       -I. $(pkg-config --cflags glib-2.0) \
+ *       libpmxcfs.a $(pkg-config --libs glib-2.0) -lpthread
+ *
+ * Outputs (written to outdir argument, default "."):
+ *   single_entry.bin   — one entry; cpos == 8
+ *   multi_entry.bin    — three entries, cpos > 8
+ *   nonascii.bin       — BEL + UTF-8 CJK + quoted string
+ *   overflow.bin       — 4 KiB buffer, not all 40 entries fit
+ *   single_entry.json  — clog_dump_json for single-entry clog
+ *   nonascii.json      — clog_dump_json showing C's escaping
+ */
+
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <glib.h>
+
+#include "cfs-utils.h"
+#include "logger.h"
+
+/* Stub for the global cfs_t used by cfs_log() in cfs-utils.c */
+cfs_t cfs = {
+    .nodename = "fixture-generator",
+    .ip       = "127.0.0.1",
+    .gid      = 0,
+    .debug    = 0,
+};
+
+/* Stub for qb_log_from_external_source (from corosync libqb) */
+void qb_log_from_external_source(
+    const char *function, const char *filename,
+    const char *format, uint8_t priority, uint32_t lineno,
+    uint32_t tags, ...)
+{
+    /* suppress all logging during fixture generation */
+    (void)function; (void)filename; (void)format;
+    (void)priority; (void)lineno;  (void)tags;
+}
+
+static void write_file(const char *path, const void *data, size_t len) {
+    FILE *f = fopen(path, "wb");
+    if (!f) { perror(path); exit(1); }
+    if (fwrite(data, 1, len, f) != len) { perror(path); exit(1); }
+    fclose(f);
+    fprintf(stderr, "Wrote %zu bytes -> %s\n", len, path);
+}
+
+/* Pack one entry and append it to the clog. */
+static void add_entry(
+    clog_base_t *clog,
+    const char *node, const char *ident, const char *tag,
+    uint32_t pid, time_t logtime, uint8_t priority, const char *msg)
+{
+    char buf[CLOG_MAX_ENTRY_SIZE];
+    clog_pack((clog_entry_t *)buf, node, ident, tag, pid, logtime, priority, msg);
+    clog_copy(clog, (clog_entry_t *)buf);
+}
+
+int main(int argc, char **argv) {
+    const char *outdir = argc > 1 ? argv[1] : ".";
+    char path[512];
+    clog_base_t *clog, *sorted;
+    GString *json_str;
+
+    /* ------------------------------------------------------------------ */
+    /* fixture 1: single_entry — one entry, cpos == 8                     */
+    /* ------------------------------------------------------------------ */
+    clog = clog_new(CLOG_DEFAULT_SIZE);
+    add_entry(clog, "node1", "root", "cluster", 123, 1000, 6, "Hello from C");
+    snprintf(path, sizeof(path), "%s/single_entry.bin", outdir);
+    write_file(path, clog, clog_size(clog));
+    /* JSON */
+    sorted = clog_sort(clog);
+    json_str = g_string_new(NULL);
+    clog_dump_json(sorted, json_str, NULL, 50);
+    snprintf(path, sizeof(path), "%s/single_entry.json", outdir);
+    write_file(path, json_str->str, json_str->len);
+    g_string_free(json_str, TRUE);
+    g_free(sorted);
+    g_free(clog);
+
+    /* ------------------------------------------------------------------ */
+    /* fixture 2: multi_entry — three entries, cpos > 8                   */
+    /* ------------------------------------------------------------------ */
+    clog = clog_new(CLOG_DEFAULT_SIZE);
+    add_entry(clog, "node1", "root",  "cluster", 101, 1000, 6, "Entry one");
+    add_entry(clog, "node2", "admin", "cluster", 102, 1001, 6, "Entry two");
+    add_entry(clog, "node1", "root",  "cluster", 103, 1002, 6, "Entry three");
+    snprintf(path, sizeof(path), "%s/multi_entry.bin", outdir);
+    write_file(path, clog, clog_size(clog));
+    g_free(clog);
+
+    /* ------------------------------------------------------------------ */
+    /* fixture 3: nonascii — BEL + UTF-8 CJK + quoted string              */
+    /* C escapes: BEL->#007, 世->\u4e16, 界->\u754c, "->\"               */
+    /* ------------------------------------------------------------------ */
+    clog = clog_new(CLOG_DEFAULT_SIZE);
+    add_entry(clog, "node1", "root", "cluster", 111, 2000, 6,
+              "Hello\x07World \xe4\xb8\x96\xe7\x95\x8c and \"quoted\"");
+    snprintf(path, sizeof(path), "%s/nonascii.bin", outdir);
+    write_file(path, clog, clog_size(clog));
+    /* JSON showing C's escaping */
+    sorted = clog_sort(clog);
+    json_str = g_string_new(NULL);
+    clog_dump_json(sorted, json_str, NULL, 50);
+    snprintf(path, sizeof(path), "%s/nonascii.json", outdir);
+    write_file(path, json_str->str, json_str->len);
+    g_string_free(json_str, TRUE);
+    g_free(sorted);
+    g_free(clog);
+
+    /* ------------------------------------------------------------------ */
+    /* fixture 4: overflow — minimum-size buffer, large entries            */
+    /* clog_new requires size >= CLOG_MAX_ENTRY_SIZE * 10 (= 40960).      */
+    /* Each entry is ~3 KiB so ~13 fit; adding 20 evicts the first 7+.   */
+    /* ------------------------------------------------------------------ */
+    {
+        /* Build a large message (~3000 chars) to consume space quickly */
+        char big_msg[3001];
+        memset(big_msg, 'X', 3000);
+        big_msg[3000] = '\0';
+
+        clog = clog_new(CLOG_MAX_ENTRY_SIZE * 10);
+        for (int i = 0; i < 20; i++) {
+            big_msg[0] = '0' + (i % 10);  /* vary first char for uniqueness */
+            big_msg[1] = 'A' + i;
+            add_entry(clog, "node1", "root", "cluster", 200 + i, 3000 + i, 6, big_msg);
+        }
+        snprintf(path, sizeof(path), "%s/overflow.bin", outdir);
+        write_file(path, clog, clog_size(clog));
+        g_free(clog);
+    }
+
+    return 0;
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/multi_entry.bin b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/multi_entry.bin
new file mode 100644
index 0000000000000000000000000000000000000000..6beabb4dcda1ce92e84c0819c4ebb76330a04ee3
GIT binary patch
literal 131072
zcmeIuJ4ypl6b9f!G(!+uOJTO4TM+H6Y_t?JK0s$+90~5grKEEM0U<WFX)G)ieD#8b
zh<6gO2)cvs!np?ym!EULh)xV+c6L+iq<US5`1-!f$4?*6H#?)%t;flBk>**}?JcEV
zR{dfv>Z)qu;Pm3WDeBlPoBA@Z$|CZO^dh2{s?AMN@s_T^aFUN#;$`%3f4g^8Tpy-+
zmSw+r>#^TIJ1OS^n?V(`ymq(GREw$JQ{Mc3N7KA+Z#ngU_iK*pqWy?NfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk
z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs
z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ
zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U
zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7
z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N
z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+
z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
v5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+z@Gv?lPg7&

literal 0
HcmV?d00001

diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.bin b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.bin
new file mode 100644
index 0000000000000000000000000000000000000000..9502f7335db8448e570793dab7ea121b43afc069
GIT binary patch
literal 131072
zcmeIuF-ikb6a~<SkeE#fLo96)h|Uhg#>&EG8p-@_$j8YHxHut@%F=8hf$YFe#1C%5
z;#_!KxbT`2aS^ll%Uh{QxzEdp@1KuqfBHJ_p5CwSkB7%x_UUvoyD7cNZCXa3^APK9
z+zzRahtzfXda<aVbKkX9Gp^F|t{Lmw)w+7wlwtll<{`Dsy!;CR0t5&UAV7cs0RjXF
z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk
z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs
z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ
zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U
zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7
z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N
z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+
z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk
r1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk|5M->_K6_A

literal 0
HcmV?d00001

diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.json b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.json
new file mode 100644
index 000000000..91dec616c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/nonascii.json
@@ -0,0 +1,5 @@
+{
+"data": [
+{"uid": 5, "time": 2000, "pri": 6, "tag": "cluster", "pid": 111, "node": "node1", "user": "root", "msg": "Hello#007World \u4e16\u754c and \"quoted\""}
+]
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/overflow.bin b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/overflow.bin
new file mode 100644
index 0000000000000000000000000000000000000000..4eeb77352bb6d949a4d3397558cbdb734b58c13b
GIT binary patch
literal 40960
zcmeI)xoXr=6u{xr7%sT)S_l?CK;o7JtR*gSONx{MO&1Kr2tI%hkO#2HYY0L#Zcz)3
z`_d`+26iHzA>hBc{Ob#T1H%kwntQ+M#~Ef~C8A8^b2*>eBKAha)2$J||EwQf`|<1N
z>ePcHXTNVue#rUI(3WyPKfAebV{RlC7Z#S{`uy!%OLL2H;?j%<0RjXF5FkK+009Dx
z3uM)AK1#lnLlOJ(^;xC=Kjyrr|4&|S{GyIWfB*pk1PBlyK!Cu^X8aG@KX?D-Sj7H(
z|Gd)wpZfd%)D@Tn2oNAZfB*pk1PHV)(Di@r{>yO0ft;*Y`u}r(|DV3v`c0jW009C7
z2oNAZfB=E6|8w_W&O{u{$%{(=f9dc4(Q7OT5FkK+009C72oPvppzHtK{SSTzoL>XP
z%S!)$?eG7wsn&1md;|y(AV7cs0RjXFbp4;Z|1uqMI47?v{r|1M|Hr3U5+Fc;009C7
z2oNC9x<J?egZ}`Xjo6-()k^=r?(hH6iPmrGd;|y(AV7cs0RjXFbp4;Z|1ux3BPS0l
z{r{%F|Hsa;BtU=w0RjXF5FkLHb%Cz`XS4tBMC{DTqe}m8^!NYx`POghd;|y(AV7cs
z0RjXFbp4;Z|8h5CS5DR{{r|SV|A#NIBtU=w0RjXF5FkLHb%Cz`bN64CBX;NHai#y?
z_4ohC#nx}?d;|y(AV7cs0RjXFbp4;Z|8g&4Pfngx`u}}@{~w=ZNq_(W0t5&UAV7dX
P>jMA!-x~r1{y%}ghvsR-

literal 0
HcmV?d00001

diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.bin b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.bin
new file mode 100644
index 0000000000000000000000000000000000000000..248c0de3108c6566fcef182ef0a843bdc0bcc472
GIT binary patch
literal 131072
zcmeIuK?*@(6b0amyi6=Xy)9USl8KRtInU_v*Tcxxlrl241xhwxCi%-M)OYH3>U2k6
zL_2!%%RE;r-?J0({#?rQ{q;D_j)U>-Iz8mQD7w9V?oC=&!)Q|4#iHJCcU2RUs;*PH
zYSOwK<qsi1fB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk
z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs
z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ
zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U
zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7
z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N
z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+
z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF
K5FkL{9|gX3I2TI*

literal 0
HcmV?d00001

diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.json b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.json
new file mode 100644
index 000000000..c5ac7b8d7
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/fixtures/single_entry.json
@@ -0,0 +1,5 @@
+{
+"data": [
+{"uid": 1, "time": 1000, "pri": 6, "tag": "cluster", "pid": 123, "node": "node1", "user": "root", "msg": "Hello from C"}
+]
+}
diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs b/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs
new file mode 100644
index 000000000..b89084d27
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs
@@ -0,0 +1,286 @@
+//! Performance tests for pmxcfs-logger
+//!
+//! These tests verify that the logger implementation scales properly
+//! and handles large log merges efficiently.
+
+use pmxcfs_logger::ClusterLog;
+
+/// Test merging large logs from multiple nodes
+///
+/// This test verifies:
+/// 1. Large log merge performance (multiple nodes with many entries)
+/// 2. Memory usage stays bounded
+/// 3. Deduplication works correctly at scale
+#[test]
+fn test_large_log_merge_performance() {
+    // Create 3 nodes with large logs
+    let node1 = ClusterLog::new();
+    let node2 = ClusterLog::new();
+    let node3 = ClusterLog::new();
+
+    // Add 1000 entries per node (3000 total)
+    for i in 0..1000 {
+        let _ = node1.add(
+            "node1",
+            "root",
+            "cluster",
+            1000 + i,
+            6,
+            1000000 + i,
+            &format!("Node1 entry {}", i),
+        );
+        let _ = node2.add(
+            "node2",
+            "admin",
+            "system",
+            2000 + i,
+            6,
+            1000000 + i,
+            &format!("Node2 entry {}", i),
+        );
+        let _ = node3.add(
+            "node3",
+            "user",
+            "service",
+            3000 + i,
+            6,
+            1000000 + i,
+            &format!("Node3 entry {}", i),
+        );
+    }
+
+    // Get remote buffers
+    let node2_buffer = node2.get_buffer();
+    let node3_buffer = node3.get_buffer();
+
+    // Merge all logs into node1
+    let start = std::time::Instant::now();
+    node1
+        .merge(vec![node2_buffer, node3_buffer], true)
+        .expect("Merge should succeed");
+    let duration = start.elapsed();
+
+    // Verify merge completed
+    let merged_count = node1.len();
+
+    // Should have merged entries (may be less than 3000 due to capacity limits)
+    assert!(
+        merged_count > 0,
+        "Should have some entries after merge (got {})",
+        merged_count
+    );
+
+    // Performance check: merge should complete in reasonable time
+    // For 3000 entries, should be well under 1 second
+    assert!(
+        duration.as_millis() < 1000,
+        "Large merge took too long: {:?}",
+        duration
+    );
+
+    println!(
+        "[OK] Merged 3000 entries from 3 nodes in {:?} (result: {} entries)",
+        duration, merged_count
+    );
+}
+
+/// Test deduplication performance with high duplicate rate
+///
+/// This test verifies that deduplication works efficiently when
+/// many duplicate entries are present.
+#[test]
+fn test_deduplication_performance() {
+    let log = ClusterLog::new();
+
+    // Add 500 entries from same node with overlapping times
+    // This creates many potential duplicates
+    for i in 0..500 {
+        let _ = log.add(
+            "node1",
+            "root",
+            "cluster",
+            1000 + i,
+            6,
+            1000 + (i / 10), // Reuse timestamps (50 unique times)
+            &format!("Entry {}", i),
+        );
+    }
+
+    // Create remote log with overlapping entries
+    let remote = ClusterLog::new();
+    for i in 0..500 {
+        let _ = remote.add(
+            "node1",
+            "root",
+            "cluster",
+            2000 + i,
+            6,
+            1000 + (i / 10), // Same timestamp pattern
+            &format!("Remote entry {}", i),
+        );
+    }
+
+    let remote_buffer = remote.get_buffer();
+
+    // Merge with deduplication
+    let start = std::time::Instant::now();
+    log.merge(vec![remote_buffer], true)
+        .expect("Merge should succeed");
+    let duration = start.elapsed();
+
+    let final_count = log.len();
+
+    // Should have deduplicated some entries
+    assert!(final_count > 0, "Should have entries after deduplication");
+
+    // Performance check
+    assert!(
+        duration.as_millis() < 500,
+        "Deduplication took too long: {:?}",
+        duration
+    );
+
+    println!(
+        "[OK] Deduplicated 1000 entries in {:?} (result: {} entries)",
+        duration, final_count
+    );
+}
+
+/// Test memory usage stays bounded during large operations
+///
+/// This test verifies that the ring buffer properly limits memory
+/// usage even when adding many entries.
+#[test]
+fn test_memory_bounded() {
+    // Create log with default capacity
+    let log = ClusterLog::new();
+
+    // Add many entries (more than capacity)
+    for i in 0..10000 {
+        let _ = log.add(
+            "node1",
+            "root",
+            "cluster",
+            1000 + i,
+            6,
+            1000000 + i,
+            &format!("Entry with some message content {}", i),
+        );
+    }
+
+    let entry_count = log.len();
+    let capacity = log.capacity();
+
+    // Buffer should not grow unbounded
+    // Entry count should be reasonable relative to capacity
+    assert!(
+        entry_count < 10000,
+        "Buffer should not store all 10000 entries (got {})",
+        entry_count
+    );
+
+    // Verify capacity is respected
+    assert!(capacity > 0, "Capacity should be set (got {})", capacity);
+
+    println!(
+        "[OK] Added 10000 entries, buffer contains {} (capacity: {} bytes)",
+        entry_count, capacity
+    );
+}
+
+/// Test JSON export performance with large logs
+///
+/// This test verifies that JSON export scales properly.
+#[test]
+fn test_json_export_performance() {
+    let log = ClusterLog::new();
+
+    // Add 1000 entries
+    for i in 0..1000 {
+        let _ = log.add(
+            "node1",
+            "root",
+            "cluster",
+            1000 + i,
+            6,
+            1000000 + i,
+            &format!("Test message {}", i),
+        );
+    }
+
+    // Export to JSON
+    let start = std::time::Instant::now();
+    let json = log.dump_json(None, 1000);
+    let duration = start.elapsed();
+
+    // Verify JSON is valid
+    let parsed: serde_json::Value = serde_json::from_str(&json).expect("Should be valid JSON");
+    let data = parsed["data"].as_array().expect("Should have data array");
+
+    assert!(!data.is_empty(), "Should have entries in JSON");
+
+    // Performance check
+    assert!(
+        duration.as_millis() < 500,
+        "JSON export took too long: {:?}",
+        duration
+    );
+
+    println!(
+        "[OK] Exported {} entries to JSON in {:?}",
+        data.len(),
+        duration
+    );
+}
+
+/// Test binary serialization performance
+///
+/// This test verifies that binary serialization/deserialization
+/// is efficient for large buffers.
+#[test]
+fn test_binary_serialization_performance() {
+    let log = ClusterLog::new();
+
+    // Add 500 entries
+    for i in 0..500 {
+        let _ = log.add(
+            "node1",
+            "root",
+            "cluster",
+            1000 + i,
+            6,
+            1000000 + i,
+            &format!("Entry {}", i),
+        );
+    }
+
+    // Serialize
+    let start = std::time::Instant::now();
+    let state = log.get_state().expect("Should serialize");
+    let serialize_duration = start.elapsed();
+
+    // Deserialize
+    let start = std::time::Instant::now();
+    let deserialized = ClusterLog::deserialize_state(&state).expect("Should deserialize");
+    let deserialize_duration = start.elapsed();
+
+    // Verify round-trip
+    assert_eq!(deserialized.len(), 500, "Should preserve entry count");
+
+    // Performance checks
+    assert!(
+        serialize_duration.as_millis() < 200,
+        "Serialization took too long: {:?}",
+        serialize_duration
+    );
+    assert!(
+        deserialize_duration.as_millis() < 200,
+        "Deserialization took too long: {:?}",
+        deserialize_duration
+    );
+
+    println!(
+        "[OK] Serialized 500 entries in {:?}, deserialized in {:?}",
+        serialize_duration, deserialize_duration
+    );
+}
-- 
2.47.3





  parent reply	other threads:[~2026-03-23 13:00 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-03-23 11:32 [PATCH pve-cluster v3 00/13] Rewrite pmxcfs with Rust Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 01/13] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 02/13] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-03-23 11:32 ` Kefu Chai [this message]
2026-03-23 11:32 ` [PATCH pve-cluster v3 04/13] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 05/13] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-03-23 11:32 ` SPAM: [PATCH pve-cluster v3 06/13] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 07/13] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 08/13] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 09/13] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 10/13] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 11/13] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 13/13] pmxcfs-rs: add project documentation Kefu Chai

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260323113239.942866-4-k.chai@proxmox.com \
    --to=k.chai@proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    --cc=tchaikov@gmail.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal