From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id C62171FF17A for ; Tue, 6 Jan 2026 15:24:20 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id E26931A237; Tue, 6 Jan 2026 15:25:28 +0100 (CET) From: Kefu Chai To: pve-devel@lists.proxmox.com Date: Tue, 6 Jan 2026 22:24:30 +0800 Message-ID: <20260106142440.2368585-7-k.chai@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260106142440.2368585-1-k.chai@proxmox.com> References: <20260106142440.2368585-1-k.chai@proxmox.com> MIME-Version: 1.0 X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1767709481419 X-SPAM-LEVEL: Spam detection results: 1 AWL -2.851 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SOMETLD_ARE_BAD_TLD 5 .bar, .beauty, .buzz, .cam, .casa, .cfd, .club, .date, .guru, .link, .live, .monster, .online, .press, .pw, .quest, .rest, .sbs, .shop, .stream, .top, .trade, .wiki, .work, .xyz TLD abuse PDS_OTHER_BAD_TLD 0.75 Untrustworthy TLDs SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_SBL_A 0.1 Contains URL's A record listed in the Spamhaus SBL blocklist [188.114.96.3, 188.114.97.3] Subject: [pve-devel] [PATCH pve-cluster 06/15] pmxcfs-rs: add pmxcfs-status crate X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox VE development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pve-devel-bounces@lists.proxmox.com Sender: "pve-devel" Add cluster status tracking and monitoring: - Status: Central status container (thread-safe) - Cluster membership tracking - VM/CT registry with version tracking - RRD data management - Cluster log integration - Quorum state tracking - Configuration file version tracking This integrates pmxcfs-memdb, pmxcfs-rrd, pmxcfs-logger, and pmxcfs-api-types to provide centralized cluster state management. It also uses procfs for system metrics collection. Includes comprehensive unit tests for: - VM registration and deletion - Cluster membership updates - Version tracking - Configuration file monitoring Signed-off-by: Kefu Chai --- src/pmxcfs-rs/Cargo.toml | 1 + src/pmxcfs-rs/pmxcfs-status/Cargo.toml | 40 + src/pmxcfs-rs/pmxcfs-status/README.md | 142 ++ src/pmxcfs-rs/pmxcfs-status/src/lib.rs | 54 + src/pmxcfs-rs/pmxcfs-status/src/status.rs | 1561 +++++++++++++++++++++ src/pmxcfs-rs/pmxcfs-status/src/traits.rs | 486 +++++++ src/pmxcfs-rs/pmxcfs-status/src/types.rs | 62 + 7 files changed, 2346 insertions(+) create mode 100644 src/pmxcfs-rs/pmxcfs-status/Cargo.toml create mode 100644 src/pmxcfs-rs/pmxcfs-status/README.md create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/lib.rs create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/status.rs create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/traits.rs create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/types.rs diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml index 2e41ac93..b5191c31 100644 --- a/src/pmxcfs-rs/Cargo.toml +++ b/src/pmxcfs-rs/Cargo.toml @@ -6,6 +6,7 @@ members = [ "pmxcfs-logger", # Cluster log with ring buffer and deduplication "pmxcfs-rrd", # RRD (Round-Robin Database) persistence "pmxcfs-memdb", # In-memory database with SQLite persistence + "pmxcfs-status", # Status monitoring and RRD data management ] resolver = "2" diff --git a/src/pmxcfs-rs/pmxcfs-status/Cargo.toml b/src/pmxcfs-rs/pmxcfs-status/Cargo.toml new file mode 100644 index 00000000..e4a817d7 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "pmxcfs-status" +description = "Status monitoring and RRD data management for pmxcfs" + +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +# Workspace dependencies +pmxcfs-api-types.workspace = true +pmxcfs-rrd.workspace = true +pmxcfs-memdb.workspace = true +pmxcfs-logger.workspace = true + +# Error handling +anyhow.workspace = true + +# Async runtime +tokio.workspace = true + +# Concurrency primitives +parking_lot.workspace = true + +# Logging +tracing.workspace = true + +# Utilities +chrono.workspace = true + +# System information (Linux /proc filesystem) +procfs = "0.17" + +[dev-dependencies] +tempfile.workspace = true diff --git a/src/pmxcfs-rs/pmxcfs-status/README.md b/src/pmxcfs-rs/pmxcfs-status/README.md new file mode 100644 index 00000000..b6958af3 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/README.md @@ -0,0 +1,142 @@ +# pmxcfs-status + +**Cluster Status** tracking and monitoring for pmxcfs. + +This crate manages all runtime cluster state information including membership, VM lists, node status, RRD metrics, and cluster logs. It serves as the central repository for dynamic cluster information that changes during runtime. + +## Overview + +The Status subsystem tracks: +- **Cluster membership**: Which nodes are in the cluster and their states +- **VM/CT tracking**: Registry of all virtual machines and containers +- **Node status**: Per-node health and resource information +- **RRD data**: Performance metrics (CPU, memory, disk, network) +- **Cluster log**: Centralized log aggregation +- **Quorum state**: Whether cluster has quorum +- **Version tracking**: Monitors configuration file changes + +## Usage + +### Initialization + +```rust +use pmxcfs_status; + +// For tests or when RRD persistence is not needed +let status = pmxcfs_status::init(); + +// For production with RRD file persistence +let status = pmxcfs_status::init_with_rrd("/var/lib/rrdcached/db").await; +``` + +The default `init()` is synchronous and doesn't require a directory parameter, making tests simpler. Use `init_with_rrd()` for production deployments that need RRD persistence. + +### Integration with Other Components + +**FUSE Plugins**: +- `.version` plugin reads from Status +- `.vmlist` plugin generates VM list from Status +- `.members` plugin generates member list from Status +- `.rrd` plugin accesses RRD data from Status +- `.clusterlog` plugin reads cluster log from Status + +**DFSM Status Sync**: +- `StatusSyncService` (pmxcfs-dfsm) broadcasts status updates +- Uses `pve_kvstore_v1` CPG group +- KV store data synchronized across nodes + +**IPC Server**: +- `set_status` IPC call updates Status +- Used by `pvecm`/`pvenode` tools +- RRD data received via IPC + +**MemDb Integration**: +- Scans VM configs to populate vmlist +- Tracks version changes on file modifications +- Used for `.version` plugin timestamps + +## Architecture + +### Module Structure + +| Module | Purpose | +|--------|---------| +| `lib.rs` | Public API and initialization | +| `status.rs` | Core Status struct and operations | +| `types.rs` | Type definitions (ClusterNode, ClusterInfo, etc.) | + +### Key Features + +**Thread-Safe**: All operations use `RwLock` or `AtomicU64` for concurrent access +**Version Tracking**: Monotonically increasing counters for change detection +**Structured Logging**: Field-based tracing for better observability +**Optional RRD**: RRD persistence is opt-in, simplifying testing + +## C to Rust Mapping + +### Data Structures + +| C Type | Rust Type | Notes | +|--------|-----------|-------| +| `cfs_status_t` | `Status` | Main status container | +| `cfs_clinfo_t` | `ClusterInfo` | Cluster membership info | +| `cfs_clnode_t` | `ClusterNode` | Individual node info | +| `vminfo_t` | `VmEntry` | VM/CT registry entry (in pmxcfs-api-types) | +| `clog_entry_t` | `ClusterLogEntry` | Cluster log entry | + +### Core Functions + +| C Function | Rust Equivalent | Notes | +|-----------|-----------------|-------| +| `cfs_status_init()` | `init()` or `init_with_rrd()` | Two variants for flexibility | +| `cfs_set_quorate()` | `Status::set_quorate()` | Quorum tracking | +| `cfs_is_quorate()` | `Status::is_quorate()` | Quorum checking | +| `vmlist_register_vm()` | `Status::register_vm()` | VM registration | +| `vmlist_delete_vm()` | `Status::delete_vm()` | VM deletion | +| `cfs_status_set()` | `Status::set_node_status()` | Status updates (including RRD) | + +## Key Differences from C Implementation + +### RRD Decoupling + +**C Version (status.c)**: +- RRD code embedded in status.c +- Async initialization always required + +**Rust Version**: +- Separate `pmxcfs-rrd` crate +- `init()` is synchronous (no RRD) +- `init_with_rrd()` is async (with RRD) +- Tests don't need temp directories + +### Concurrency + +**C Version**: +- Single `GMutex` for entire status structure + +**Rust Version**: +- Fine-grained `RwLock` for different data structures +- `AtomicU64` for version counters +- Better read parallelism + +## Configuration File Tracking + +Status tracks version numbers for these common Proxmox config files: + +- `corosync.conf`, `corosync.conf.new` +- `storage.cfg`, `user.cfg`, `domains.cfg` +- `datacenter.cfg`, `vzdump.cron`, `vzdump.conf` +- `ha/` directory files (crm_commands, manager_status, resources.cfg, etc.) +- `sdn/` directory files (vnets.cfg, zones.cfg, controllers.cfg, etc.) +- And many more (see `Status::new()` in status.rs for complete list) + +## References + +### C Implementation +- `src/pmxcfs/status.c` / `status.h` - Status tracking + +### Related Crates +- **pmxcfs-rrd**: RRD file persistence +- **pmxcfs-dfsm**: Status synchronization via StatusSyncService +- **pmxcfs-logger**: Cluster log implementation +- **pmxcfs**: FUSE plugins that read from Status diff --git a/src/pmxcfs-rs/pmxcfs-status/src/lib.rs b/src/pmxcfs-rs/pmxcfs-status/src/lib.rs new file mode 100644 index 00000000..282e007d --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/src/lib.rs @@ -0,0 +1,54 @@ +/// Status information and monitoring +/// +/// This module manages: +/// - Cluster membership (nodes, IPs, online status) +/// - RRD (Round Robin Database) data for metrics +/// - Cluster log +/// - Node status information +/// - VM/CT list tracking +mod status; +mod traits; +mod types; + +// Re-export public types +pub use pmxcfs_api_types::{VmEntry, VmType}; +pub use types::{ClusterInfo, ClusterLogEntry, ClusterNode, NodeStatus}; + +// Re-export Status struct and trait +pub use status::Status; +pub use traits::{BoxFuture, MockStatus, StatusOps}; + +use std::sync::Arc; + +/// Initialize status subsystem without RRD persistence +/// +/// This is the default initialization that creates a Status instance +/// without file-based RRD persistence. RRD data will be kept in memory only. +pub fn init() -> Arc { + tracing::info!("Status subsystem initialized (RRD persistence disabled)"); + Arc::new(Status::new(None)) +} + +/// Initialize status subsystem with RRD file persistence +/// +/// Creates a Status instance with RRD data written to disk in the specified directory. +/// This requires the RRD directory to exist and be writable. +pub async fn init_with_rrd>(rrd_dir: P) -> Arc { + let rrd_dir_path = rrd_dir.as_ref(); + let rrd_writer = match pmxcfs_rrd::RrdWriter::new(rrd_dir_path).await { + Ok(writer) => { + tracing::info!( + directory = %rrd_dir_path.display(), + "RRD file persistence enabled" + ); + Some(writer) + } + Err(e) => { + tracing::warn!(error = %e, "RRD file persistence disabled"); + None + } + }; + + tracing::info!("Status subsystem initialized"); + Arc::new(Status::new(rrd_writer)) +} diff --git a/src/pmxcfs-rs/pmxcfs-status/src/status.rs b/src/pmxcfs-rs/pmxcfs-status/src/status.rs new file mode 100644 index 00000000..94b6483d --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/src/status.rs @@ -0,0 +1,1561 @@ +/// Status subsystem implementation +use crate::types::{ClusterInfo, ClusterLogEntry, ClusterNode, NodeStatus, RrdEntry}; +use anyhow::Result; +use parking_lot::RwLock; +use pmxcfs_api_types::{VmEntry, VmType}; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Status subsystem (matches C implementation's cfs_status_t) +pub struct Status { + /// Cluster information (nodes, membership) - matches C's clinfo + cluster_info: RwLock>, + + /// Cluster info version counter - increments on membership changes (matches C's clinfo_version) + cluster_version: AtomicU64, + + /// VM list version counter - increments when VM list changes (matches C's vmlist_version) + vmlist_version: AtomicU64, + + /// MemDB path version counters (matches C's memdb_change_array) + /// Tracks versions for specific config files like "corosync.conf", "user.cfg", etc. + memdb_path_versions: RwLock>, + + /// Node status data by name + node_status: RwLock>, + + /// Cluster log with ring buffer and deduplication (matches C's clusterlog_t) + cluster_log: pmxcfs_logger::ClusterLog, + + /// RRD entries by key (e.g., "pve2-node/nodename" or "pve2.3-vm/vmid") + pub(crate) rrd_data: RwLock>, + + /// RRD file writer for persistent storage (using tokio RwLock for async compatibility) + rrd_writer: Option>>, + + /// VM/CT list (vmid -> VmEntry) + vmlist: RwLock>, + + /// Quorum status (matches C's cfs_status.quorate) + quorate: RwLock, + + /// Current cluster members (CPG membership) + members: RwLock>, + + /// Daemon start timestamp (UNIX epoch) - for .version plugin + start_time: u64, + + /// KV store data from nodes (nodeid -> key -> value) + /// Matches C implementation's kvhash + kvstore: RwLock>>>, +} + +impl Status { + /// Create a new Status instance + /// + /// For production use with RRD persistence, use `pmxcfs_status::init_with_rrd()`. + /// For tests or when RRD persistence is not needed, use `pmxcfs_status::init()`. + /// This constructor is public to allow custom initialization patterns. + pub fn new(rrd_writer: Option) -> Self { + // Wrap RrdWriter in Arc if provided (for async compatibility) + let rrd_writer = rrd_writer.map(|w| Arc::new(tokio::sync::RwLock::new(w))); + + // Initialize memdb path versions for common Proxmox config files + // Matches C implementation's memdb_change_array (status.c:79-120) + // These are the exact paths tracked by the C implementation + let mut path_versions = HashMap::new(); + let common_paths = vec![ + "corosync.conf", + "corosync.conf.new", + "storage.cfg", + "user.cfg", + "domains.cfg", + "notifications.cfg", + "priv/notifications.cfg", + "priv/shadow.cfg", + "priv/acme/plugins.cfg", + "priv/tfa.cfg", + "priv/token.cfg", + "datacenter.cfg", + "vzdump.cron", + "vzdump.conf", + "jobs.cfg", + "ha/crm_commands", + "ha/manager_status", + "ha/resources.cfg", + "ha/rules.cfg", + "ha/groups.cfg", + "ha/fence.cfg", + "status.cfg", + "replication.cfg", + "ceph.conf", + "sdn/vnets.cfg", + "sdn/zones.cfg", + "sdn/controllers.cfg", + "sdn/subnets.cfg", + "sdn/ipams.cfg", + "sdn/mac-cache.json", // SDN MAC address cache + "sdn/pve-ipam-state.json", // SDN IPAM state + "sdn/dns.cfg", // SDN DNS configuration + "sdn/fabrics.cfg", // SDN fabrics configuration + "sdn/.running-config", // SDN running configuration + "virtual-guest/cpu-models.conf", // Virtual guest CPU models + "virtual-guest/profiles.cfg", // Virtual guest profiles + "firewall/cluster.fw", // Cluster firewall rules + "mapping/directory.cfg", // Directory mappings + "mapping/pci.cfg", // PCI device mappings + "mapping/usb.cfg", // USB device mappings + ]; + + for path in common_paths { + path_versions.insert(path.to_string(), AtomicU64::new(0)); + } + + // Get start time (matches C implementation's cfs_status.start_time) + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + Self { + cluster_info: RwLock::new(None), + cluster_version: AtomicU64::new(1), + vmlist_version: AtomicU64::new(1), + memdb_path_versions: RwLock::new(path_versions), + node_status: RwLock::new(HashMap::new()), + cluster_log: pmxcfs_logger::ClusterLog::new(), + rrd_data: RwLock::new(HashMap::new()), + rrd_writer, + vmlist: RwLock::new(HashMap::new()), + quorate: RwLock::new(false), + members: RwLock::new(Vec::new()), + start_time, + kvstore: RwLock::new(HashMap::new()), + } + } + + /// Get node status + pub fn get_node_status(&self, name: &str) -> Option { + self.node_status.read().get(name).cloned() + } + + /// Set node status (matches C implementation's cfs_status_set) + /// + /// This handles status updates received via IPC from external clients. + /// If the key starts with "rrd/", it's RRD data that should be written to disk. + /// Otherwise, it's generic node status data. + pub async fn set_node_status(&self, name: String, data: Vec) -> Result<()> { + // Check if this is RRD data (matching C's cfs_status_set behavior) + if let Some(rrd_key) = name.strip_prefix("rrd/") { + // Strip "rrd/" prefix to get the actual RRD key + // Convert data to string (RRD data is text format) + let data_str = String::from_utf8(data) + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in RRD data: {e}"))?; + + // Write to RRD (stores in memory and writes to disk) + self.set_rrd_data(rrd_key.to_string(), data_str).await?; + } else { + // Regular node status (not RRD) + let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); + let status = NodeStatus { + name: name.clone(), + data, + timestamp: now, + }; + self.node_status.write().insert(name, status); + } + + Ok(()) + } + + /// Add cluster log entry + pub fn add_log_entry(&self, entry: ClusterLogEntry) { + // Convert ClusterLogEntry to ClusterLog format and add + // The ClusterLog handles size limits and deduplication internally + let _ = self.cluster_log.add( + &entry.node, + &entry.ident, + &entry.tag, + 0, // pid not tracked in our entries + entry.priority, + entry.timestamp as u32, + &entry.message, + ); + } + + /// Get cluster log entries + pub fn get_log_entries(&self, max: usize) -> Vec { + // Get entries from ClusterLog and convert to ClusterLogEntry + self.cluster_log + .get_entries(max) + .into_iter() + .map(|entry| ClusterLogEntry { + timestamp: entry.time as u64, + node: entry.node, + priority: entry.priority, + ident: entry.ident, + tag: entry.tag, + message: entry.message, + }) + .collect() + } + + /// Clear all cluster log entries (for testing) + pub fn clear_cluster_log(&self) { + self.cluster_log.clear(); + } + + /// Set RRD data (C-compatible format) + /// Key format: "pve2-node/{nodename}" or "pve2.3-vm/{vmid}" + /// Data format: "{timestamp}:{val1}:{val2}:..." + pub async fn set_rrd_data(&self, key: String, data: String) -> Result<()> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let entry = RrdEntry { + key: key.clone(), + data: data.clone(), + timestamp: now, + }; + + // Store in memory for .rrd plugin file + self.rrd_data.write().insert(key.clone(), entry); + + // Also write to RRD file on disk (if persistence is enabled) + if let Some(writer_lock) = &self.rrd_writer { + let mut writer = writer_lock.write().await; + writer.update(&key, &data).await?; + tracing::trace!("Updated RRD file: {} -> {}", key, data); + } + + Ok(()) + } + + /// Remove old RRD entries (older than 5 minutes) + pub fn remove_old_rrd_data(&self) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + const EXPIRE_SECONDS: u64 = 60 * 5; // 5 minutes + + self.rrd_data + .write() + .retain(|_, entry| now - entry.timestamp <= EXPIRE_SECONDS); + } + + /// Get RRD data dump (text format matching C implementation) + pub fn get_rrd_dump(&self) -> String { + // Remove old entries first + self.remove_old_rrd_data(); + + let rrd = self.rrd_data.read(); + let mut result = String::new(); + + for entry in rrd.values() { + result.push_str(&entry.key); + result.push(':'); + result.push_str(&entry.data); + result.push('\n'); + } + + result + } + + /// Collect disk I/O statistics (bytes read, bytes written) + /// + /// Note: This is for future VM RRD implementation. Per C implementation: + /// - Node RRD (rrd_def_node) has 12 fields and does NOT include diskread/diskwrite + /// - VM RRD (rrd_def_vm) has 10 fields and DOES include diskread/diskwrite at indices 8-9 + /// + /// This method will be used when implementing VM RRD collection. + /// + /// # Sector Size + /// The Linux kernel reports disk statistics in /proc/diskstats using 512-byte sectors + /// as the standard unit, regardless of the device's actual physical sector size. + /// This is a kernel reporting convention (see Documentation/admin-guide/iostats.rst). + #[allow(dead_code)] + fn collect_disk_io() -> Result<(u64, u64)> { + // /proc/diskstats always uses 512-byte sectors (kernel convention) + const DISKSTATS_SECTOR_SIZE: u64 = 512; + + let diskstats = procfs::diskstats()?; + + let mut total_read = 0u64; + let mut total_write = 0u64; + + for stat in diskstats { + // Skip partitions (only look at whole disks: sda, vda, etc.) + if stat + .name + .chars() + .last() + .map(|c| c.is_numeric()) + .unwrap_or(false) + { + continue; + } + + // Convert sectors to bytes using kernel's reporting unit + total_read += stat.sectors_read * DISKSTATS_SECTOR_SIZE; + total_write += stat.sectors_written * DISKSTATS_SECTOR_SIZE; + } + + Ok((total_read, total_write)) + } + + /// Register a VM/CT + pub fn register_vm(&self, vmid: u32, vmtype: VmType, node: String) { + tracing::debug!(vmid, vmtype = ?vmtype, node = %node, "Registered VM"); + + // Get existing VM version or start at 1 + let version = self + .vmlist + .read() + .get(&vmid) + .map(|vm| vm.version + 1) + .unwrap_or(1); + + let entry = VmEntry { + vmid, + vmtype, + node, + version, + }; + self.vmlist.write().insert(vmid, entry); + + // Increment vmlist version counter + self.increment_vmlist_version(); + } + + /// Delete a VM/CT + pub fn delete_vm(&self, vmid: u32) { + if self.vmlist.write().remove(&vmid).is_some() { + tracing::debug!(vmid, "Deleted VM"); + + // Increment vmlist version counter + self.increment_vmlist_version(); + } + } + + /// Check if VM/CT exists + pub fn vm_exists(&self, vmid: u32) -> bool { + self.vmlist.read().contains_key(&vmid) + } + + /// Check if a different VM/CT exists (different node or type) + pub fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool { + if let Some(entry) = self.vmlist.read().get(&vmid) { + entry.vmtype != vmtype || entry.node != node + } else { + false + } + } + + /// Get VM list + pub fn get_vmlist(&self) -> HashMap { + self.vmlist.read().clone() + } + + /// Scan directories for VMs/CTs and update vmlist + /// + /// Uses memdb's `recreate_vmlist()` to properly scan nodes/*/qemu-server/ + /// and nodes/*/lxc/ directories to track which node each VM belongs to. + pub fn scan_vmlist(&self, memdb: &pmxcfs_memdb::MemDb) { + // Use the proper recreate_vmlist from memdb which scans nodes/*/qemu-server/ and nodes/*/lxc/ + match pmxcfs_memdb::recreate_vmlist(memdb) { + Ok(new_vmlist) => { + let vmlist_len = new_vmlist.len(); + let mut vmlist = self.vmlist.write(); + *vmlist = new_vmlist; + drop(vmlist); + + tracing::info!(vms = vmlist_len, "VM list scan complete"); + + // Increment vmlist version counter + self.increment_vmlist_version(); + } + Err(err) => { + tracing::error!(error = %err, "Failed to recreate vmlist"); + } + } + } + + /// Initialize cluster information with cluster name + pub fn init_cluster(&self, cluster_name: String) { + let info = ClusterInfo::new(cluster_name); + *self.cluster_info.write() = Some(info); + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + + /// Register a node in the cluster (name, ID, IP) + pub fn register_node(&self, node_id: u32, name: String, ip: String) { + tracing::debug!(node_id, node = %name, ip = %ip, "Registering cluster node"); + + let mut cluster_info = self.cluster_info.write(); + if let Some(ref mut info) = *cluster_info { + let node = ClusterNode { + name, + node_id, + ip, + online: false, // Will be updated by cluster module + }; + info.add_node(node); + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + } + + /// Get cluster information (for .members plugin) + pub fn get_cluster_info(&self) -> Option { + self.cluster_info.read().clone() + } + + /// Get cluster version + pub fn get_cluster_version(&self) -> u64 { + self.cluster_version.load(Ordering::SeqCst) + } + + /// Increment cluster version (called when membership changes) + pub fn increment_cluster_version(&self) { + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + + /// Update cluster info from CMAP (called by ClusterConfigService) + pub fn update_cluster_info( + &self, + cluster_name: String, + config_version: u64, + nodes: Vec<(u32, String, String)>, + ) -> Result<()> { + let mut cluster_info = self.cluster_info.write(); + + // Create or update cluster info + let mut info = cluster_info + .take() + .unwrap_or_else(|| ClusterInfo::new(cluster_name.clone())); + + // Update cluster name if changed + if info.cluster_name != cluster_name { + info.cluster_name = cluster_name; + } + + // Clear existing nodes + info.nodes_by_id.clear(); + info.nodes_by_name.clear(); + + // Add updated nodes + for (nodeid, name, ip) in nodes { + let node = ClusterNode { + name: name.clone(), + node_id: nodeid, + ip, + online: false, // Will be updated by quorum module + }; + info.add_node(node); + } + + *cluster_info = Some(info); + + // Update version to reflect configuration change + self.cluster_version.store(config_version, Ordering::SeqCst); + + tracing::info!(version = config_version, "Updated cluster configuration"); + Ok(()) + } + + /// Update node online status (called by cluster module) + pub fn set_node_online(&self, node_id: u32, online: bool) { + let mut cluster_info = self.cluster_info.write(); + if let Some(ref mut info) = *cluster_info + && let Some(node) = info.nodes_by_id.get_mut(&node_id) + && node.online != online + { + node.online = online; + // Also update in nodes_by_name + if let Some(name_node) = info.nodes_by_name.get_mut(&node.name) { + name_node.online = online; + } + self.cluster_version.fetch_add(1, Ordering::SeqCst); + tracing::debug!( + node = %node.name, + node_id, + online = if online { "true" } else { "false" }, + "Node online status changed" + ); + } + } + + /// Check if cluster is quorate (matches C's cfs_is_quorate) + pub fn is_quorate(&self) -> bool { + *self.quorate.read() + } + + /// Set quorum status (matches C's cfs_set_quorate) + pub fn set_quorate(&self, quorate: bool) { + let old_quorate = *self.quorate.read(); + *self.quorate.write() = quorate; + + if old_quorate != quorate { + if quorate { + tracing::info!("Node has quorum"); + } else { + tracing::info!("Node lost quorum"); + } + } + } + + /// Get current cluster members (CPG membership) + pub fn get_members(&self) -> Vec { + self.members.read().clone() + } + + /// Update cluster members and sync online status (matches C's dfsm_confchg callback) + /// + /// This updates the CPG member list and synchronizes the online status + /// in cluster_info to match current membership. + pub fn update_members(&self, members: Vec) { + *self.members.write() = members.clone(); + + // Update online status in cluster_info based on members + // (matches C implementation's dfsm_confchg in status.c:1989-2025) + let mut cluster_info = self.cluster_info.write(); + if let Some(ref mut info) = *cluster_info { + // First mark all nodes as offline + for node in info.nodes_by_id.values_mut() { + node.online = false; + } + for node in info.nodes_by_name.values_mut() { + node.online = false; + } + + // Then mark active members as online + for member in &members { + if let Some(node) = info.nodes_by_id.get_mut(&member.node_id) { + node.online = true; + // Also update in nodes_by_name + if let Some(name_node) = info.nodes_by_name.get_mut(&node.name) { + name_node.online = true; + } + } + } + + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + } + + /// Get daemon start timestamp (for .version plugin) + pub fn get_start_time(&self) -> u64 { + self.start_time + } + + /// Increment VM list version (matches C's cfs_status.vmlist_version++) + pub fn increment_vmlist_version(&self) { + self.vmlist_version.fetch_add(1, Ordering::SeqCst); + } + + /// Get VM list version + pub fn get_vmlist_version(&self) -> u64 { + self.vmlist_version.load(Ordering::SeqCst) + } + + /// Increment version for a specific memdb path (matches C's record_memdb_change) + pub fn increment_path_version(&self, path: &str) { + let versions = self.memdb_path_versions.read(); + if let Some(counter) = versions.get(path) { + counter.fetch_add(1, Ordering::SeqCst); + } + } + + /// Get version for a specific memdb path + pub fn get_path_version(&self, path: &str) -> u64 { + let versions = self.memdb_path_versions.read(); + versions + .get(path) + .map(|counter| counter.load(Ordering::SeqCst)) + .unwrap_or(0) + } + + /// Get all memdb path versions (for .version plugin) + pub fn get_all_path_versions(&self) -> HashMap { + let versions = self.memdb_path_versions.read(); + versions + .iter() + .map(|(path, counter)| (path.clone(), counter.load(Ordering::SeqCst))) + .collect() + } + + /// Increment ALL configuration file versions (matches C's record_memdb_reload) + /// + /// Called when the entire database is reloaded from cluster peers. + /// This ensures clients know that all configuration files should be re-read. + pub fn increment_all_path_versions(&self) { + let versions = self.memdb_path_versions.read(); + for (_, counter) in versions.iter() { + counter.fetch_add(1, Ordering::SeqCst); + } + } + + /// Set key-value data from a node (kvstore DFSM) + /// + /// Matches C implementation's cfs_kvstore_node_set in status.c. + /// Stores ephemeral status data like RRD metrics, IP addresses, etc. + pub fn set_node_kv(&self, nodeid: u32, key: String, value: Vec) { + let mut kvstore = self.kvstore.write(); + kvstore.entry(nodeid).or_default().insert(key, value); + } + + /// Get key-value data from a node + pub fn get_node_kv(&self, nodeid: u32, key: &str) -> Option> { + let kvstore = self.kvstore.read(); + kvstore.get(&nodeid)?.get(key).cloned() + } + + /// Add cluster log entry (called by kvstore DFSM) + /// + /// This is the wrapper for kvstore LOG messages. + /// Matches C implementation's clusterlog_insert call. + pub fn add_cluster_log( + &self, + timestamp: u32, + priority: u8, + tag: String, + node: String, + message: String, + ) { + let entry = ClusterLogEntry { + timestamp: timestamp as u64, + node, + priority, + ident: String::new(), // Not used in kvstore messages + tag, + message, + }; + self.add_log_entry(entry); + } + + /// Update node online status based on CPG membership (kvstore DFSM confchg callback) + /// + /// This is called when kvstore CPG membership changes. + /// Matches C implementation's dfsm_confchg in status.c. + pub fn update_member_status(&self, member_list: &[u32]) { + let mut cluster_info = self.cluster_info.write(); + if let Some(ref mut info) = *cluster_info { + // Mark all nodes as offline + for node in info.nodes_by_id.values_mut() { + node.online = false; + } + for node in info.nodes_by_name.values_mut() { + node.online = false; + } + + // Mark nodes in member_list as online + for &nodeid in member_list { + if let Some(node) = info.nodes_by_id.get_mut(&nodeid) { + node.online = true; + // Also update in nodes_by_name + if let Some(name_node) = info.nodes_by_name.get_mut(&node.name) { + name_node.online = true; + } + } + } + + self.cluster_version.fetch_add(1, Ordering::SeqCst); + } + } + + /// Get cluster log state (for DFSM synchronization) + /// + /// Returns the cluster log in C-compatible binary format (clog_base_t). + /// Matches C implementation's clusterlog_get_state() in logger.c:553-571. + pub fn get_cluster_log_state(&self) -> Result> { + self.cluster_log.get_state() + } + + /// Merge cluster log states from remote nodes + /// + /// Deserializes binary states from remote nodes and merges them with the local log. + /// Matches C implementation's dfsm_process_state_update() in status.c:2049-2074. + pub fn merge_cluster_log_states( + &self, + states: &[pmxcfs_api_types::NodeSyncInfo], + ) -> Result<()> { + use pmxcfs_logger::ClusterLog; + + let mut remote_logs = Vec::new(); + + for state_info in states { + // Check if this node has state data + let state_data = match &state_info.state { + Some(data) if !data.is_empty() => data, + _ => continue, + }; + + match ClusterLog::deserialize_state(state_data) { + Ok(ring_buffer) => { + tracing::debug!( + "Deserialized cluster log from node {}: {} entries", + state_info.nodeid, + ring_buffer.len() + ); + remote_logs.push(ring_buffer); + } + Err(e) => { + tracing::warn!( + nodeid = state_info.nodeid, + error = %e, + "Failed to deserialize cluster log from node" + ); + } + } + } + + if !remote_logs.is_empty() { + // Merge remote logs with local log (include_local = true) + match self.cluster_log.merge(remote_logs, true) { + Ok(merged) => { + // Update our buffer with the merged result + self.cluster_log.update_buffer(merged); + tracing::debug!("Successfully merged cluster logs"); + } + Err(e) => { + tracing::error!(error = %e, "Failed to merge cluster logs"); + } + } + } + + Ok(()) + } + + /// Add cluster log entry from remote node (kvstore LOG message) + /// + /// Matches C implementation's clusterlog_insert() via kvstore message handling. + pub fn add_remote_cluster_log( + &self, + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + ) -> Result<()> { + self.cluster_log + .add(&node, &ident, &tag, 0, priority, time, &message)?; + Ok(()) + } +} + +// Implement StatusOps trait for Status +impl crate::traits::StatusOps for Status { + fn get_node_status(&self, name: &str) -> Option { + self.get_node_status(name) + } + + fn set_node_status<'a>( + &'a self, + name: String, + data: Vec, + ) -> crate::traits::BoxFuture<'a, Result<()>> { + Box::pin(self.set_node_status(name, data)) + } + + fn add_log_entry(&self, entry: ClusterLogEntry) { + self.add_log_entry(entry) + } + + fn get_log_entries(&self, max: usize) -> Vec { + self.get_log_entries(max) + } + + fn clear_cluster_log(&self) { + self.clear_cluster_log() + } + + fn add_cluster_log( + &self, + timestamp: u32, + priority: u8, + tag: String, + node: String, + msg: String, + ) { + self.add_cluster_log(timestamp, priority, tag, node, msg) + } + + fn get_cluster_log_state(&self) -> Result> { + self.get_cluster_log_state() + } + + fn merge_cluster_log_states(&self, states: &[pmxcfs_api_types::NodeSyncInfo]) -> Result<()> { + self.merge_cluster_log_states(states) + } + + fn add_remote_cluster_log( + &self, + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + ) -> Result<()> { + self.add_remote_cluster_log(time, priority, node, ident, tag, message) + } + + fn set_rrd_data<'a>( + &'a self, + key: String, + data: String, + ) -> crate::traits::BoxFuture<'a, Result<()>> { + Box::pin(self.set_rrd_data(key, data)) + } + + fn remove_old_rrd_data(&self) { + self.remove_old_rrd_data() + } + + fn get_rrd_dump(&self) -> String { + self.get_rrd_dump() + } + + fn register_vm(&self, vmid: u32, vmtype: VmType, node: String) { + self.register_vm(vmid, vmtype, node) + } + + fn delete_vm(&self, vmid: u32) { + self.delete_vm(vmid) + } + + fn vm_exists(&self, vmid: u32) -> bool { + self.vm_exists(vmid) + } + + fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool { + self.different_vm_exists(vmid, vmtype, node) + } + + fn get_vmlist(&self) -> HashMap { + self.get_vmlist() + } + + fn scan_vmlist(&self, memdb: &pmxcfs_memdb::MemDb) { + self.scan_vmlist(memdb) + } + + fn init_cluster(&self, cluster_name: String) { + self.init_cluster(cluster_name) + } + + fn register_node(&self, node_id: u32, name: String, ip: String) { + self.register_node(node_id, name, ip) + } + + fn get_cluster_info(&self) -> Option { + self.get_cluster_info() + } + + fn get_cluster_version(&self) -> u64 { + self.get_cluster_version() + } + + fn increment_cluster_version(&self) { + self.increment_cluster_version() + } + + fn update_cluster_info( + &self, + cluster_name: String, + config_version: u64, + nodes: Vec<(u32, String, String)>, + ) -> Result<()> { + self.update_cluster_info(cluster_name, config_version, nodes) + } + + fn set_node_online(&self, node_id: u32, online: bool) { + self.set_node_online(node_id, online) + } + + fn is_quorate(&self) -> bool { + self.is_quorate() + } + + fn set_quorate(&self, quorate: bool) { + self.set_quorate(quorate) + } + + fn get_members(&self) -> Vec { + self.get_members() + } + + fn update_members(&self, members: Vec) { + self.update_members(members) + } + + fn update_member_status(&self, member_list: &[u32]) { + self.update_member_status(member_list) + } + + fn get_start_time(&self) -> u64 { + self.get_start_time() + } + + fn increment_vmlist_version(&self) { + self.increment_vmlist_version() + } + + fn get_vmlist_version(&self) -> u64 { + self.get_vmlist_version() + } + + fn increment_path_version(&self, path: &str) { + self.increment_path_version(path) + } + + fn get_path_version(&self, path: &str) -> u64 { + self.get_path_version(path) + } + + fn get_all_path_versions(&self) -> HashMap { + self.get_all_path_versions() + } + + fn increment_all_path_versions(&self) { + self.increment_all_path_versions() + } + + fn set_node_kv(&self, nodeid: u32, key: String, value: Vec) { + self.set_node_kv(nodeid, key, value) + } + + fn get_node_kv(&self, nodeid: u32, key: &str) -> Option> { + self.get_node_kv(nodeid, key) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::ClusterLogEntry; + use pmxcfs_api_types::VmType; + + /// Test helper: Create Status without rrdcached daemon (for unit tests) + fn init_test_status() -> Arc { + // Don't try to connect to rrdcached daemon in unit tests + // RRD writer creation would be async, so just pass None for tests + // Status::new() already initializes path_versions internally + Arc::new(Status::new(None)) + } + + #[tokio::test] + async fn test_rrd_data_storage_and_retrieval() { + let status = init_test_status(); + + status.rrd_data.write().clear(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Test node RRD data format + let node_data = + format!("{now}:0:1.5:4:45.5:2.1:8000000000:6000000000:0:0:0:0:1000000:500000"); + let _ = status + .set_rrd_data("pve2-node/testnode".to_string(), node_data.clone()) + .await; + + // Test VM RRD data format + let vm_data = format!("{now}:1:60:4:2048:2048:10000:5000:1000:500:100:50"); + let _ = status + .set_rrd_data("pve2.3-vm/100".to_string(), vm_data.clone()) + .await; + + // Get RRD dump + let dump = status.get_rrd_dump(); + + // Verify both entries are present + assert!( + dump.contains("pve2-node/testnode"), + "Should contain node entry" + ); + assert!(dump.contains("pve2.3-vm/100"), "Should contain VM entry"); + + // Verify format: each line should be "key:data" + for line in dump.lines() { + assert!( + line.contains(':'), + "Each line should contain colon separator" + ); + let parts: Vec<&str> = line.split(':').collect(); + assert!(parts.len() > 1, "Each line should have key:data format"); + } + + assert_eq!(dump.lines().count(), 2, "Should have exactly 2 entries"); + } + + #[tokio::test] + async fn test_rrd_data_aging() { + let status = init_test_status(); + + status.rrd_data.write().clear(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let recent_data = + format!("{now}:0:1.5:4:45.5:2.1:8000000000:6000000000:0:0:0:0:1000000:500000"); + let _ = status + .set_rrd_data("pve2-node/recent".to_string(), recent_data) + .await; + + // Manually add an old entry (simulate time passing) + let old_timestamp = now - 400; // 400 seconds ago (> 5 minutes) + let old_data = format!( + "{old_timestamp}:0:1.5:4:45.5:2.1:8000000000:6000000000:0:0:0:0:1000000:500000" + ); + let entry = RrdEntry { + key: "pve2-node/old".to_string(), + data: old_data, + timestamp: old_timestamp, + }; + status + .rrd_data + .write() + .insert("pve2-node/old".to_string(), entry); + + // Get dump - should trigger aging and remove old entry + let dump = status.get_rrd_dump(); + + assert!( + dump.contains("pve2-node/recent"), + "Recent entry should be present" + ); + assert!( + !dump.contains("pve2-node/old"), + "Old entry should be aged out" + ); + } + + #[tokio::test] + async fn test_rrd_set_via_node_status() { + let status = init_test_status(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Simulate receiving RRD data via IPC (like pvestatd sends) + // Format matches C implementation: "timestamp:uptime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout" + let node_data = format!("{now}:12345:1.5:8:0.5:0.1:16000:8000:4000:0:100:50:1000:2000"); + + // Test the set_node_status method with "rrd/" prefix (matches C's cfs_status_set behavior) + let result = status + .set_node_status( + "rrd/pve2-node/testnode".to_string(), + node_data.as_bytes().to_vec(), + ) + .await; + assert!( + result.is_ok(), + "Should successfully set RRD data via node_status" + ); + + // Get the dump and verify + let dump = status.get_rrd_dump(); + assert!( + dump.contains("pve2-node/testnode"), + "Should contain node metrics" + ); + + // Verify the data has the expected number of fields + for line in dump.lines() { + if line.starts_with("pve2-node/") { + let parts: Vec<&str> = line.split(':').collect(); + // Format: key:timestamp:uptime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout + // That's 1 (key) + 14 fields = 15 parts minimum + assert!( + parts.len() >= 15, + "Node data should have at least 15 colon-separated fields, got {}", + parts.len() + ); + } + } + } + + #[tokio::test] + async fn test_rrd_multiple_updates() { + let status = init_test_status(); + + status.rrd_data.write().clear(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Add multiple entries + for i in 0..5 { + let data = format!( + "{}:{}:1.5:4:45.5:2.1:8000000000:6000000000:0:0:0:0:1000000:500000", + now + i, + i + ); + let _ = status + .set_rrd_data(format!("pve2-node/node{i}"), data) + .await; + } + + let dump = status.get_rrd_dump(); + let count = dump.lines().count(); + assert_eq!(count, 5, "Should have 5 entries"); + + // Verify each entry is present + for i in 0..5 { + assert!( + dump.contains(&format!("pve2-node/node{i}")), + "Should contain node{i}" + ); + } + } + + // ========== VM/CT Registry Tests ========== + + #[test] + fn test_vm_registration() { + let status = init_test_status(); + + // Register a QEMU VM + status.register_vm(100, VmType::Qemu, "node1".to_string()); + + // Verify it exists + assert!(status.vm_exists(100), "VM 100 should exist"); + + // Verify version incremented + let vmlist_version = status.get_vmlist_version(); + assert!(vmlist_version > 1, "VM list version should increment"); + + // Get VM list and verify entry + let vmlist = status.get_vmlist(); + assert_eq!(vmlist.len(), 1, "Should have 1 VM"); + + let vm = vmlist.get(&100).expect("VM 100 should be in list"); + assert_eq!(vm.vmid, 100); + assert_eq!(vm.vmtype, VmType::Qemu); + assert_eq!(vm.node, "node1"); + assert_eq!(vm.version, 1, "First registration should have version 1"); + } + + #[test] + fn test_vm_deletion() { + let status = init_test_status(); + + // Register and then delete + status.register_vm(100, VmType::Qemu, "node1".to_string()); + assert!(status.vm_exists(100), "VM should exist after registration"); + + let version_before = status.get_vmlist_version(); + status.delete_vm(100); + + assert!(!status.vm_exists(100), "VM should not exist after deletion"); + + let version_after = status.get_vmlist_version(); + assert!( + version_after > version_before, + "Version should increment on deletion" + ); + + let vmlist = status.get_vmlist(); + assert_eq!(vmlist.len(), 0, "VM list should be empty"); + } + + #[test] + fn test_vm_multiple_registrations() { + let status = init_test_status(); + + // Register multiple VMs + status.register_vm(100, VmType::Qemu, "node1".to_string()); + status.register_vm(101, VmType::Qemu, "node2".to_string()); + status.register_vm(200, VmType::Lxc, "node1".to_string()); + status.register_vm(201, VmType::Lxc, "node3".to_string()); + + let vmlist = status.get_vmlist(); + assert_eq!(vmlist.len(), 4, "Should have 4 VMs"); + + // Verify each VM + assert_eq!(vmlist.get(&100).unwrap().vmtype, VmType::Qemu); + assert_eq!(vmlist.get(&101).unwrap().node, "node2"); + assert_eq!(vmlist.get(&200).unwrap().vmtype, VmType::Lxc); + assert_eq!(vmlist.get(&201).unwrap().node, "node3"); + } + + #[test] + fn test_vm_re_registration_increments_version() { + let status = init_test_status(); + + // Register VM + status.register_vm(100, VmType::Qemu, "node1".to_string()); + let vmlist = status.get_vmlist(); + let version1 = vmlist.get(&100).unwrap().version; + assert_eq!(version1, 1, "First registration should have version 1"); + + // Re-register same VM + status.register_vm(100, VmType::Qemu, "node2".to_string()); + let vmlist = status.get_vmlist(); + let version2 = vmlist.get(&100).unwrap().version; + assert_eq!(version2, 2, "Second registration should increment version"); + assert_eq!( + vmlist.get(&100).unwrap().node, + "node2", + "Node should be updated" + ); + } + + #[test] + fn test_different_vm_exists() { + let status = init_test_status(); + + // Register VM 100 as QEMU on node1 + status.register_vm(100, VmType::Qemu, "node1".to_string()); + + // Check if different VM exists - same type, different node + assert!( + status.different_vm_exists(100, VmType::Qemu, "node2"), + "Should detect different node" + ); + + // Check if different VM exists - different type, same node + assert!( + status.different_vm_exists(100, VmType::Lxc, "node1"), + "Should detect different type" + ); + + // Check if different VM exists - same type and node (should be false) + assert!( + !status.different_vm_exists(100, VmType::Qemu, "node1"), + "Should not detect difference for identical VM" + ); + + // Check non-existent VM + assert!( + !status.different_vm_exists(999, VmType::Qemu, "node1"), + "Non-existent VM should return false" + ); + } + + // ========== Cluster Membership Tests ========== + + #[test] + fn test_cluster_initialization() { + let status = init_test_status(); + + // Initially no cluster info + assert!( + status.get_cluster_info().is_none(), + "Should have no cluster info initially" + ); + + // Initialize cluster + status.init_cluster("test-cluster".to_string()); + + let cluster_info = status.get_cluster_info(); + assert!( + cluster_info.is_some(), + "Cluster info should exist after init" + ); + assert_eq!(cluster_info.unwrap().cluster_name, "test-cluster"); + + let version = status.get_cluster_version(); + assert!(version > 1, "Cluster version should increment"); + } + + #[test] + fn test_node_registration() { + let status = init_test_status(); + + status.init_cluster("test-cluster".to_string()); + + // Register nodes + status.register_node(1, "node1".to_string(), "192.168.1.10".to_string()); + status.register_node(2, "node2".to_string(), "192.168.1.11".to_string()); + + let cluster_info = status + .get_cluster_info() + .expect("Cluster info should exist"); + assert_eq!(cluster_info.nodes_by_id.len(), 2, "Should have 2 nodes"); + assert_eq!( + cluster_info.nodes_by_name.len(), + 2, + "Should have 2 nodes by name" + ); + + let node1 = cluster_info + .nodes_by_id + .get(&1) + .expect("Node 1 should exist"); + assert_eq!(node1.name, "node1"); + assert_eq!(node1.ip, "192.168.1.10"); + assert!(!node1.online, "Node should be offline initially"); + } + + #[test] + fn test_node_online_status() { + let status = init_test_status(); + + status.init_cluster("test-cluster".to_string()); + status.register_node(1, "node1".to_string(), "192.168.1.10".to_string()); + + // Set online + status.set_node_online(1, true); + let cluster_info = status.get_cluster_info().unwrap(); + assert!( + cluster_info.nodes_by_id.get(&1).unwrap().online, + "Node should be online" + ); + assert!( + cluster_info.nodes_by_name.get("node1").unwrap().online, + "Node should be online in nodes_by_name too" + ); + + // Set offline + status.set_node_online(1, false); + let cluster_info = status.get_cluster_info().unwrap(); + assert!( + !cluster_info.nodes_by_id.get(&1).unwrap().online, + "Node should be offline" + ); + } + + #[test] + fn test_update_members() { + let status = init_test_status(); + + status.init_cluster("test-cluster".to_string()); + status.register_node(1, "node1".to_string(), "192.168.1.10".to_string()); + status.register_node(2, "node2".to_string(), "192.168.1.11".to_string()); + status.register_node(3, "node3".to_string(), "192.168.1.12".to_string()); + + // Simulate CPG membership: nodes 1 and 3 are online + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let members = vec![ + pmxcfs_api_types::MemberInfo { + node_id: 1, + pid: 1000, + joined_at: now, + }, + pmxcfs_api_types::MemberInfo { + node_id: 3, + pid: 1002, + joined_at: now, + }, + ]; + status.update_members(members); + + let cluster_info = status.get_cluster_info().unwrap(); + assert!( + cluster_info.nodes_by_id.get(&1).unwrap().online, + "Node 1 should be online" + ); + assert!( + !cluster_info.nodes_by_id.get(&2).unwrap().online, + "Node 2 should be offline" + ); + assert!( + cluster_info.nodes_by_id.get(&3).unwrap().online, + "Node 3 should be online" + ); + } + + #[test] + fn test_quorum_state() { + let status = init_test_status(); + + // Initially not quorate + assert!(!status.is_quorate(), "Should not be quorate initially"); + + // Set quorate + status.set_quorate(true); + assert!(status.is_quorate(), "Should be quorate"); + + // Unset quorate + status.set_quorate(false); + assert!(!status.is_quorate(), "Should not be quorate"); + } + + #[test] + fn test_path_version_tracking() { + let status = init_test_status(); + + // Initial version should be 0 + assert_eq!(status.get_path_version("corosync.conf"), 0); + + // Increment version + status.increment_path_version("corosync.conf"); + assert_eq!(status.get_path_version("corosync.conf"), 1); + + // Increment again + status.increment_path_version("corosync.conf"); + assert_eq!(status.get_path_version("corosync.conf"), 2); + + // Non-tracked path should return 0 + assert_eq!(status.get_path_version("nonexistent.cfg"), 0); + } + + #[test] + fn test_all_path_versions() { + let status = init_test_status(); + + // Increment a few paths + status.increment_path_version("corosync.conf"); + status.increment_path_version("corosync.conf"); + status.increment_path_version("storage.cfg"); + + let all_versions = status.get_all_path_versions(); + + // Should contain all tracked paths + assert!(all_versions.contains_key("corosync.conf")); + assert!(all_versions.contains_key("storage.cfg")); + assert!(all_versions.contains_key("user.cfg")); + + // Verify specific versions + assert_eq!(all_versions.get("corosync.conf"), Some(&2)); + assert_eq!(all_versions.get("storage.cfg"), Some(&1)); + assert_eq!(all_versions.get("user.cfg"), Some(&0)); + } + + #[test] + fn test_vmlist_version_tracking() { + let status = init_test_status(); + + let initial_version = status.get_vmlist_version(); + + status.increment_vmlist_version(); + assert_eq!(status.get_vmlist_version(), initial_version + 1); + + status.increment_vmlist_version(); + assert_eq!(status.get_vmlist_version(), initial_version + 2); + } + + #[test] + fn test_cluster_log_add_entry() { + let status = init_test_status(); + + let entry = ClusterLogEntry { + timestamp: 1234567890, + node: "node1".to_string(), + priority: 6, + ident: "pmxcfs".to_string(), + tag: "startup".to_string(), + message: "Test message".to_string(), + }; + + status.add_log_entry(entry); + + let entries = status.get_log_entries(10); + assert_eq!(entries.len(), 1, "Should have 1 log entry"); + assert_eq!(entries[0].node, "node1"); + assert_eq!(entries[0].message, "Test message"); + } + + #[test] + fn test_cluster_log_multiple_entries() { + let status = init_test_status(); + + // Add multiple entries + for i in 0..5 { + let entry = ClusterLogEntry { + timestamp: 1234567890 + i, + node: format!("node{i}"), + priority: 6, + ident: "test".to_string(), + tag: "test".to_string(), + message: format!("Message {i}"), + }; + status.add_log_entry(entry); + } + + let entries = status.get_log_entries(10); + assert_eq!(entries.len(), 5, "Should have 5 log entries"); + } + + #[test] + fn test_cluster_log_clear() { + let status = init_test_status(); + + // Add entries + for i in 0..3 { + let entry = ClusterLogEntry { + timestamp: 1234567890 + i, + node: "node1".to_string(), + priority: 6, + ident: "test".to_string(), + tag: "test".to_string(), + message: format!("Message {i}"), + }; + status.add_log_entry(entry); + } + + assert_eq!(status.get_log_entries(10).len(), 3, "Should have 3 entries"); + + // Clear + status.clear_cluster_log(); + + assert_eq!( + status.get_log_entries(10).len(), + 0, + "Should have 0 entries after clear" + ); + } + + #[test] + fn test_kvstore_operations() { + let status = init_test_status(); + + // Set some KV data + status.set_node_kv(1, "ip".to_string(), b"192.168.1.10".to_vec()); + status.set_node_kv(1, "status".to_string(), b"online".to_vec()); + status.set_node_kv(2, "ip".to_string(), b"192.168.1.11".to_vec()); + + // Get KV data + let ip1 = status.get_node_kv(1, "ip"); + assert_eq!(ip1, Some(b"192.168.1.10".to_vec())); + + let status1 = status.get_node_kv(1, "status"); + assert_eq!(status1, Some(b"online".to_vec())); + + let ip2 = status.get_node_kv(2, "ip"); + assert_eq!(ip2, Some(b"192.168.1.11".to_vec())); + + // Non-existent key + let nonexistent = status.get_node_kv(1, "nonexistent"); + assert_eq!(nonexistent, None); + + // Non-existent node + let nonexistent_node = status.get_node_kv(999, "ip"); + assert_eq!(nonexistent_node, None); + } + + #[test] + fn test_start_time() { + let status = init_test_status(); + + let start_time = status.get_start_time(); + assert!(start_time > 0, "Start time should be set"); + + // Verify it's a recent timestamp (within last hour) + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + assert!(now - start_time < 3600, "Start time should be recent"); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-status/src/traits.rs b/src/pmxcfs-rs/pmxcfs-status/src/traits.rs new file mode 100644 index 00000000..add2c440 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/src/traits.rs @@ -0,0 +1,486 @@ +use crate::types::{ClusterInfo, ClusterLogEntry, NodeStatus}; +use anyhow::Result; +use parking_lot::RwLock; +use pmxcfs_api_types::{VmEntry, VmType}; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +/// Traits for Status operations to enable mocking and testing +/// +/// Boxed future type for async trait methods +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + +/// Trait for Status operations +/// +/// This trait abstracts all Status operations to enable: +/// - Dependency injection in production code +/// - Easy mocking in unit tests +/// - Test isolation without global singleton +/// +/// The real `Status` struct implements this trait for production use. +/// `MockStatus` implements this trait for testing. +pub trait StatusOps: Send + Sync { + // Node status operations + fn get_node_status(&self, name: &str) -> Option; + fn set_node_status<'a>(&'a self, name: String, data: Vec) -> BoxFuture<'a, Result<()>>; + + // Cluster log operations + fn add_log_entry(&self, entry: ClusterLogEntry); + fn get_log_entries(&self, max: usize) -> Vec; + fn clear_cluster_log(&self); + fn add_cluster_log(&self, timestamp: u32, priority: u8, tag: String, node: String, msg: String); + fn get_cluster_log_state(&self) -> Result>; + fn merge_cluster_log_states(&self, states: &[pmxcfs_api_types::NodeSyncInfo]) -> Result<()>; + fn add_remote_cluster_log( + &self, + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + ) -> Result<()>; + + // RRD operations + fn set_rrd_data<'a>(&'a self, key: String, data: String) -> BoxFuture<'a, Result<()>>; + fn remove_old_rrd_data(&self); + fn get_rrd_dump(&self) -> String; + + // VM list operations + fn register_vm(&self, vmid: u32, vmtype: VmType, node: String); + fn delete_vm(&self, vmid: u32); + fn vm_exists(&self, vmid: u32) -> bool; + fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool; + fn get_vmlist(&self) -> HashMap; + fn scan_vmlist(&self, memdb: &pmxcfs_memdb::MemDb); + + // Cluster info operations + fn init_cluster(&self, cluster_name: String); + fn register_node(&self, node_id: u32, name: String, ip: String); + fn get_cluster_info(&self) -> Option; + fn get_cluster_version(&self) -> u64; + fn increment_cluster_version(&self); + fn update_cluster_info( + &self, + cluster_name: String, + config_version: u64, + nodes: Vec<(u32, String, String)>, + ) -> Result<()>; + fn set_node_online(&self, node_id: u32, online: bool); + + // Quorum operations + fn is_quorate(&self) -> bool; + fn set_quorate(&self, quorate: bool); + + // Members operations + fn get_members(&self) -> Vec; + fn update_members(&self, members: Vec); + fn update_member_status(&self, member_list: &[u32]); + + // Version/timestamp operations + fn get_start_time(&self) -> u64; + fn increment_vmlist_version(&self); + fn get_vmlist_version(&self) -> u64; + fn increment_path_version(&self, path: &str); + fn get_path_version(&self, path: &str) -> u64; + fn get_all_path_versions(&self) -> HashMap; + fn increment_all_path_versions(&self); + + // KV store operations + fn set_node_kv(&self, nodeid: u32, key: String, value: Vec); + fn get_node_kv(&self, nodeid: u32, key: &str) -> Option>; +} + +/// Mock implementation of StatusOps for testing +/// +/// This provides a lightweight, isolated Status implementation for unit tests. +/// Unlike the real Status, MockStatus: +/// - Can be created independently without global singleton +/// - Has no RRD writer or async dependencies +/// - Is completely isolated between test instances +/// - Can be easily reset or configured for specific test scenarios +/// +/// # Example +/// ``` +/// use pmxcfs_status::{MockStatus, StatusOps}; +/// use std::sync::Arc; +/// +/// # fn test_example() { +/// let status: Arc = Arc::new(MockStatus::new()); +/// status.set_quorate(true); +/// assert!(status.is_quorate()); +/// # } +/// ``` +pub struct MockStatus { + vmlist: RwLock>, + quorate: RwLock, + cluster_info: RwLock>, + members: RwLock>, + cluster_version: Arc, + vmlist_version: Arc, + path_versions: RwLock>, + kvstore: RwLock>>>, + cluster_log: RwLock>, + rrd_data: RwLock>, + node_status: RwLock>, + start_time: u64, +} + +impl MockStatus { + /// Create a new MockStatus instance for testing + pub fn new() -> Self { + Self { + vmlist: RwLock::new(HashMap::new()), + quorate: RwLock::new(false), + cluster_info: RwLock::new(None), + members: RwLock::new(Vec::new()), + cluster_version: Arc::new(std::sync::atomic::AtomicU64::new(0)), + vmlist_version: Arc::new(std::sync::atomic::AtomicU64::new(0)), + path_versions: RwLock::new(HashMap::new()), + kvstore: RwLock::new(HashMap::new()), + cluster_log: RwLock::new(Vec::new()), + rrd_data: RwLock::new(HashMap::new()), + node_status: RwLock::new(HashMap::new()), + start_time: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + } + } + + /// Reset all mock state (useful for test cleanup) + pub fn reset(&self) { + self.vmlist.write().clear(); + *self.quorate.write() = false; + *self.cluster_info.write() = None; + self.members.write().clear(); + self.cluster_version + .store(0, std::sync::atomic::Ordering::SeqCst); + self.vmlist_version + .store(0, std::sync::atomic::Ordering::SeqCst); + self.path_versions.write().clear(); + self.kvstore.write().clear(); + self.cluster_log.write().clear(); + self.rrd_data.write().clear(); + self.node_status.write().clear(); + } +} + +impl Default for MockStatus { + fn default() -> Self { + Self::new() + } +} + +impl StatusOps for MockStatus { + fn get_node_status(&self, name: &str) -> Option { + self.node_status.read().get(name).cloned() + } + + fn set_node_status<'a>(&'a self, name: String, data: Vec) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + // Simplified mock - just store the data + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + self.node_status.write().insert( + name.clone(), + NodeStatus { + name, + data, + timestamp: now, + }, + ); + Ok(()) + }) + } + + fn add_log_entry(&self, entry: ClusterLogEntry) { + self.cluster_log.write().push(entry); + } + + fn get_log_entries(&self, max: usize) -> Vec { + let log = self.cluster_log.read(); + log.iter().take(max).cloned().collect() + } + + fn clear_cluster_log(&self) { + self.cluster_log.write().clear(); + } + + fn add_cluster_log( + &self, + timestamp: u32, + priority: u8, + tag: String, + node: String, + msg: String, + ) { + let entry = ClusterLogEntry { + timestamp: timestamp as u64, + node, + priority, + ident: "mock".to_string(), + tag, + message: msg, + }; + self.add_log_entry(entry); + } + + fn get_cluster_log_state(&self) -> Result> { + // Simplified mock + Ok(Vec::new()) + } + + fn merge_cluster_log_states(&self, _states: &[pmxcfs_api_types::NodeSyncInfo]) -> Result<()> { + // Simplified mock + Ok(()) + } + + fn add_remote_cluster_log( + &self, + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + ) -> Result<()> { + let entry = ClusterLogEntry { + timestamp: time as u64, + node, + priority, + ident, + tag, + message, + }; + self.add_log_entry(entry); + Ok(()) + } + + fn set_rrd_data<'a>(&'a self, key: String, data: String) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + self.rrd_data.write().insert(key, data); + Ok(()) + }) + } + + fn remove_old_rrd_data(&self) { + // Mock does nothing + } + + fn get_rrd_dump(&self) -> String { + let data = self.rrd_data.read(); + data.iter().map(|(k, v)| format!("{k}: {v}\n")).collect() + } + + fn register_vm(&self, vmid: u32, vmtype: VmType, node: String) { + // Get existing version or start at 1 + let version = self + .vmlist + .read() + .get(&vmid) + .map(|vm| vm.version + 1) + .unwrap_or(1); + + self.vmlist.write().insert( + vmid, + VmEntry { + vmtype, + node, + vmid, + version, + }, + ); + self.increment_vmlist_version(); + } + + fn delete_vm(&self, vmid: u32) { + self.vmlist.write().remove(&vmid); + self.increment_vmlist_version(); + } + + fn vm_exists(&self, vmid: u32) -> bool { + self.vmlist.read().contains_key(&vmid) + } + + fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool { + if let Some(entry) = self.vmlist.read().get(&vmid) { + entry.vmtype != vmtype || entry.node != node + } else { + false + } + } + + fn get_vmlist(&self) -> HashMap { + self.vmlist.read().clone() + } + + fn scan_vmlist(&self, _memdb: &pmxcfs_memdb::MemDb) { + // Mock does nothing - real implementation scans /qemu-server and /lxc + } + + fn init_cluster(&self, cluster_name: String) { + *self.cluster_info.write() = Some(ClusterInfo { + cluster_name, + nodes_by_id: HashMap::new(), + nodes_by_name: HashMap::new(), + }); + self.increment_cluster_version(); + } + + fn register_node(&self, node_id: u32, name: String, ip: String) { + let mut info = self.cluster_info.write(); + if let Some(cluster) = info.as_mut() { + let node = crate::types::ClusterNode { + name: name.clone(), + node_id, + ip, + online: true, + }; + cluster.add_node(node); + } + self.increment_cluster_version(); + } + + fn get_cluster_info(&self) -> Option { + self.cluster_info.read().clone() + } + + fn get_cluster_version(&self) -> u64 { + self.cluster_version + .load(std::sync::atomic::Ordering::SeqCst) + } + + fn increment_cluster_version(&self) { + self.cluster_version + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + fn update_cluster_info( + &self, + cluster_name: String, + config_version: u64, + nodes: Vec<(u32, String, String)>, + ) -> Result<()> { + let mut cluster_info = self.cluster_info.write(); + + // Create or update cluster info + let mut info = cluster_info.take().unwrap_or_else(|| ClusterInfo { + cluster_name: cluster_name.clone(), + nodes_by_id: HashMap::new(), + nodes_by_name: HashMap::new(), + }); + + // Update cluster name if changed + if info.cluster_name != cluster_name { + info.cluster_name = cluster_name; + } + + // Clear existing nodes + info.nodes_by_id.clear(); + info.nodes_by_name.clear(); + + // Add updated nodes + for (nodeid, name, ip) in nodes { + let node = crate::types::ClusterNode { + name, + node_id: nodeid, + ip, + online: false, + }; + info.add_node(node); + } + + *cluster_info = Some(info); + + // Update version to reflect configuration change + self.cluster_version + .store(config_version, std::sync::atomic::Ordering::SeqCst); + + Ok(()) + } + + fn set_node_online(&self, node_id: u32, online: bool) { + let mut info = self.cluster_info.write(); + if let Some(cluster) = info.as_mut() + && let Some(node) = cluster.nodes_by_id.get_mut(&node_id) + { + node.online = online; + // Also update in nodes_by_name + if let Some(name_node) = cluster.nodes_by_name.get_mut(&node.name) { + name_node.online = online; + } + } + } + + fn is_quorate(&self) -> bool { + *self.quorate.read() + } + + fn set_quorate(&self, quorate: bool) { + *self.quorate.write() = quorate; + } + + fn get_members(&self) -> Vec { + self.members.read().clone() + } + + fn update_members(&self, members: Vec) { + *self.members.write() = members; + } + + fn update_member_status(&self, _member_list: &[u32]) { + // Mock does nothing - real implementation updates online status + } + + fn get_start_time(&self) -> u64 { + self.start_time + } + + fn increment_vmlist_version(&self) { + self.vmlist_version + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + fn get_vmlist_version(&self) -> u64 { + self.vmlist_version + .load(std::sync::atomic::Ordering::SeqCst) + } + + fn increment_path_version(&self, path: &str) { + let mut versions = self.path_versions.write(); + let version = versions.entry(path.to_string()).or_insert(0); + *version += 1; + } + + fn get_path_version(&self, path: &str) -> u64 { + *self.path_versions.read().get(path).unwrap_or(&0) + } + + fn get_all_path_versions(&self) -> HashMap { + self.path_versions.read().clone() + } + + fn increment_all_path_versions(&self) { + let mut versions = self.path_versions.write(); + for version in versions.values_mut() { + *version += 1; + } + } + + fn set_node_kv(&self, nodeid: u32, key: String, value: Vec) { + self.kvstore + .write() + .entry(nodeid) + .or_default() + .insert(key, value); + } + + fn get_node_kv(&self, nodeid: u32, key: &str) -> Option> { + self.kvstore.read().get(&nodeid)?.get(key).cloned() + } +} diff --git a/src/pmxcfs-rs/pmxcfs-status/src/types.rs b/src/pmxcfs-rs/pmxcfs-status/src/types.rs new file mode 100644 index 00000000..393ce63a --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-status/src/types.rs @@ -0,0 +1,62 @@ +/// Data types for the status module +use std::collections::HashMap; + +/// Cluster node information (matches C implementation's cfs_clnode_t) +#[derive(Debug, Clone)] +pub struct ClusterNode { + pub name: String, + pub node_id: u32, + pub ip: String, + pub online: bool, +} + +/// Cluster information (matches C implementation's cfs_clinfo_t) +#[derive(Debug, Clone)] +pub struct ClusterInfo { + pub cluster_name: String, + pub nodes_by_id: HashMap, + pub nodes_by_name: HashMap, +} + +impl ClusterInfo { + pub(crate) fn new(cluster_name: String) -> Self { + Self { + cluster_name, + nodes_by_id: HashMap::new(), + nodes_by_name: HashMap::new(), + } + } + + /// Add or update a node in the cluster + pub(crate) fn add_node(&mut self, node: ClusterNode) { + self.nodes_by_name.insert(node.name.clone(), node.clone()); + self.nodes_by_id.insert(node.node_id, node); + } +} + +/// Node status data +#[derive(Clone, Debug)] +pub struct NodeStatus { + pub name: String, + pub data: Vec, + pub timestamp: u64, +} + +/// Cluster log entry +#[derive(Clone, Debug)] +pub struct ClusterLogEntry { + pub timestamp: u64, + pub node: String, + pub priority: u8, + pub ident: String, + pub tag: String, + pub message: String, +} + +/// RRD (Round Robin Database) entry +#[derive(Clone, Debug)] +pub(crate) struct RrdEntry { + pub key: String, + pub data: String, + pub timestamp: u64, +} -- 2.47.3 _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel