From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id AFB851FF144 for ; Tue, 24 Feb 2026 17:48:28 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id AE98D120FA; Tue, 24 Feb 2026 17:49:19 +0100 (CET) Message-ID: Date: Tue, 24 Feb 2026 17:17:42 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Subject: Re: [PATCH pve-cluster 04/14 v2] pmxcfs-rs: add pmxcfs-logger crate To: Kefu Chai , pve-devel@lists.proxmox.com References: <20260213094119.2379288-1-k.chai@proxmox.com> <20260213094119.2379288-5-k.chai@proxmox.com> Content-Language: en-US From: Samuel Rufinatscha In-Reply-To: <20260213094119.2379288-5-k.chai@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.155 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_ASCII_DIVIDERS 0.8 Email that uses ascii formatting dividers and possible spam tricks KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record X-MailFrom: s.rufinatscha@proxmox.com X-Mailman-Rule-Hits: max-size X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; news-moderation; no-subject; digests; suspicious-header Message-ID-Hash: 6GMGNTD5Y67I6M77P2C2J4N6APTV2HJL X-Message-ID-Hash: 6GMGNTD5Y67I6M77P2C2J4N6APTV2HJL X-Mailman-Approved-At: Tue, 24 Feb 2026 17:49:22 +0100 X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Nice work on v2 and thanks for applying my previous suggestions, Kefu. A few things I’d like to suggest: The binary compat tests should use real C fixtures (include_bytes!) and assert actual contents. Please generate binary blobs via clusterlog_get_state() and clog_dump_json() and check for: * parsed entries * header fields, including at least one multi entry fixture with cpos != 8 * JSON output for non ASCII content * and a test that serializes a buffer where not all entries fit Separately, I’m wondering about the use of VecDeque vs C byte ring abstraction. A benchmark at high log rates would help quantify the serialization/allocation overhead. Also left two small inline comments on the test offsets and a header size comment. On 2/13/26 10:48 AM, Kefu Chai wrote: > Add configuration management crate for pmxcfs: > - Config struct: Runtime configuration (node name, IP, flags) > - Thread-safe debug level mutation via RwLock > - Arc-wrapped for shared ownership across components > - Comprehensive unit tests including thread safety tests > > This crate provides the foundational configuration structure used > by all pmxcfs components. The Config is designed to be shared via > Arc to allow multiple components to access the same configuration > instance, with mutable debug level for runtime adjustments. > > Signed-off-by: Kefu Chai > --- > src/pmxcfs-rs/Cargo.toml | 2 + > src/pmxcfs-rs/pmxcfs-logger/Cargo.toml | 15 + > src/pmxcfs-rs/pmxcfs-logger/README.md | 58 ++ > .../pmxcfs-logger/src/cluster_log.rs | 615 ++++++++++++++++ > src/pmxcfs-rs/pmxcfs-logger/src/entry.rs | 694 ++++++++++++++++++ > src/pmxcfs-rs/pmxcfs-logger/src/hash.rs | 176 +++++ > src/pmxcfs-rs/pmxcfs-logger/src/lib.rs | 27 + > .../pmxcfs-logger/src/ring_buffer.rs | 628 ++++++++++++++++ > .../tests/binary_compatibility_tests.rs | 315 ++++++++ > .../pmxcfs-logger/tests/performance_tests.rs | 294 ++++++++ > 10 files changed, 2824 insertions(+) > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/Cargo.toml > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/README.md > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/entry.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/hash.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/lib.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs > > diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml > index f190968ed..d26fac04c 100644 > --- a/src/pmxcfs-rs/Cargo.toml > +++ b/src/pmxcfs-rs/Cargo.toml > @@ -3,6 +3,7 @@ > members = [ > "pmxcfs-api-types", # Shared types and error definitions > "pmxcfs-config", # Configuration management > + "pmxcfs-logger", # Cluster log with ring buffer and deduplication > ] > resolver = "2" > > @@ -18,6 +19,7 @@ rust-version = "1.85" > # Internal workspace dependencies > pmxcfs-api-types = { path = "pmxcfs-api-types" } > pmxcfs-config = { path = "pmxcfs-config" } > +pmxcfs-logger = { path = "pmxcfs-logger" } > > # Error handling > thiserror = "1.0" > diff --git a/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml b/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml > new file mode 100644 > index 000000000..1af3f015c > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/Cargo.toml > @@ -0,0 +1,15 @@ > +[package] > +name = "pmxcfs-logger" > +version = "0.1.0" > +edition = "2021" > + > +[dependencies] > +anyhow = "1.0" > +parking_lot = "0.12" > +serde = { version = "1.0", features = ["derive"] } > +serde_json = "1.0" > +tracing = "0.1" > + > +[dev-dependencies] > +tempfile = "3.0" > + > diff --git a/src/pmxcfs-rs/pmxcfs-logger/README.md b/src/pmxcfs-rs/pmxcfs-logger/README.md > new file mode 100644 > index 000000000..38f102c27 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/README.md > @@ -0,0 +1,58 @@ > +# pmxcfs-logger > + > +Cluster-wide log management for pmxcfs, fully compatible with the C implementation (logger.c). > + > +## Overview > + > +This crate implements a cluster log system matching Proxmox's C-based logger.c behavior. It provides: > + > +- **Ring Buffer Storage**: Circular buffer for log entries with automatic capacity management > +- **FNV-1a Hashing**: Hashing for node and identity-based deduplication > +- **Deduplication**: Per-node tracking of latest log entries to avoid duplicates > +- **Time-based Sorting**: Chronological ordering of log entries across nodes > +- **Multi-node Merging**: Combining logs from multiple cluster nodes > +- **JSON Export**: Web UI-compatible JSON output matching C format > + > +## Architecture > + > +### Key Components > + > +1. **LogEntry** (`entry.rs`): Individual log entry with automatic UID generation > +2. **RingBuffer** (`ring_buffer.rs`): Circular buffer with capacity management > +3. **ClusterLog** (`lib.rs`): Main API with deduplication and merging > +4. **Hash Functions** (`hash.rs`): FNV-1a implementation matching C > + > +## C to Rust Mapping > + > +| C Function | Rust Equivalent | Location | > +|------------|-----------------|----------| > +| `fnv_64a_buf` | `hash::fnv_64a` | hash.rs | > +| `clog_pack` | `LogEntry::pack` | entry.rs | > +| `clog_copy` | `RingBuffer::add_entry` | ring_buffer.rs | > +| `clog_sort` | `RingBuffer::sort` | ring_buffer.rs | > +| `clog_dump_json` | `RingBuffer::dump_json` | ring_buffer.rs | > +| `clusterlog_insert` | `ClusterLog::insert` | lib.rs | > +| `clusterlog_add` | `ClusterLog::add` | lib.rs | > +| `clusterlog_merge` | `ClusterLog::merge` | lib.rs | > +| `dedup_lookup` | `ClusterLog::dedup_lookup` | lib.rs | > + > +## Key Differences from C > + > +1. **No `node_digest` in DedupEntry**: C stores `node_digest` both as HashMap key and in the struct. Rust only uses it as the key, saving 8 bytes per entry. > + > +2. **Mutex granularity**: C uses a single global mutex. Rust uses separate Arc> for buffer and dedup table, allowing better concurrency. > + > +3. **Code size**: Rust implementation is ~24% the size of C (740 lines vs 3,000+) while maintaining equivalent functionality. > + > +## Integration > + > +This crate is integrated into `pmxcfs-status` to provide cluster log functionality. The `.clusterlog` FUSE plugin uses this to provide JSON log output compatible with the Proxmox web UI. > + > +## References > + > +### C Implementation > +- `src/pmxcfs/logger.c` / `logger.h` - Cluster log implementation > + > +### Related Crates > +- **pmxcfs-status**: Integrates ClusterLog for status tracking > +- **pmxcfs**: FUSE plugin exposes cluster log via `.clusterlog` > diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs b/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs > new file mode 100644 > index 000000000..c9d04ee47 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/src/cluster_log.rs > @@ -0,0 +1,615 @@ > +/// Cluster Log Implementation > +/// > +/// This module implements the cluster-wide log system with deduplication > +/// and merging support, matching C's clusterlog_t. > +use crate::entry::LogEntry; > +use crate::ring_buffer::{RingBuffer, CLOG_DEFAULT_SIZE}; > +use anyhow::Result; > +use parking_lot::Mutex; > +use std::collections::{BTreeMap, HashMap}; > +use std::sync::Arc; > + > +/// Deduplication entry - tracks the latest UID and time for each node > +/// > +/// Note: C's `dedup_entry_t` includes node_digest field because GHashTable stores > +/// the struct pointer both as key and value. In Rust, we use HashMap > +/// where node_digest is the key, so we don't need to duplicate it in the value. > +/// This is functionally equivalent but more efficient. > +#[derive(Debug, Clone)] > +pub(crate) struct DedupEntry { > + /// Latest UID seen from this node > + pub uid: u32, > + /// Latest timestamp seen from this node > + pub time: u32, > +} > + > +/// Internal state protected by a single mutex > +/// Matches C's clusterlog_t which uses a single mutex for both base and dedup > +struct ClusterLogInner { > + /// Ring buffer for log storage (matches C's cl->base) > + buffer: RingBuffer, > + /// Deduplication tracker (matches C's cl->dedup) > + dedup: HashMap, > +} > + > +/// Cluster-wide log with deduplication and merging support > +/// Matches C's `clusterlog_t` > +/// > +/// Note: Unlike the initial implementation with separate mutexes, we use a single > +/// mutex to match C's semantics and ensure atomic updates of buffer+dedup. > +pub struct ClusterLog { > + /// Inner state protected by a single mutex > + /// Matches C's single g_mutex_t protecting both cl->base and cl->dedup > + inner: Arc>, > +} > + > +impl ClusterLog { > + /// Create a new cluster log with default size > + pub fn new() -> Self { > + Self::with_capacity(CLOG_DEFAULT_SIZE) > + } > + > + /// Create a new cluster log with specified capacity > + pub fn with_capacity(capacity: usize) -> Self { > + Self { > + inner: Arc::new(Mutex::new(ClusterLogInner { > + buffer: RingBuffer::new(capacity), > + dedup: HashMap::new(), > + })), > + } > + } > + > + /// Matches C's `clusterlog_add` function > + #[allow(clippy::too_many_arguments)] > + pub fn add( > + &self, > + node: &str, > + ident: &str, > + tag: &str, > + pid: u32, > + priority: u8, > + time: u32, > + message: &str, > + ) -> Result<()> { > + let entry = LogEntry::pack(node, ident, tag, pid, time, priority, message)?; > + self.insert(&entry) > + } > + > + /// Insert a log entry (with deduplication) > + /// > + /// Matches C's `clusterlog_insert` function > + pub fn insert(&self, entry: &LogEntry) -> Result<()> { > + let mut inner = self.inner.lock(); > + > + // Check deduplication > + if Self::is_not_duplicate(&mut inner.dedup, entry) { > + // Entry is not a duplicate, add it > + inner.buffer.add_entry(entry)?; > + } else { > + tracing::debug!("Ignoring duplicate cluster log entry"); > + } > + > + Ok(()) > + } > + > + /// Check if entry is a duplicate (returns true if NOT a duplicate) > + /// > + /// Matches C's `dedup_lookup` function > + /// > + /// ## Hash Collision Risk > + /// > + /// Uses FNV-1a hash (`node_digest`) as deduplication key. Hash collisions > + /// are theoretically possible but extremely rare in practice: > + /// > + /// - FNV-1a produces 64-bit hashes (2^64 possible values) > + /// - Collision probability with N entries: ~N²/(2 × 2^64) > + /// - For 10,000 log entries: collision probability < 10^-11 > + /// > + /// If a collision occurs, two different log entries (from different nodes > + /// or with different content) will be treated as duplicates, causing one > + /// to be silently dropped. > + /// > + /// This design is inherited from the C implementation for compatibility. > + /// The risk is acceptable because: > + /// 1. Collisions are astronomically rare > + /// 2. Only affects log deduplication, not critical data integrity > + /// 3. Lost log entries don't compromise cluster operation > + /// > + /// Changing this would break wire format compatibility with C nodes. > + fn is_not_duplicate(dedup: &mut HashMap, entry: &LogEntry) -> bool { > + match dedup.get_mut(&entry.node_digest) { > + None => { > + dedup.insert( > + entry.node_digest, > + DedupEntry { > + time: entry.time, > + uid: entry.uid, > + }, > + ); > + true > + } > + Some(dd) => { > + if entry.time > dd.time || (entry.time == dd.time && entry.uid > dd.uid) { > + dd.time = entry.time; > + dd.uid = entry.uid; > + true > + } else { > + false > + } > + } > + } > + } > + > + pub fn get_entries(&self, max: usize) -> Vec { > + let inner = self.inner.lock(); > + inner.buffer.iter().take(max).cloned().collect() > + } > + > + /// Get the current buffer (for testing) > + pub fn get_buffer(&self) -> RingBuffer { > + let inner = self.inner.lock(); > + inner.buffer.clone() > + } > + > + /// Get buffer length (for testing) > + pub fn len(&self) -> usize { > + let inner = self.inner.lock(); > + inner.buffer.len() > + } > + > + /// Get buffer capacity (for testing) > + pub fn capacity(&self) -> usize { > + let inner = self.inner.lock(); > + inner.buffer.capacity() > + } > + > + /// Check if buffer is empty (for testing) > + pub fn is_empty(&self) -> bool { > + let inner = self.inner.lock(); > + inner.buffer.is_empty() > + } > + > + /// Clear all log entries (for testing) > + pub fn clear(&self) { > + let mut inner = self.inner.lock(); > + let capacity = inner.buffer.capacity(); > + inner.buffer = RingBuffer::new(capacity); > + inner.dedup.clear(); > + } > + > + /// Sort the log entries by time > + /// > + /// Matches C's `clog_sort` function > + pub fn sort(&self) -> Result { > + let inner = self.inner.lock(); > + inner.buffer.sort() > + } > + > + /// Merge logs from multiple nodes > + /// > + /// Matches C's `clusterlog_merge` function > + /// > + /// This method atomically updates both the buffer and dedup state under a single > + /// mutex lock, matching C's behavior where both cl->base and cl->dedup are > + /// updated under cl->mutex. > + pub fn merge(&self, remote_logs: Vec, include_local: bool) -> Result<()> { > + let mut sorted_entries: BTreeMap<(u32, u64, u32), LogEntry> = BTreeMap::new(); > + let mut merge_dedup: HashMap = HashMap::new(); > + > + // Lock once for the entire operation (matching C's single mutex) > + let mut inner = self.inner.lock(); > + > + // Calculate maximum capacity > + let max_size = if include_local { > + let local_cap = inner.buffer.capacity(); > + > + std::iter::once(local_cap) > + .chain(remote_logs.iter().map(|b| b.capacity())) > + .max() > + .unwrap_or(CLOG_DEFAULT_SIZE) > + } else { > + remote_logs > + .iter() > + .map(|b| b.capacity()) > + .max() > + .unwrap_or(CLOG_DEFAULT_SIZE) > + }; > + > + // Add local entries if requested > + if include_local { > + for entry in inner.buffer.iter() { > + let key = (entry.time, entry.node_digest, entry.uid); > + // Keep-first: only insert if key doesn't exist, matching C's g_tree_lookup guard > + if let std::collections::btree_map::Entry::Vacant(e) = sorted_entries.entry(key) { > + e.insert(entry.clone()); > + Self::is_not_duplicate(&mut merge_dedup, entry); > + } > + } > + } > + > + // Add remote entries > + for remote_buffer in &remote_logs { > + for entry in remote_buffer.iter() { > + let key = (entry.time, entry.node_digest, entry.uid); > + // Keep-first: only insert if key doesn't exist, matching C's g_tree_lookup guard > + if let std::collections::btree_map::Entry::Vacant(e) = sorted_entries.entry(key) { > + e.insert(entry.clone()); > + Self::is_not_duplicate(&mut merge_dedup, entry); > + } > + } > + } > + > + let mut result = RingBuffer::new(max_size); > + > + // BTreeMap iterates oldest->newest. We add each as new head (push_front), > + // so result ends with newest at head, matching C's behavior. > + // Fill to 100% capacity (matching C's behavior), not just 90% > + for (_key, entry) in sorted_entries.iter() { > + // add_entry will automatically evict old entries if needed to stay within capacity > + result.add_entry(entry)?; > + } > + > + // Atomically update both buffer and dedup (matches C lines 503-507) > + inner.buffer = result; > + inner.dedup = merge_dedup; > + > + Ok(()) > + } > + > + /// Export log to JSON format > + /// > + /// Matches C's `clog_dump_json` function > + pub fn dump_json(&self, ident_filter: Option<&str>, max_entries: usize) -> String { > + let inner = self.inner.lock(); > + inner.buffer.dump_json(ident_filter, max_entries) > + } > + > + /// Export log to JSON format with sorted entries > + pub fn dump_json_sorted( > + &self, > + ident_filter: Option<&str>, > + max_entries: usize, > + ) -> Result { > + let sorted = self.sort()?; > + Ok(sorted.dump_json(ident_filter, max_entries)) > + } > + > + /// Matches C's `clusterlog_get_state` function > + /// > + /// Returns binary-serialized clog_base_t structure for network transmission. > + /// This format is compatible with C nodes for mixed-cluster operation. > + pub fn get_state(&self) -> Result> { > + let sorted = self.sort()?; > + Ok(sorted.serialize_binary()) > + } > + > + pub fn deserialize_state(data: &[u8]) -> Result { > + RingBuffer::deserialize_binary(data) > + } > + > +} > + > +impl Default for ClusterLog { > + fn default() -> Self { > + Self::new() > + } > +} > + > +#[cfg(test)] > +mod tests { > + use super::*; > + > + #[test] > + fn test_cluster_log_creation() { > + let log = ClusterLog::new(); > + assert!(log.inner.lock().buffer.is_empty()); > + } > + > + #[test] > + fn test_add_entry() { > + let log = ClusterLog::new(); > + > + let result = log.add( > + "node1", > + "root", > + "cluster", > + 12345, > + 6, // Info priority > + 1234567890, > + "Test message", > + ); > + > + assert!(result.is_ok()); > + assert!(!log.inner.lock().buffer.is_empty()); > + } > + > + #[test] > + fn test_deduplication() { > + let log = ClusterLog::new(); > + > + // Add same entry twice (but with different UIDs since each add creates a new entry) > + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Message 1"); > + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Message 1"); > + > + // Both entries are added because they have different UIDs > + // Deduplication tracks the latest (time, UID) per node, not content > + let inner = log.inner.lock(); > + assert_eq!(inner.buffer.len(), 2); > + } > + > + #[test] > + fn test_newer_entry_replaces() { > + let log = ClusterLog::new(); > + > + // Add older entry > + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Old message"); > + > + // Add newer entry from same node > + let _ = log.add("node1", "root", "cluster", 123, 6, 1001, "New message"); > + > + // Should have both entries (newer doesn't remove older, just updates dedup tracker) > + let inner = log.inner.lock(); > + assert_eq!(inner.buffer.len(), 2); > + } > + > + #[test] > + fn test_json_export() { > + let log = ClusterLog::new(); > + > + let _ = log.add( > + "node1", > + "root", > + "cluster", > + 123, > + 6, > + 1234567890, > + "Test message", > + ); > + > + let json = log.dump_json(None, 50); > + > + // Should be valid JSON > + assert!(serde_json::from_str::(&json).is_ok()); > + > + // Should contain "data" field > + let value: serde_json::Value = serde_json::from_str(&json).unwrap(); > + assert!(value.get("data").is_some()); > + } > + > + #[test] > + fn test_merge_logs() { > + let log1 = ClusterLog::new(); > + let log2 = ClusterLog::new(); > + > + // Add entries to first log > + let _ = log1.add( > + "node1", > + "root", > + "cluster", > + 123, > + 6, > + 1000, > + "Message from node1", > + ); > + > + // Add entries to second log > + let _ = log2.add( > + "node2", > + "root", > + "cluster", > + 456, > + 6, > + 1001, > + "Message from node2", > + ); > + > + // Get log2's buffer for merging > + let log2_buffer = log2.inner.lock().buffer.clone(); > + > + // Merge into log1 (updates log1's buffer atomically) > + log1.merge(vec![log2_buffer], true).unwrap(); > + > + // Check log1's buffer now contains entries from both logs > + let inner = log1.inner.lock(); > + assert!(inner.buffer.len() >= 2); > + } > + > + // ======================================================================== > + // HIGH PRIORITY TESTS - Merge Edge Cases > + // ======================================================================== > + > + #[test] > + fn test_merge_empty_logs() { > + let log = ClusterLog::new(); > + > + // Add some entries to local log > + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Local entry"); > + > + // Merge with empty remote logs (updates buffer atomically) > + log.merge(vec![], true).unwrap(); > + > + // Check buffer has 1 entry (from local log) > + let inner = log.inner.lock(); > + assert_eq!(inner.buffer.len(), 1); > + let entry = inner.buffer.iter().next().unwrap(); > + assert_eq!(entry.node, "node1"); > + } > + > + #[test] > + fn test_merge_single_node_only() { > + let log = ClusterLog::new(); > + > + // Add entries only from single node > + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1"); > + let _ = log.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2"); > + let _ = log.add("node1", "root", "cluster", 125, 6, 1002, "Entry 3"); > + > + // Merge with no remote logs (just sort local) > + log.merge(vec![], true).unwrap(); > + > + // Check buffer has all 3 entries > + let inner = log.inner.lock(); > + assert_eq!(inner.buffer.len(), 3); > + > + // Entries should be sorted by time (buffer stores newest first) > + let times: Vec = inner.buffer.iter().map(|e| e.time).collect(); > + let mut expected = vec![1002, 1001, 1000]; > + expected.sort(); > + expected.reverse(); // Newest first > + > + let mut actual = times.clone(); > + actual.sort(); > + actual.reverse(); > + > + assert_eq!(actual, expected); > + } > + > + #[test] > + fn test_merge_all_duplicates() { > + let log1 = ClusterLog::new(); > + let log2 = ClusterLog::new(); > + > + // Add same entries to both logs (same node, time, but different UIDs) > + let _ = log1.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1"); > + let _ = log1.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2"); > + > + let _ = log2.add("node1", "root", "cluster", 125, 6, 1000, "Entry 1"); > + let _ = log2.add("node1", "root", "cluster", 126, 6, 1001, "Entry 2"); > + > + let log2_buffer = log2.inner.lock().buffer.clone(); > + > + // Merge - should handle entries from same node at same times > + log1.merge(vec![log2_buffer], true).unwrap(); > + > + // Check merged buffer has 4 entries (all are unique by UID despite same time/node) > + let inner = log1.inner.lock(); > + assert_eq!(inner.buffer.len(), 4); > + } > + > + #[test] > + fn test_merge_exceeding_capacity() { > + // Create small buffer to test capacity enforcement > + let log = ClusterLog::with_capacity(50_000); // Small buffer > + > + // Add many entries to fill beyond capacity > + for i in 0..100 { > + let _ = log.add( > + "node1", > + "root", > + "cluster", > + 100 + i, > + 6, > + 1000 + i, > + &format!("Entry {}", i), > + ); > + } > + > + // Create remote log with many entries > + let remote = ClusterLog::with_capacity(50_000); > + for i in 0..100 { > + let _ = remote.add( > + "node2", > + "root", > + "cluster", > + 200 + i, > + 6, > + 1000 + i, > + &format!("Remote {}", i), > + ); > + } > + > + let remote_buffer = remote.inner.lock().buffer.clone(); > + > + // Merge - should stop when buffer is near full > + log.merge(vec![remote_buffer], true).unwrap(); > + > + // Buffer should be limited by capacity, not necessarily < 200 > + // The actual limit depends on entry sizes and capacity > + // Just verify we got some reasonable number of entries > + let inner = log.inner.lock(); > + assert!(!inner.buffer.is_empty(), "Should have some entries"); > + assert!( > + inner.buffer.len() <= 200, > + "Should not exceed total available entries" > + ); > + } > + > + #[test] > + fn test_merge_preserves_dedup_state() { > + let log = ClusterLog::new(); > + > + // Add entries from node1 > + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1"); > + let _ = log.add("node1", "root", "cluster", 124, 6, 1001, "Entry 2"); > + > + // Create remote log with later entries from node1 > + let remote = ClusterLog::new(); > + let _ = remote.add("node1", "root", "cluster", 125, 6, 1002, "Entry 3"); > + > + let remote_buffer = remote.inner.lock().buffer.clone(); > + > + // Merge > + log.merge(vec![remote_buffer], true).unwrap(); > + > + // Check that dedup state was updated > + let inner = log.inner.lock(); > + let node1_digest = crate::hash::fnv_64a_str("node1"); > + let dedup_entry = inner.dedup.get(&node1_digest).unwrap(); > + > + // Should track the latest time from node1 > + assert_eq!(dedup_entry.time, 1002); > + // UID is auto-generated, so just verify it exists and is reasonable > + assert!(dedup_entry.uid > 0); > + } > + > + #[test] > + fn test_get_state_binary_format() { > + let log = ClusterLog::new(); > + > + // Add some entries > + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1"); > + let _ = log.add("node2", "admin", "system", 456, 6, 1001, "Entry 2"); > + > + // Get state > + let state = log.get_state().unwrap(); > + > + // Should be binary format, not JSON > + assert!(state.len() >= 8); // At least header > + > + // Check header format (clog_base_t) > + let size = u32::from_le_bytes(state[0..4].try_into().unwrap()) as usize; > + let cpos = u32::from_le_bytes(state[4..8].try_into().unwrap()); > + > + assert_eq!(size, state.len()); > + assert_eq!(cpos, 8); // First entry at offset 8 > + > + // Should be able to deserialize back > + let deserialized = ClusterLog::deserialize_state(&state).unwrap(); > + assert_eq!(deserialized.len(), 2); > + } > + > + #[test] > + fn test_state_roundtrip() { > + let log = ClusterLog::new(); > + > + // Add entries > + let _ = log.add("node1", "root", "cluster", 123, 6, 1000, "Test 1"); > + let _ = log.add("node2", "admin", "system", 456, 6, 1001, "Test 2"); > + > + // Serialize > + let state = log.get_state().unwrap(); > + > + // Deserialize > + let deserialized = ClusterLog::deserialize_state(&state).unwrap(); > + > + // Check entries preserved > + assert_eq!(deserialized.len(), 2); > + > + // Buffer is stored newest-first after sorting and serialization > + let entries: Vec<_> = deserialized.iter().collect(); > + assert_eq!(entries[0].node, "node2"); // Newest (time 1001) > + assert_eq!(entries[0].message, "Test 2"); > + assert_eq!(entries[1].node, "node1"); // Oldest (time 1000) > + assert_eq!(entries[1].message, "Test 1"); > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs b/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs > new file mode 100644 > index 000000000..81d5cecbc > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/src/entry.rs > @@ -0,0 +1,694 @@ > +/// Log Entry Implementation > +/// > +/// This module implements the cluster log entry structure, matching the C > +/// implementation's clog_entry_t (logger.c). > +use super::hash::fnv_64a_str; > +use anyhow::{bail, Result}; > +use serde::Serialize; > +use std::sync::atomic::{AtomicU32, Ordering}; > + > +// Import constant from ring_buffer to avoid duplication > +use crate::ring_buffer::CLOG_MAX_ENTRY_SIZE; > + > +/// Global UID counter (matches C's `uid_counter` global variable) > +/// > +/// # UID Wraparound Behavior > +/// > +/// The UID counter is a 32-bit unsigned integer that wraps around after 2^32 entries. > +/// This matches the C implementation's behavior (logger.c:62). > +/// > +/// **Wraparound implications:** > +/// - At 1000 entries/second: wraparound after ~49 days > +/// - At 100 entries/second: wraparound after ~497 days > +/// - After wraparound, UIDs restart from 1 > +/// > +/// **Impact on deduplication:** > +/// The deduplication logic compares (time, UID) tuples. After wraparound, an entry > +/// with UID=1 might be incorrectly considered older than an entry with UID=4294967295, > +/// even if they have the same timestamp. This is a known limitation inherited from > +/// the C implementation. > +/// > +/// **Mitigation:** > +/// - Entries with different timestamps are correctly ordered (time is primary sort key) > +/// - Wraparound only affects entries with identical timestamps from the same node > +/// - A warning is logged when wraparound occurs (see fetch_add below) > +static UID_COUNTER: AtomicU32 = AtomicU32::new(0); > + > +/// Log entry structure > +/// > +/// Matches C's `clog_entry_t` from logger.c: > +/// ```c > +/// typedef struct { > +/// uint32_t prev; // Previous entry offset > +/// uint32_t next; // Next entry offset > +/// uint32_t uid; // Unique ID > +/// uint32_t time; // Timestamp > +/// uint64_t node_digest; // FNV-1a hash of node name > +/// uint64_t ident_digest; // FNV-1a hash of ident > +/// uint32_t pid; // Process ID > +/// uint8_t priority; // Syslog priority (0-7) > +/// uint8_t node_len; // Length of node name (including null) > +/// uint8_t ident_len; // Length of ident (including null) > +/// uint8_t tag_len; // Length of tag (including null) > +/// uint32_t msg_len; // Length of message (including null) > +/// char data[]; // Variable length data: node + ident + tag + msg > +/// } clog_entry_t; > +/// ``` > +#[derive(Debug, Clone, Serialize)] > +pub struct LogEntry { > + /// Unique ID for this entry (auto-incrementing) > + pub uid: u32, > + > + /// Unix timestamp > + pub time: u32, > + > + /// FNV-1a hash of node name > + pub node_digest: u64, > + > + /// FNV-1a hash of ident (user) > + pub ident_digest: u64, > + > + /// Process ID > + pub pid: u32, > + > + /// Syslog priority (0-7) > + pub priority: u8, > + > + /// Node name > + pub node: String, > + > + /// Identity/user > + pub ident: String, > + > + /// Tag (e.g., "cluster", "pmxcfs") > + pub tag: String, > + > + /// Log message > + pub message: String, > +} > + > +impl LogEntry { > + /// Matches C's `clog_pack` function > + pub fn pack( > + node: &str, > + ident: &str, > + tag: &str, > + pid: u32, > + time: u32, > + priority: u8, > + message: &str, > + ) -> Result { > + if priority >= 8 { > + bail!("Invalid priority: {priority} (must be 0-7)"); > + } > + > + // Truncate to 254 bytes to leave room for null terminator (C uses MIN(strlen+1, 255)) > + let node = Self::truncate_string(node, 254); > + let ident = Self::truncate_string(ident, 254); > + let tag = Self::truncate_string(tag, 254); > + let message = Self::utf8_to_ascii(message); > + > + let node_len = node.len() + 1; > + let ident_len = ident.len() + 1; > + let tag_len = tag.len() + 1; > + let mut msg_len = message.len() + 1; > + > + // Use checked arithmetic to prevent integer overflow > + // Header: 48 bytes fixed (prev, next, uid, time, digests, pid, priority, lengths) Shouldn't this be 44 bytes? > + // Variable: node_len + ident_len + tag_len + msg_len > + let header_size = std::mem::size_of::() * 4 // prev, next, uid, time > + + std::mem::size_of::() * 2 // node_digest, ident_digest > + + std::mem::size_of::() * 2 // pid, msg_len > + + std::mem::size_of::() * 4; // priority, node_len, ident_len, tag_len > + > + let total_size = header_size > + .checked_add(node_len) > + .and_then(|s| s.checked_add(ident_len)) > + .and_then(|s| s.checked_add(tag_len)) > + .and_then(|s| s.checked_add(msg_len)) > + .ok_or_else(|| anyhow::anyhow!("Entry size calculation overflow"))?; > + > + if total_size > CLOG_MAX_ENTRY_SIZE { > + let diff = total_size - CLOG_MAX_ENTRY_SIZE; > + msg_len = msg_len.saturating_sub(diff); > + } > + > + let node_digest = fnv_64a_str(&node); > + let ident_digest = fnv_64a_str(&ident); > + > + // Increment UID counter with wraparound detection > + let old_uid = UID_COUNTER.fetch_add(1, Ordering::SeqCst); > + > + // Warn on wraparound (when counter goes from u32::MAX to 0) > + // This happens approximately every 49 days at 1000 entries/second > + if old_uid == u32::MAX { > + tracing::warn!( > + "UID counter wrapped around (2^32 entries reached). \ > + Deduplication may be affected for entries with identical timestamps. \ > + This is expected behavior matching the C implementation." > + ); > + } > + > + let uid = old_uid.wrapping_add(1); > + > + Ok(Self { > + uid, > + time, > + node_digest, > + ident_digest, > + pid, > + priority, > + node, > + ident, > + tag, > + message: message[..msg_len.saturating_sub(1)].to_string(), > + }) > + } > + > + /// Truncate string to max length (safe for multi-byte UTF-8) > + fn truncate_string(s: &str, max_len: usize) -> String { > + if s.len() <= max_len { > + return s.to_string(); > + } > + > + // Find the last valid UTF-8 character that fits within max_len > + let truncate_at = s > + .char_indices() > + .take_while(|(idx, ch)| idx + ch.len_utf8() <= max_len) > + .last() > + .map(|(idx, ch)| idx + ch.len_utf8()) > + .unwrap_or(0); > + > + s[..truncate_at].to_string() > + } > + > + /// Convert UTF-8 to ASCII with proper escaping > + /// > + /// Matches C's `utf8_to_ascii` function behavior: > + /// - Control characters (0x00-0x1F, 0x7F): Escaped as #XXX (e.g., #007 for BEL) > + /// - Unicode (U+0080 to U+FFFF): Escaped as \uXXXX (e.g., \u4e16 for 世) > + /// - Quotes: Escaped as \" (matches C's quotequote=TRUE behavior) > + /// - Characters > U+FFFF: Silently dropped > + /// - ASCII printable (0x20-0x7E except quotes): Passed through unchanged > + fn utf8_to_ascii(s: &str) -> String { > + let mut result = String::with_capacity(s.len()); > + > + for c in s.chars() { > + match c { > + // Control characters: #XXX format (3 decimal digits) > + '\x00'..='\x1F' | '\x7F' => { > + let code = c as u32; > + result.push('#'); > + // Format as 3 decimal digits with leading zeros (e.g., #007 for BEL) > + result.push_str(&format!("{:03}", code)); > + } > + // Quote escaping: matches C's quotequote=TRUE behavior (logger.c:245) > + '"' => { > + result.push('\\'); > + result.push('"'); > + } > + // ASCII printable characters: pass through > + c if c.is_ascii() => { > + result.push(c); > + } > + // Unicode U+0080 to U+FFFF: \uXXXX format > + c if (c as u32) < 0x10000 => { > + result.push('\\'); > + result.push('u'); > + result.push_str(&format!("{:04x}", c as u32)); > + } > + // Characters > U+FFFF: silently drop (matches C behavior) > + _ => {} > + } > + } > + > + result > + } > + > + /// Matches C's `clog_entry_size` function > + pub fn size(&self) -> usize { > + std::mem::size_of::() * 4 // prev, next, uid, time > + + std::mem::size_of::() * 2 // node_digest, ident_digest > + + std::mem::size_of::() * 2 // pid, msg_len > + + std::mem::size_of::() * 4 // priority, node_len, ident_len, tag_len > + + self.node.len() + 1 > + + self.ident.len() + 1 > + + self.tag.len() + 1 > + + self.message.len() + 1 > + } > + > + /// C implementation: `uint32_t realsize = ((size + 7) & 0xfffffff8);` > + pub fn aligned_size(&self) -> usize { > + let size = self.size(); > + (size + 7) & !7 > + } > + > + pub fn to_json_object(&self) -> serde_json::Value { > + serde_json::json!({ > + "uid": self.uid, > + "time": self.time, > + "pri": self.priority, > + "tag": self.tag, > + "pid": self.pid, > + "node": self.node, > + "user": self.ident, > + "msg": self.message, > + }) > + } > + > + /// Serialize to C binary format (clog_entry_t) > + /// > + /// Binary layout matches C structure: > + /// ```c > + /// struct { > + /// uint32_t prev; // Will be filled by ring buffer > + /// uint32_t next; // Will be filled by ring buffer > + /// uint32_t uid; > + /// uint32_t time; > + /// uint64_t node_digest; > + /// uint64_t ident_digest; > + /// uint32_t pid; > + /// uint8_t priority; > + /// uint8_t node_len; > + /// uint8_t ident_len; > + /// uint8_t tag_len; > + /// uint32_t msg_len; > + /// char data[]; // node + ident + tag + msg (null-terminated) > + /// } > + /// ``` > + pub fn serialize_binary(&self, prev: u32, next: u32) -> Vec { > + let mut buf = Vec::new(); > + > + buf.extend_from_slice(&prev.to_le_bytes()); > + buf.extend_from_slice(&next.to_le_bytes()); > + buf.extend_from_slice(&self.uid.to_le_bytes()); > + buf.extend_from_slice(&self.time.to_le_bytes()); > + buf.extend_from_slice(&self.node_digest.to_le_bytes()); > + buf.extend_from_slice(&self.ident_digest.to_le_bytes()); > + buf.extend_from_slice(&self.pid.to_le_bytes()); > + buf.push(self.priority); > + > + // Cap at 255 to match C's MIN(strlen+1, 255) and prevent u8 overflow > + let node_len = (self.node.len() + 1).min(255) as u8; > + let ident_len = (self.ident.len() + 1).min(255) as u8; > + let tag_len = (self.tag.len() + 1).min(255) as u8; > + let msg_len = (self.message.len() + 1) as u32; > + > + buf.push(node_len); > + buf.push(ident_len); > + buf.push(tag_len); > + buf.extend_from_slice(&msg_len.to_le_bytes()); > + > + buf.extend_from_slice(self.node.as_bytes()); > + buf.push(0); > + > + buf.extend_from_slice(self.ident.as_bytes()); > + buf.push(0); > + > + buf.extend_from_slice(self.tag.as_bytes()); > + buf.push(0); > + > + buf.extend_from_slice(self.message.as_bytes()); > + buf.push(0); > + > + buf > + } > + > + pub(crate) fn deserialize_binary(data: &[u8]) -> Result<(Self, u32, u32)> { > + if data.len() < 48 { > + bail!( > + "Entry too small: {} bytes (need at least 48 for header)", > + data.len() > + ); > + } > + > + let mut offset = 0; > + > + let prev = u32::from_le_bytes(data[offset..offset + 4].try_into()?); > + offset += 4; > + > + let next = u32::from_le_bytes(data[offset..offset + 4].try_into()?); > + offset += 4; > + > + let uid = u32::from_le_bytes(data[offset..offset + 4].try_into()?); > + offset += 4; > + > + let time = u32::from_le_bytes(data[offset..offset + 4].try_into()?); > + offset += 4; > + > + let node_digest = u64::from_le_bytes(data[offset..offset + 8].try_into()?); > + offset += 8; > + > + let ident_digest = u64::from_le_bytes(data[offset..offset + 8].try_into()?); > + offset += 8; > + > + let pid = u32::from_le_bytes(data[offset..offset + 4].try_into()?); > + offset += 4; > + > + let priority = data[offset]; > + offset += 1; > + > + let node_len = data[offset] as usize; > + offset += 1; > + > + let ident_len = data[offset] as usize; > + offset += 1; > + > + let tag_len = data[offset] as usize; > + offset += 1; > + > + let msg_len = u32::from_le_bytes(data[offset..offset + 4].try_into()?) as usize; > + offset += 4; > + > + if offset + node_len + ident_len + tag_len + msg_len > data.len() { > + bail!("Entry data exceeds buffer size"); > + } > + > + let node = read_null_terminated(&data[offset..offset + node_len])?; > + offset += node_len; > + > + let ident = read_null_terminated(&data[offset..offset + ident_len])?; > + offset += ident_len; > + > + let tag = read_null_terminated(&data[offset..offset + tag_len])?; > + offset += tag_len; > + > + let message = read_null_terminated(&data[offset..offset + msg_len])?; > + > + Ok(( > + Self { > + uid, > + time, > + node_digest, > + ident_digest, > + pid, > + priority, > + node, > + ident, > + tag, > + message, > + }, > + prev, > + next, > + )) > + } > +} > + > +fn read_null_terminated(data: &[u8]) -> Result { > + let len = data.iter().position(|&b| b == 0).unwrap_or(data.len()); > + Ok(String::from_utf8_lossy(&data[..len]).into_owned()) > +} > + > +#[cfg(test)] > +pub fn reset_uid_counter() { > + UID_COUNTER.store(0, Ordering::SeqCst); > +} > + > +#[cfg(test)] > +mod tests { > + use super::*; > + > + #[test] > + fn test_pack_entry() { > + reset_uid_counter(); > + > + let entry = LogEntry::pack( > + "node1", > + "root", > + "cluster", > + 12345, > + 1234567890, > + 6, // Info priority > + "Test message", > + ) > + .unwrap(); > + > + assert_eq!(entry.uid, 1); > + assert_eq!(entry.time, 1234567890); > + assert_eq!(entry.node, "node1"); > + assert_eq!(entry.ident, "root"); > + assert_eq!(entry.tag, "cluster"); > + assert_eq!(entry.pid, 12345); > + assert_eq!(entry.priority, 6); > + assert_eq!(entry.message, "Test message"); > + } > + > + #[test] > + fn test_uid_increment() { > + reset_uid_counter(); > + > + let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg1").unwrap(); > + let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg2").unwrap(); > + > + assert_eq!(entry1.uid, 1); > + assert_eq!(entry2.uid, 2); > + } > + > + #[test] > + fn test_invalid_priority() { > + let result = LogEntry::pack("node1", "root", "tag", 0, 1000, 8, "message"); > + assert!(result.is_err()); > + } > + > + #[test] > + fn test_node_digest() { > + let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap(); > + let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg").unwrap(); > + let entry3 = LogEntry::pack("node2", "root", "tag", 0, 1000, 6, "msg").unwrap(); > + > + // Same node should have same digest > + assert_eq!(entry1.node_digest, entry2.node_digest); > + > + // Different node should have different digest > + assert_ne!(entry1.node_digest, entry3.node_digest); > + } > + > + #[test] > + fn test_ident_digest() { > + let entry1 = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap(); > + let entry2 = LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "msg").unwrap(); > + let entry3 = LogEntry::pack("node1", "admin", "tag", 0, 1000, 6, "msg").unwrap(); > + > + // Same ident should have same digest > + assert_eq!(entry1.ident_digest, entry2.ident_digest); > + > + // Different ident should have different digest > + assert_ne!(entry1.ident_digest, entry3.ident_digest); > + } > + > + #[test] > + fn test_utf8_to_ascii() { > + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "Hello 世界").unwrap(); > + assert!(entry.message.is_ascii()); > + // Unicode chars escaped as \uXXXX format (matches C implementation) > + assert!(entry.message.contains("\\u4e16")); // 世 = U+4E16 > + assert!(entry.message.contains("\\u754c")); // 界 = U+754C > + } > + > + #[test] > + fn test_utf8_control_chars() { > + // Test control character escaping > + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "Hello\x07World").unwrap(); > + assert!(entry.message.is_ascii()); > + // BEL (0x07) should be escaped as #007 (matches C implementation) > + assert!(entry.message.contains("#007")); > + } > + > + #[test] > + fn test_utf8_mixed_content() { > + // Test mix of ASCII, Unicode, and control chars > + let entry = LogEntry::pack( > + "node1", > + "root", > + "tag", > + 0, > + 1000, > + 6, > + "Test\x01\nUnicode世\ttab", > + ) > + .unwrap(); > + assert!(entry.message.is_ascii()); > + // SOH (0x01) -> #001 > + assert!(entry.message.contains("#001")); > + // Newline (0x0A) -> #010 > + assert!(entry.message.contains("#010")); > + // Unicode 世 (U+4E16) -> \u4e16 > + assert!(entry.message.contains("\\u4e16")); > + // Tab (0x09) -> #009 > + assert!(entry.message.contains("#009")); > + } > + > + #[test] > + fn test_string_truncation() { > + let long_node = "a".repeat(300); > + let entry = LogEntry::pack(&long_node, "root", "tag", 0, 1000, 6, "msg").unwrap(); > + assert!(entry.node.len() <= 255); > + } > + > + #[test] > + fn test_truncate_multibyte_utf8() { > + // Test that truncate_string doesn't panic on multi-byte UTF-8 boundaries > + // "世" is 3 bytes in UTF-8 (0xE4 0xB8 0x96) > + let s = "x".repeat(253) + "世"; > + > + // This should not panic, even though 254 falls in the middle of "世" > + let entry = LogEntry::pack(&s, "root", "tag", 0, 1000, 6, "msg").unwrap(); > + > + // Should truncate to 253 bytes (before the multi-byte char) > + assert_eq!(entry.node.len(), 253); > + assert_eq!(entry.node, "x".repeat(253)); > + } > + > + #[test] > + fn test_message_truncation() { > + let long_message = "a".repeat(CLOG_MAX_ENTRY_SIZE); > + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, &long_message).unwrap(); > + // Entry should fit within max size > + assert!(entry.size() <= CLOG_MAX_ENTRY_SIZE); > + } > + > + #[test] > + fn test_aligned_size() { > + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg").unwrap(); > + let aligned = entry.aligned_size(); > + > + // Aligned size should be multiple of 8 > + assert_eq!(aligned % 8, 0); > + > + // Aligned size should be >= actual size > + assert!(aligned >= entry.size()); > + > + // Aligned size should be within 7 bytes of actual size > + assert!(aligned - entry.size() < 8); > + } > + > + #[test] > + fn test_json_export() { > + let entry = LogEntry::pack("node1", "root", "cluster", 123, 1234567890, 6, "Test").unwrap(); > + let json = entry.to_json_object(); > + > + assert_eq!(json["node"], "node1"); > + assert_eq!(json["user"], "root"); > + assert_eq!(json["tag"], "cluster"); > + assert_eq!(json["pid"], 123); > + assert_eq!(json["time"], 1234567890); > + assert_eq!(json["pri"], 6); > + assert_eq!(json["msg"], "Test"); > + } > + > + #[test] > + fn test_binary_serialization_roundtrip() { > + let entry = LogEntry::pack( > + "node1", > + "root", > + "cluster", > + 12345, > + 1234567890, > + 6, > + "Test message", > + ) > + .unwrap(); > + > + // Serialize with prev/next pointers > + let binary = entry.serialize_binary(100, 200); > + > + // Deserialize > + let (deserialized, prev, next) = LogEntry::deserialize_binary(&binary).unwrap(); > + > + // Check prev/next pointers > + assert_eq!(prev, 100); > + assert_eq!(next, 200); > + > + // Check entry fields > + assert_eq!(deserialized.uid, entry.uid); > + assert_eq!(deserialized.time, entry.time); > + assert_eq!(deserialized.node_digest, entry.node_digest); > + assert_eq!(deserialized.ident_digest, entry.ident_digest); > + assert_eq!(deserialized.pid, entry.pid); > + assert_eq!(deserialized.priority, entry.priority); > + assert_eq!(deserialized.node, entry.node); > + assert_eq!(deserialized.ident, entry.ident); > + assert_eq!(deserialized.tag, entry.tag); > + assert_eq!(deserialized.message, entry.message); > + } > + > + #[test] > + fn test_binary_format_header_size() { > + let entry = LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap(); > + let binary = entry.serialize_binary(0, 0); > + > + // Header should be exactly 48 bytes > + // prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) + > + // pid(4) + priority(1) + node_len(1) + ident_len(1) + tag_len(1) + msg_len(4) > + assert!(binary.len() >= 48); > + > + // First 48 bytes are header > + assert_eq!(&binary[0..4], &0u32.to_le_bytes()); // prev > + assert_eq!(&binary[4..8], &0u32.to_le_bytes()); // next > + } > + > + #[test] > + fn test_binary_deserialize_invalid_size() { > + let too_small = vec![0u8; 40]; // Less than 48 byte header > + let result = LogEntry::deserialize_binary(&too_small); > + assert!(result.is_err()); > + } > + > + #[test] > + fn test_binary_null_terminators() { > + let entry = LogEntry::pack("node1", "root", "tag", 123, 1000, 6, "message").unwrap(); > + let binary = entry.serialize_binary(0, 0); > + > + // Check that strings are null-terminated > + // Find null bytes in data section (after 48-byte header) > + let data_section = &binary[48..]; > + let null_count = data_section.iter().filter(|&&b| b == 0).count(); > + assert_eq!(null_count, 4); // 4 null terminators (node, ident, tag, msg) > + } > + > + #[test] > + fn test_length_field_overflow_prevention() { > + // Test that 255-byte strings are handled correctly (prevent u8 overflow) > + // C does: MIN(strlen(s) + 1, 255) to cap at 255 > + let long_string = "a".repeat(255); > + > + let entry = LogEntry::pack(&long_string, &long_string, &long_string, 123, 1000, 6, "msg").unwrap(); > + > + // Strings should be truncated to 254 bytes (leaving room for null) > + assert_eq!(entry.node.len(), 254); > + assert_eq!(entry.ident.len(), 254); > + assert_eq!(entry.tag.len(), 254); > + > + // Serialize and check length fields are capped at 255 (254 bytes + null) > + let binary = entry.serialize_binary(0, 0); > + > + // Extract length fields from header > + // Layout: prev(4) + next(4) + uid(4) + time(4) + node_digest(8) + ident_digest(8) + > + // pid(4) + priority(1) + node_len(1) + ident_len(1) + tag_len(1) + msg_len(4) > + // Offsets: node_len=37, ident_len=38, tag_len=39 > + let node_len = binary[37]; > + let ident_len = binary[38]; > + let tag_len = binary[39]; > + > + assert_eq!(node_len, 255); // 254 bytes + 1 null = 255 > + assert_eq!(ident_len, 255); > + assert_eq!(tag_len, 255); > + } > + > + #[test] > + fn test_length_field_no_wraparound() { > + // Even if somehow a 255+ byte string gets through, serialize should cap at 255 > + // This tests the defensive .min(255) in serialize_binary > + let mut entry = LogEntry::pack("node", "ident", "tag", 123, 1000, 6, "msg").unwrap(); > + > + // Artificially create an edge case (though pack() already prevents this) > + entry.node = "x".repeat(254); // Max valid size > + > + let binary = entry.serialize_binary(0, 0); > + let node_len = binary[37]; // Offset 37 for node_len > + > + // Should be 255 (254 + 1 for null), not wrap to 0 > + assert_eq!(node_len, 255); > + assert_ne!(node_len, 0); // Ensure no wraparound > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs b/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs > new file mode 100644 > index 000000000..09dad6afd > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/src/hash.rs > @@ -0,0 +1,176 @@ > +/// FNV-1a (Fowler-Noll-Vo) 64-bit hash function > +/// > +/// This matches the C implementation's `fnv_64a_buf` function > +/// Used for generating node and ident digests for deduplication. > +/// FNV-1a 64-bit non-zero initial basis > +pub(crate) const FNV1A_64_INIT: u64 = 0xcbf29ce484222325; > + > +/// Compute 64-bit FNV-1a hash > +/// > +/// This is a faithful port of the C implementation's `fnv_64a_buf` function: > +/// ```c > +/// static inline uint64_t fnv_64a_buf(const void *buf, size_t len, uint64_t hval) { > +/// unsigned char *bp = (unsigned char *)buf; > +/// unsigned char *be = bp + len; > +/// while (bp < be) { > +/// hval ^= (uint64_t)*bp++; > +/// hval += (hval << 1) + (hval << 4) + (hval << 5) + (hval << 7) + (hval << 8) + (hval << 40); > +/// } > +/// return hval; > +/// } > +/// ``` > +/// > +/// # Arguments > +/// * `data` - The data to hash > +/// * `init` - Initial hash value (use FNV1A_64_INIT for first hash) > +/// > +/// # Returns > +/// 64-bit hash value > +/// > +/// Note: This function appears unused but is actually called via `fnv_64a_str` below, > +/// which provides the primary API for string hashing. Both functions share the core > +/// FNV-1a implementation logic. > +#[inline] > +#[allow(dead_code)] // Used via fnv_64a_str wrapper > +pub(crate) fn fnv_64a(data: &[u8], init: u64) -> u64 { > + let mut hval = init; > + > + for &byte in data { > + hval ^= byte as u64; > + // FNV magic prime multiplication done via shifts and adds > + // This is equivalent to: hval *= 0x100000001b3 (FNV 64-bit prime) > + hval = hval.wrapping_add( > + (hval << 1) > + .wrapping_add(hval << 4) > + .wrapping_add(hval << 5) > + .wrapping_add(hval << 7) > + .wrapping_add(hval << 8) > + .wrapping_add(hval << 40), > + ); > + } > + > + hval > +} > + > +/// Hash a null-terminated string (includes the null byte) > +/// > +/// The C implementation includes the null terminator in the hash: > +/// `fnv_64a_buf(node, node_len, FNV1A_64_INIT)` where node_len includes the '\0' > +/// > +/// This function adds a null byte to match that behavior. > +#[inline] > +pub(crate) fn fnv_64a_str(s: &str) -> u64 { > + let bytes = s.as_bytes(); > + let mut hval = FNV1A_64_INIT; > + > + for &byte in bytes { > + hval ^= byte as u64; > + hval = hval.wrapping_add( > + (hval << 1) > + .wrapping_add(hval << 4) > + .wrapping_add(hval << 5) > + .wrapping_add(hval << 7) > + .wrapping_add(hval << 8) > + .wrapping_add(hval << 40), > + ); > + } > + > + // Hash the null terminator to match C behavior > + // C implementation: `hval ^= (uint64_t)*bp++` where *bp is '\0' > + // Since XOR with 0 is a no-op (hval ^ 0 == hval), we skip it and proceed > + // directly to the multiplication step. This optimization produces identical > + // results to the C implementation while being more explicit about the intent. > + hval.wrapping_add( > + (hval << 1) > + .wrapping_add(hval << 4) > + .wrapping_add(hval << 5) > + .wrapping_add(hval << 7) > + .wrapping_add(hval << 8) > + .wrapping_add(hval << 40), > + ) > +} > + > +#[cfg(test)] > +mod tests { > + use super::*; > + > + #[test] > + fn test_fnv1a_init() { > + // Test that init constant matches C implementation > + assert_eq!(FNV1A_64_INIT, 0xcbf29ce484222325); > + } > + > + #[test] > + fn test_fnv1a_empty() { > + // Empty string with null terminator > + let hash = fnv_64a(&[0], FNV1A_64_INIT); > + assert_ne!(hash, FNV1A_64_INIT); // Should be different from init > + } > + > + #[test] > + fn test_fnv1a_consistency() { > + // Same input should produce same output > + let data = b"test"; > + let hash1 = fnv_64a(data, FNV1A_64_INIT); > + let hash2 = fnv_64a(data, FNV1A_64_INIT); > + assert_eq!(hash1, hash2); > + } > + > + #[test] > + fn test_fnv1a_different_data() { > + // Different input should (usually) produce different output > + let hash1 = fnv_64a(b"test1", FNV1A_64_INIT); > + let hash2 = fnv_64a(b"test2", FNV1A_64_INIT); > + assert_ne!(hash1, hash2); > + } > + > + #[test] > + fn test_fnv1a_str() { > + // Test string hashing with null terminator > + let hash1 = fnv_64a_str("node1"); > + let hash2 = fnv_64a_str("node1"); > + let hash3 = fnv_64a_str("node2"); > + > + assert_eq!(hash1, hash2); // Same string should hash the same > + assert_ne!(hash1, hash3); // Different strings should hash differently > + } > + > + #[test] > + fn test_fnv1a_node_names() { > + // Test with typical Proxmox node names > + let nodes = vec!["pve1", "pve2", "pve3"]; > + let mut hashes = Vec::new(); > + > + for node in &nodes { > + let hash = fnv_64a_str(node); > + hashes.push(hash); > + } > + > + // All hashes should be unique > + for i in 0..hashes.len() { > + for j in (i + 1)..hashes.len() { > + assert_ne!( > + hashes[i], hashes[j], > + "Hashes for {} and {} should differ", > + nodes[i], nodes[j] > + ); > + } > + } > + } > + > + #[test] > + fn test_fnv1a_chaining() { > + // Test that we can chain hashes > + let data1 = b"first"; > + let data2 = b"second"; > + > + let hash1 = fnv_64a(data1, FNV1A_64_INIT); > + let hash2 = fnv_64a(data2, hash1); // Use previous hash as init > + > + // Should produce a deterministic result > + let hash1_again = fnv_64a(data1, FNV1A_64_INIT); > + let hash2_again = fnv_64a(data2, hash1_again); > + > + assert_eq!(hash2, hash2_again); > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs b/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs > new file mode 100644 > index 000000000..964f0b3a6 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/src/lib.rs > @@ -0,0 +1,27 @@ > +/// Cluster Log Implementation > +/// > +/// This module provides a cluster-wide log system compatible with the C implementation. > +/// It maintains a ring buffer of log entries that can be merged from multiple nodes, > +/// deduplicated, and exported to JSON. > +/// > +/// Key features: > +/// - Ring buffer storage for efficient memory usage > +/// - FNV-1a hashing for node and ident tracking > +/// - Deduplication across nodes > +/// - Time-based sorting > +/// - Multi-node log merging > +/// - JSON export for web UI > +// Internal modules (not exposed) > +mod cluster_log; > +mod entry; > +mod hash; > +mod ring_buffer; > + > +// Public API - only expose what's needed externally > +pub use cluster_log::ClusterLog; > + > +// Re-export types only for testing or internal crate use > +#[doc(hidden)] > +pub use entry::LogEntry; > +#[doc(hidden)] > +pub use ring_buffer::RingBuffer; > diff --git a/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs b/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs > new file mode 100644 > index 000000000..2c82308c9 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/src/ring_buffer.rs > @@ -0,0 +1,628 @@ > +/// Ring Buffer Implementation for Cluster Log > +/// > +/// This module implements a circular buffer for storing log entries, > +/// matching the C implementation's clog_base_t structure. > +use super::entry::LogEntry; > +use super::hash::fnv_64a_str; > +use anyhow::{bail, Result}; > +use std::collections::VecDeque; > + > +/// Matches C's CLOG_DEFAULT_SIZE constant > +pub(crate) const CLOG_DEFAULT_SIZE: usize = 8192 * 16; // 131,072 bytes (128 KB) > + > +/// Matches C's CLOG_MAX_ENTRY_SIZE constant > +pub(crate) const CLOG_MAX_ENTRY_SIZE: usize = 4096; // 4,096 bytes (4 KB) > + > +/// Ring buffer for log entries > +/// > +/// This is a simplified Rust version of the C implementation's ring buffer. > +/// The C version uses a raw byte buffer with manual pointer arithmetic, > +/// but we use a VecDeque for safety and simplicity while maintaining > +/// the same conceptual behavior. > +/// > +/// C structure (clog_base_t): > +/// ```c > +/// struct clog_base { > +/// uint32_t size; // Total buffer size > +/// uint32_t cpos; // Current position > +/// char data[]; // Variable length data > +/// }; > +/// ``` > +#[derive(Debug, Clone)] > +pub struct RingBuffer { > + /// Maximum capacity in bytes > + capacity: usize, > + > + /// Current size in bytes (approximate) > + current_size: usize, > + > + /// Entries stored in the buffer (newest first) > + /// We use VecDeque for efficient push/pop at both ends > + entries: VecDeque, > +} > + > +impl RingBuffer { > + /// Create a new ring buffer with specified capacity > + pub fn new(capacity: usize) -> Self { > + // Ensure minimum capacity > + let capacity = if capacity < CLOG_MAX_ENTRY_SIZE * 10 { > + CLOG_DEFAULT_SIZE > + } else { > + capacity > + }; > + > + Self { > + capacity, > + current_size: 0, > + entries: VecDeque::new(), > + } > + } > + > + /// Add an entry to the buffer > + /// > + /// Matches C's `clog_copy` function which calls `clog_alloc_entry` > + /// to allocate space in the ring buffer. > + pub fn add_entry(&mut self, entry: &LogEntry) -> Result<()> { > + let entry_size = entry.aligned_size(); > + > + // Make room if needed (remove oldest entries) > + while self.current_size + entry_size > self.capacity && !self.entries.is_empty() { > + if let Some(old_entry) = self.entries.pop_back() { > + self.current_size = self.current_size.saturating_sub(old_entry.aligned_size()); > + } > + } > + > + // Add new entry at the front (newest first) > + self.entries.push_front(entry.clone()); > + self.current_size += entry_size; > + > + Ok(()) > + } > + > + /// Check if buffer is near full (>90% capacity) > + pub fn is_near_full(&self) -> bool { > + self.current_size > (self.capacity * 9 / 10) > + } > + > + /// Check if buffer is empty > + pub fn is_empty(&self) -> bool { > + self.entries.is_empty() > + } > + > + /// Get number of entries > + pub fn len(&self) -> usize { > + self.entries.len() > + } > + > + /// Get buffer capacity > + pub fn capacity(&self) -> usize { > + self.capacity > + } > + > + /// Iterate over entries (newest first) > + pub fn iter(&self) -> impl Iterator { > + self.entries.iter() > + } > + > + /// Sort entries by time, node_digest, and uid > + /// > + /// Matches C's `clog_sort` function > + /// > + /// C uses GTree with custom comparison function `clog_entry_sort_fn`: > + /// ```c > + /// if (entry1->time != entry2->time) { > + /// return entry1->time - entry2->time; > + /// } > + /// if (entry1->node_digest != entry2->node_digest) { > + /// return entry1->node_digest - entry2->node_digest; > + /// } > + /// return entry1->uid - entry2->uid; > + /// ``` > + pub fn sort(&self) -> Result { > + let mut new_buffer = Self::new(self.capacity); > + > + // Collect and sort entries > + let mut sorted: Vec = self.entries.iter().cloned().collect(); > + > + // Sort by time (ascending), then node_digest, then uid > + sorted.sort_by_key(|e| (e.time, e.node_digest, e.uid)); > + > + // Add sorted entries to new buffer > + // Since add_entry pushes to front, we add in forward order to get newest-first > + // sorted = [oldest...newest], add_entry pushes to front, so: > + // - Add oldest: [oldest] > + // - Add next: [next, oldest] > + // - Add newest: [newest, next, oldest] > + for entry in sorted.iter() { > + new_buffer.add_entry(entry)?; > + } > + > + Ok(new_buffer) > + } > + > + /// Dump buffer to JSON format > + /// > + /// Matches C's `clog_dump_json` function > + /// > + /// # Arguments > + /// * `ident_filter` - Optional ident filter (user filter) > + /// * `max_entries` - Maximum number of entries to include > + pub fn dump_json(&self, ident_filter: Option<&str>, max_entries: usize) -> String { > + // Compute ident digest if filter is provided > + let ident_digest = ident_filter.map(fnv_64a_str); > + > + let mut data = Vec::new(); > + let mut count = 0; > + > + // Iterate over entries (newest first, matching C's walk from cpos->prev) > + for entry in self.iter() { > + if count >= max_entries { > + break; > + } > + > + // Apply ident filter if specified > + if let Some(digest) = ident_digest { > + if digest != entry.ident_digest { > + continue; > + } > + } > + > + data.push(entry.to_json_object()); > + count += 1; > + } > + > + let result = serde_json::json!({ > + "data": data > + }); > + > + serde_json::to_string_pretty(&result).unwrap_or_else(|_| "{}".to_string()) > + } > + > + /// Dump buffer contents (for debugging) > + /// > + /// Matches C's `clog_dump` function > + #[allow(dead_code)] > + pub fn dump(&self) { > + for (idx, entry) in self.entries.iter().enumerate() { > + println!( > + "[{}] uid={:08x} time={} node={}{{{:016X}}} tag={}[{}{{{:016X}}}]: {}", > + idx, > + entry.uid, > + entry.time, > + entry.node, > + entry.node_digest, > + entry.tag, > + entry.ident, > + entry.ident_digest, > + entry.message > + ); > + } > + } > + > + /// Serialize to C binary format (clog_base_t) > + /// > + /// Returns a full memory dump of the ring buffer matching C's format. > + /// C's clusterlog_get_state() returns g_memdup2(cl->base, clog->size), > + /// which is the entire allocated buffer capacity, not just used space. > + /// > + /// Binary layout matches C structure: > + /// ```c > + /// struct clog_base { > + /// uint32_t size; // Total allocated buffer capacity > + /// uint32_t cpos; // Offset to newest entry (not always 8!) > + /// char data[]; // Ring buffer data (entries at various offsets) > + /// }; > + /// ``` > + /// > + /// Entry offsets and linkage: > + /// - entry.prev: offset to previous (older) entry > + /// - entry.next: end offset of THIS entry (offset + aligned_size), NOT pointer to next entry! > + pub fn serialize_binary(&self) -> Vec { > + // Allocate full buffer capacity (matching C's g_malloc0(size)) > + let mut buf = vec![0u8; self.capacity]; > + > + // Empty buffer case > + if self.entries.is_empty() { > + buf[0..4].copy_from_slice(&(self.capacity as u32).to_le_bytes()); // size > + buf[4..8].copy_from_slice(&0u32.to_le_bytes()); // cpos = 0 (empty) > + return buf; > + } > + > + // Calculate all offsets first > + let mut offsets = Vec::with_capacity(self.entries.len()); > + let mut current_offset = 8usize; > + > + for entry in self.iter() { > + let aligned_size = entry.aligned_size(); > + > + // Check if we have space > + if current_offset + aligned_size > self.capacity { > + break; > + } > + > + offsets.push(current_offset as u32); > + current_offset += aligned_size; > + } > + > + // Track where newest entry is (first entry at offset 8) > + let newest_offset = 8u32; > + > + // Write entries with correct prev/next pointers > + // Entries are in newest-first order: [newest, second-newest, ..., oldest] > + for (i, entry) in self.iter().enumerate() { > + let offset = offsets[i] as usize; > + let aligned_size = entry.aligned_size(); > + > + // entry.prev points to the next-older entry (or 0 if this is oldest) > + let prev = if i + 1 < offsets.len() { > + offsets[i + 1] > + } else { > + 0 // Oldest entry has prev = 0 > + }; > + > + // entry.next is the end offset of THIS entry > + let next = offset as u32 + aligned_size as u32; > + > + let entry_bytes = entry.serialize_binary(prev, next); > + > + // Write entry data > + buf[offset..offset + entry_bytes.len()].copy_from_slice(&entry_bytes); > + > + // Padding is already zeroed in vec![0u8; capacity] > + } > + > + // Write header > + buf[0..4].copy_from_slice(&(self.capacity as u32).to_le_bytes()); // size = full capacity > + buf[4..8].copy_from_slice(&newest_offset.to_le_bytes()); // cpos = offset to newest entry > + > + buf > + } > + > + /// Deserialize from C binary format > + /// > + /// Parses clog_base_t structure and extracts all entries. > + /// Includes wrap-around guards matching C's logic in `clog_dump`, `clog_dump_json`, > + /// and `clog_sort` functions. > + pub fn deserialize_binary(data: &[u8]) -> Result { > + if data.len() < 8 { > + bail!( > + "Buffer too small: {} bytes (need at least 8 for header)", > + data.len() > + ); > + } > + > + // Read header > + let size = u32::from_le_bytes(data[0..4].try_into()?) as usize; > + let initial_cpos = u32::from_le_bytes(data[4..8].try_into()?) as usize; > + > + if size != data.len() { > + bail!( > + "Size mismatch: header says {}, got {} bytes", > + size, > + data.len() > + ); > + } > + > + // Empty buffer (cpos == 0) > + if initial_cpos == 0 { > + return Ok(Self::new(size)); > + } > + > + // Validate cpos range > + if initial_cpos < 8 || initial_cpos >= size { > + bail!("Invalid cpos: {initial_cpos} (size: {size})"); > + } > + > + // Parse entries starting from cpos, walking backwards via prev pointers > + // Apply C's wrap-around guards from `clog_dump` and `clog_dump_json` > + let mut entries = VecDeque::new(); > + let mut current_pos = initial_cpos; > + let mut visited = std::collections::HashSet::new(); > + > + loop { > + // Guard against infinite loops > + if !visited.insert(current_pos) { > + break; // Already visited this position > + } > + > + // C guard: cpos must be non-zero > + if current_pos == 0 { > + break; > + } > + > + // Validate bounds > + if current_pos >= size { > + break; > + } > + > + // Parse entry at current_pos > + let entry_data = &data[current_pos..]; > + let (entry, prev, _next) = LogEntry::deserialize_binary(entry_data)?; > + > + // Add to back (we're walking backwards in time, newest to oldest) > + // VecDeque should end up as [newest, ..., oldest] > + entries.push_back(entry); > + > + // C wrap-around guard: if (cpos < cur->prev && cur->prev <= clog->cpos) break; > + // Detects when prev wraps around past initial position > + if current_pos < prev as usize && prev as usize <= initial_cpos { > + break; > + } > + > + current_pos = prev as usize; > + } > + > + // Create ring buffer with entries > + let mut buffer = Self::new(size); > + buffer.entries = entries; > + > + // Recalculate current_size > + buffer.current_size = buffer > + .entries > + .iter() > + .map(|e| e.aligned_size()) > + .sum(); > + > + Ok(buffer) > + } > +} > + > +impl Default for RingBuffer { > + fn default() -> Self { > + Self::new(CLOG_DEFAULT_SIZE) > + } > +} > + > +#[cfg(test)] > +mod tests { > + use super::*; > + > + #[test] > + fn test_ring_buffer_creation() { > + let buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + assert_eq!(buffer.capacity, CLOG_DEFAULT_SIZE); > + assert_eq!(buffer.len(), 0); > + assert!(buffer.is_empty()); > + } > + > + #[test] > + fn test_add_entry() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + let entry = LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "message").unwrap(); > + > + let result = buffer.add_entry(&entry); > + assert!(result.is_ok()); > + assert_eq!(buffer.len(), 1); > + assert!(!buffer.is_empty()); > + } > + > + #[test] > + fn test_ring_buffer_wraparound() { > + // Create a buffer with minimum required size (CLOG_MAX_ENTRY_SIZE * 10) > + // but fill it beyond 90% to trigger wraparound > + let mut buffer = RingBuffer::new(CLOG_MAX_ENTRY_SIZE * 10); > + > + // Add many small entries to fill the buffer > + // Each entry is small, so we need many to fill the buffer > + let initial_count = 50_usize; > + for i in 0..initial_count { > + let entry = > + LogEntry::pack("node1", "root", "tag", 0, 1000 + i as u32, 6, "msg").unwrap(); > + let _ = buffer.add_entry(&entry); > + } > + > + // All entries should fit initially > + let count_before = buffer.len(); > + assert_eq!(count_before, initial_count); > + > + // Now add entries with large messages to trigger wraparound > + // Make messages large enough to fill the buffer beyond capacity > + let large_msg = "x".repeat(7000); // Very large message (close to max) > + let large_entries_count = 20_usize; > + for i in 0..large_entries_count { > + let entry = > + LogEntry::pack("node1", "root", "tag", 0, 2000 + i as u32, 6, &large_msg).unwrap(); > + let _ = buffer.add_entry(&entry); > + } > + > + // Should have removed some old entries due to capacity limits > + assert!( > + buffer.len() < count_before + large_entries_count, > + "Expected wraparound to remove old entries (have {} entries, expected < {})", > + buffer.len(), > + count_before + large_entries_count > + ); > + > + // Newest entry should be present > + let newest = buffer.iter().next().unwrap(); > + assert_eq!(newest.time, 2000 + large_entries_count as u32 - 1); // Last added entry > + } > + > + #[test] > + fn test_sort_by_time() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + > + // Add entries in random time order > + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "c").unwrap()); > + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap()); > + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "b").unwrap()); > + > + let sorted = buffer.sort().unwrap(); > + > + // Check that entries are sorted by time (oldest first after reversing) > + let times: Vec = sorted.iter().map(|e| e.time).collect(); > + let mut times_sorted = times.clone(); > + times_sorted.sort(); > + times_sorted.reverse(); // Newest first in buffer > + assert_eq!(times, times_sorted); > + } > + > + #[test] > + fn test_sort_by_node_digest() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + > + // Add entries with same time but different nodes > + let _ = buffer.add_entry(&LogEntry::pack("node3", "root", "tag", 0, 1000, 6, "c").unwrap()); > + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap()); > + let _ = buffer.add_entry(&LogEntry::pack("node2", "root", "tag", 0, 1000, 6, "b").unwrap()); > + > + let sorted = buffer.sort().unwrap(); > + > + // Entries with same time should be sorted by node_digest > + // Within same time, should be sorted > + for entries in sorted.iter().collect::>().windows(2) { > + if entries[0].time == entries[1].time { > + assert!(entries[0].node_digest >= entries[1].node_digest); > + } > + } > + } > + > + #[test] > + fn test_json_dump() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + let _ = buffer > + .add_entry(&LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "msg").unwrap()); > + > + let json = buffer.dump_json(None, 50); > + > + // Should be valid JSON > + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); > + assert!(parsed.get("data").is_some()); > + > + let data = parsed["data"].as_array().unwrap(); > + assert_eq!(data.len(), 1); > + > + let entry = &data[0]; > + assert_eq!(entry["node"], "node1"); > + assert_eq!(entry["user"], "root"); > + assert_eq!(entry["tag"], "cluster"); > + } > + > + #[test] > + fn test_json_dump_with_filter() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + > + // Add entries with different users > + let _ = > + buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "msg1").unwrap()); > + let _ = > + buffer.add_entry(&LogEntry::pack("node1", "admin", "tag", 0, 1001, 6, "msg2").unwrap()); > + let _ = > + buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "msg3").unwrap()); > + > + // Filter for "root" only > + let json = buffer.dump_json(Some("root"), 50); > + > + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); > + let data = parsed["data"].as_array().unwrap(); > + > + // Should only have 2 entries (the ones from "root") > + assert_eq!(data.len(), 2); > + > + for entry in data { > + assert_eq!(entry["user"], "root"); > + } > + } > + > + #[test] > + fn test_json_dump_max_entries() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + > + // Add 10 entries > + for i in 0..10 { > + let _ = buffer > + .add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000 + i, 6, "msg").unwrap()); > + } > + > + // Request only 5 entries > + let json = buffer.dump_json(None, 5); > + > + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); > + let data = parsed["data"].as_array().unwrap(); > + > + assert_eq!(data.len(), 5); > + } > + > + #[test] > + fn test_iterator() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + > + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1000, 6, "a").unwrap()); > + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1001, 6, "b").unwrap()); > + let _ = buffer.add_entry(&LogEntry::pack("node1", "root", "tag", 0, 1002, 6, "c").unwrap()); > + > + let messages: Vec = buffer.iter().map(|e| e.message.clone()).collect(); > + > + // Should be in reverse order (newest first) > + assert_eq!(messages, vec!["c", "b", "a"]); > + } > + > + #[test] > + fn test_binary_serialization_roundtrip() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + > + let _ = buffer.add_entry( > + &LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "Entry 1").unwrap(), > + ); > + let _ = buffer.add_entry( > + &LogEntry::pack("node2", "admin", "system", 456, 1001, 5, "Entry 2").unwrap(), > + ); > + > + // Serialize > + let binary = buffer.serialize_binary(); > + > + // Deserialize > + let deserialized = RingBuffer::deserialize_binary(&binary).unwrap(); > + > + // Check entry count > + assert_eq!(deserialized.len(), buffer.len()); > + > + // Check entries match > + let orig_entries: Vec<_> = buffer.iter().collect(); > + let deser_entries: Vec<_> = deserialized.iter().collect(); > + > + for (orig, deser) in orig_entries.iter().zip(deser_entries.iter()) { > + assert_eq!(deser.uid, orig.uid); > + assert_eq!(deser.time, orig.time); > + assert_eq!(deser.node, orig.node); > + assert_eq!(deser.message, orig.message); > + } > + } > + > + #[test] > + fn test_binary_format_header() { > + let mut buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); > + let _ = buffer.add_entry(&LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap()); > + > + let binary = buffer.serialize_binary(); > + > + // Check header format > + assert!(binary.len() >= 8); > + > + let size = u32::from_le_bytes(binary[0..4].try_into().unwrap()) as usize; > + let cpos = u32::from_le_bytes(binary[4..8].try_into().unwrap()); > + > + assert_eq!(size, binary.len()); > + assert_eq!(cpos, 8); // First entry at offset 8 > + } > + > + #[test] > + fn test_binary_empty_buffer() { > + let buffer = RingBuffer::new(CLOG_DEFAULT_SIZE); // Use default size to avoid capacity upgrade > + let binary = buffer.serialize_binary(); > + > + // Empty buffer returns full capacity (matching C's g_memdup2(cl->base, clog->size)) > + assert_eq!(binary.len(), CLOG_DEFAULT_SIZE); // Full capacity, not just header! > + > + // Check header > + let size = u32::from_le_bytes(binary[0..4].try_into().unwrap()) as usize; > + let cpos = u32::from_le_bytes(binary[4..8].try_into().unwrap()); > + > + assert_eq!(size, CLOG_DEFAULT_SIZE); > + assert_eq!(cpos, 0); // Empty buffer has cpos = 0 > + > + let deserialized = RingBuffer::deserialize_binary(&binary).unwrap(); > + assert_eq!(deserialized.len(), 0); > + assert_eq!(deserialized.capacity(), CLOG_DEFAULT_SIZE); > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs b/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs > new file mode 100644 > index 000000000..5185386dc > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/tests/binary_compatibility_tests.rs > @@ -0,0 +1,315 @@ > +//! Binary compatibility tests for pmxcfs-logger > +//! > +//! These tests verify that the Rust implementation can correctly > +//! serialize/deserialize binary data in a format compatible with > +//! the C implementation. > + > +use pmxcfs_logger::{ClusterLog, LogEntry, RingBuffer}; > + > +/// Test deserializing a minimal C-compatible binary blob > +/// > +/// This test uses a hand-crafted binary blob that matches C's clog_base_t format: > +/// - 8-byte header (size + cpos) > +/// - Single entry at offset 8 > +#[test] > +fn test_deserialize_minimal_c_blob() { > + // Create a minimal valid C binary blob > + // Header: size=8+entry_size, cpos=8 (points to first entry) > + // Entry: minimal valid entry with all required fields > + > + let entry = LogEntry::pack("node1", "root", "test", 123, 1000, 6, "msg").unwrap(); > + let entry_bytes = entry.serialize_binary(0, 0); // prev=0 (end), next=0 > + let entry_size = entry_bytes.len(); > + > + // Allocate buffer with capacity for header + entry > + let total_size = 8 + entry_size; > + let mut blob = vec![0u8; total_size]; > + > + // Write header > + blob[0..4].copy_from_slice(&(total_size as u32).to_le_bytes()); // size > + blob[4..8].copy_from_slice(&8u32.to_le_bytes()); // cpos = 8 > + > + // Write entry > + blob[8..8 + entry_size].copy_from_slice(&entry_bytes); > + > + // Deserialize > + let buffer = RingBuffer::deserialize_binary(&blob).expect("Should deserialize"); > + > + // Verify > + assert_eq!(buffer.len(), 1, "Should have 1 entry"); > + let entries: Vec<_> = buffer.iter().collect(); > + assert_eq!(entries[0].node, "node1"); > + assert_eq!(entries[0].message, "msg"); > +} > + > +/// Test round-trip: Rust serialize -> deserialize > +/// > +/// Verifies that Rust can serialize and deserialize its own format > +#[test] > +fn test_roundtrip_single_entry() { > + let mut buffer = RingBuffer::new(8192 * 16); > + > + let entry = LogEntry::pack("node1", "root", "cluster", 123, 1000, 6, "Test message").unwrap(); > + buffer.add_entry(&entry).unwrap(); > + > + // Serialize > + let blob = buffer.serialize_binary(); > + > + // Verify header > + let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize; > + let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize; > + > + assert_eq!(size, blob.len(), "Size should match blob length"); > + assert_eq!(cpos, 8, "First entry should be at offset 8"); > + > + // Deserialize > + let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize"); > + > + // Verify > + assert_eq!(deserialized.len(), 1); > + let entries: Vec<_> = deserialized.iter().collect(); > + assert_eq!(entries[0].node, "node1"); > + assert_eq!(entries[0].ident, "root"); > + assert_eq!(entries[0].message, "Test message"); > +} > + > +/// Test round-trip with multiple entries > +/// > +/// Verifies linked list structure (prev/next pointers) > +#[test] > +fn test_roundtrip_multiple_entries() { > + let mut buffer = RingBuffer::new(8192 * 16); > + > + // Add 3 entries > + for i in 0..3 { > + let entry = LogEntry::pack( > + "node1", > + "root", > + "test", > + 100 + i, > + 1000 + i, > + 6, > + &format!("Message {}", i), > + ) > + .unwrap(); > + buffer.add_entry(&entry).unwrap(); > + } > + > + // Serialize > + let blob = buffer.serialize_binary(); > + > + // Deserialize > + let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize"); > + > + // Verify all entries preserved > + assert_eq!(deserialized.len(), 3); > + > + let entries: Vec<_> = deserialized.iter().collect(); > + // Entries are stored newest-first > + assert_eq!(entries[0].message, "Message 2"); // Newest > + assert_eq!(entries[1].message, "Message 1"); > + assert_eq!(entries[2].message, "Message 0"); // Oldest > +} > + > +/// Test empty buffer serialization > +/// > +/// C returns a buffer with size and cpos=0 for empty buffers > +#[test] > +fn test_empty_buffer_format() { > + let buffer = RingBuffer::new(8192 * 16); > + > + // Serialize empty buffer > + let blob = buffer.serialize_binary(); > + > + // Verify format > + assert_eq!(blob.len(), 8192 * 16, "Should be full capacity"); > + > + let size = u32::from_le_bytes(blob[0..4].try_into().unwrap()) as usize; > + let cpos = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize; > + > + assert_eq!(size, 8192 * 16, "Size should match capacity"); > + assert_eq!(cpos, 0, "Empty buffer should have cpos=0"); > + > + // Deserialize > + let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize"); > + assert_eq!(deserialized.len(), 0, "Should be empty"); > +} > + > +/// Test entry alignment (8-byte boundaries) > +/// > +/// C uses ((size + 7) & ~7) for alignment > +#[test] > +fn test_entry_alignment() { > + let entry = LogEntry::pack("n", "u", "t", 1, 1000, 6, "m").unwrap(); > + > + let aligned_size = entry.aligned_size(); > + > + // Should be multiple of 8 > + assert_eq!(aligned_size % 8, 0, "Aligned size should be multiple of 8"); > + > + // Should be >= actual size > + assert!(aligned_size >= entry.size()); > + > + // Should be within 7 bytes of actual size > + assert!(aligned_size - entry.size() < 8); > +} > + > +/// Test string length capping (prevents u8 overflow) > +/// > +/// node_len, ident_len, tag_len are u8 and must cap at 255 > +#[test] > +fn test_string_length_capping() { > + // Create entry with very long strings > + let long_node = "a".repeat(300); > + let long_ident = "b".repeat(300); > + let long_tag = "c".repeat(300); > + > + let entry = LogEntry::pack(&long_node, &long_ident, &long_tag, 1, 1000, 6, "msg").unwrap(); > + > + // Serialize > + let blob = entry.serialize_binary(0, 0); > + > + // Check length fields (at offsets 32, 33, 34 after header) > + let node_len = blob[32]; > + let ident_len = blob[33]; > + let tag_len = blob[34]; Shoudnt this be 37/38/39 ? > + > + // All should be capped at 255 (including null terminator) > + assert!(node_len <= 255, "node_len should be capped at 255"); > + assert!(ident_len <= 255, "ident_len should be capped at 255"); > + assert!(tag_len <= 255, "tag_len should be capped at 255"); Assert the expected value instead. > +} > + > +/// Test ClusterLog state serialization > +/// > +/// Verifies get_state() returns C-compatible format > +#[test] > +fn test_cluster_log_state_format() { > + let log = ClusterLog::new(); > + > + // Add some entries > + log.add("node1", "root", "cluster", 123, 6, 1000, "Entry 1") > + .unwrap(); > + log.add("node2", "admin", "system", 456, 6, 1001, "Entry 2") > + .unwrap(); > + > + // Get state > + let state = log.get_state().expect("Should serialize"); > + > + // Verify header format > + assert!(state.len() >= 8, "Should have at least header"); > + > + let size = u32::from_le_bytes(state[0..4].try_into().unwrap()) as usize; > + let cpos = u32::from_le_bytes(state[4..8].try_into().unwrap()) as usize; > + > + assert_eq!(size, state.len(), "Size should match blob length"); > + assert!(cpos >= 8, "cpos should point into data section"); > + assert!(cpos < size, "cpos should be within buffer"); > + > + // Deserialize and verify > + let deserialized = ClusterLog::deserialize_state(&state).expect("Should deserialize"); > + assert_eq!(deserialized.len(), 2, "Should have 2 entries"); > +} > + > +/// Test wrap-around detection in deserialization > +/// > +/// Verifies that circular buffer wrap-around is handled correctly > +#[test] > +fn test_wraparound_detection() { > + // Create a buffer with entries > + let mut buffer = RingBuffer::new(8192 * 16); > + > + for i in 0..5 { > + let entry = LogEntry::pack("node1", "root", "test", 100 + i, 1000 + i, 6, "msg").unwrap(); > + buffer.add_entry(&entry).unwrap(); > + } > + > + // Serialize > + let blob = buffer.serialize_binary(); > + > + // Deserialize (should handle prev pointers correctly) > + let deserialized = RingBuffer::deserialize_binary(&blob).expect("Should deserialize"); > + > + // Should get all entries > + assert_eq!(deserialized.len(), 5); > +} > + > +/// Test invalid binary data handling > +/// > +/// Verifies that malformed data is rejected > +#[test] > +fn test_invalid_binary_data() { > + // Too small > + let too_small = vec![0u8; 4]; > + assert!(RingBuffer::deserialize_binary(&too_small).is_err()); > + > + // Size mismatch > + let mut size_mismatch = vec![0u8; 100]; > + size_mismatch[0..4].copy_from_slice(&200u32.to_le_bytes()); // Claims 200 bytes > + assert!(RingBuffer::deserialize_binary(&size_mismatch).is_err()); > + > + // Invalid cpos (beyond buffer) > + let mut invalid_cpos = vec![0u8; 100]; > + invalid_cpos[0..4].copy_from_slice(&100u32.to_le_bytes()); // size = 100 > + invalid_cpos[4..8].copy_from_slice(&200u32.to_le_bytes()); // cpos = 200 (invalid) > + assert!(RingBuffer::deserialize_binary(&invalid_cpos).is_err()); > +} > + > +/// Test FNV-1a hash consistency > +/// > +/// Verifies that node_digest and ident_digest are computed correctly > +#[test] > +fn test_hash_consistency() { > + let entry1 = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "msg1").unwrap(); > + let entry2 = LogEntry::pack("node1", "root", "test", 2, 1001, 6, "msg2").unwrap(); > + let entry3 = LogEntry::pack("node2", "admin", "test", 3, 1002, 6, "msg3").unwrap(); > + > + // Same node should have same digest > + assert_eq!(entry1.node_digest, entry2.node_digest); > + > + // Same ident should have same digest > + assert_eq!(entry1.ident_digest, entry2.ident_digest); > + > + // Different node should have different digest > + assert_ne!(entry1.node_digest, entry3.node_digest); > + > + // Different ident should have different digest > + assert_ne!(entry1.ident_digest, entry3.ident_digest); > +} > + > +/// Test priority validation > +/// > +/// Priority must be 0-7 (syslog priority) > +#[test] > +fn test_priority_validation() { > + // Valid priorities (0-7) > + for pri in 0..=7 { > + let result = LogEntry::pack("node1", "root", "test", 1, 1000, pri, "msg"); > + assert!(result.is_ok(), "Priority {} should be valid", pri); > + } > + > + // Invalid priority (8+) > + let result = LogEntry::pack("node1", "root", "test", 1, 1000, 8, "msg"); > + assert!(result.is_err(), "Priority 8 should be invalid"); > +} > + > +/// Test UTF-8 to ASCII conversion > +/// > +/// Verifies control character and Unicode escaping (matches C implementation) > +#[test] > +fn test_utf8_escaping() { > + // Control characters (C format: #XXX with 3 decimal digits) > + let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Hello\x07World").unwrap(); > + assert!(entry.message.contains("#007"), "BEL should be escaped as #007"); > + > + // Unicode characters > + let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Hello 世界").unwrap(); > + assert!(entry.message.contains("\\u4e16"), "世 should be escaped as \\u4e16"); > + assert!(entry.message.contains("\\u754c"), "界 should be escaped as \\u754c"); > + > + // Mixed content > + let entry = LogEntry::pack("node1", "root", "test", 1, 1000, 6, "Test\x01\n世").unwrap(); > + assert!(entry.message.contains("#001"), "SOH should be escaped"); > + assert!(entry.message.contains("#010"), "LF should be escaped"); > + assert!(entry.message.contains("\\u4e16"), "Unicode should be escaped"); > +} > diff --git a/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs b/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs > new file mode 100644 > index 000000000..eec7470d3 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-logger/tests/performance_tests.rs > @@ -0,0 +1,294 @@ > +//! Performance tests for pmxcfs-logger > +//! > +//! These tests verify that the logger implementation scales properly > +//! and handles large log merges efficiently. > + > +use pmxcfs_logger::ClusterLog; > + > +/// Test merging large logs from multiple nodes > +/// > +/// This test verifies: > +/// 1. Large log merge performance (multiple nodes with many entries) > +/// 2. Memory usage stays bounded > +/// 3. Deduplication works correctly at scale > +#[test] > +fn test_large_log_merge_performance() { > + // Create 3 nodes with large logs > + let node1 = ClusterLog::new(); > + let node2 = ClusterLog::new(); > + let node3 = ClusterLog::new(); > + > + // Add 1000 entries per node (3000 total) > + for i in 0..1000 { > + let _ = node1.add( > + "node1", > + "root", > + "cluster", > + 1000 + i, > + 6, > + 1000000 + i, > + &format!("Node1 entry {}", i), > + ); > + let _ = node2.add( > + "node2", > + "admin", > + "system", > + 2000 + i, > + 6, > + 1000000 + i, > + &format!("Node2 entry {}", i), > + ); > + let _ = node3.add( > + "node3", > + "user", > + "service", > + 3000 + i, > + 6, > + 1000000 + i, > + &format!("Node3 entry {}", i), > + ); > + } > + > + // Get remote buffers > + let node2_buffer = node2.get_buffer(); > + let node3_buffer = node3.get_buffer(); > + > + // Merge all logs into node1 > + let start = std::time::Instant::now(); > + node1 > + .merge(vec![node2_buffer, node3_buffer], true) > + .expect("Merge should succeed"); > + let duration = start.elapsed(); > + > + // Verify merge completed > + let merged_count = node1.len(); > + > + // Should have merged entries (may be less than 3000 due to capacity limits) > + assert!( > + merged_count > 0, > + "Should have some entries after merge (got {})", > + merged_count > + ); > + > + // Performance check: merge should complete in reasonable time > + // For 3000 entries, should be well under 1 second > + assert!( > + duration.as_millis() < 1000, > + "Large merge took too long: {:?}", > + duration > + ); > + > + println!( > + "[OK] Merged 3000 entries from 3 nodes in {:?} (result: {} entries)", > + duration, merged_count > + ); > +} > + > +/// Test deduplication performance with high duplicate rate > +/// > +/// This test verifies that deduplication works efficiently when > +/// many duplicate entries are present. > +#[test] > +fn test_deduplication_performance() { > + let log = ClusterLog::new(); > + > + // Add 500 entries from same node with overlapping times > + // This creates many potential duplicates > + for i in 0..500 { > + let _ = log.add( > + "node1", > + "root", > + "cluster", > + 1000 + i, > + 6, > + 1000 + (i / 10), // Reuse timestamps (50 unique times) > + &format!("Entry {}", i), > + ); > + } > + > + // Create remote log with overlapping entries > + let remote = ClusterLog::new(); > + for i in 0..500 { > + let _ = remote.add( > + "node1", > + "root", > + "cluster", > + 2000 + i, > + 6, > + 1000 + (i / 10), // Same timestamp pattern > + &format!("Remote entry {}", i), > + ); > + } > + > + let remote_buffer = remote.get_buffer(); > + > + // Merge with deduplication > + let start = std::time::Instant::now(); > + log.merge(vec![remote_buffer], true) > + .expect("Merge should succeed"); > + let duration = start.elapsed(); > + > + let final_count = log.len(); > + > + // Should have deduplicated some entries > + assert!( > + final_count > 0, > + "Should have entries after deduplication" > + ); > + > + // Performance check > + assert!( > + duration.as_millis() < 500, > + "Deduplication took too long: {:?}", > + duration > + ); > + > + println!( > + "[OK] Deduplicated 1000 entries in {:?} (result: {} entries)", > + duration, final_count > + ); > +} > + > +/// Test memory usage stays bounded during large operations > +/// > +/// This test verifies that the ring buffer properly limits memory > +/// usage even when adding many entries. > +#[test] > +fn test_memory_bounded() { > + // Create log with default capacity > + let log = ClusterLog::new(); > + > + // Add many entries (more than capacity) > + for i in 0..10000 { > + let _ = log.add( > + "node1", > + "root", > + "cluster", > + 1000 + i, > + 6, > + 1000000 + i, > + &format!("Entry with some message content {}", i), > + ); > + } > + > + let entry_count = log.len(); > + let capacity = log.capacity(); > + > + // Buffer should not grow unbounded > + // Entry count should be reasonable relative to capacity > + assert!( > + entry_count < 10000, > + "Buffer should not store all 10000 entries (got {})", > + entry_count > + ); > + > + // Verify capacity is respected > + assert!( > + capacity > 0, > + "Capacity should be set (got {})", > + capacity > + ); > + > + println!( > + "[OK] Added 10000 entries, buffer contains {} (capacity: {} bytes)", > + entry_count, capacity > + ); > +} > + > +/// Test JSON export performance with large logs > +/// > +/// This test verifies that JSON export scales properly. > +#[test] > +fn test_json_export_performance() { > + let log = ClusterLog::new(); > + > + // Add 1000 entries > + for i in 0..1000 { > + let _ = log.add( > + "node1", > + "root", > + "cluster", > + 1000 + i, > + 6, > + 1000000 + i, > + &format!("Test message {}", i), > + ); > + } > + > + // Export to JSON > + let start = std::time::Instant::now(); > + let json = log.dump_json(None, 1000); > + let duration = start.elapsed(); > + > + // Verify JSON is valid > + let parsed: serde_json::Value = > + serde_json::from_str(&json).expect("Should be valid JSON"); > + let data = parsed["data"].as_array().expect("Should have data array"); > + > + assert!(data.len() > 0, "Should have entries in JSON"); > + > + // Performance check > + assert!( > + duration.as_millis() < 500, > + "JSON export took too long: {:?}", > + duration > + ); > + > + println!( > + "[OK] Exported {} entries to JSON in {:?}", > + data.len(), > + duration > + ); > +} > + > +/// Test binary serialization performance > +/// > +/// This test verifies that binary serialization/deserialization > +/// is efficient for large buffers. > +#[test] > +fn test_binary_serialization_performance() { > + let log = ClusterLog::new(); > + > + // Add 500 entries > + for i in 0..500 { > + let _ = log.add( > + "node1", > + "root", > + "cluster", > + 1000 + i, > + 6, > + 1000000 + i, > + &format!("Entry {}", i), > + ); > + } > + > + // Serialize > + let start = std::time::Instant::now(); > + let state = log.get_state().expect("Should serialize"); > + let serialize_duration = start.elapsed(); > + > + // Deserialize > + let start = std::time::Instant::now(); > + let deserialized = ClusterLog::deserialize_state(&state).expect("Should deserialize"); > + let deserialize_duration = start.elapsed(); > + > + // Verify round-trip > + assert_eq!(deserialized.len(), 500, "Should preserve entry count"); > + > + // Performance checks > + assert!( > + serialize_duration.as_millis() < 200, > + "Serialization took too long: {:?}", > + serialize_duration > + ); > + assert!( > + deserialize_duration.as_millis() < 200, > + "Deserialization took too long: {:?}", > + deserialize_duration > + ); > + > + println!( > + "[OK] Serialized 500 entries in {:?}, deserialized in {:?}", > + serialize_duration, deserialize_duration > + ); > +}