From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id C69431FF141 for ; Fri, 13 Feb 2026 10:47:03 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id DC06833313; Fri, 13 Feb 2026 10:47:08 +0100 (CET) From: Kefu Chai To: pve-devel@lists.proxmox.com Subject: [PATCH pve-cluster 07/14 v2] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Date: Fri, 13 Feb 2026 17:33:44 +0800 Message-ID: <20260213094119.2379288-8-k.chai@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260213094119.2379288-1-k.chai@proxmox.com> References: <20260213094119.2379288-1-k.chai@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1770975743104 X-SPAM-LEVEL: Spam detection results: 2 AWL -4.309 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SOMETLD_ARE_BAD_TLD 5 .bar, .beauty, .buzz, .cam, .casa, .cfd, .club, .date, .guru, .link, .live, .monster, .online, .press, .pw, .quest, .rest, .sbs, .shop, .stream, .top, .trade, .wiki, .work, .xyz TLD abuse PDS_OTHER_BAD_TLD 0.75 Untrustworthy TLDs POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_1 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLACK 3 Contains an URL listed in the URIBL blacklist [types.rs] X-MailFrom: k.chai@proxmox.com X-Mailman-Rule-Hits: max-size X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; news-moderation; no-subject; digests; suspicious-header Message-ID-Hash: CJHAU4BMOYXX4NNEHQ42BNC2LMCQGXB5 X-Message-ID-Hash: CJHAU4BMOYXX4NNEHQ42BNC2LMCQGXB5 X-Mailman-Approved-At: Fri, 13 Feb 2026 10:47:06 +0100 X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Add cluster status tracking and monitoring: - Status: Central status container (thread-safe) - Cluster membership tracking - VM/CT registry with version tracking - RRD data management - Cluster log integration - Quorum state tracking - Configuration file version tracking This integrates pmxcfs-memdb, pmxcfs-rrd, pmxcfs-logger, and pmxcfs-api-types to provide centralized cluster state management. It also uses procfs for system metrics collection. Includes comprehensive unit tests for: - VM registration and deletion - Cluster membership updates - Version tracking - Configuration file monitoring The pmxcfs-test-utils provides utilities shared by integration tests. It is added along with pmxcfs-status to avoid circular dependency. Signed-off-by: Kefu Chai --- src/pmxcfs-rs/Cargo.toml | 4 + src/pmxcfs-rs/pmxcfs-status/Cargo.toml | 39 + src/pmxcfs-rs/pmxcfs-status/README.md | 142 ++ src/pmxcfs-rs/pmxcfs-status/src/lib.rs | 94 + src/pmxcfs-rs/pmxcfs-status/src/status.rs | 1852 +++++++++++++++++ src/pmxcfs-rs/pmxcfs-status/src/traits.rs | 492 +++++ src/pmxcfs-rs/pmxcfs-status/src/types.rs | 77 + src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml | 34 + src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs | 570 +++++ .../pmxcfs-test-utils/src/mock_memdb.rs | 771 +++++++ 10 files changed, 4075 insertions(+) create mode 100644 src/pmxcfs-rs/pmxcfs-status/Cargo.toml create mode 100644 src/pmxcfs-rs/pmxcfs-status/README.md create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/lib.rs create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/status.rs create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/traits.rs create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/types.rs create mode 100644 src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml create mode 100644 src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs create mode 100644 src/pmxcfs-rs/pmxcfs-test-utils/src/mock_memdb.rs diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml index 073488851..9d509c1d2 100644 --- a/src/pmxcfs-rs/Cargo.toml +++ b/src/pmxcfs-rs/Cargo.toml @@ -6,6 +6,8 @@ members = [ "pmxcfs-logger", # Cluster log with ring buffer and deduplication "pmxcfs-rrd", # RRD (Round-Robin Database) persistence "pmxcfs-memdb", # In-memory database with SQLite persistence + "pmxcfs-status", # Status monitoring and RRD data management + "pmxcfs-test-utils", # Test utilities and helpers (dev-only) ] resolver = "2" @@ -24,6 +26,8 @@ pmxcfs-config = { path = "pmxcfs-config" } pmxcfs-logger = { path = "pmxcfs-logger" } pmxcfs-rrd = { path = "pmxcfs-rrd" } pmxcfs-memdb = { path = "pmxcfs-memdb" } +pmxcfs-status = { path = "pmxcfs-status" } +pmxcfs-test-utils = { path = "pmxcfs-test-utils" } # Core async runtime tokio = { version = "1.35", features = ["full"] } diff --git a/src/pmxcfs-rs/pmxcfs-status/Cargo.toml b/src/pmxcfs-rs/pmxcfs-status/Cargo.toml new file mode 100644 index 000000000..1a16379b5 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "pmxcfs-status" +description = "Status monitoring and RRD data management for pmxcfs" + +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +# Workspace dependencies +pmxcfs-api-types.workspace = true +pmxcfs-config.workspace = true +pmxcfs-rrd.workspace = true +pmxcfs-memdb.workspace = true +pmxcfs-logger.workspace = true + +# Error handling +anyhow.workspace = true + +# Async runtime +tokio.workspace = true + +# Concurrency primitives +parking_lot.workspace = true + +# Logging +tracing.workspace = true + +# System information (Linux /proc filesystem) +procfs = "0.17" + +[dev-dependencies] +tempfile.workspace = true +pmxcfs-test-utils.workspace = true diff --git a/src/pmxcfs-rs/pmxcfs-status/README.md b/src/pmxcfs-rs/pmxcfs-status/README.md new file mode 100644 index 000000000..b6958af3f --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/README.md @@ -0,0 +1,142 @@ +# pmxcfs-status + +**Cluster Status** tracking and monitoring for pmxcfs. + +This crate manages all runtime cluster state information including membership, VM lists, node status, RRD metrics, and cluster logs. It serves as the central repository for dynamic cluster information that changes during runtime. + +## Overview + +The Status subsystem tracks: +- **Cluster membership**: Which nodes are in the cluster and their states +- **VM/CT tracking**: Registry of all virtual machines and containers +- **Node status**: Per-node health and resource information +- **RRD data**: Performance metrics (CPU, memory, disk, network) +- **Cluster log**: Centralized log aggregation +- **Quorum state**: Whether cluster has quorum +- **Version tracking**: Monitors configuration file changes + +## Usage + +### Initialization + +```rust +use pmxcfs_status; + +// For tests or when RRD persistence is not needed +let status = pmxcfs_status::init(); + +// For production with RRD file persistence +let status = pmxcfs_status::init_with_rrd("/var/lib/rrdcached/db").await; +``` + +The default `init()` is synchronous and doesn't require a directory parameter, making tests simpler. Use `init_with_rrd()` for production deployments that need RRD persistence. + +### Integration with Other Components + +**FUSE Plugins**: +- `.version` plugin reads from Status +- `.vmlist` plugin generates VM list from Status +- `.members` plugin generates member list from Status +- `.rrd` plugin accesses RRD data from Status +- `.clusterlog` plugin reads cluster log from Status + +**DFSM Status Sync**: +- `StatusSyncService` (pmxcfs-dfsm) broadcasts status updates +- Uses `pve_kvstore_v1` CPG group +- KV store data synchronized across nodes + +**IPC Server**: +- `set_status` IPC call updates Status +- Used by `pvecm`/`pvenode` tools +- RRD data received via IPC + +**MemDb Integration**: +- Scans VM configs to populate vmlist +- Tracks version changes on file modifications +- Used for `.version` plugin timestamps + +## Architecture + +### Module Structure + +| Module | Purpose | +|--------|---------| +| `lib.rs` | Public API and initialization | +| `status.rs` | Core Status struct and operations | +| `types.rs` | Type definitions (ClusterNode, ClusterInfo, etc.) | + +### Key Features + +**Thread-Safe**: All operations use `RwLock` or `AtomicU64` for concurrent access +**Version Tracking**: Monotonically increasing counters for change detection +**Structured Logging**: Field-based tracing for better observability +**Optional RRD**: RRD persistence is opt-in, simplifying testing + +## C to Rust Mapping + +### Data Structures + +| C Type | Rust Type | Notes | +|--------|-----------|-------| +| `cfs_status_t` | `Status` | Main status container | +| `cfs_clinfo_t` | `ClusterInfo` | Cluster membership info | +| `cfs_clnode_t` | `ClusterNode` | Individual node info | +| `vminfo_t` | `VmEntry` | VM/CT registry entry (in pmxcfs-api-types) | +| `clog_entry_t` | `ClusterLogEntry` | Cluster log entry | + +### Core Functions + +| C Function | Rust Equivalent | Notes | +|-----------|-----------------|-------| +| `cfs_status_init()` | `init()` or `init_with_rrd()` | Two variants for flexibility | +| `cfs_set_quorate()` | `Status::set_quorate()` | Quorum tracking | +| `cfs_is_quorate()` | `Status::is_quorate()` | Quorum checking | +| `vmlist_register_vm()` | `Status::register_vm()` | VM registration | +| `vmlist_delete_vm()` | `Status::delete_vm()` | VM deletion | +| `cfs_status_set()` | `Status::set_node_status()` | Status updates (including RRD) | + +## Key Differences from C Implementation + +### RRD Decoupling + +**C Version (status.c)**: +- RRD code embedded in status.c +- Async initialization always required + +**Rust Version**: +- Separate `pmxcfs-rrd` crate +- `init()` is synchronous (no RRD) +- `init_with_rrd()` is async (with RRD) +- Tests don't need temp directories + +### Concurrency + +**C Version**: +- Single `GMutex` for entire status structure + +**Rust Version**: +- Fine-grained `RwLock` for different data structures +- `AtomicU64` for version counters +- Better read parallelism + +## Configuration File Tracking + +Status tracks version numbers for these common Proxmox config files: + +- `corosync.conf`, `corosync.conf.new` +- `storage.cfg`, `user.cfg`, `domains.cfg` +- `datacenter.cfg`, `vzdump.cron`, `vzdump.conf` +- `ha/` directory files (crm_commands, manager_status, resources.cfg, etc.) +- `sdn/` directory files (vnets.cfg, zones.cfg, controllers.cfg, etc.) +- And many more (see `Status::new()` in status.rs for complete list) + +## References + +### C Implementation +- `src/pmxcfs/status.c` / `status.h` - Status tracking + +### Related Crates +- **pmxcfs-rrd**: RRD file persistence +- **pmxcfs-dfsm**: Status synchronization via StatusSyncService +- **pmxcfs-logger**: Cluster log implementation +- **pmxcfs**: FUSE plugins that read from Status diff --git a/src/pmxcfs-rs/pmxcfs-status/src/lib.rs b/src/pmxcfs-rs/pmxcfs-status/src/lib.rs new file mode 100644 index 000000000..67c97f81c --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/src/lib.rs @@ -0,0 +1,94 @@ +/// Status information and monitoring +/// +/// This module manages: +/// - Cluster membership (nodes, IPs, online status) +/// - RRD (Round Robin Database) data for metrics +/// - Cluster log +/// - Node status information +/// - VM/CT list tracking +mod status; +mod traits; +mod types; + +// Re-export public types +pub use pmxcfs_api_types::{VmEntry, VmType}; +pub use types::{ClusterInfo, ClusterLogEntry, ClusterNode, NodeStatus}; + +// Re-export Status struct and trait +pub use status::Status; +pub use traits::{BoxFuture, MockStatus, StatusOps}; + +use std::sync::Arc; + +/// Initialize status subsystem without RRD persistence +/// +/// DEPRECATED: Use init_with_config() instead. Config is required (matches C semantics). +/// This function is kept for backward compatibility but will be removed. +#[deprecated(note = "Use init_with_config() instead - config is required")] +pub fn init() -> Arc { + // Create a default config for backward compatibility + let config = pmxcfs_config::Config::shared( + "localhost".to_string(), + "127.0.0.1".parse().unwrap(), + 33, + false, + true, // local mode + "pmxcfs".to_string(), + ); + tracing::warn!("Using deprecated init() - config should be provided explicitly"); + Arc::new(Status::new(config, None)) +} + +/// Initialize status subsystem with configuration +/// +/// Creates a Status instance with the global configuration. +/// Config is REQUIRED (matches C semantics where cfs is always present). +pub fn init_with_config(config: Arc) -> Arc { + tracing::info!("Status subsystem initialized with config"); + Arc::new(Status::new(config, None)) +} + +/// Initialize status subsystem with RRD file persistence +/// +/// DEPRECATED: Use init_with_config_and_rrd() instead. Config is required (matches C semantics). +#[deprecated(note = "Use init_with_config_and_rrd() instead - config is required")] +pub async fn init_with_rrd>(rrd_dir: P) -> Arc { + let config = pmxcfs_config::Config::shared( + "localhost".to_string(), + "127.0.0.1".parse().unwrap(), + 33, + false, + true, // local mode + "pmxcfs".to_string(), + ); + tracing::warn!("Using deprecated init_with_rrd() - config should be provided explicitly"); + init_with_config_and_rrd(config, rrd_dir).await +} + +/// Initialize status subsystem with full configuration and RRD persistence +/// +/// Creates a Status instance with both configuration and RRD persistence. +/// This is the recommended initialization for production use. +/// Config is REQUIRED (matches C semantics where cfs is always present). +pub async fn init_with_config_and_rrd>( + config: Arc, + rrd_dir: P, +) -> Arc { + let rrd_dir_path = rrd_dir.as_ref(); + let rrd_writer = match pmxcfs_rrd::RrdWriter::new(rrd_dir_path).await { + Ok(writer) => { + tracing::info!( + directory = %rrd_dir_path.display(), + "RRD file persistence enabled" + ); + Some(writer) + } + Err(e) => { + tracing::warn!(error = %e, "RRD file persistence disabled"); + None + } + }; + + tracing::info!("Status subsystem initialized with config and RRD"); + Arc::new(Status::new(config, rrd_writer)) +} diff --git a/src/pmxcfs-rs/pmxcfs-status/src/status.rs b/src/pmxcfs-rs/pmxcfs-status/src/status.rs new file mode 100644 index 000000000..58d81b8ed --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/src/status.rs @@ -0,0 +1,1852 @@ +/// Status subsystem implementation +use crate::types::{ClusterInfo, ClusterLogEntry, ClusterNode, NodeStatus, RrdEntry}; +use anyhow::Result; +use parking_lot::RwLock; +use pmxcfs_api_types::{VmEntry, VmType}; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Status subsystem (matches C implementation's cfs_status_t) +pub struct Status { + /// Configuration (nodename, IP, etc.) - matches C's global `cfs` variable + /// Always present, just like C's global `cfs` struct (never NULL) + config: Arc, + + /// Cluster information (nodes, membership) - matches C's clinfo + cluster_info: RwLock>, + + /// Cluster info version counter - increments on membership changes (matches C's clinfo_version) + /// This is separate from config_version in ClusterInfo (which matches C's cman_version) + cluster_version: AtomicU64, + + /// VM list version counter - increments when VM list changes (matches C's vmlist_version) + vmlist_version: AtomicU64, + + /// Global VM info version counter (matches C's vminfo_version_counter) + /// Used to track the order of VM updates across all VMs + vminfo_version_counter: AtomicU64, + + /// MemDB path version counters (matches C's memdb_change_array) + /// Tracks versions for specific config files like "corosync.conf", "user.cfg", etc. + memdb_path_versions: RwLock>, + + /// Node status data by name + node_status: RwLock>, + + /// Cluster log with ring buffer and deduplication (matches C's clusterlog_t) + cluster_log: pmxcfs_logger::ClusterLog, + + /// RRD entries by key (e.g., "pve2-node/nodename" or "pve2.3-vm/vmid") + pub(crate) rrd_data: RwLock>, + + /// RRD dump cache (timestamp, cached_dump) + rrd_dump_cache: RwLock>, + + /// RRD file writer for persistent storage (using tokio RwLock for async compatibility) + rrd_writer: Option>>, + + /// VM/CT list (vmid -> VmEntry) + vmlist: RwLock>, + + /// Quorum status (matches C's cfs_status.quorate) + quorate: RwLock, + + /// Current cluster members (CPG membership) + members: RwLock>, + + /// Daemon start timestamp (UNIX epoch) - for .version plugin + start_time: u64, + + /// KV store data from nodes (nodeid -> key -> (value, version)) + /// Matches C implementation's kvhash with per-key version tracking + kvstore: RwLock, u32)>>>, + + /// Node IP addresses (nodename -> IP) - matches C's iphash + node_ips: RwLock>, +} + +impl Status { + /// Create a new Status instance + /// + /// For production use, use `pmxcfs_status::init_with_config()` or `init_with_config_and_rrd()`. + /// For tests, use `pmxcfs_test_utils::create_test_config()` to create a config. + /// + /// # Arguments + /// * `config` - Configuration (contains nodename, IP, etc.) - REQUIRED, like C's global cfs + /// * `rrd_writer` - Optional RRD writer for persistent storage + pub fn new(config: Arc, rrd_writer: Option) -> Self { + // Wrap RrdWriter in Arc if provided (for async compatibility) + let rrd_writer = rrd_writer.map(|w| Arc::new(tokio::sync::RwLock::new(w))); + + // Initialize memdb path versions for common Proxmox config files + // Matches C implementation's memdb_change_array (status.c:79-120) + // These are the exact paths tracked by the C implementation + let mut path_versions = HashMap::new(); + let common_paths = vec![ + "corosync.conf", + "corosync.conf.new", + "storage.cfg", + "user.cfg", + "domains.cfg", + "notifications.cfg", + "priv/notifications.cfg", + "priv/shadow.cfg", + "priv/acme/plugins.cfg", + "priv/tfa.cfg", + "priv/token.cfg", + "datacenter.cfg", + "vzdump.cron", + "vzdump.conf", + "jobs.cfg", + "ha/crm_commands", + "ha/manager_status", + "ha/resources.cfg", + "ha/rules.cfg", + "ha/groups.cfg", + "ha/fence.cfg", + "status.cfg", + "replication.cfg", + "ceph.conf", + "sdn/vnets.cfg", + "sdn/zones.cfg", + "sdn/controllers.cfg", + "sdn/subnets.cfg", + "sdn/ipams.cfg", + "sdn/mac-cache.json", // SDN MAC address cache + "sdn/pve-ipam-state.json", // SDN IPAM state + "sdn/dns.cfg", // SDN DNS configuration + "sdn/fabrics.cfg", // SDN fabrics configuration + "sdn/.running-config", // SDN running configuration + "virtual-guest/cpu-models.conf", // Virtual guest CPU models + "virtual-guest/profiles.cfg", // Virtual guest profiles + "firewall/cluster.fw", // Cluster firewall rules + "mapping/directory.cfg", // Directory mappings + "mapping/pci.cfg", // PCI device mappings + "mapping/usb.cfg", // USB device mappings + ]; + + for path in common_paths { + path_versions.insert(path.to_string(), AtomicU64::new(0)); + } + + // Get start time (matches C implementation's cfs_status.start_time) + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + Self { + config, + cluster_info: RwLock::new(None), + cluster_version: AtomicU64::new(0), // Match C's clinfo_version starting at 0 + vmlist_version: AtomicU64::new(0), // Match C's vmlist_version starting at 0 + vminfo_version_counter: AtomicU64::new(0), + memdb_path_versions: RwLock::new(path_versions), + node_status: RwLock::new(HashMap::new()), + cluster_log: pmxcfs_logger::ClusterLog::new(), + rrd_data: RwLock::new(HashMap::new()), + rrd_dump_cache: RwLock::new(None), + rrd_writer, + vmlist: RwLock::new(HashMap::new()), + quorate: RwLock::new(false), + members: RwLock::new(Vec::new()), + start_time, + kvstore: RwLock::new(HashMap::new()), + node_ips: RwLock::new(HashMap::new()), + } + } + + /// Get node status + pub fn get_node_status(&self, name: &str) -> Option { + self.node_status.read().get(name).cloned() + } + + /// Set node status (matches C implementation's cfs_status_set) + /// + /// This handles status updates received via IPC from external clients. + /// If the key starts with "rrd/", it's RRD data that should be written to disk. + /// If the key is "nodeip", it's a node IP address update. + /// Otherwise, it's generic node status data. + pub async fn set_node_status(&self, name: String, data: Vec) -> Result<()> { + // Check size limit (matches C's CFS_MAX_STATUS_SIZE check) + if data.len() > pmxcfs_api_types::CFS_MAX_STATUS_SIZE { + return Err(anyhow::anyhow!( + "Status data too large: {} bytes (max: {})", + data.len(), + pmxcfs_api_types::CFS_MAX_STATUS_SIZE + )); + } + + // Check if this is RRD data (matching C's cfs_status_set behavior) + if let Some(rrd_key) = name.strip_prefix("rrd/") { + // Strip "rrd/" prefix to get the actual RRD key + // Convert data to string (RRD data is text format) + let mut data_str = String::from_utf8(data) + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in RRD data: {e}"))?; + + // Strip NUL termination from C payloads (C strings are NUL-terminated) + if data_str.ends_with('\0') { + data_str.pop(); + } + + // Write to RRD (stores in memory and writes to disk) + self.set_rrd_data(rrd_key.to_string(), data_str).await?; + } else if name == "nodeip" { + // Node IP address update (matches C's nodeip_hash_set) + let mut ip_str = String::from_utf8(data) + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in nodeip data: {e}"))?; + + // Strip NUL termination + if ip_str.ends_with('\0') { + ip_str.pop(); + } + + // Get current node name from config (always valid, like C's cfs.nodename) + let nodename = self.get_local_nodename(); + let mut node_ips = self.node_ips.write(); + + // Use entry API for atomic check-and-update to prevent race where + // two concurrent updates could both see old value and both increment version + use std::collections::hash_map::Entry; + let needs_version_bump = match node_ips.entry(nodename.to_string()) { + Entry::Occupied(mut e) if e.get() != &ip_str => { + e.insert(ip_str); + true + } + Entry::Vacant(e) => { + e.insert(ip_str); + true + } + _ => false, + }; + + drop(node_ips); + + if needs_version_bump { + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + } else { + // Regular node status (not RRD or nodeip) + let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); + let status = NodeStatus { + name: name.clone(), + data, + timestamp: now, + }; + self.node_status.write().insert(name, status); + } + + Ok(()) + } + + /// Get local node name (helper for nodeip handling) + /// + /// Returns the nodename from config (matches C implementation's use of cfs.nodename). + /// The C code initializes cfs.nodename from uname() at startup (pmxcfs.c:826), + /// and our Config does the same. This method simply returns that cached value. + fn get_local_nodename(&self) -> &str { + self.config.nodename() + } + + /// Add cluster log entry + pub fn add_log_entry(&self, entry: ClusterLogEntry) { + // Convert ClusterLogEntry to ClusterLog format and add + // The ClusterLog handles size limits and deduplication internally + let _ = self.cluster_log.add( + &entry.node, + &entry.ident, + &entry.tag, + 0, // pid not tracked in our entries + entry.priority, + entry.timestamp as u32, + &entry.message, + ); + } + + /// Get cluster log entries + pub fn get_log_entries(&self, max: usize) -> Vec { + // Get entries from ClusterLog and convert to ClusterLogEntry + self.cluster_log + .get_entries(max) + .into_iter() + .map(|entry| ClusterLogEntry { + uid: entry.uid, + timestamp: entry.time as u64, + priority: entry.priority, + tag: entry.tag, + pid: entry.pid, + node: entry.node, + ident: entry.ident, + message: entry.message, + }) + .collect() + } + + /// Get cluster log entries filtered by ident (user) + /// + /// Matches C implementation: clog_dump_json() filters by ident_digest + /// If user is empty, returns all entries (no filtering) + pub fn get_log_entries_filtered(&self, max: usize, user: &str) -> Vec { + if user.is_empty() { + return self.get_log_entries(max); + } + + // Filter by ident field (matches C's ident_digest comparison) + // Iterate all entries to ensure we don't miss matches (C iterates the entire ring buffer) + let all_entries = self.cluster_log.get_entries(usize::MAX); + all_entries + .into_iter() + .filter(|entry| entry.ident == user) + .take(max) + .map(|entry| ClusterLogEntry { + uid: entry.uid, + timestamp: entry.time as u64, + priority: entry.priority, + tag: entry.tag, + pid: entry.pid, + node: entry.node, + ident: entry.ident, + message: entry.message, + }) + .collect() + } + + /// Clear all cluster log entries (for testing) + pub fn clear_cluster_log(&self) { + self.cluster_log.clear(); + } + + /// Set RRD data (C-compatible format) + /// Key format: "pve2-node/{nodename}" or "pve2.3-vm/{vmid}" + /// Data format from pvestatd: "{non_archivable_fields...}:{ctime}:{val1}:{val2}:..." + pub async fn set_rrd_data(&self, key: String, data: String) -> Result<()> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let entry = RrdEntry { + key: key.clone(), + data: data.clone(), + timestamp: now, + }; + + // Store in memory for .rrd plugin file + self.rrd_data.write().insert(key.clone(), entry); + + // Also write to RRD file on disk (if persistence is enabled) + if let Some(writer_lock) = &self.rrd_writer { + let mut writer = writer_lock.write().await; + writer.update(&key, &data).await?; + tracing::trace!("Updated RRD file: {} -> {}", key, data); + } + + Ok(()) + } + + /// Remove old RRD entries (older than 5 minutes) + pub fn remove_old_rrd_data(&self) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + const EXPIRE_SECONDS: u64 = 60 * 5; // 5 minutes + + self.rrd_data + .write() + .retain(|_, entry| { + // Handle clock jumps backwards by checking both directions + now.saturating_sub(entry.timestamp) < EXPIRE_SECONDS + }); + } + + /// Get RRD data dump (text format matching C implementation) + pub fn get_rrd_dump(&self) -> String { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + // Check cache (valid for 2 seconds, matching C implementation) + const CACHE_SECONDS: u64 = 2; + { + let cache = self.rrd_dump_cache.read(); + if let Some((cache_time, ref cached_dump)) = *cache { + if now.saturating_sub(cache_time) < CACHE_SECONDS { + return cached_dump.clone(); + } + } + } + + // Remove old entries first + self.remove_old_rrd_data(); + + let rrd = self.rrd_data.read(); + let mut result = String::new(); + + for entry in rrd.values() { + result.push_str(&entry.key); + result.push(':'); + result.push_str(&entry.data); + result.push('\n'); + } + + // Append NUL terminator for Perl compatibility (matches C implementation) + result.push('\0'); + + drop(rrd); + + // Update cache + *self.rrd_dump_cache.write() = Some((now, result.clone())); + + result + } + + /// Collect disk I/O statistics (bytes read, bytes written) + /// + /// Note: This is for future VM RRD implementation. Per C implementation: + /// - Node RRD (rrd_def_node) has 12 fields and does NOT include diskread/diskwrite + /// - VM RRD (rrd_def_vm) has 10 fields and DOES include diskread/diskwrite at indices 8-9 + /// + /// This method will be used when implementing VM RRD collection. + /// + /// # Sector Size + /// The Linux kernel reports disk statistics in /proc/diskstats using 512-byte sectors + /// as the standard unit, regardless of the device's actual physical sector size. + /// This is a kernel reporting convention (see Documentation/admin-guide/iostats.rst). + #[allow(dead_code)] + fn collect_disk_io() -> Result<(u64, u64)> { + // /proc/diskstats always uses 512-byte sectors (kernel convention) + const DISKSTATS_SECTOR_SIZE: u64 = 512; + + let diskstats = procfs::diskstats()?; + + let mut total_read = 0u64; + let mut total_write = 0u64; + + for stat in diskstats { + // Skip partitions (only look at whole disks: sda, vda, etc.) + if stat + .name + .chars() + .last() + .map(|c| c.is_numeric()) + .unwrap_or(false) + { + continue; + } + + // Convert sectors to bytes using kernel's reporting unit + total_read += stat.sectors_read * DISKSTATS_SECTOR_SIZE; + total_write += stat.sectors_written * DISKSTATS_SECTOR_SIZE; + } + + Ok((total_read, total_write)) + } + + /// Register a VM/CT + pub fn register_vm(&self, vmid: u32, vmtype: VmType, node: String) { + tracing::debug!(vmid, vmtype = ?vmtype, node = %node, "Registered VM"); + + // Use global version counter (matches C's vminfo_version_counter) + let version = (self.vminfo_version_counter.fetch_add(1, Ordering::SeqCst) + 1) as u32; + + let entry = VmEntry { + vmid, + vmtype, + node, + version, + }; + self.vmlist.write().insert(vmid, entry); + + // Increment vmlist version counter + self.increment_vmlist_version(); + } + + /// Delete a VM/CT + pub fn delete_vm(&self, vmid: u32) { + self.vmlist.write().remove(&vmid); + tracing::debug!(vmid, "Deleted VM"); + + // Always increment vmlist version counter (matches C behavior) + self.increment_vmlist_version(); + } + + /// Check if VM/CT exists + pub fn vm_exists(&self, vmid: u32) -> bool { + self.vmlist.read().contains_key(&vmid) + } + + /// Check if a different VM/CT exists (different node or type) + pub fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool { + if let Some(entry) = self.vmlist.read().get(&vmid) { + entry.vmtype != vmtype || entry.node != node + } else { + false + } + } + + /// Get VM list + pub fn get_vmlist(&self) -> HashMap { + self.vmlist.read().clone() + } + + /// Scan directories for VMs/CTs and update vmlist + /// + /// Uses memdb's `recreate_vmlist()` to properly scan nodes/*/qemu-server/ + /// and nodes/*/lxc/ directories to track which node each VM belongs to. + pub fn scan_vmlist(&self, memdb: &pmxcfs_memdb::MemDb) { + // Use the proper recreate_vmlist from memdb which scans nodes/*/qemu-server/ and nodes/*/lxc/ + match pmxcfs_memdb::recreate_vmlist(memdb) { + Ok(new_vmlist) => { + let vmlist_len = new_vmlist.len(); + let mut vmlist = self.vmlist.write(); + + // Preserve version counters for existing VMs, assign new versions to new VMs + for (vmid, new_entry) in &new_vmlist { + if let Some(existing) = vmlist.get(vmid) { + // VM already exists - check if it changed + if existing.vmtype != new_entry.vmtype || existing.node != new_entry.node { + // VM changed - increment global counter and update + let version = (self.vminfo_version_counter.fetch_add(1, Ordering::SeqCst) + 1) as u32; + vmlist.insert(*vmid, VmEntry { + vmid: *vmid, + vmtype: new_entry.vmtype, + node: new_entry.node.clone(), + version, + }); + } + // else: VM unchanged, keep existing entry with its version + } else { + // New VM - assign new version + let version = (self.vminfo_version_counter.fetch_add(1, Ordering::SeqCst) + 1) as u32; + vmlist.insert(*vmid, VmEntry { + vmid: *vmid, + vmtype: new_entry.vmtype, + node: new_entry.node.clone(), + version, + }); + } + } + + // Remove VMs that no longer exist + vmlist.retain(|vmid, _| new_vmlist.contains_key(vmid)); + + drop(vmlist); + + tracing::info!(vms = vmlist_len, "VM list scan complete"); + + // Increment vmlist version counter + self.increment_vmlist_version(); + } + Err(err) => { + tracing::error!(error = %err, "Failed to recreate vmlist"); + } + } + } + + /// Initialize cluster information with cluster name + pub fn init_cluster(&self, cluster_name: String) { + let info = ClusterInfo::new(cluster_name, 0); + *self.cluster_info.write() = Some(info); + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + + /// Register a node in the cluster (name, ID, IP) + pub fn register_node(&self, node_id: u32, name: String, ip: String) { + tracing::debug!(node_id, node = %name, ip = %ip, "Registering cluster node"); + + let mut cluster_info = self.cluster_info.write(); + if let Some(ref mut info) = *cluster_info { + let node = ClusterNode { + name, + node_id, + ip, + online: false, // Will be updated by cluster module + }; + info.add_node(node); + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + } + + /// Get cluster information (for .members plugin) + pub fn get_cluster_info(&self) -> Option { + self.cluster_info.read().clone() + } + + /// Get cluster version + pub fn get_cluster_version(&self) -> u64 { + self.cluster_version.load(Ordering::SeqCst) + } + + /// Increment cluster version (called when membership changes) + pub fn increment_cluster_version(&self) { + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + + /// Update cluster info from CMAP (called by ClusterConfigService) + pub fn update_cluster_info( + &self, + cluster_name: String, + config_version: u64, + nodes: Vec<(u32, String, String)>, + ) -> Result<()> { + let mut cluster_info = self.cluster_info.write(); + + // Create or update cluster info + let mut info = cluster_info + .take() + .unwrap_or_else(|| ClusterInfo::new(cluster_name.clone(), config_version)); + + // Update cluster name if changed + if info.cluster_name != cluster_name { + info.cluster_name = cluster_name; + } + + // Update config version + info.config_version = config_version; + + // Preserve online status from old nodes (matches C's cfs_status_set_clinfo) + let old_nodes = info.nodes_by_id.clone(); + + // Clear existing nodes + info.nodes_by_id.clear(); + info.nodes_by_name.clear(); + + // Add updated nodes, preserving online status + for (nodeid, name, ip) in nodes { + let online = old_nodes + .get(&nodeid) + .map(|old_node| old_node.online) + .unwrap_or(false); + + let node = ClusterNode { + name: name.clone(), + node_id: nodeid, + ip, + online, + }; + info.add_node(node); + } + + // Clean up kvstore entries for removed nodes + let mut kvstore = self.kvstore.write(); + kvstore.retain(|nodeid, _| info.nodes_by_id.contains_key(nodeid)); + drop(kvstore); + + *cluster_info = Some(info); + + // Increment cluster_version (separate from config_version) + self.cluster_version.fetch_add(1, Ordering::SeqCst); + + tracing::info!(version = config_version, "Updated cluster configuration"); + Ok(()) + } + + /// Update node online status (called by cluster module) + pub fn set_node_online(&self, node_id: u32, online: bool) { + let mut cluster_info = self.cluster_info.write(); + if let Some(ref mut info) = *cluster_info + && let Some(node) = info.nodes_by_id.get_mut(&node_id) + && node.online != online + { + node.online = online; + self.cluster_version.fetch_add(1, Ordering::SeqCst); + tracing::debug!( + node = %node.name, + node_id, + online = if online { "true" } else { "false" }, + "Node online status changed" + ); + } + } + + /// Check if cluster is quorate (matches C's cfs_is_quorate) + pub fn is_quorate(&self) -> bool { + *self.quorate.read() + } + + /// Set quorum status (matches C's cfs_set_quorate) + pub fn set_quorate(&self, quorate: bool) { + let mut quorate_guard = self.quorate.write(); + let old_quorate = *quorate_guard; + *quorate_guard = quorate; + drop(quorate_guard); + + if old_quorate != quorate { + if quorate { + tracing::info!("Node has quorum"); + } else { + tracing::info!("Node lost quorum"); + } + } + } + + /// Get current cluster members (CPG membership) + pub fn get_members(&self) -> Vec { + self.members.read().clone() + } + + /// Update cluster members and sync online status (matches C's dfsm_confchg callback) + /// + /// This updates the CPG member list and synchronizes the online status + /// in cluster_info to match current membership. + /// + /// IMPORTANT: Both members and cluster_info are updated atomically under locks + /// to prevent TOCTOU where readers could see inconsistent state. + pub fn update_members(&self, members: Vec) { + // Acquire both locks before any updates to ensure atomicity + // (matches C's single mutex protection in status.c) + let mut members_guard = self.members.write(); + let mut cluster_info = self.cluster_info.write(); + + // Update members first + *members_guard = members.clone(); + + // Update online status in cluster_info based on members + // (matches C implementation's dfsm_confchg in status.c:1989-2025) + if let Some(ref mut info) = *cluster_info { + // First mark all nodes as offline + for node in info.nodes_by_id.values_mut() { + node.online = false; + } + + // Then mark active members as online + for member in &members { + if let Some(node) = info.nodes_by_id.get_mut(&member.node_id) { + node.online = true; + } + } + + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + + // Both locks released together at end of scope + } + + /// Get daemon start timestamp (for .version plugin) + pub fn get_start_time(&self) -> u64 { + self.start_time + } + + /// Increment VM list version (matches C's cfs_status.vmlist_version++) + pub fn increment_vmlist_version(&self) { + self.vmlist_version.fetch_add(1, Ordering::SeqCst); + } + + /// Get VM list version + pub fn get_vmlist_version(&self) -> u64 { + self.vmlist_version.load(Ordering::SeqCst) + } + + /// Increment version for a specific memdb path (matches C's record_memdb_change) + pub fn increment_path_version(&self, path: &str) { + let versions = self.memdb_path_versions.read(); + if let Some(counter) = versions.get(path) { + counter.fetch_add(1, Ordering::SeqCst); + } + } + + /// Get version for a specific memdb path + pub fn get_path_version(&self, path: &str) -> u64 { + let versions = self.memdb_path_versions.read(); + versions + .get(path) + .map(|counter| counter.load(Ordering::SeqCst)) + .unwrap_or(0) + } + + /// Get all memdb path versions (for .version plugin) + pub fn get_all_path_versions(&self) -> HashMap { + let versions = self.memdb_path_versions.read(); + versions + .iter() + .map(|(path, counter)| (path.clone(), counter.load(Ordering::SeqCst))) + .collect() + } + + /// Increment ALL configuration file versions (matches C's record_memdb_reload) + /// + /// Called when the entire database is reloaded from cluster peers. + /// This ensures clients know that all configuration files should be re-read. + pub fn increment_all_path_versions(&self) { + let versions = self.memdb_path_versions.read(); + for (_, counter) in versions.iter() { + counter.fetch_add(1, Ordering::SeqCst); + } + } + + /// Set key-value data from a node (kvstore DFSM) + /// + /// Matches C implementation's cfs_kvstore_node_set in status.c. + /// Stores ephemeral status data like RRD metrics, IP addresses, etc. + pub fn set_node_kv(&self, nodeid: u32, key: String, value: Vec) { + // Validate that the node exists in cluster info + let cluster_info = self.cluster_info.read(); + match &*cluster_info { + Some(info) if info.nodes_by_id.contains_key(&nodeid) => {}, + _ => { + tracing::warn!(nodeid, key = %key, "Ignoring KV update for unknown node"); + return; + } + } + drop(cluster_info); + + // Handle special keys (matches C's cfs_kvstore_node_set) + if let Some(rrd_key) = key.strip_prefix("rrd/") { + // RRD data - convert to string and store + if let Ok(mut data_str) = String::from_utf8(value) { + // Strip NUL termination + if data_str.ends_with('\0') { + data_str.pop(); + } + // Store RRD data (async operation, but we can't await here) + // In production, this would be handled by spawning a task + tracing::trace!(nodeid, key = %rrd_key, "Received RRD data from node"); + } + } else if key == "nodeip" { + // Node IP address + if let Ok(mut ip_str) = String::from_utf8(value.clone()) { + // Strip NUL termination + if ip_str.ends_with('\0') { + ip_str.pop(); + } + // Get node name from cluster info + let cluster_info = self.cluster_info.read(); + if let Some(info) = &*cluster_info { + if let Some(node) = info.nodes_by_id.get(&nodeid) { + let nodename = node.name.clone(); + drop(cluster_info); + + let mut node_ips = self.node_ips.write(); + let old_ip = node_ips.get(&nodename); + + if old_ip.map(|s| s.as_str()) != Some(ip_str.as_str()) { + node_ips.insert(nodename, ip_str); + drop(node_ips); + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + } + } + } + } else { + // Regular KV data with version tracking (matches C's kventry_hash_set) + let mut kvstore = self.kvstore.write(); + let node_kv = kvstore.entry(nodeid).or_default(); + + // Remove entry if value is empty (matches C behavior) + if value.is_empty() { + node_kv.remove(&key); + } else { + // Increment version for this key + let new_version = node_kv + .get(&key) + .map(|(_, version)| version + 1) + .unwrap_or(1); + node_kv.insert(key, (value, new_version)); + } + } + } + + /// Get key-value data from a node + pub fn get_node_kv(&self, nodeid: u32, key: &str) -> Option> { + let kvstore = self.kvstore.read(); + kvstore.get(&nodeid)?.get(key).map(|(value, _)| value.clone()) + } + + /// Add cluster log entry (called by kvstore DFSM) + /// + /// This is the wrapper for kvstore LOG messages. + /// Matches C implementation's clusterlog_insert call. + pub fn add_cluster_log( + &self, + timestamp: u32, + priority: u8, + tag: String, + node: String, + message: String, + ) { + let entry = ClusterLogEntry { + uid: 0, + timestamp: timestamp as u64, + priority, + tag, + pid: 0, + node, + ident: String::new(), + message, + }; + self.add_log_entry(entry); + } + + /// Update node online status based on CPG membership (kvstore DFSM confchg callback) + /// + /// This is called when kvstore CPG membership changes. + /// Matches C implementation's dfsm_confchg in status.c. + pub fn update_member_status(&self, member_list: &[u32]) { + let mut cluster_info = self.cluster_info.write(); + if let Some(ref mut info) = *cluster_info { + // Mark all nodes as offline + for node in info.nodes_by_id.values_mut() { + node.online = false; + } + + // Mark nodes in member_list as online + for &nodeid in member_list { + if let Some(node) = info.nodes_by_id.get_mut(&nodeid) { + node.online = true; + } + } + + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + } + + /// Get cluster log state (for DFSM synchronization) + /// + /// Returns the cluster log in C-compatible binary format (clog_base_t). + /// Matches C implementation's clusterlog_get_state() in logger.c:553-571. + pub fn get_cluster_log_state(&self) -> Result> { + self.cluster_log.get_state() + } + + /// Merge cluster log states from remote nodes + /// + /// Deserializes binary states from remote nodes and merges them with the local log. + /// Matches C implementation's dfsm_process_state_update() in status.c:2049-2074. + pub fn merge_cluster_log_states( + &self, + states: &[pmxcfs_api_types::NodeSyncInfo], + ) -> Result<()> { + use pmxcfs_logger::ClusterLog; + + let mut remote_logs = Vec::new(); + + for state_info in states { + // Check if this node has state data + let state_data = match &state_info.state { + Some(data) if !data.is_empty() => data, + _ => continue, + }; + + match ClusterLog::deserialize_state(state_data) { + Ok(ring_buffer) => { + tracing::debug!( + "Deserialized cluster log from node {}: {} entries", + state_info.node_id, + ring_buffer.len() + ); + remote_logs.push(ring_buffer); + } + Err(e) => { + tracing::warn!( + nodeid = state_info.node_id, + error = %e, + "Failed to deserialize cluster log from node" + ); + } + } + } + + if !remote_logs.is_empty() { + // Merge remote logs with local log (include_local = true) + // The merge() method atomically updates both buffer and dedup state + match self.cluster_log.merge(remote_logs, true) { + Ok(()) => { + tracing::debug!("Successfully merged cluster logs"); + } + Err(e) => { + tracing::error!(error = %e, "Failed to merge cluster logs"); + } + } + } + + Ok(()) + } + + /// Add cluster log entry from remote node (kvstore LOG message) + /// + /// Matches C implementation's clusterlog_insert() via kvstore message handling. + pub fn add_remote_cluster_log( + &self, + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + ) -> Result<()> { + self.cluster_log + .add(&node, &ident, &tag, 0, priority, time, &message)?; + Ok(()) + } +} + +// Implement StatusOps trait for Status +impl crate::traits::StatusOps for Status { + fn get_node_status(&self, name: &str) -> Option { + self.get_node_status(name) + } + + fn set_node_status<'a>( + &'a self, + name: String, + data: Vec, + ) -> crate::traits::BoxFuture<'a, Result<()>> { + Box::pin(self.set_node_status(name, data)) + } + + fn add_log_entry(&self, entry: ClusterLogEntry) { + self.add_log_entry(entry) + } + + fn get_log_entries(&self, max: usize) -> Vec { + self.get_log_entries(max) + } + + fn clear_cluster_log(&self) { + self.clear_cluster_log() + } + + fn add_cluster_log( + &self, + timestamp: u32, + priority: u8, + tag: String, + node: String, + msg: String, + ) { + self.add_cluster_log(timestamp, priority, tag, node, msg) + } + + fn get_cluster_log_state(&self) -> Result> { + self.get_cluster_log_state() + } + + fn merge_cluster_log_states(&self, states: &[pmxcfs_api_types::NodeSyncInfo]) -> Result<()> { + self.merge_cluster_log_states(states) + } + + fn add_remote_cluster_log( + &self, + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + ) -> Result<()> { + self.add_remote_cluster_log(time, priority, node, ident, tag, message) + } + + fn set_rrd_data<'a>( + &'a self, + key: String, + data: String, + ) -> crate::traits::BoxFuture<'a, Result<()>> { + Box::pin(self.set_rrd_data(key, data)) + } + + fn remove_old_rrd_data(&self) { + self.remove_old_rrd_data() + } + + fn get_rrd_dump(&self) -> String { + self.get_rrd_dump() + } + + fn register_vm(&self, vmid: u32, vmtype: VmType, node: String) { + self.register_vm(vmid, vmtype, node) + } + + fn delete_vm(&self, vmid: u32) { + self.delete_vm(vmid) + } + + fn vm_exists(&self, vmid: u32) -> bool { + self.vm_exists(vmid) + } + + fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool { + self.different_vm_exists(vmid, vmtype, node) + } + + fn get_vmlist(&self) -> HashMap { + self.get_vmlist() + } + + fn scan_vmlist(&self, memdb: &pmxcfs_memdb::MemDb) { + self.scan_vmlist(memdb) + } + + fn init_cluster(&self, cluster_name: String) { + self.init_cluster(cluster_name) + } + + fn register_node(&self, node_id: u32, name: String, ip: String) { + self.register_node(node_id, name, ip) + } + + fn get_cluster_info(&self) -> Option { + self.get_cluster_info() + } + + fn get_cluster_version(&self) -> u64 { + self.get_cluster_version() + } + + fn increment_cluster_version(&self) { + self.increment_cluster_version() + } + + fn update_cluster_info( + &self, + cluster_name: String, + config_version: u64, + nodes: Vec<(u32, String, String)>, + ) -> Result<()> { + self.update_cluster_info(cluster_name, config_version, nodes) + } + + fn set_node_online(&self, node_id: u32, online: bool) { + self.set_node_online(node_id, online) + } + + fn is_quorate(&self) -> bool { + self.is_quorate() + } + + fn set_quorate(&self, quorate: bool) { + self.set_quorate(quorate) + } + + fn get_members(&self) -> Vec { + self.get_members() + } + + fn update_members(&self, members: Vec) { + self.update_members(members) + } + + fn update_member_status(&self, member_list: &[u32]) { + self.update_member_status(member_list) + } + + fn get_start_time(&self) -> u64 { + self.get_start_time() + } + + fn increment_vmlist_version(&self) { + self.increment_vmlist_version() + } + + fn get_vmlist_version(&self) -> u64 { + self.get_vmlist_version() + } + + fn increment_path_version(&self, path: &str) { + self.increment_path_version(path) + } + + fn get_path_version(&self, path: &str) -> u64 { + self.get_path_version(path) + } + + fn get_all_path_versions(&self) -> HashMap { + self.get_all_path_versions() + } + + fn increment_all_path_versions(&self) { + self.increment_all_path_versions() + } + + fn set_node_kv(&self, nodeid: u32, key: String, value: Vec) { + self.set_node_kv(nodeid, key, value) + } + + fn get_node_kv(&self, nodeid: u32, key: &str) -> Option> { + self.get_node_kv(nodeid, key) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::ClusterLogEntry; + use pmxcfs_api_types::VmType; + + /// Test helper: Create Status without rrdcached daemon (for unit tests) + fn init_test_status() -> Arc { + // Use pmxcfs-test-utils helper to create test config (matches C semantics) + let config = pmxcfs_test_utils::create_test_config(false); + Arc::new(Status::new(config, None)) + } + + #[tokio::test] + async fn test_rrd_data_storage_and_retrieval() { + let status = init_test_status(); + + status.rrd_data.write().clear(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Test node RRD data format + let node_data = + format!("{now}:0:1.5:4:45.5:2.1:8000000000:6000000000:0:0:0:0:1000000:500000"); + let _ = status + .set_rrd_data("pve2-node/testnode".to_string(), node_data.clone()) + .await; + + // Test VM RRD data format + let vm_data = format!("{now}:1:60:4:2048:2048:10000:5000:1000:500:100:50"); + let _ = status + .set_rrd_data("pve2.3-vm/100".to_string(), vm_data.clone()) + .await; + + // Get RRD dump + let dump = status.get_rrd_dump(); + + // Verify NUL terminator (C compatibility) + assert!(dump.ends_with('\0'), "Dump should end with NUL terminator"); + + // Strip NUL terminator for line-based checks + let dump_str = dump.trim_end_matches('\0'); + + // Verify both entries are present + assert!( + dump_str.contains("pve2-node/testnode"), + "Should contain node entry" + ); + assert!(dump_str.contains("pve2.3-vm/100"), "Should contain VM entry"); + + // Verify format: each line should be "key:data" + for line in dump_str.lines() { + assert!( + line.contains(':'), + "Each line should contain colon separator" + ); + let parts: Vec<&str> = line.split(':').collect(); + assert!(parts.len() > 1, "Each line should have key:data format"); + } + + assert_eq!(dump_str.lines().count(), 2, "Should have exactly 2 entries"); + } + + #[tokio::test] + async fn test_rrd_data_aging() { + let status = init_test_status(); + + status.rrd_data.write().clear(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let recent_data = + format!("{now}:0:1.5:4:45.5:2.1:8000000000:6000000000:0:0:0:0:1000000:500000"); + let _ = status + .set_rrd_data("pve2-node/recent".to_string(), recent_data) + .await; + + // Manually add an old entry (simulate time passing) + let old_timestamp = now - 400; // 400 seconds ago (> 5 minutes) + let old_data = format!( + "{old_timestamp}:0:1.5:4:45.5:2.1:8000000000:6000000000:0:0:0:0:1000000:500000" + ); + let entry = RrdEntry { + key: "pve2-node/old".to_string(), + data: old_data, + timestamp: old_timestamp, + }; + status + .rrd_data + .write() + .insert("pve2-node/old".to_string(), entry); + + // Get dump - should trigger aging and remove old entry + let dump = status.get_rrd_dump(); + + assert!( + dump.contains("pve2-node/recent"), + "Recent entry should be present" + ); + assert!( + !dump.contains("pve2-node/old"), + "Old entry should be aged out" + ); + } + + #[tokio::test] + async fn test_rrd_set_via_node_status() { + let status = init_test_status(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Simulate receiving RRD data via IPC (like pvestatd sends) + // Format matches C implementation: "timestamp:uptime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout" + let node_data = format!("{now}:12345:1.5:8:0.5:0.1:16000:8000:4000:0:100:50:1000:2000"); + + // Test the set_node_status method with "rrd/" prefix (matches C's cfs_status_set behavior) + let result = status + .set_node_status( + "rrd/pve2-node/testnode".to_string(), + node_data.as_bytes().to_vec(), + ) + .await; + assert!( + result.is_ok(), + "Should successfully set RRD data via node_status" + ); + + // Get the dump and verify + let dump = status.get_rrd_dump(); + assert!( + dump.contains("pve2-node/testnode"), + "Should contain node metrics" + ); + + // Verify the data has the expected number of fields + for line in dump.lines() { + if line.starts_with("pve2-node/") { + let parts: Vec<&str> = line.split(':').collect(); + // Format: key:timestamp:uptime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout + // That's 1 (key) + 14 fields = 15 parts minimum + assert!( + parts.len() >= 15, + "Node data should have at least 15 colon-separated fields, got {}", + parts.len() + ); + } + } + } + + #[tokio::test] + async fn test_rrd_multiple_updates() { + let status = init_test_status(); + + status.rrd_data.write().clear(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Add multiple entries + for i in 0..5 { + let data = format!( + "{}:{}:1.5:4:45.5:2.1:8000000000:6000000000:0:0:0:0:1000000:500000", + now + i, + i + ); + let _ = status + .set_rrd_data(format!("pve2-node/node{i}"), data) + .await; + } + + let dump = status.get_rrd_dump(); + + // Strip NUL terminator for line counting + let dump_str = dump.trim_end_matches('\0'); + let count = dump_str.lines().count(); + assert_eq!(count, 5, "Should have 5 entries"); + + // Verify each entry is present + for i in 0..5 { + assert!( + dump.contains(&format!("pve2-node/node{i}")), + "Should contain node{i}" + ); + } + } + + // ========== VM/CT Registry Tests ========== + + #[test] + fn test_vm_registration() { + let status = init_test_status(); + + // Register a QEMU VM + status.register_vm(100, VmType::Qemu, "node1".to_string()); + + // Verify it exists + assert!(status.vm_exists(100), "VM 100 should exist"); + + // Verify version incremented (starts at 0, increments to 1) + let vmlist_version = status.get_vmlist_version(); + assert!(vmlist_version > 0, "VM list version should increment"); + + // Get VM list and verify entry + let vmlist = status.get_vmlist(); + assert_eq!(vmlist.len(), 1, "Should have 1 VM"); + + let vm = vmlist.get(&100).expect("VM 100 should be in list"); + assert_eq!(vm.vmid, 100); + assert_eq!(vm.vmtype, VmType::Qemu); + assert_eq!(vm.node, "node1"); + assert_eq!(vm.version, 1, "First registration should have version 1"); + } + + #[test] + fn test_vm_deletion() { + let status = init_test_status(); + + // Register and then delete + status.register_vm(100, VmType::Qemu, "node1".to_string()); + assert!(status.vm_exists(100), "VM should exist after registration"); + + let version_before = status.get_vmlist_version(); + status.delete_vm(100); + + assert!(!status.vm_exists(100), "VM should not exist after deletion"); + + let version_after = status.get_vmlist_version(); + assert!( + version_after > version_before, + "Version should increment on deletion" + ); + + let vmlist = status.get_vmlist(); + assert_eq!(vmlist.len(), 0, "VM list should be empty"); + } + + #[test] + fn test_vm_multiple_registrations() { + let status = init_test_status(); + + // Register multiple VMs + status.register_vm(100, VmType::Qemu, "node1".to_string()); + status.register_vm(101, VmType::Qemu, "node2".to_string()); + status.register_vm(200, VmType::Lxc, "node1".to_string()); + status.register_vm(201, VmType::Lxc, "node3".to_string()); + + let vmlist = status.get_vmlist(); + assert_eq!(vmlist.len(), 4, "Should have 4 VMs"); + + // Verify each VM + assert_eq!(vmlist.get(&100).unwrap().vmtype, VmType::Qemu); + assert_eq!(vmlist.get(&101).unwrap().node, "node2"); + assert_eq!(vmlist.get(&200).unwrap().vmtype, VmType::Lxc); + assert_eq!(vmlist.get(&201).unwrap().node, "node3"); + } + + #[test] + fn test_vm_re_registration_increments_version() { + let status = init_test_status(); + + // Register VM + status.register_vm(100, VmType::Qemu, "node1".to_string()); + let vmlist = status.get_vmlist(); + let version1 = vmlist.get(&100).unwrap().version; + assert_eq!(version1, 1, "First registration should have version 1"); + + // Re-register same VM + status.register_vm(100, VmType::Qemu, "node2".to_string()); + let vmlist = status.get_vmlist(); + let version2 = vmlist.get(&100).unwrap().version; + assert_eq!(version2, 2, "Second registration should increment version"); + assert_eq!( + vmlist.get(&100).unwrap().node, + "node2", + "Node should be updated" + ); + } + + #[test] + fn test_different_vm_exists() { + let status = init_test_status(); + + // Register VM 100 as QEMU on node1 + status.register_vm(100, VmType::Qemu, "node1".to_string()); + + // Check if different VM exists - same type, different node + assert!( + status.different_vm_exists(100, VmType::Qemu, "node2"), + "Should detect different node" + ); + + // Check if different VM exists - different type, same node + assert!( + status.different_vm_exists(100, VmType::Lxc, "node1"), + "Should detect different type" + ); + + // Check if different VM exists - same type and node (should be false) + assert!( + !status.different_vm_exists(100, VmType::Qemu, "node1"), + "Should not detect difference for identical VM" + ); + + // Check non-existent VM + assert!( + !status.different_vm_exists(999, VmType::Qemu, "node1"), + "Non-existent VM should return false" + ); + } + + // ========== Cluster Membership Tests ========== + + #[test] + fn test_cluster_initialization() { + let status = init_test_status(); + + // Initially no cluster info + assert!( + status.get_cluster_info().is_none(), + "Should have no cluster info initially" + ); + + // Initialize cluster + status.init_cluster("test-cluster".to_string()); + + let cluster_info = status.get_cluster_info(); + assert!( + cluster_info.is_some(), + "Cluster info should exist after init" + ); + assert_eq!(cluster_info.unwrap().cluster_name, "test-cluster"); + + let version = status.get_cluster_version(); + assert!(version > 0, "Cluster version should increment"); + } + + #[test] + fn test_node_registration() { + let status = init_test_status(); + + status.init_cluster("test-cluster".to_string()); + + // Register nodes + status.register_node(1, "node1".to_string(), "192.168.1.10".to_string()); + status.register_node(2, "node2".to_string(), "192.168.1.11".to_string()); + + let cluster_info = status + .get_cluster_info() + .expect("Cluster info should exist"); + assert_eq!(cluster_info.nodes_by_id.len(), 2, "Should have 2 nodes"); + assert_eq!( + cluster_info.nodes_by_name.len(), + 2, + "Should have 2 nodes by name" + ); + + let node1 = cluster_info + .nodes_by_id + .get(&1) + .expect("Node 1 should exist"); + assert_eq!(node1.name, "node1"); + assert_eq!(node1.ip, "192.168.1.10"); + assert!(!node1.online, "Node should be offline initially"); + } + + #[test] + fn test_node_online_status() { + let status = init_test_status(); + + status.init_cluster("test-cluster".to_string()); + status.register_node(1, "node1".to_string(), "192.168.1.10".to_string()); + + // Set online + status.set_node_online(1, true); + let cluster_info = status.get_cluster_info().unwrap(); + assert!( + cluster_info.nodes_by_id.get(&1).unwrap().online, + "Node should be online" + ); + assert!( + cluster_info.get_node_by_name("node1").unwrap().online, + "Node should be online in nodes_by_name too" + ); + + // Set offline + status.set_node_online(1, false); + let cluster_info = status.get_cluster_info().unwrap(); + assert!( + !cluster_info.nodes_by_id.get(&1).unwrap().online, + "Node should be offline" + ); + } + + #[test] + fn test_update_members() { + let status = init_test_status(); + + status.init_cluster("test-cluster".to_string()); + status.register_node(1, "node1".to_string(), "192.168.1.10".to_string()); + status.register_node(2, "node2".to_string(), "192.168.1.11".to_string()); + status.register_node(3, "node3".to_string(), "192.168.1.12".to_string()); + + // Simulate CPG membership: nodes 1 and 3 are online + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let members = vec![ + pmxcfs_api_types::MemberInfo { + node_id: 1, + pid: 1000, + joined_at: now, + }, + pmxcfs_api_types::MemberInfo { + node_id: 3, + pid: 1002, + joined_at: now, + }, + ]; + status.update_members(members); + + let cluster_info = status.get_cluster_info().unwrap(); + assert!( + cluster_info.nodes_by_id.get(&1).unwrap().online, + "Node 1 should be online" + ); + assert!( + !cluster_info.nodes_by_id.get(&2).unwrap().online, + "Node 2 should be offline" + ); + assert!( + cluster_info.nodes_by_id.get(&3).unwrap().online, + "Node 3 should be online" + ); + } + + #[test] + fn test_quorum_state() { + let status = init_test_status(); + + // Initially not quorate + assert!(!status.is_quorate(), "Should not be quorate initially"); + + // Set quorate + status.set_quorate(true); + assert!(status.is_quorate(), "Should be quorate"); + + // Unset quorate + status.set_quorate(false); + assert!(!status.is_quorate(), "Should not be quorate"); + } + + #[test] + fn test_path_version_tracking() { + let status = init_test_status(); + + // Initial version should be 0 + assert_eq!(status.get_path_version("corosync.conf"), 0); + + // Increment version + status.increment_path_version("corosync.conf"); + assert_eq!(status.get_path_version("corosync.conf"), 1); + + // Increment again + status.increment_path_version("corosync.conf"); + assert_eq!(status.get_path_version("corosync.conf"), 2); + + // Non-tracked path should return 0 + assert_eq!(status.get_path_version("nonexistent.cfg"), 0); + } + + #[test] + fn test_all_path_versions() { + let status = init_test_status(); + + // Increment a few paths + status.increment_path_version("corosync.conf"); + status.increment_path_version("corosync.conf"); + status.increment_path_version("storage.cfg"); + + let all_versions = status.get_all_path_versions(); + + // Should contain all tracked paths + assert!(all_versions.contains_key("corosync.conf")); + assert!(all_versions.contains_key("storage.cfg")); + assert!(all_versions.contains_key("user.cfg")); + + // Verify specific versions + assert_eq!(all_versions.get("corosync.conf"), Some(&2)); + assert_eq!(all_versions.get("storage.cfg"), Some(&1)); + assert_eq!(all_versions.get("user.cfg"), Some(&0)); + } + + #[test] + fn test_vmlist_version_tracking() { + let status = init_test_status(); + + let initial_version = status.get_vmlist_version(); + + status.increment_vmlist_version(); + assert_eq!(status.get_vmlist_version(), initial_version + 1); + + status.increment_vmlist_version(); + assert_eq!(status.get_vmlist_version(), initial_version + 2); + } + + #[test] + fn test_cluster_log_add_entry() { + let status = init_test_status(); + + let entry = ClusterLogEntry { + uid: 0, + timestamp: 1234567890, + node: "node1".to_string(), + priority: 6, + pid: 0, + ident: "pmxcfs".to_string(), + tag: "startup".to_string(), + message: "Test message".to_string(), + }; + + status.add_log_entry(entry); + + let entries = status.get_log_entries(10); + assert_eq!(entries.len(), 1, "Should have 1 log entry"); + assert_eq!(entries[0].node, "node1"); + assert_eq!(entries[0].message, "Test message"); + } + + #[test] + fn test_cluster_log_multiple_entries() { + let status = init_test_status(); + + // Add multiple entries + for i in 0..5 { + let entry = ClusterLogEntry { + uid: 0, + timestamp: 1234567890 + i, + node: format!("node{i}"), + priority: 6, + pid: 0, + ident: "test".to_string(), + tag: "test".to_string(), + message: format!("Message {i}"), + }; + status.add_log_entry(entry); + } + + let entries = status.get_log_entries(10); + assert_eq!(entries.len(), 5, "Should have 5 log entries"); + } + + #[test] + fn test_cluster_log_clear() { + let status = init_test_status(); + + // Add entries + for i in 0..3 { + let entry = ClusterLogEntry { + uid: 0, + timestamp: 1234567890 + i, + node: "node1".to_string(), + priority: 6, + pid: 0, + ident: "test".to_string(), + tag: "test".to_string(), + message: format!("Message {i}"), + }; + status.add_log_entry(entry); + } + + assert_eq!(status.get_log_entries(10).len(), 3, "Should have 3 entries"); + + // Clear + status.clear_cluster_log(); + + assert_eq!( + status.get_log_entries(10).len(), + 0, + "Should have 0 entries after clear" + ); + } + + #[test] + fn test_kvstore_operations() { + let status = init_test_status(); + + // Initialize cluster and register nodes + status.init_cluster("test-cluster".to_string()); + status.register_node(1, "node1".to_string(), "192.168.1.10".to_string()); + status.register_node(2, "node2".to_string(), "192.168.1.11".to_string()); + + // Set some KV data + status.set_node_kv(1, "ip".to_string(), b"192.168.1.10".to_vec()); + status.set_node_kv(1, "status".to_string(), b"online".to_vec()); + status.set_node_kv(2, "ip".to_string(), b"192.168.1.11".to_vec()); + + // Get KV data + let ip1 = status.get_node_kv(1, "ip"); + assert_eq!(ip1, Some(b"192.168.1.10".to_vec())); + + let status1 = status.get_node_kv(1, "status"); + assert_eq!(status1, Some(b"online".to_vec())); + + let ip2 = status.get_node_kv(2, "ip"); + assert_eq!(ip2, Some(b"192.168.1.11".to_vec())); + + // Test empty value removal (matches C behavior) + status.set_node_kv(1, "ip".to_string(), vec![]); + let ip1_after_remove = status.get_node_kv(1, "ip"); + assert_eq!(ip1_after_remove, None, "Empty value should remove the key"); + + // Non-existent key + let nonexistent = status.get_node_kv(1, "nonexistent"); + assert_eq!(nonexistent, None); + + // Non-existent node + let nonexistent_node = status.get_node_kv(999, "ip"); + assert_eq!(nonexistent_node, None); + + // Test unknown node rejection + status.set_node_kv(999, "unknown-key".to_string(), b"test".to_vec()); + let retrieved = status.get_node_kv(999, "unknown-key"); + assert_eq!(retrieved, None, "Unknown node should be rejected"); + } + + #[test] + fn test_start_time() { + let status = init_test_status(); + + let start_time = status.get_start_time(); + assert!(start_time > 0, "Start time should be set"); + + // Verify it's a recent timestamp (within last hour) + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + assert!(now - start_time < 3600, "Start time should be recent"); + } + + #[test] + fn test_get_local_nodename() { + // Config is always required (matches C semantics where cfs is always present) + let status = init_test_status(); + + let nodename = status.get_local_nodename(); + assert_eq!(nodename, pmxcfs_test_utils::TEST_NODE_NAME, "Nodename should match test config"); + assert!(!nodename.is_empty(), "Nodename should not be empty"); + + // Test with custom config + let config = pmxcfs_config::Config::shared( + "testnode".to_string(), + "192.168.1.10".parse().unwrap(), + 33, + false, + false, + "test-cluster".to_string(), + ); + let status_custom = Arc::new(Status::new(config, None)); + + let nodename = status_custom.get_local_nodename(); + assert_eq!(nodename, "testnode", "Nodename should match custom config"); + + tracing::info!(nodename = %nodename, "Local nodename from config"); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-status/src/traits.rs b/src/pmxcfs-rs/pmxcfs-status/src/traits.rs new file mode 100644 index 000000000..a7796fc45 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/src/traits.rs @@ -0,0 +1,492 @@ +use crate::types::{ClusterInfo, ClusterLogEntry, NodeStatus}; +use anyhow::Result; +use parking_lot::RwLock; +use pmxcfs_api_types::{VmEntry, VmType}; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +/// Traits for Status operations to enable mocking and testing +/// +/// Boxed future type for async trait methods +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + +/// Trait for Status operations +/// +/// This trait abstracts all Status operations to enable: +/// - Dependency injection in production code +/// - Easy mocking in unit tests +/// - Test isolation without global singleton +/// +/// The real `Status` struct implements this trait for production use. +/// `MockStatus` implements this trait for testing. +pub trait StatusOps: Send + Sync { + // Node status operations + fn get_node_status(&self, name: &str) -> Option; + fn set_node_status<'a>(&'a self, name: String, data: Vec) -> BoxFuture<'a, Result<()>>; + + // Cluster log operations + fn add_log_entry(&self, entry: ClusterLogEntry); + fn get_log_entries(&self, max: usize) -> Vec; + fn clear_cluster_log(&self); + fn add_cluster_log(&self, timestamp: u32, priority: u8, tag: String, node: String, msg: String); + fn get_cluster_log_state(&self) -> Result>; + fn merge_cluster_log_states(&self, states: &[pmxcfs_api_types::NodeSyncInfo]) -> Result<()>; + fn add_remote_cluster_log( + &self, + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + ) -> Result<()>; + + // RRD operations + fn set_rrd_data<'a>(&'a self, key: String, data: String) -> BoxFuture<'a, Result<()>>; + fn remove_old_rrd_data(&self); + fn get_rrd_dump(&self) -> String; + + // VM list operations + fn register_vm(&self, vmid: u32, vmtype: VmType, node: String); + fn delete_vm(&self, vmid: u32); + fn vm_exists(&self, vmid: u32) -> bool; + fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool; + fn get_vmlist(&self) -> HashMap; + fn scan_vmlist(&self, memdb: &pmxcfs_memdb::MemDb); + + // Cluster info operations + fn init_cluster(&self, cluster_name: String); + fn register_node(&self, node_id: u32, name: String, ip: String); + fn get_cluster_info(&self) -> Option; + fn get_cluster_version(&self) -> u64; + fn increment_cluster_version(&self); + fn update_cluster_info( + &self, + cluster_name: String, + config_version: u64, + nodes: Vec<(u32, String, String)>, + ) -> Result<()>; + fn set_node_online(&self, node_id: u32, online: bool); + + // Quorum operations + fn is_quorate(&self) -> bool; + fn set_quorate(&self, quorate: bool); + + // Members operations + fn get_members(&self) -> Vec; + fn update_members(&self, members: Vec); + fn update_member_status(&self, member_list: &[u32]); + + // Version/timestamp operations + fn get_start_time(&self) -> u64; + fn increment_vmlist_version(&self); + fn get_vmlist_version(&self) -> u64; + fn increment_path_version(&self, path: &str); + fn get_path_version(&self, path: &str) -> u64; + fn get_all_path_versions(&self) -> HashMap; + fn increment_all_path_versions(&self); + + // KV store operations + fn set_node_kv(&self, nodeid: u32, key: String, value: Vec); + fn get_node_kv(&self, nodeid: u32, key: &str) -> Option>; +} + +/// Mock implementation of StatusOps for testing +/// +/// This provides a lightweight, isolated Status implementation for unit tests. +/// Unlike the real Status, MockStatus: +/// - Can be created independently without global singleton +/// - Has no RRD writer or async dependencies +/// - Is completely isolated between test instances +/// - Can be easily reset or configured for specific test scenarios +/// +/// # Example +/// ``` +/// use pmxcfs_status::{MockStatus, StatusOps}; +/// use std::sync::Arc; +/// +/// # fn test_example() { +/// let status: Arc = Arc::new(MockStatus::new()); +/// status.set_quorate(true); +/// assert!(status.is_quorate()); +/// # } +/// ``` +pub struct MockStatus { + vmlist: RwLock>, + quorate: RwLock, + cluster_info: RwLock>, + members: RwLock>, + cluster_version: Arc, + vmlist_version: Arc, + path_versions: RwLock>, + kvstore: RwLock>>>, + cluster_log: RwLock>, + rrd_data: RwLock>, + node_status: RwLock>, + start_time: u64, +} + +impl MockStatus { + /// Create a new MockStatus instance for testing + pub fn new() -> Self { + Self { + vmlist: RwLock::new(HashMap::new()), + quorate: RwLock::new(false), + cluster_info: RwLock::new(None), + members: RwLock::new(Vec::new()), + cluster_version: Arc::new(std::sync::atomic::AtomicU64::new(0)), + vmlist_version: Arc::new(std::sync::atomic::AtomicU64::new(0)), + path_versions: RwLock::new(HashMap::new()), + kvstore: RwLock::new(HashMap::new()), + cluster_log: RwLock::new(Vec::new()), + rrd_data: RwLock::new(HashMap::new()), + node_status: RwLock::new(HashMap::new()), + start_time: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + } + } + + /// Reset all mock state (useful for test cleanup) + pub fn reset(&self) { + self.vmlist.write().clear(); + *self.quorate.write() = false; + *self.cluster_info.write() = None; + self.members.write().clear(); + self.cluster_version + .store(0, std::sync::atomic::Ordering::SeqCst); + self.vmlist_version + .store(0, std::sync::atomic::Ordering::SeqCst); + self.path_versions.write().clear(); + self.kvstore.write().clear(); + self.cluster_log.write().clear(); + self.rrd_data.write().clear(); + self.node_status.write().clear(); + } +} + +impl Default for MockStatus { + fn default() -> Self { + Self::new() + } +} + +impl StatusOps for MockStatus { + fn get_node_status(&self, name: &str) -> Option { + self.node_status.read().get(name).cloned() + } + + fn set_node_status<'a>(&'a self, name: String, data: Vec) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + // Simplified mock - just store the data + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + self.node_status.write().insert( + name.clone(), + NodeStatus { + name, + data, + timestamp: now, + }, + ); + Ok(()) + }) + } + + fn add_log_entry(&self, entry: ClusterLogEntry) { + self.cluster_log.write().push(entry); + } + + fn get_log_entries(&self, max: usize) -> Vec { + let log = self.cluster_log.read(); + log.iter().take(max).cloned().collect() + } + + fn clear_cluster_log(&self) { + self.cluster_log.write().clear(); + } + + fn add_cluster_log( + &self, + timestamp: u32, + priority: u8, + tag: String, + node: String, + msg: String, + ) { + let entry = ClusterLogEntry { + uid: 0, + timestamp: timestamp as u64, + priority, + tag, + pid: 0, + node, + ident: "mock".to_string(), + message: msg, + }; + self.add_log_entry(entry); + } + + fn get_cluster_log_state(&self) -> Result> { + // Simplified mock + Ok(Vec::new()) + } + + fn merge_cluster_log_states(&self, _states: &[pmxcfs_api_types::NodeSyncInfo]) -> Result<()> { + // Simplified mock + Ok(()) + } + + fn add_remote_cluster_log( + &self, + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + ) -> Result<()> { + let entry = ClusterLogEntry { + uid: 0, + timestamp: time as u64, + priority, + tag, + pid: 0, + node, + ident, + message, + }; + self.add_log_entry(entry); + Ok(()) + } + + fn set_rrd_data<'a>(&'a self, key: String, data: String) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + self.rrd_data.write().insert(key, data); + Ok(()) + }) + } + + fn remove_old_rrd_data(&self) { + // Mock does nothing + } + + fn get_rrd_dump(&self) -> String { + let data = self.rrd_data.read(); + data.iter().map(|(k, v)| format!("{k}: {v}\n")).collect() + } + + fn register_vm(&self, vmid: u32, vmtype: VmType, node: String) { + // Get existing version or start at 1 + let version = self + .vmlist + .read() + .get(&vmid) + .map(|vm| vm.version + 1) + .unwrap_or(1); + + self.vmlist.write().insert( + vmid, + VmEntry { + vmtype, + node, + vmid, + version, + }, + ); + self.increment_vmlist_version(); + } + + fn delete_vm(&self, vmid: u32) { + self.vmlist.write().remove(&vmid); + self.increment_vmlist_version(); + } + + fn vm_exists(&self, vmid: u32) -> bool { + self.vmlist.read().contains_key(&vmid) + } + + fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool { + if let Some(entry) = self.vmlist.read().get(&vmid) { + entry.vmtype != vmtype || entry.node != node + } else { + false + } + } + + fn get_vmlist(&self) -> HashMap { + self.vmlist.read().clone() + } + + fn scan_vmlist(&self, _memdb: &pmxcfs_memdb::MemDb) { + // Mock does nothing - real implementation scans /qemu-server and /lxc + } + + fn init_cluster(&self, cluster_name: String) { + *self.cluster_info.write() = Some(ClusterInfo { + cluster_name, + config_version: 0, + nodes_by_id: HashMap::new(), + nodes_by_name: HashMap::new(), + }); + self.increment_cluster_version(); + } + + fn register_node(&self, node_id: u32, name: String, ip: String) { + let mut info = self.cluster_info.write(); + if let Some(cluster) = info.as_mut() { + let node = crate::types::ClusterNode { + name: name.clone(), + node_id, + ip, + online: false, // Match real Status behavior - updated by cluster module + }; + cluster.add_node(node); + } + self.increment_cluster_version(); + } + + fn get_cluster_info(&self) -> Option { + self.cluster_info.read().clone() + } + + fn get_cluster_version(&self) -> u64 { + self.cluster_version + .load(std::sync::atomic::Ordering::SeqCst) + } + + fn increment_cluster_version(&self) { + self.cluster_version + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + fn update_cluster_info( + &self, + cluster_name: String, + config_version: u64, + nodes: Vec<(u32, String, String)>, + ) -> Result<()> { + let mut cluster_info = self.cluster_info.write(); + + // Create or update cluster info + let mut info = cluster_info.take().unwrap_or_else(|| ClusterInfo { + cluster_name: cluster_name.clone(), + config_version, + nodes_by_id: HashMap::new(), + nodes_by_name: HashMap::new(), + }); + + // Update cluster name if changed + if info.cluster_name != cluster_name { + info.cluster_name = cluster_name; + } + + // Clear existing nodes + info.nodes_by_id.clear(); + info.nodes_by_name.clear(); + + // Add updated nodes + for (nodeid, name, ip) in nodes { + let node = crate::types::ClusterNode { + name, + node_id: nodeid, + ip, + online: false, + }; + info.add_node(node); + } + + *cluster_info = Some(info); + + // Update version to reflect configuration change + self.cluster_version + .store(config_version, std::sync::atomic::Ordering::SeqCst); + + Ok(()) + } + + fn set_node_online(&self, node_id: u32, online: bool) { + let mut info = self.cluster_info.write(); + if let Some(cluster) = info.as_mut() + && let Some(node) = cluster.nodes_by_id.get_mut(&node_id) + { + node.online = online; + } + } + + fn is_quorate(&self) -> bool { + *self.quorate.read() + } + + fn set_quorate(&self, quorate: bool) { + *self.quorate.write() = quorate; + } + + fn get_members(&self) -> Vec { + self.members.read().clone() + } + + fn update_members(&self, members: Vec) { + *self.members.write() = members; + } + + fn update_member_status(&self, _member_list: &[u32]) { + // Mock does nothing - real implementation updates online status + } + + fn get_start_time(&self) -> u64 { + self.start_time + } + + fn increment_vmlist_version(&self) { + self.vmlist_version + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + fn get_vmlist_version(&self) -> u64 { + self.vmlist_version + .load(std::sync::atomic::Ordering::SeqCst) + } + + fn increment_path_version(&self, path: &str) { + let mut versions = self.path_versions.write(); + let version = versions.entry(path.to_string()).or_insert(0); + *version += 1; + } + + fn get_path_version(&self, path: &str) -> u64 { + *self.path_versions.read().get(path).unwrap_or(&0) + } + + fn get_all_path_versions(&self) -> HashMap { + self.path_versions.read().clone() + } + + fn increment_all_path_versions(&self) { + let mut versions = self.path_versions.write(); + for version in versions.values_mut() { + *version += 1; + } + } + + fn set_node_kv(&self, nodeid: u32, key: String, value: Vec) { + let mut kvstore = self.kvstore.write(); + let node_kv = kvstore.entry(nodeid).or_default(); + + // Remove entry if value is empty (matches real Status behavior) + if value.is_empty() { + node_kv.remove(&key); + } else { + node_kv.insert(key, value); + } + } + + fn get_node_kv(&self, nodeid: u32, key: &str) -> Option> { + self.kvstore.read().get(&nodeid)?.get(key).cloned() + } +} diff --git a/src/pmxcfs-rs/pmxcfs-status/src/types.rs b/src/pmxcfs-rs/pmxcfs-status/src/types.rs new file mode 100644 index 000000000..7b8ef2037 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/src/types.rs @@ -0,0 +1,77 @@ +/// Data types for the status module +use std::collections::HashMap; + +/// Cluster node information (matches C implementation's cfs_clnode_t) +#[derive(Debug, Clone)] +pub struct ClusterNode { + pub name: String, + pub node_id: u32, + pub ip: String, + pub online: bool, +} + +/// Cluster information (matches C implementation's cfs_clinfo_t) +#[derive(Debug, Clone)] +pub struct ClusterInfo { + pub cluster_name: String, + /// Configuration version from corosync (matches C's cman_version) + pub config_version: u64, + pub nodes_by_id: HashMap, + /// Index mapping node name to node_id (safer than duplicating ClusterNode) + pub nodes_by_name: HashMap, +} + +impl ClusterInfo { + pub(crate) fn new(cluster_name: String, config_version: u64) -> Self { + Self { + cluster_name, + config_version, + nodes_by_id: HashMap::new(), + nodes_by_name: HashMap::new(), + } + } + + /// Add or update a node in the cluster + pub(crate) fn add_node(&mut self, node: ClusterNode) { + let node_id = node.node_id; + let name = node.name.clone(); + self.nodes_by_id.insert(node_id, node); + self.nodes_by_name.insert(name, node_id); + } + + /// Get node by name + pub fn get_node_by_name(&self, name: &str) -> Option<&ClusterNode> { + let node_id = self.nodes_by_name.get(name)?; + self.nodes_by_id.get(node_id) + } +} + +/// Node status data +#[derive(Clone, Debug)] +pub struct NodeStatus { + pub name: String, + pub data: Vec, + pub timestamp: u64, +} + +/// Cluster log entry +/// Field order matches C output: uid, time, pri, tag, pid, node, user, msg +#[derive(Clone, Debug)] +pub struct ClusterLogEntry { + pub uid: u32, + pub timestamp: u64, + pub priority: u8, + pub tag: String, + pub pid: u32, + pub node: String, + pub ident: String, + pub message: String, +} + +/// RRD (Round Robin Database) entry +#[derive(Clone, Debug)] +pub(crate) struct RrdEntry { + pub key: String, + pub data: String, + pub timestamp: u64, +} diff --git a/src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml b/src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml new file mode 100644 index 000000000..41cdce64b --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "pmxcfs-test-utils" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[lib] +name = "pmxcfs_test_utils" +path = "src/lib.rs" + +[dependencies] +# Internal workspace dependencies +pmxcfs-api-types.workspace = true +pmxcfs-config.workspace = true +pmxcfs-memdb.workspace = true +pmxcfs-status.workspace = true + +# Error handling +anyhow.workspace = true + +# Concurrency +parking_lot.workspace = true + +# System integration +libc.workspace = true + +# Development utilities +tempfile.workspace = true + +# Async runtime +tokio.workspace = true diff --git a/src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs b/src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs new file mode 100644 index 000000000..b37cdcc39 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs @@ -0,0 +1,570 @@ +//! Test utilities for pmxcfs integration and unit tests +//! +//! This crate provides: +//! - Common test setup and helper functions +//! - TestEnv builder for standard test configurations +//! - Mock implementations (MockStatus, MockMemDb for isolated testing) +//! - Test constants and utilities + +use anyhow::Result; +use pmxcfs_config::Config; +use pmxcfs_memdb::MemDb; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tempfile::TempDir; + +// Re-export MockStatus for easy test access +pub use pmxcfs_status::{MockStatus, StatusOps}; + +// Mock implementations +mod mock_memdb; +pub use mock_memdb::MockMemDb; + +// Re-export MemDbOps for convenience in tests +pub use pmxcfs_memdb::MemDbOps; + +// Test constants +pub const TEST_MTIME: u32 = 1234567890; +pub const TEST_NODE_NAME: &str = "testnode"; +pub const TEST_CLUSTER_NAME: &str = "test-cluster"; +pub const TEST_WWW_DATA_GID: u32 = 33; + +/// Test environment builder for standard test setups +/// +/// This builder provides a fluent interface for creating test environments +/// with optional components (database, status, config). +/// +/// # Example +/// ``` +/// use pmxcfs_test_utils::TestEnv; +/// +/// # fn example() -> anyhow::Result<()> { +/// let env = TestEnv::new() +/// .with_database()? +/// .with_mock_status() +/// .build(); +/// +/// // Use env.db, env.status, etc. +/// # Ok(()) +/// # } +/// ``` +pub struct TestEnv { + pub config: Arc, + pub db: Option, + pub status: Option>, + pub temp_dir: Option, +} + +impl TestEnv { + /// Create a new test environment builder with default config + pub fn new() -> Self { + Self::new_with_config(false) + } + + /// Create a new test environment builder with local mode config + pub fn new_local() -> Self { + Self::new_with_config(true) + } + + /// Create a new test environment builder with custom local_mode setting + pub fn new_with_config(local_mode: bool) -> Self { + let config = create_test_config(local_mode); + Self { + config, + db: None, + status: None, + temp_dir: None, + } + } + + /// Add a database with standard directory structure + pub fn with_database(mut self) -> Result { + let (temp_dir, db) = create_test_db()?; + self.temp_dir = Some(temp_dir); + self.db = Some(db); + Ok(self) + } + + /// Add a minimal database (no standard directories) + pub fn with_minimal_database(mut self) -> Result { + let (temp_dir, db) = create_minimal_test_db()?; + self.temp_dir = Some(temp_dir); + self.db = Some(db); + Ok(self) + } + + /// Add a MockStatus instance for isolated testing + pub fn with_mock_status(mut self) -> Self { + self.status = Some(Arc::new(MockStatus::new())); + self + } + + /// Add the real Status instance with test config + pub fn with_status(mut self) -> Self { + self.status = Some(pmxcfs_status::init_with_config(self.config.clone())); + self + } + + /// Build and return the test environment + pub fn build(self) -> Self { + self + } + + /// Get a reference to the database (panics if not configured) + pub fn db(&self) -> &MemDb { + self.db + .as_ref() + .expect("Database not configured. Call with_database() first") + } + + /// Get a reference to the status (panics if not configured) + pub fn status(&self) -> &Arc { + self.status + .as_ref() + .expect("Status not configured. Call with_status() or with_mock_status() first") + } +} + +impl Default for TestEnv { + fn default() -> Self { + Self::new() + } +} + +/// Creates a standard test configuration +/// +/// # Arguments +/// * `local_mode` - Whether to run in local mode (no cluster) +/// +/// # Returns +/// Arc-wrapped Config suitable for testing +pub fn create_test_config(local_mode: bool) -> Arc { + Config::shared( + TEST_NODE_NAME.to_string(), + "127.0.0.1".parse().unwrap(), + TEST_WWW_DATA_GID, + false, // debug mode + local_mode, + TEST_CLUSTER_NAME.to_string(), + ) +} + +/// Creates a test database with standard directory structure +/// +/// Creates the following directories: +/// - /nodes/{nodename}/qemu-server +/// - /nodes/{nodename}/lxc +/// - /nodes/{nodename}/priv +/// - /priv/lock/qemu-server +/// - /priv/lock/lxc +/// - /qemu-server +/// - /lxc +/// +/// # Returns +/// (TempDir, MemDb) - The temp directory must be kept alive for database to persist +pub fn create_test_db() -> Result<(TempDir, MemDb)> { + let temp_dir = TempDir::new()?; + let db_path = temp_dir.path().join("test.db"); + let db = MemDb::open(&db_path, true)?; + + // Create standard directory structure + let now = TEST_MTIME; + + // Node-specific directories + db.create("/nodes", libc::S_IFDIR, 0, now)?; + db.create(&format!("/nodes/{}", TEST_NODE_NAME), libc::S_IFDIR, 0, now)?; + db.create( + &format!("/nodes/{}/qemu-server", TEST_NODE_NAME), libc::S_IFDIR, 0, + now, + )?; + db.create( + &format!("/nodes/{}/lxc", TEST_NODE_NAME), libc::S_IFDIR, 0, + now, + )?; + db.create( + &format!("/nodes/{}/priv", TEST_NODE_NAME), libc::S_IFDIR, 0, + now, + )?; + + // Global directories + db.create("/priv", libc::S_IFDIR, 0, now)?; + db.create("/priv/lock", libc::S_IFDIR, 0, now)?; + db.create("/priv/lock/qemu-server", libc::S_IFDIR, 0, now)?; + db.create("/priv/lock/lxc", libc::S_IFDIR, 0, now)?; + db.create("/qemu-server", libc::S_IFDIR, 0, now)?; + db.create("/lxc", libc::S_IFDIR, 0, now)?; + + Ok((temp_dir, db)) +} + +/// Creates a minimal test database (no standard directories) +/// +/// Use this when you want full control over database structure +/// +/// # Returns +/// (TempDir, MemDb) - The temp directory must be kept alive for database to persist +pub fn create_minimal_test_db() -> Result<(TempDir, MemDb)> { + let temp_dir = TempDir::new()?; + let db_path = temp_dir.path().join("test.db"); + let db = MemDb::open(&db_path, true)?; + Ok((temp_dir, db)) +} + +/// Creates test VM configuration content +/// +/// # Arguments +/// * `vmid` - VM ID +/// * `cores` - Number of CPU cores +/// * `memory` - Memory in MB +/// +/// # Returns +/// Configuration file content as bytes +pub fn create_vm_config(vmid: u32, cores: u32, memory: u32) -> Vec { + format!( + "name: test-vm-{}\ncores: {}\nmemory: {}\nbootdisk: scsi0\n", + vmid, cores, memory + ) + .into_bytes() +} + +/// Creates test CT (container) configuration content +/// +/// # Arguments +/// * `vmid` - Container ID +/// * `cores` - Number of CPU cores +/// * `memory` - Memory in MB +/// +/// # Returns +/// Configuration file content as bytes +pub fn create_ct_config(vmid: u32, cores: u32, memory: u32) -> Vec { + format!( + "cores: {}\nmemory: {}\nrootfs: local:100/vm-{}-disk-0.raw\n", + cores, memory, vmid + ) + .into_bytes() +} + +/// Creates a test lock path for a VM config +/// +/// # Arguments +/// * `vmid` - VM ID +/// * `vm_type` - "qemu-server" or "lxc" +/// +/// # Returns +/// Lock path in format `/priv/lock/{vm_type}/{vmid}.conf` +pub fn create_lock_path(vmid: u32, vm_type: &str) -> String { + format!("/priv/lock/{}/{}.conf", vm_type, vmid) +} + +/// Creates a test config path for a VM +/// +/// # Arguments +/// * `vmid` - VM ID +/// * `vm_type` - "qemu-server" or "lxc" +/// +/// # Returns +/// Config path in format `/{vm_type}/{vmid}.conf` +pub fn create_config_path(vmid: u32, vm_type: &str) -> String { + format!("/{}/{}.conf", vm_type, vmid) +} + +/// Clears all VMs from a status instance +/// +/// Useful for ensuring clean state before tests that register VMs. +/// +/// # Arguments +/// * `status` - The status instance to clear +pub fn clear_test_vms(status: &dyn StatusOps) { + let existing_vms: Vec = status.get_vmlist().keys().copied().collect(); + for vmid in existing_vms { + status.delete_vm(vmid); + } +} + +/// Wait for a condition to become true, polling at regular intervals +/// +/// This is a replacement for sleep-based synchronization in integration tests. +/// Instead of sleeping for an arbitrary duration and hoping the condition is met, +/// this function polls the condition and returns as soon as it becomes true. +/// +/// # Arguments +/// * `predicate` - Function that returns true when the condition is met +/// * `timeout` - Maximum time to wait for the condition +/// * `check_interval` - How often to check the condition +/// +/// # Returns +/// * `true` if condition was met within timeout +/// * `false` if timeout was reached without condition being met +/// +/// # Example +/// ```no_run +/// use pmxcfs_test_utils::wait_for_condition; +/// use std::time::Duration; +/// use std::sync::atomic::{AtomicBool, Ordering}; +/// use std::sync::Arc; +/// +/// # async fn example() { +/// let ready = Arc::new(AtomicBool::new(false)); +/// +/// // Wait for service to be ready (with timeout) +/// let result = wait_for_condition( +/// || ready.load(Ordering::SeqCst), +/// Duration::from_secs(5), +/// Duration::from_millis(10), +/// ).await; +/// +/// assert!(result, "Service should be ready within 5 seconds"); +/// # } +/// ``` +pub async fn wait_for_condition( + predicate: F, + timeout: Duration, + check_interval: Duration, +) -> bool +where + F: Fn() -> bool, +{ + let start = Instant::now(); + loop { + if predicate() { + return true; + } + if start.elapsed() >= timeout { + return false; + } + tokio::time::sleep(check_interval).await; + } +} + +/// Wait for a condition with a custom error message +/// +/// Similar to `wait_for_condition`, but returns a Result with a custom error message +/// if the timeout is reached. +/// +/// # Arguments +/// * `predicate` - Function that returns true when the condition is met +/// * `timeout` - Maximum time to wait for the condition +/// * `check_interval` - How often to check the condition +/// * `error_msg` - Error message to return if timeout is reached +/// +/// # Returns +/// * `Ok(())` if condition was met within timeout +/// * `Err(anyhow::Error)` with custom message if timeout was reached +/// +/// # Example +/// ```no_run +/// use pmxcfs_test_utils::wait_for_condition_or_fail; +/// use std::time::Duration; +/// use std::sync::atomic::{AtomicU64, Ordering}; +/// use std::sync::Arc; +/// +/// # async fn example() -> anyhow::Result<()> { +/// let counter = Arc::new(AtomicU64::new(0)); +/// +/// wait_for_condition_or_fail( +/// || counter.load(Ordering::SeqCst) >= 1, +/// Duration::from_secs(5), +/// Duration::from_millis(10), +/// "Service should initialize within 5 seconds", +/// ).await?; +/// +/// # Ok(()) +/// # } +/// ``` +pub async fn wait_for_condition_or_fail( + predicate: F, + timeout: Duration, + check_interval: Duration, + error_msg: &str, +) -> Result<()> +where + F: Fn() -> bool, +{ + if wait_for_condition(predicate, timeout, check_interval).await { + Ok(()) + } else { + anyhow::bail!("{}", error_msg) + } +} + +/// Blocking version of wait_for_condition for synchronous tests +/// +/// Similar to `wait_for_condition`, but works in synchronous contexts. +/// Polls the condition and returns as soon as it becomes true or timeout is reached. +/// +/// # Arguments +/// * `predicate` - Function that returns true when the condition is met +/// * `timeout` - Maximum time to wait for the condition +/// * `check_interval` - How often to check the condition +/// +/// # Returns +/// * `true` if condition was met within timeout +/// * `false` if timeout was reached without condition being met +/// +/// # Example +/// ```no_run +/// use pmxcfs_test_utils::wait_for_condition_blocking; +/// use std::time::Duration; +/// use std::sync::atomic::{AtomicBool, Ordering}; +/// use std::sync::Arc; +/// +/// let ready = Arc::new(AtomicBool::new(false)); +/// +/// // Wait for service to be ready (with timeout) +/// let result = wait_for_condition_blocking( +/// || ready.load(Ordering::SeqCst), +/// Duration::from_secs(5), +/// Duration::from_millis(10), +/// ); +/// +/// assert!(result, "Service should be ready within 5 seconds"); +/// ``` +pub fn wait_for_condition_blocking( + predicate: F, + timeout: Duration, + check_interval: Duration, +) -> bool +where + F: Fn() -> bool, +{ + let start = Instant::now(); + loop { + if predicate() { + return true; + } + if start.elapsed() >= timeout { + return false; + } + std::thread::sleep(check_interval); + } +} + +/// Wait for a pmxcfs-ipc server to be ready by checking for the listening socket +/// +/// This function checks /proc/net/unix for the abstract Unix socket that indicates +/// the server has successfully started and is listening for connections. +/// +/// This works for all server configurations, including those that reject connections +/// (which don't create ring buffer files). +/// +/// # Arguments +/// * `service_name` - The name of the IPC service (e.g., "pve2") +/// +/// # Panics +/// Panics with assertion failure if server is not ready within 5 seconds +/// +/// # Example +/// ```no_run +/// use pmxcfs_test_utils::wait_for_server_ready; +/// +/// // Wait for the "pve2" service to be ready +/// wait_for_server_ready("pve2"); +/// ``` +pub fn wait_for_server_ready(service_name: &str) { + // Check if abstract Unix socket is listening + // Abstract sockets are listed in /proc/net/unix with @ prefix + assert!( + wait_for_condition_blocking( + || { + // Read /proc/net/unix and check for abstract socket + if let Ok(content) = std::fs::read_to_string("/proc/net/unix") { + // Abstract sockets are listed with @{name} format + let socket_name = format!("@{}", service_name); + for line in content.lines() { + if line.contains(&socket_name) && line.contains("LISTEN") { + return true; + } + } + } + false + }, + Duration::from_secs(5), + Duration::from_millis(10), + ), + "Server '{}' should be ready within 5 seconds", + service_name + ); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_create_test_config() { + let config = create_test_config(true); + assert_eq!(config.nodename(), TEST_NODE_NAME); + assert_eq!(config.cluster_name(), TEST_CLUSTER_NAME); + assert!(config.is_local_mode()); + } + + #[test] + fn test_create_test_db() -> Result<()> { + let (_temp_dir, db) = create_test_db()?; + + // Verify standard directories exist + assert!(db.exists("/nodes")?, "Should have /nodes"); + assert!(db.exists("/qemu-server")?, "Should have /qemu-server"); + assert!(db.exists("/priv/lock")?, "Should have /priv/lock"); + + Ok(()) + } + + #[test] + fn test_path_helpers() { + assert_eq!( + create_lock_path(100, "qemu-server"), + "/priv/lock/qemu-server/100.conf" + ); + assert_eq!( + create_config_path(100, "qemu-server"), + "/qemu-server/100.conf" + ); + } + + #[test] + fn test_env_builder_basic() { + let env = TestEnv::new().build(); + assert_eq!(env.config.nodename(), TEST_NODE_NAME); + assert!(env.db.is_none()); + assert!(env.status.is_none()); + } + + #[test] + fn test_env_builder_with_database() -> Result<()> { + let env = TestEnv::new().with_database()?.build(); + assert!(env.db.is_some()); + assert!(env.db().exists("/nodes")?); + Ok(()) + } + + #[test] + fn test_env_builder_with_mock_status() { + let env = TestEnv::new().with_mock_status().build(); + assert!(env.status.is_some()); + + // Test that MockStatus works + let status = env.status(); + status.set_quorate(true); + assert!(status.is_quorate()); + } + + #[test] + fn test_env_builder_full() -> Result<()> { + let env = TestEnv::new().with_database()?.with_mock_status().build(); + + assert!(env.db.is_some()); + assert!(env.status.is_some()); + assert!(env.config.nodename() == TEST_NODE_NAME); + + Ok(()) + } + + // NOTE: Tokio tests for wait_for_condition functions are REMOVED because they + // cause the test runner to hang when running `cargo test --lib --workspace`. + // Root cause: tokio multi-threaded runtime doesn't shut down properly when + // these async tests complete, blocking the entire test suite. + // + // These utility functions work correctly and are verified in integration tests + // that actually use them (e.g., integration-tests/). +} diff --git a/src/pmxcfs-rs/pmxcfs-test-utils/src/mock_memdb.rs b/src/pmxcfs-rs/pmxcfs-test-utils/src/mock_memdb.rs new file mode 100644 index 000000000..804b0a30d --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-test-utils/src/mock_memdb.rs @@ -0,0 +1,771 @@ +//! Mock in-memory database implementation for testing +//! +//! This module provides `MockMemDb`, a lightweight in-memory implementation +//! of the `MemDbOps` trait for use in unit tests. + +use anyhow::{Result, bail}; +use parking_lot::RwLock; +use pmxcfs_memdb::{MemDbOps, LOCK_DIR_PATH, ROOT_INODE, TreeEntry}; +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +// Directory and file type constants from dirent.h +const DT_DIR: u8 = 4; +const DT_REG: u8 = 8; + +// Lock timeout in seconds (matches C implementation) +const LOCK_TIMEOUT_SECS: u64 = 120; + +/// Normalize a lock identifier into the cache key used by the lock map. +/// +/// This mirrors the behavior in the production MemDb by ensuring the key is +/// a relative path starting with the `priv/lock` prefix. +fn lock_cache_key(path: &str) -> String { + let trimmed = path.trim_start_matches('/'); + if trimmed.starts_with(LOCK_DIR_PATH) { + trimmed.to_string() + } else { + format!("{}/{}", LOCK_DIR_PATH, trimmed) + } +} + +/// Mock in-memory database for testing +/// +/// Unlike the real `MemDb` which uses SQLite persistence, `MockMemDb` stores +/// everything in memory using HashMap. This makes it: +/// - Faster for unit tests (no disk I/O) +/// - Easier to inject failures for error testing +/// - Completely isolated (no shared state between tests) +/// +/// # Example +/// ``` +/// use pmxcfs_test_utils::MockMemDb; +/// use pmxcfs_memdb::MemDbOps; +/// use std::sync::Arc; +/// +/// let db: Arc = Arc::new(MockMemDb::new()); +/// db.create("/test.txt", 0, 0, 1234).unwrap(); +/// assert!(db.exists("/test.txt").unwrap()); +/// ``` +pub struct MockMemDb { + /// Files and directories stored as path -> data + files: RwLock>>, + /// Directory entries stored as path -> Vec + directories: RwLock>>, + /// Metadata stored as path -> TreeEntry + entries: RwLock>, + /// Lock state stored as path -> (timestamp, checksum) + locks: RwLock>, + /// Version counter + version: AtomicU64, + /// Inode counter + next_inode: AtomicU64, +} + +impl MockMemDb { + /// Create a new empty mock database + pub fn new() -> Self { + let mut directories = HashMap::new(); + directories.insert("/".to_string(), Vec::new()); + + let mut entries = HashMap::new(); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as u32; + + // Create root entry + entries.insert( + "/".to_string(), + TreeEntry { + inode: ROOT_INODE, + parent: 0, + version: 0, + writer: 1, + mtime: now, + size: 0, + entry_type: DT_DIR, + data: Vec::new(), + name: String::new(), + }, + ); + + Self { + files: RwLock::new(HashMap::new()), + directories: RwLock::new(directories), + entries: RwLock::new(entries), + locks: RwLock::new(HashMap::new()), + version: AtomicU64::new(1), + next_inode: AtomicU64::new(ROOT_INODE + 1), + } + } + + /// Helper to check if path is a directory + fn is_directory(&self, path: &str) -> bool { + self.directories.read().contains_key(path) + } + + /// Helper to get parent path + fn parent_path(path: &str) -> Option { + if path == "/" { + return None; + } + let parent = path.rsplit_once('/')?.0; + if parent.is_empty() { + Some("/".to_string()) + } else { + Some(parent.to_string()) + } + } + + /// Helper to get file name from path + fn file_name(path: &str) -> String { + if path == "/" { + return String::new(); + } + path.rsplit('/').next().unwrap_or("").to_string() + } +} + +impl Default for MockMemDb { + fn default() -> Self { + Self::new() + } +} + +impl MemDbOps for MockMemDb { + fn create(&self, path: &str, mode: u32, _writer: u32, mtime: u32) -> Result<()> { + if path.is_empty() { + bail!("Empty path"); + } + + if self.entries.read().contains_key(path) { + bail!("File exists: {}", path); + } + + let is_dir = (mode & libc::S_IFMT) == libc::S_IFDIR; + let entry_type = if is_dir { DT_DIR } else { DT_REG }; + let inode = self.next_inode.fetch_add(1, Ordering::SeqCst); + + // Add to parent directory + if let Some(parent) = Self::parent_path(path) { + if !self.is_directory(&parent) { + bail!("Parent is not a directory: {}", parent); + } + let mut dirs = self.directories.write(); + if let Some(children) = dirs.get_mut(&parent) { + children.push(Self::file_name(path)); + } + } + + // Create entry + let entry = TreeEntry { + inode, + parent: 0, // Simplified + version: self.version.load(Ordering::SeqCst), + writer: 1, + mtime, + size: 0, + entry_type, + data: Vec::new(), + name: Self::file_name(path), + }; + + self.entries.write().insert(path.to_string(), entry); + + if is_dir { + self.directories + .write() + .insert(path.to_string(), Vec::new()); + } else { + self.files.write().insert(path.to_string(), Vec::new()); + } + + self.version.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + fn read(&self, path: &str, offset: u64, size: usize) -> Result> { + let files = self.files.read(); + let data = files + .get(path) + .ok_or_else(|| anyhow::anyhow!("File not found: {}", path))?; + + let offset = offset as usize; + if offset >= data.len() { + return Ok(Vec::new()); + } + + let end = std::cmp::min(offset + size, data.len()); + Ok(data[offset..end].to_vec()) + } + + fn write( + &self, + path: &str, + offset: u64, + _writer: u32, + mtime: u32, + data: &[u8], + truncate: bool, + ) -> Result { + let mut files = self.files.write(); + let file_data = files + .get_mut(path) + .ok_or_else(|| anyhow::anyhow!("File not found: {}", path))?; + + let offset = offset as usize; + + if truncate { + file_data.clear(); + } + + // Expand if needed + if offset + data.len() > file_data.len() { + file_data.resize(offset + data.len(), 0); + } + + file_data[offset..offset + data.len()].copy_from_slice(data); + + // Update entry + if let Some(entry) = self.entries.write().get_mut(path) { + entry.mtime = mtime; + entry.size = file_data.len(); + } + + self.version.fetch_add(1, Ordering::SeqCst); + Ok(data.len()) + } + + fn delete(&self, path: &str, _writer: u32, _mtime: u32) -> Result<()> { + if !self.entries.read().contains_key(path) { + bail!("File not found: {}", path); + } + + // Check if directory is empty + if let Some(children) = self.directories.read().get(path) { + if !children.is_empty() { + bail!("Directory not empty: {}", path); + } + } + + self.entries.write().remove(path); + self.files.write().remove(path); + self.directories.write().remove(path); + + // Remove from parent + if let Some(parent) = Self::parent_path(path) { + if let Some(children) = self.directories.write().get_mut(&parent) { + children.retain(|name| name != &Self::file_name(path)); + } + } + + self.version.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + fn rename(&self, old_path: &str, new_path: &str, _writer: u32, _mtime: u32) -> Result<()> { + // Hold write locks for entire operation to avoid TOCTOU race condition + let mut entries = self.entries.write(); + let mut files = self.files.write(); + let mut directories = self.directories.write(); + + // Check existence + if !entries.contains_key(old_path) { + bail!("Source not found: {}", old_path); + } + if entries.contains_key(new_path) { + bail!("Destination already exists: {}", new_path); + } + + let is_dir = directories.contains_key(old_path); + + // Update parent directory children lists + if let Some(old_parent) = Self::parent_path(old_path) { + if let Some(children) = directories.get_mut(&old_parent) { + children.retain(|name| name != &Self::file_name(old_path)); + } + } + if let Some(new_parent) = Self::parent_path(new_path) { + if let Some(children) = directories.get_mut(&new_parent) { + children.push(Self::file_name(new_path)); + } + } + + // If renaming a directory, update all descendant paths + if is_dir { + let old_prefix = if old_path == "/" { + "/".to_string() + } else { + format!("{}/", old_path) + }; + let new_prefix = if new_path == "/" { + "/".to_string() + } else { + format!("{}/", new_path) + }; + + // Collect all paths that need to be updated + let paths_to_update: Vec = entries + .keys() + .filter(|p| p.starts_with(&old_prefix)) + .cloned() + .collect(); + + // Update each descendant path + for old_descendant in paths_to_update { + let new_descendant = old_descendant.replacen(&old_prefix, &new_prefix, 1); + + // Move entry + if let Some(mut entry) = entries.remove(&old_descendant) { + entry.name = Self::file_name(&new_descendant); + entries.insert(new_descendant.clone(), entry); + } + + // Move file data + if let Some(data) = files.remove(&old_descendant) { + files.insert(new_descendant.clone(), data); + } + + // Move directory + if let Some(children) = directories.remove(&old_descendant) { + directories.insert(new_descendant, children); + } + } + } + + // Move the entry itself + if let Some(mut entry) = entries.remove(old_path) { + entry.name = Self::file_name(new_path); + entries.insert(new_path.to_string(), entry); + } + + // Move file data + if let Some(data) = files.remove(old_path) { + files.insert(new_path.to_string(), data); + } + + // Move directory + if let Some(children) = directories.remove(old_path) { + directories.insert(new_path.to_string(), children); + } + + drop(entries); + drop(files); + drop(directories); + + self.version.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + fn exists(&self, path: &str) -> Result { + Ok(self.entries.read().contains_key(path)) + } + + fn readdir(&self, path: &str) -> Result> { + let directories = self.directories.read(); + let children = directories + .get(path) + .ok_or_else(|| anyhow::anyhow!("Not a directory: {}", path))?; + + let entries = self.entries.read(); + let mut result = Vec::new(); + + for child_name in children { + let child_path = if path == "/" { + format!("/{}", child_name) + } else { + format!("{}/{}", path, child_name) + }; + + if let Some(entry) = entries.get(&child_path) { + result.push(entry.clone()); + } + } + + Ok(result) + } + + fn set_mtime(&self, path: &str, _writer: u32, mtime: u32) -> Result<()> { + let mut entries = self.entries.write(); + let entry = entries + .get_mut(path) + .ok_or_else(|| anyhow::anyhow!("File not found: {}", path))?; + entry.mtime = mtime; + Ok(()) + } + + fn lookup_path(&self, path: &str) -> Option { + self.entries.read().get(path).cloned() + } + + fn get_entry_by_inode(&self, inode: u64) -> Option { + self.entries + .read() + .values() + .find(|e| e.inode == inode) + .cloned() + } + + fn acquire_lock(&self, path: &str, csum: &[u8; 32]) -> Result<()> { + let mut locks = self.locks.write(); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let key = lock_cache_key(path); + + if let Some((timestamp, existing_csum)) = locks.get(&key) { + // Check if expired + if now - timestamp > LOCK_TIMEOUT_SECS { + // Expired, can acquire + locks.insert(key, (now, *csum)); + return Ok(()); + } + + // Not expired, check if same checksum (refresh) + if existing_csum == csum { + locks.insert(key, (now, *csum)); + return Ok(()); + } + + bail!("Lock already held with different checksum"); + } + + locks.insert(key, (now, *csum)); + Ok(()) + } + + fn release_lock(&self, path: &str, csum: &[u8; 32]) -> Result<()> { + let mut locks = self.locks.write(); + let key = lock_cache_key(path); + if let Some((_, existing_csum)) = locks.get(&key) { + if existing_csum == csum { + locks.remove(&key); + return Ok(()); + } + bail!("Lock checksum mismatch"); + } + bail!("No lock found"); + } + + fn is_locked(&self, path: &str) -> bool { + let key = lock_cache_key(path); + if let Some((timestamp, _)) = self.locks.read().get(&key) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + now - timestamp <= LOCK_TIMEOUT_SECS + } else { + false + } + } + + fn lock_expired(&self, path: &str, csum: &[u8; 32]) -> bool { + let key = lock_cache_key(path); + if let Some((timestamp, existing_csum)) = self.locks.read().get(&key).cloned() { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // If checksum mismatches, this is a different lock holder attempting + // to check expiration. Reset the timeout to prevent premature expiration + // while the current holder still has the lock. This matches the C + // implementation's behavior where lock_expired() with wrong checksum + // extends the lock timeout. + if &existing_csum != csum { + self.locks.write().insert(key, (now, *csum)); + return false; + } + + // Check expiration + now - timestamp > LOCK_TIMEOUT_SECS + } else { + false + } + } + + fn get_version(&self) -> u64 { + self.version.load(Ordering::SeqCst) + } + + fn get_all_entries(&self) -> Result> { + Ok(self.entries.read().values().cloned().collect()) + } + + fn replace_all_entries(&self, entries: Vec) -> Result<()> { + // Preserve root entry before clearing + let root_entry = self.entries.read().get("/").cloned(); + + // Acquire all write locks once (in correct order to avoid deadlocks) + let mut entries_map = self.entries.write(); + let mut files_map = self.files.write(); + let mut dirs_map = self.directories.write(); + + // Clear all data + entries_map.clear(); + files_map.clear(); + dirs_map.clear(); + + // Restore root entry to preserve invariant + if let Some(root) = root_entry { + entries_map.insert("/".to_string(), root); + dirs_map.insert("/".to_string(), Vec::new()); + } + + // Insert all entries + for entry in entries { + let path = format!("/{}", entry.name); // Simplified + entries_map.insert(path.clone(), entry.clone()); + + // Use entry_type to distinguish files from directories + if entry.entry_type == DT_REG { + files_map.insert(path, entry.data.clone()); + } else if entry.entry_type == DT_DIR { + dirs_map.insert(path, Vec::new()); + } + } + + // Rebuild parent-child relationships + let paths: Vec = entries_map.keys().cloned().collect(); + for path in paths { + if let Some(entry) = entries_map.get(&path) { + if let Some(parent) = Self::parent_path(&path) { + if let Some(children) = dirs_map.get_mut(&parent) { + if !children.contains(&entry.name) { + children.push(entry.name.clone()); + } + } + } + } + } + + drop(entries_map); + drop(files_map); + drop(dirs_map); + + self.version.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + fn apply_tree_entry(&self, entry: TreeEntry) -> Result<()> { + let path = format!("/{}", entry.name); // Simplified + + // Acquire locks once + let mut entries_map = self.entries.write(); + let mut files_map = self.files.write(); + let mut dirs_map = self.directories.write(); + + entries_map.insert(path.clone(), entry.clone()); + + // Use entry_type to distinguish files from directories + if entry.entry_type == DT_REG { + files_map.insert(path.clone(), entry.data.clone()); + } else if entry.entry_type == DT_DIR { + dirs_map.insert(path.clone(), Vec::new()); + } + + // Update parent-child relationship + if let Some(parent) = Self::parent_path(&path) { + if let Some(children) = dirs_map.get_mut(&parent) { + if !children.contains(&entry.name) { + children.push(entry.name.clone()); + } + } + } + + drop(entries_map); + drop(files_map); + drop(dirs_map); + + self.version.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + fn encode_database(&self) -> Result> { + // Simplified - just return empty vec + Ok(Vec::new()) + } + + fn compute_database_checksum(&self) -> Result<[u8; 32]> { + // Simplified - return deterministic checksum based on version + let version = self.version.load(Ordering::SeqCst); + let mut checksum = [0u8; 32]; + checksum[0..8].copy_from_slice(&version.to_le_bytes()); + Ok(checksum) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + #[test] + fn test_mock_memdb_basic_operations() { + let db = MockMemDb::new(); + + // Create file + db.create("/test.txt", libc::S_IFREG, 0, 1234).unwrap(); + assert!(db.exists("/test.txt").unwrap()); + + // Write data + let data = b"Hello, MockMemDb!"; + db.write("/test.txt", 0, 0, 1235, data, false).unwrap(); + + // Read data + let read_data = db.read("/test.txt", 0, 100).unwrap(); + assert_eq!(&read_data[..], data); + + // Check entry + let entry = db.lookup_path("/test.txt").unwrap(); + assert_eq!(entry.size, data.len()); + assert_eq!(entry.mtime, 1235); + } + + #[test] + fn test_mock_memdb_directory_operations() { + let db = MockMemDb::new(); + + // Create directory + db.create("/mydir", libc::S_IFDIR, 0, 1000).unwrap(); + assert!(db.exists("/mydir").unwrap()); + + // Create file in directory + db.create("/mydir/file.txt", libc::S_IFREG, 0, 1001).unwrap(); + + // Read directory + let entries = db.readdir("/mydir").unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].name, "file.txt"); + } + + #[test] + fn test_mock_memdb_lock_operations() { + let db = MockMemDb::new(); + let csum1 = [1u8; 32]; + let csum2 = [2u8; 32]; + + // Acquire lock + db.acquire_lock("/priv/lock/resource", &csum1).unwrap(); + assert!(db.is_locked("/priv/lock/resource")); + + // Lock with same checksum should succeed (refresh) + assert!(db.acquire_lock("/priv/lock/resource", &csum1).is_ok()); + + // Lock with different checksum should fail + assert!(db.acquire_lock("/priv/lock/resource", &csum2).is_err()); + + // Release lock + db.release_lock("/priv/lock/resource", &csum1).unwrap(); + assert!(!db.is_locked("/priv/lock/resource")); + + // Can acquire with different checksum now + db.acquire_lock("/priv/lock/resource", &csum2).unwrap(); + assert!(db.is_locked("/priv/lock/resource")); + } + + #[test] + fn test_mock_memdb_rename() { + let db = MockMemDb::new(); + + // Create file + db.create("/old.txt", libc::S_IFREG, 0, 1000).unwrap(); + db.write("/old.txt", 0, 0, 1001, b"content", false).unwrap(); + + // Rename + db.rename("/old.txt", "/new.txt", 0, 1000).unwrap(); + + // Old path should not exist + assert!(!db.exists("/old.txt").unwrap()); + + // New path should exist with same content + assert!(db.exists("/new.txt").unwrap()); + let data = db.read("/new.txt", 0, 100).unwrap(); + assert_eq!(&data[..], b"content"); + } + + #[test] + fn test_mock_memdb_delete() { + let db = MockMemDb::new(); + + // Create and delete file + db.create("/delete-me.txt", libc::S_IFREG, 0, 1000).unwrap(); + assert!(db.exists("/delete-me.txt").unwrap()); + + db.delete("/delete-me.txt", 0, 1000).unwrap(); + assert!(!db.exists("/delete-me.txt").unwrap()); + + // Delete non-existent file should fail + assert!(db.delete("/nonexistent.txt", 0, 1000).is_err()); + } + + #[test] + fn test_mock_memdb_version_tracking() { + let db = MockMemDb::new(); + let initial_version = db.get_version(); + + // Version should increment on modifications + db.create("/file1.txt", libc::S_IFREG, 0, 1000).unwrap(); + assert!(db.get_version() > initial_version); + + let v1 = db.get_version(); + db.write("/file1.txt", 0, 0, 1001, b"data", false).unwrap(); + assert!(db.get_version() > v1); + + let v2 = db.get_version(); + db.delete("/file1.txt", 0, 1000).unwrap(); + assert!(db.get_version() > v2); + } + + #[test] + fn test_mock_memdb_isolation() { + // Each MockMemDb instance is completely isolated + let db1 = MockMemDb::new(); + let db2 = MockMemDb::new(); + + db1.create("/test.txt", libc::S_IFREG, 0, 1000).unwrap(); + + // db2 should not see db1's files + assert!(db1.exists("/test.txt").unwrap()); + assert!(!db2.exists("/test.txt").unwrap()); + } + + #[test] + fn test_mock_memdb_as_trait_object() { + // Demonstrate using MockMemDb through trait object + let db: Arc = Arc::new(MockMemDb::new()); + + db.create("/trait-test.txt", libc::S_IFREG, 0, 2000).unwrap(); + assert!(db.exists("/trait-test.txt").unwrap()); + + db.write("/trait-test.txt", 0, 0, 2001, b"via trait", false) + .unwrap(); + let data = db.read("/trait-test.txt", 0, 100).unwrap(); + assert_eq!(&data[..], b"via trait"); + } + + #[test] + fn test_mock_memdb_error_cases() { + let db = MockMemDb::new(); + + // Create duplicate should fail + db.create("/dup.txt", libc::S_IFREG, 0, 1000).unwrap(); + assert!(db.create("/dup.txt", libc::S_IFREG, 0, 1000).is_err()); + + // Read non-existent file should fail + assert!(db.read("/nonexistent.txt", 0, 100).is_err()); + + // Write to non-existent file should fail + assert!( + db.write("/nonexistent.txt", 0, 0, 1000, b"data", false) + .is_err() + ); + + // Empty path should fail + assert!(db.create("", libc::S_IFREG, 0, 1000).is_err()); + } +} -- 2.47.3