From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 2652A1FF143 for ; Mon, 02 Feb 2026 17:06:58 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 44B15B33B; Mon, 2 Feb 2026 17:07:26 +0100 (CET) Message-ID: <0b31340f-2b33-4aec-b9fa-2d92e38f3a40@proxmox.com> Date: Mon, 2 Feb 2026 17:07:13 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird From: Samuel Rufinatscha Subject: Re: [pve-devel] [PATCH pve-cluster 06/15] pmxcfs-rs: add pmxcfs-status crate To: Proxmox VE development discussion , Kefu Chai References: <20260106142440.2368585-1-k.chai@proxmox.com> <20260106142440.2368585-7-k.chai@proxmox.com> Content-Language: en-US In-Reply-To: <20260106142440.2368585-7-k.chai@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 2 AWL -4.129 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SOMETLD_ARE_BAD_TLD 5 .bar, .beauty, .buzz, .cam, .casa, .cfd, .club, .date, .guru, .link, .live, .monster, .online, .press, .pw, .quest, .rest, .sbs, .shop, .stream, .top, .trade, .wiki, .work, .xyz TLD abuse PDS_OTHER_BAD_TLD 0.75 Untrustworthy TLDs SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLACK 3 Contains an URL listed in the URIBL blacklist [types.rs] Message-ID-Hash: XLLBNHMNK2G3VZXFH2XC2FGLO5XDL5IZ X-Message-ID-Hash: XLLBNHMNK2G3VZXFH2XC2FGLO5XDL5IZ X-MailFrom: s.rufinatscha@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: comments inline On 1/6/26 3:25 PM, Kefu Chai wrote: > Add cluster status tracking and monitoring: > - Status: Central status container (thread-safe) > - Cluster membership tracking > - VM/CT registry with version tracking > - RRD data management > - Cluster log integration > - Quorum state tracking > - Configuration file version tracking > > This integrates pmxcfs-memdb, pmxcfs-rrd, pmxcfs-logger, and > pmxcfs-api-types to provide centralized cluster state management. > It also uses procfs for system metrics collection. > > Includes comprehensive unit tests for: > - VM registration and deletion > - Cluster membership updates > - Version tracking > - Configuration file monitoring > > Signed-off-by: Kefu Chai > --- > src/pmxcfs-rs/Cargo.toml | 1 + > src/pmxcfs-rs/pmxcfs-status/Cargo.toml | 40 + > src/pmxcfs-rs/pmxcfs-status/README.md | 142 ++ > src/pmxcfs-rs/pmxcfs-status/src/lib.rs | 54 + > src/pmxcfs-rs/pmxcfs-status/src/status.rs | 1561 +++++++++++++++++++++ > src/pmxcfs-rs/pmxcfs-status/src/traits.rs | 486 +++++++ > src/pmxcfs-rs/pmxcfs-status/src/types.rs | 62 + > 7 files changed, 2346 insertions(+) > create mode 100644 src/pmxcfs-rs/pmxcfs-status/Cargo.toml > create mode 100644 src/pmxcfs-rs/pmxcfs-status/README.md > create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/lib.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/status.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/traits.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-status/src/types.rs > > diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml > index 2e41ac93..b5191c31 100644 > --- a/src/pmxcfs-rs/Cargo.toml > +++ b/src/pmxcfs-rs/Cargo.toml > @@ -6,6 +6,7 @@ members = [ > "pmxcfs-logger", # Cluster log with ring buffer and deduplication > "pmxcfs-rrd", # RRD (Round-Robin Database) persistence > "pmxcfs-memdb", # In-memory database with SQLite persistence > + "pmxcfs-status", # Status monitoring and RRD data management > ] > resolver = "2" > > diff --git a/src/pmxcfs-rs/pmxcfs-status/Cargo.toml b/src/pmxcfs-rs/pmxcfs-status/Cargo.toml > new file mode 100644 > index 00000000..e4a817d7 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-status/Cargo.toml > @@ -0,0 +1,40 @@ > +[package] > +name = "pmxcfs-status" > +description = "Status monitoring and RRD data management for pmxcfs" > + > +version.workspace = true > +edition.workspace = true > +authors.workspace = true > +license.workspace = true > +repository.workspace = true > + > +[lints] > +workspace = true > + > +[dependencies] > +# Workspace dependencies > +pmxcfs-api-types.workspace = true > +pmxcfs-rrd.workspace = true > +pmxcfs-memdb.workspace = true > +pmxcfs-logger.workspace = true > + > +# Error handling > +anyhow.workspace = true > + > +# Async runtime > +tokio.workspace = true > + > +# Concurrency primitives > +parking_lot.workspace = true > + > +# Logging > +tracing.workspace = true > + > +# Utilities > +chrono.workspace = true this dependency is not used > + > +# System information (Linux /proc filesystem) > +procfs = "0.17" > + > +[dev-dependencies] > +tempfile.workspace = true > diff --git a/src/pmxcfs-rs/pmxcfs-status/README.md b/src/pmxcfs-rs/pmxcfs-status/README.md > new file mode 100644 > index 00000000..b6958af3 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-status/README.md > @@ -0,0 +1,142 @@ > +# pmxcfs-status > + > +**Cluster Status** tracking and monitoring for pmxcfs. > + > +This crate manages all runtime cluster state information including membership, VM lists, node status, RRD metrics, and cluster logs. It serves as the central repository for dynamic cluster information that changes during runtime. > + > +## Overview > + > +The Status subsystem tracks: > +- **Cluster membership**: Which nodes are in the cluster and their states > +- **VM/CT tracking**: Registry of all virtual machines and containers > +- **Node status**: Per-node health and resource information > +- **RRD data**: Performance metrics (CPU, memory, disk, network) > +- **Cluster log**: Centralized log aggregation > +- **Quorum state**: Whether cluster has quorum > +- **Version tracking**: Monitors configuration file changes > + > +## Usage > + > +### Initialization > + > +```rust > +use pmxcfs_status; > + > +// For tests or when RRD persistence is not needed > +let status = pmxcfs_status::init(); > + > +// For production with RRD file persistence > +let status = pmxcfs_status::init_with_rrd("/var/lib/rrdcached/db").await; > +``` > + > +The default `init()` is synchronous and doesn't require a directory parameter, making tests simpler. Use `init_with_rrd()` for production deployments that need RRD persistence. > + > +### Integration with Other Components > + > +**FUSE Plugins**: > +- `.version` plugin reads from Status > +- `.vmlist` plugin generates VM list from Status > +- `.members` plugin generates member list from Status > +- `.rrd` plugin accesses RRD data from Status > +- `.clusterlog` plugin reads cluster log from Status > + > +**DFSM Status Sync**: > +- `StatusSyncService` (pmxcfs-dfsm) broadcasts status updates > +- Uses `pve_kvstore_v1` CPG group > +- KV store data synchronized across nodes > + > +**IPC Server**: > +- `set_status` IPC call updates Status > +- Used by `pvecm`/`pvenode` tools > +- RRD data received via IPC > + > +**MemDb Integration**: > +- Scans VM configs to populate vmlist > +- Tracks version changes on file modifications > +- Used for `.version` plugin timestamps > + > +## Architecture > + > +### Module Structure > + > +| Module | Purpose | > +|--------|---------| > +| `lib.rs` | Public API and initialization | > +| `status.rs` | Core Status struct and operations | > +| `types.rs` | Type definitions (ClusterNode, ClusterInfo, etc.) | > + > +### Key Features > + > +**Thread-Safe**: All operations use `RwLock` or `AtomicU64` for concurrent access > +**Version Tracking**: Monotonically increasing counters for change detection > +**Structured Logging**: Field-based tracing for better observability > +**Optional RRD**: RRD persistence is opt-in, simplifying testing > + > +## C to Rust Mapping > + > +### Data Structures > + > +| C Type | Rust Type | Notes | > +|--------|-----------|-------| > +| `cfs_status_t` | `Status` | Main status container | > +| `cfs_clinfo_t` | `ClusterInfo` | Cluster membership info | > +| `cfs_clnode_t` | `ClusterNode` | Individual node info | > +| `vminfo_t` | `VmEntry` | VM/CT registry entry (in pmxcfs-api-types) | > +| `clog_entry_t` | `ClusterLogEntry` | Cluster log entry | > + > +### Core Functions > + > +| C Function | Rust Equivalent | Notes | > +|-----------|-----------------|-------| > +| `cfs_status_init()` | `init()` or `init_with_rrd()` | Two variants for flexibility | > +| `cfs_set_quorate()` | `Status::set_quorate()` | Quorum tracking | > +| `cfs_is_quorate()` | `Status::is_quorate()` | Quorum checking | > +| `vmlist_register_vm()` | `Status::register_vm()` | VM registration | > +| `vmlist_delete_vm()` | `Status::delete_vm()` | VM deletion | > +| `cfs_status_set()` | `Status::set_node_status()` | Status updates (including RRD) | > + > +## Key Differences from C Implementation > + > +### RRD Decoupling > + > +**C Version (status.c)**: > +- RRD code embedded in status.c > +- Async initialization always required > + > +**Rust Version**: > +- Separate `pmxcfs-rrd` crate > +- `init()` is synchronous (no RRD) > +- `init_with_rrd()` is async (with RRD) > +- Tests don't need temp directories > + > +### Concurrency > + > +**C Version**: > +- Single `GMutex` for entire status structure > + > +**Rust Version**: > +- Fine-grained `RwLock` for different data structures > +- `AtomicU64` for version counters > +- Better read parallelism > + > +## Configuration File Tracking > + > +Status tracks version numbers for these common Proxmox config files: > + > +- `corosync.conf`, `corosync.conf.new` > +- `storage.cfg`, `user.cfg`, `domains.cfg` > +- `datacenter.cfg`, `vzdump.cron`, `vzdump.conf` > +- `ha/` directory files (crm_commands, manager_status, resources.cfg, etc.) > +- `sdn/` directory files (vnets.cfg, zones.cfg, controllers.cfg, etc.) > +- And many more (see `Status::new()` in status.rs for complete list) > + > +## References > + > +### C Implementation > +- `src/pmxcfs/status.c` / `status.h` - Status tracking > + > +### Related Crates > +- **pmxcfs-rrd**: RRD file persistence > +- **pmxcfs-dfsm**: Status synchronization via StatusSyncService > +- **pmxcfs-logger**: Cluster log implementation > +- **pmxcfs**: FUSE plugins that read from Status > diff --git a/src/pmxcfs-rs/pmxcfs-status/src/lib.rs b/src/pmxcfs-rs/pmxcfs-status/src/lib.rs > new file mode 100644 > index 00000000..282e007d > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-status/src/lib.rs > @@ -0,0 +1,54 @@ > +/// Status information and monitoring > +/// > +/// This module manages: > +/// - Cluster membership (nodes, IPs, online status) > +/// - RRD (Round Robin Database) data for metrics > +/// - Cluster log > +/// - Node status information > +/// - VM/CT list tracking > +mod status; > +mod traits; > +mod types; > + > +// Re-export public types > +pub use pmxcfs_api_types::{VmEntry, VmType}; > +pub use types::{ClusterInfo, ClusterLogEntry, ClusterNode, NodeStatus}; > + > +// Re-export Status struct and trait > +pub use status::Status; > +pub use traits::{BoxFuture, MockStatus, StatusOps}; > + > +use std::sync::Arc; > + > +/// Initialize status subsystem without RRD persistence > +/// > +/// This is the default initialization that creates a Status instance > +/// without file-based RRD persistence. RRD data will be kept in memory only. > +pub fn init() -> Arc { > + tracing::info!("Status subsystem initialized (RRD persistence disabled)"); > + Arc::new(Status::new(None)) > +} > + > +/// Initialize status subsystem with RRD file persistence > +/// > +/// Creates a Status instance with RRD data written to disk in the specified directory. > +/// This requires the RRD directory to exist and be writable. > +pub async fn init_with_rrd>(rrd_dir: P) -> Arc { > + let rrd_dir_path = rrd_dir.as_ref(); > + let rrd_writer = match pmxcfs_rrd::RrdWriter::new(rrd_dir_path).await { > + Ok(writer) => { > + tracing::info!( > + directory = %rrd_dir_path.display(), > + "RRD file persistence enabled" > + ); > + Some(writer) > + } > + Err(e) => { > + tracing::warn!(error = %e, "RRD file persistence disabled"); > + None > + } > + }; > + > + tracing::info!("Status subsystem initialized"); > + Arc::new(Status::new(rrd_writer)) > +} > diff --git a/src/pmxcfs-rs/pmxcfs-status/src/status.rs b/src/pmxcfs-rs/pmxcfs-status/src/status.rs > new file mode 100644 > index 00000000..94b6483d > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-status/src/status.rs > @@ -0,0 +1,1561 @@ > +/// Status subsystem implementation > +use crate::types::{ClusterInfo, ClusterLogEntry, ClusterNode, NodeStatus, RrdEntry}; > +use anyhow::Result; > +use parking_lot::RwLock; > +use pmxcfs_api_types::{VmEntry, VmType}; > +use std::collections::HashMap; > +use std::sync::Arc; > +use std::sync::atomic::{AtomicU64, Ordering}; > +use std::time::{SystemTime, UNIX_EPOCH}; > + > +/// Status subsystem (matches C implementation's cfs_status_t) > +pub struct Status { > + /// Cluster information (nodes, membership) - matches C's clinfo > + cluster_info: RwLock>, > + > + /// Cluster info version counter - increments on membership changes (matches C's clinfo_version) > + cluster_version: AtomicU64, This field is used as a change counter in multiple places but gets overwritten in update_cluster_info(). In C we have clinfo_version vs cman_version. These need to be separate fields as in C, otherwise update_cluster_info overwrites the monotonic change counter that other call sites depend on. > + > + /// VM list version counter - increments when VM list changes (matches C's vmlist_version) > + vmlist_version: AtomicU64, > + > + /// MemDB path version counters (matches C's memdb_change_array) > + /// Tracks versions for specific config files like "corosync.conf", "user.cfg", etc. > + memdb_path_versions: RwLock>, > + > + /// Node status data by name > + node_status: RwLock>, > + > + /// Cluster log with ring buffer and deduplication (matches C's clusterlog_t) > + cluster_log: pmxcfs_logger::ClusterLog, > + > + /// RRD entries by key (e.g., "pve2-node/nodename" or "pve2.3-vm/vmid") > + pub(crate) rrd_data: RwLock>, > + > + /// RRD file writer for persistent storage (using tokio RwLock for async compatibility) > + rrd_writer: Option>>, > + > + /// VM/CT list (vmid -> VmEntry) > + vmlist: RwLock>, > + > + /// Quorum status (matches C's cfs_status.quorate) > + quorate: RwLock, > + > + /// Current cluster members (CPG membership) > + members: RwLock>, > + > + /// Daemon start timestamp (UNIX epoch) - for .version plugin > + start_time: u64, > + > + /// KV store data from nodes (nodeid -> key -> value) > + /// Matches C implementation's kvhash > + kvstore: RwLock>>>, C removes a kvstore entry when len == 0 and maintains a per key entry->version counter (incremented on overwrite). Our kvstore currently stores only Vec and doesn’t reflect these semantics > +} > + > +impl Status { > + /// Create a new Status instance > + /// > + /// For production use with RRD persistence, use `pmxcfs_status::init_with_rrd()`. > + /// For tests or when RRD persistence is not needed, use `pmxcfs_status::init()`. > + /// This constructor is public to allow custom initialization patterns. > + pub fn new(rrd_writer: Option) -> Self { > + // Wrap RrdWriter in Arc if provided (for async compatibility) > + let rrd_writer = rrd_writer.map(|w| Arc::new(tokio::sync::RwLock::new(w))); > + > + // Initialize memdb path versions for common Proxmox config files > + // Matches C implementation's memdb_change_array (status.c:79-120) > + // These are the exact paths tracked by the C implementation > + let mut path_versions = HashMap::new(); > + let common_paths = vec![ > + "corosync.conf", > + "corosync.conf.new", > + "storage.cfg", > + "user.cfg", > + "domains.cfg", > + "notifications.cfg", > + "priv/notifications.cfg", > + "priv/shadow.cfg", > + "priv/acme/plugins.cfg", > + "priv/tfa.cfg", > + "priv/token.cfg", > + "datacenter.cfg", > + "vzdump.cron", > + "vzdump.conf", > + "jobs.cfg", > + "ha/crm_commands", > + "ha/manager_status", > + "ha/resources.cfg", > + "ha/rules.cfg", > + "ha/groups.cfg", > + "ha/fence.cfg", > + "status.cfg", > + "replication.cfg", > + "ceph.conf", > + "sdn/vnets.cfg", > + "sdn/zones.cfg", > + "sdn/controllers.cfg", > + "sdn/subnets.cfg", > + "sdn/ipams.cfg", > + "sdn/mac-cache.json", // SDN MAC address cache > + "sdn/pve-ipam-state.json", // SDN IPAM state > + "sdn/dns.cfg", // SDN DNS configuration > + "sdn/fabrics.cfg", // SDN fabrics configuration > + "sdn/.running-config", // SDN running configuration > + "virtual-guest/cpu-models.conf", // Virtual guest CPU models > + "virtual-guest/profiles.cfg", // Virtual guest profiles > + "firewall/cluster.fw", // Cluster firewall rules > + "mapping/directory.cfg", // Directory mappings > + "mapping/pci.cfg", // PCI device mappings > + "mapping/usb.cfg", // USB device mappings > + ]; > + > + for path in common_paths { > + path_versions.insert(path.to_string(), AtomicU64::new(0)); > + } > + > + // Get start time (matches C implementation's cfs_status.start_time) > + let start_time = SystemTime::now() > + .duration_since(UNIX_EPOCH) > + .unwrap_or_default() > + .as_secs(); > + > + Self { > + cluster_info: RwLock::new(None), > + cluster_version: AtomicU64::new(1), > + vmlist_version: AtomicU64::new(1), > + memdb_path_versions: RwLock::new(path_versions), > + node_status: RwLock::new(HashMap::new()), > + cluster_log: pmxcfs_logger::ClusterLog::new(), > + rrd_data: RwLock::new(HashMap::new()), > + rrd_writer, > + vmlist: RwLock::new(HashMap::new()), > + quorate: RwLock::new(false), > + members: RwLock::new(Vec::new()), > + start_time, > + kvstore: RwLock::new(HashMap::new()), > + } > + } > + > + /// Get node status > + pub fn get_node_status(&self, name: &str) -> Option { > + self.node_status.read().get(name).cloned() > + } > + > + /// Set node status (matches C implementation's cfs_status_set) > + /// > + /// This handles status updates received via IPC from external clients. > + /// If the key starts with "rrd/", it's RRD data that should be written to disk. > + /// Otherwise, it's generic node status data. > + pub async fn set_node_status(&self, name: String, data: Vec) -> Result<()> { we need to check for CFS_MAX_STATUS_SIZE, to avoid accepting unbounded payloads (and to avoid possible state divergence with C) > + // Check if this is RRD data (matching C's cfs_status_set behavior) > + if let Some(rrd_key) = name.strip_prefix("rrd/") { > + // Strip "rrd/" prefix to get the actual RRD key > + // Convert data to string (RRD data is text format) > + let data_str = String::from_utf8(data) > + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in RRD data: {e}"))?; We need to strip \0 as C payloads are NUL terminated and from_utf8 preserves it, so that it doesn't end up in RRD dump output > + > + // Write to RRD (stores in memory and writes to disk) > + self.set_rrd_data(rrd_key.to_string(), data_str).await?; > + } else { nodeip handling is missing here, C has a dedicated branch for it. The backing data structure iphash is also missing. > + // Regular node status (not RRD) > + let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); > + let status = NodeStatus { > + name: name.clone(), > + data, > + timestamp: now, > + }; > + self.node_status.write().insert(name, status); > + } > + > + Ok(()) > + } > + > + /// Add cluster log entry > + pub fn add_log_entry(&self, entry: ClusterLogEntry) { > + // Convert ClusterLogEntry to ClusterLog format and add > + // The ClusterLog handles size limits and deduplication internally > + let _ = self.cluster_log.add( > + &entry.node, > + &entry.ident, > + &entry.tag, > + 0, // pid not tracked in our entries > + entry.priority, > + entry.timestamp as u32, > + &entry.message, > + ); > + } > + > + /// Get cluster log entries > + pub fn get_log_entries(&self, max: usize) -> Vec { > + // Get entries from ClusterLog and convert to ClusterLogEntry > + self.cluster_log > + .get_entries(max) > + .into_iter() > + .map(|entry| ClusterLogEntry { > + timestamp: entry.time as u64, > + node: entry.node, > + priority: entry.priority, > + ident: entry.ident, > + tag: entry.tag, > + message: entry.message, > + }) > + .collect() > + } > + > + /// Clear all cluster log entries (for testing) > + pub fn clear_cluster_log(&self) { > + self.cluster_log.clear(); > + } > + > + /// Set RRD data (C-compatible format) > + /// Key format: "pve2-node/{nodename}" or "pve2.3-vm/{vmid}" > + /// Data format: "{timestamp}:{val1}:{val2}:..." > + pub async fn set_rrd_data(&self, key: String, data: String) -> Result<()> { > + let now = SystemTime::now() > + .duration_since(UNIX_EPOCH) > + .unwrap_or_default() > + .as_secs(); > + > + let entry = RrdEntry { > + key: key.clone(), > + data: data.clone(), > + timestamp: now, > + }; > + > + // Store in memory for .rrd plugin file > + self.rrd_data.write().insert(key.clone(), entry); > + > + // Also write to RRD file on disk (if persistence is enabled) > + if let Some(writer_lock) = &self.rrd_writer { > + let mut writer = writer_lock.write().await; > + writer.update(&key, &data).await?; > + tracing::trace!("Updated RRD file: {} -> {}", key, data); > + } > + > + Ok(()) > + } > + > + /// Remove old RRD entries (older than 5 minutes) > + pub fn remove_old_rrd_data(&self) { > + let now = SystemTime::now() > + .duration_since(UNIX_EPOCH) > + .unwrap_or_default() > + .as_secs(); > + > + const EXPIRE_SECONDS: u64 = 60 * 5; // 5 minutes > + > + self.rrd_data > + .write() > + .retain(|_, entry| now - entry.timestamp <= EXPIRE_SECONDS); If the system clock jumps backwards, now can be less than entry.timestamp > + } > + > + /// Get RRD data dump (text format matching C implementation) > + pub fn get_rrd_dump(&self) -> String { This rebuilds everytime when called, and calls remove_old_rrd_data under write lock. This could be cached for a specific time to improve performance, similarly as done in C. > + // Remove old entries first > + self.remove_old_rrd_data(); > + > + let rrd = self.rrd_data.read(); > + let mut result = String::new(); > + > + for entry in rrd.values() { > + result.push_str(&entry.key); > + result.push(':'); > + result.push_str(&entry.data); > + result.push('\n'); > + } > + > + result > + } > + > + /// Collect disk I/O statistics (bytes read, bytes written) > + /// > + /// Note: This is for future VM RRD implementation. Per C implementation: > + /// - Node RRD (rrd_def_node) has 12 fields and does NOT include diskread/diskwrite > + /// - VM RRD (rrd_def_vm) has 10 fields and DOES include diskread/diskwrite at indices 8-9 > + /// > + /// This method will be used when implementing VM RRD collection. > + /// > + /// # Sector Size > + /// The Linux kernel reports disk statistics in /proc/diskstats using 512-byte sectors > + /// as the standard unit, regardless of the device's actual physical sector size. > + /// This is a kernel reporting convention (see Documentation/admin-guide/iostats.rst). > + #[allow(dead_code)] > + fn collect_disk_io() -> Result<(u64, u64)> { > + // /proc/diskstats always uses 512-byte sectors (kernel convention) > + const DISKSTATS_SECTOR_SIZE: u64 = 512; > + > + let diskstats = procfs::diskstats()?; > + > + let mut total_read = 0u64; > + let mut total_write = 0u64; > + > + for stat in diskstats { > + // Skip partitions (only look at whole disks: sda, vda, etc.) > + if stat > + .name > + .chars() > + .last() > + .map(|c| c.is_numeric()) > + .unwrap_or(false) > + { > + continue; > + } > + > + // Convert sectors to bytes using kernel's reporting unit > + total_read += stat.sectors_read * DISKSTATS_SECTOR_SIZE; > + total_write += stat.sectors_written * DISKSTATS_SECTOR_SIZE; > + } > + > + Ok((total_read, total_write)) > + } > + > + /// Register a VM/CT > + pub fn register_vm(&self, vmid: u32, vmtype: VmType, node: String) { > + tracing::debug!(vmid, vmtype = ?vmtype, node = %node, "Registered VM"); > + > + // Get existing VM version or start at 1 > + let version = self > + .vmlist > + .read() > + .get(&vmid) > + .map(|vm| vm.version + 1) In C we have the global static uint32_t vminfo_version_counter. Here we have per vm based counters. Why the difference? wouldnt it be more helpful if we also have a global order here, so we can determine the update order of VMs from it? > + .unwrap_or(1); > + > + let entry = VmEntry { > + vmid, > + vmtype, > + node, > + version, > + }; > + self.vmlist.write().insert(vmid, entry); Between the read() and write() we have TOCTOU window, similarly as in set_quorate > + > + // Increment vmlist version counter > + self.increment_vmlist_version(); > + } > + > + /// Delete a VM/CT > + pub fn delete_vm(&self, vmid: u32) { > + if self.vmlist.write().remove(&vmid).is_some() { This should bump unconditionally to match C > + tracing::debug!(vmid, "Deleted VM"); > + > + // Increment vmlist version counter > + self.increment_vmlist_version(); > + } > + } > + > + /// Check if VM/CT exists > + pub fn vm_exists(&self, vmid: u32) -> bool { > + self.vmlist.read().contains_key(&vmid) > + } > + > + /// Check if a different VM/CT exists (different node or type) > + pub fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool { > + if let Some(entry) = self.vmlist.read().get(&vmid) { > + entry.vmtype != vmtype || entry.node != node > + } else { > + false > + } > + } > + > + /// Get VM list > + pub fn get_vmlist(&self) -> HashMap { > + self.vmlist.read().clone() > + } > + > + /// Scan directories for VMs/CTs and update vmlist > + /// > + /// Uses memdb's `recreate_vmlist()` to properly scan nodes/*/qemu-server/ > + /// and nodes/*/lxc/ directories to track which node each VM belongs to. > + pub fn scan_vmlist(&self, memdb: &pmxcfs_memdb::MemDb) { > + // Use the proper recreate_vmlist from memdb which scans nodes/*/qemu-server/ and nodes/*/lxc/ > + match pmxcfs_memdb::recreate_vmlist(memdb) { > + Ok(new_vmlist) => { > + let vmlist_len = new_vmlist.len(); > + let mut vmlist = self.vmlist.write(); > + *vmlist = new_vmlist; This replaces the entire HashMap, which resets all per VM version counters. > + drop(vmlist); > + > + tracing::info!(vms = vmlist_len, "VM list scan complete"); > + > + // Increment vmlist version counter > + self.increment_vmlist_version(); > + } > + Err(err) => { > + tracing::error!(error = %err, "Failed to recreate vmlist"); > + } > + } > + } > + > + /// Initialize cluster information with cluster name > + pub fn init_cluster(&self, cluster_name: String) { > + let info = ClusterInfo::new(cluster_name); > + *self.cluster_info.write() = Some(info); > + self.cluster_version.fetch_add(1, Ordering::SeqCst); > + } > + > + /// Register a node in the cluster (name, ID, IP) > + pub fn register_node(&self, node_id: u32, name: String, ip: String) { > + tracing::debug!(node_id, node = %name, ip = %ip, "Registering cluster node"); > + > + let mut cluster_info = self.cluster_info.write(); > + if let Some(ref mut info) = *cluster_info { > + let node = ClusterNode { > + name, > + node_id, > + ip, > + online: false, // Will be updated by cluster module > + }; > + info.add_node(node); > + self.cluster_version.fetch_add(1, Ordering::SeqCst); > + } > + } > + > + /// Get cluster information (for .members plugin) > + pub fn get_cluster_info(&self) -> Option { > + self.cluster_info.read().clone() > + } > + > + /// Get cluster version > + pub fn get_cluster_version(&self) -> u64 { > + self.cluster_version.load(Ordering::SeqCst) > + } > + > + /// Increment cluster version (called when membership changes) > + pub fn increment_cluster_version(&self) { > + self.cluster_version.fetch_add(1, Ordering::SeqCst); > + } > + > + /// Update cluster info from CMAP (called by ClusterConfigService) > + pub fn update_cluster_info( > + &self, > + cluster_name: String, > + config_version: u64, > + nodes: Vec<(u32, String, String)>, > + ) -> Result<()> { > + let mut cluster_info = self.cluster_info.write(); > + > + // Create or update cluster info > + let mut info = cluster_info > + .take() > + .unwrap_or_else(|| ClusterInfo::new(cluster_name.clone())); > + > + // Update cluster name if changed > + if info.cluster_name != cluster_name { > + info.cluster_name = cluster_name; > + } > + > + // Clear existing nodes > + info.nodes_by_id.clear(); > + info.nodes_by_name.clear(); > + > + // Add updated nodes > + for (nodeid, name, ip) in nodes { > + let node = ClusterNode { > + name: name.clone(), > + node_id: nodeid, > + ip, > + online: false, // Will be updated by quorum module This drops online status. C's cfs_status_set_clinfo preserves it by copying from oldnode. This needs the same treatment here. > + }; > + info.add_node(node); > + } Do we need to cleanup kvstore on node removal? > + > + *cluster_info = Some(info); > + > + // Update version to reflect configuration change > + self.cluster_version.store(config_version, Ordering::SeqCst); > + > + tracing::info!(version = config_version, "Updated cluster configuration"); > + Ok(()) > + } > + > + /// Update node online status (called by cluster module) > + pub fn set_node_online(&self, node_id: u32, online: bool) { > + let mut cluster_info = self.cluster_info.write(); > + if let Some(ref mut info) = *cluster_info > + && let Some(node) = info.nodes_by_id.get_mut(&node_id) > + && node.online != online > + { > + node.online = online; > + // Also update in nodes_by_name > + if let Some(name_node) = info.nodes_by_name.get_mut(&node.name) { > + name_node.online = online; > + } > + self.cluster_version.fetch_add(1, Ordering::SeqCst); > + tracing::debug!( > + node = %node.name, > + node_id, > + online = if online { "true" } else { "false" }, > + "Node online status changed" > + ); > + } > + } > + > + /// Check if cluster is quorate (matches C's cfs_is_quorate) > + pub fn is_quorate(&self) -> bool { > + *self.quorate.read() > + } > + > + /// Set quorum status (matches C's cfs_set_quorate) > + pub fn set_quorate(&self, quorate: bool) { > + let old_quorate = *self.quorate.read(); between this > + *self.quorate.write() = quorate; and this line we have a TOCTOU window. The * dereferences the bool out of it, and then the guard is dropped at the semicolon. So between line 1 and line 2, no lock is held. Putting both operations under the write lock would solve it. > + > + if old_quorate != quorate { > + if quorate { > + tracing::info!("Node has quorum"); > + } else { > + tracing::info!("Node lost quorum"); > + } > + } > + } > + > + /// Get current cluster members (CPG membership) > + pub fn get_members(&self) -> Vec { > + self.members.read().clone() > + } > + > + /// Update cluster members and sync online status (matches C's dfsm_confchg callback) > + /// > + /// This updates the CPG member list and synchronizes the online status > + /// in cluster_info to match current membership. > + pub fn update_members(&self, members: Vec) { > + *self.members.write() = members.clone(); > + > + // Update online status in cluster_info based on members > + // (matches C implementation's dfsm_confchg in status.c:1989-2025) > + let mut cluster_info = self.cluster_info.write(); > + if let Some(ref mut info) = *cluster_info { > + // First mark all nodes as offline > + for node in info.nodes_by_id.values_mut() { > + node.online = false; > + } > + for node in info.nodes_by_name.values_mut() { > + node.online = false; > + } > + > + // Then mark active members as online > + for member in &members { > + if let Some(node) = info.nodes_by_id.get_mut(&member.node_id) { > + node.online = true; > + // Also update in nodes_by_name > + if let Some(name_node) = info.nodes_by_name.get_mut(&node.name) { > + name_node.online = true; > + } > + } > + } > + > + self.cluster_version.fetch_add(1, Ordering::SeqCst); > + } > + } > + > + /// Get daemon start timestamp (for .version plugin) > + pub fn get_start_time(&self) -> u64 { > + self.start_time > + } > + > + /// Increment VM list version (matches C's cfs_status.vmlist_version++) > + pub fn increment_vmlist_version(&self) { > + self.vmlist_version.fetch_add(1, Ordering::SeqCst); > + } > + > + /// Get VM list version > + pub fn get_vmlist_version(&self) -> u64 { > + self.vmlist_version.load(Ordering::SeqCst) > + } > + > + /// Increment version for a specific memdb path (matches C's record_memdb_change) > + pub fn increment_path_version(&self, path: &str) { > + let versions = self.memdb_path_versions.read(); > + if let Some(counter) = versions.get(path) { > + counter.fetch_add(1, Ordering::SeqCst); > + } > + } > + > + /// Get version for a specific memdb path > + pub fn get_path_version(&self, path: &str) -> u64 { > + let versions = self.memdb_path_versions.read(); > + versions > + .get(path) > + .map(|counter| counter.load(Ordering::SeqCst)) > + .unwrap_or(0) > + } > + > + /// Get all memdb path versions (for .version plugin) > + pub fn get_all_path_versions(&self) -> HashMap { > + let versions = self.memdb_path_versions.read(); > + versions > + .iter() > + .map(|(path, counter)| (path.clone(), counter.load(Ordering::SeqCst))) > + .collect() > + } > + > + /// Increment ALL configuration file versions (matches C's record_memdb_reload) > + /// > + /// Called when the entire database is reloaded from cluster peers. > + /// This ensures clients know that all configuration files should be re-read. > + pub fn increment_all_path_versions(&self) { > + let versions = self.memdb_path_versions.read(); > + for (_, counter) in versions.iter() { > + counter.fetch_add(1, Ordering::SeqCst); > + } > + } > + > + /// Set key-value data from a node (kvstore DFSM) > + /// > + /// Matches C implementation's cfs_kvstore_node_set in status.c. > + /// Stores ephemeral status data like RRD metrics, IP addresses, etc. > + pub fn set_node_kv(&self, nodeid: u32, key: String, value: Vec) { We accept unknown nodeids here, maybe something like this would work let cluster_info = self.cluster_info.read(); match &*cluster_info { Some(info) if info.nodes_by_id.contains_key(&nodeid) => {}, _ => return, } drop(cluster_info); Also, shouldn't we also have the same 3 checks here as set_node_status should have? Basically if let Some(rrd_key) = key.strip_prefix("rrd/") { .. } else if key == "nodeip" { .. } else { .. } > + let mut kvstore = self.kvstore.write(); > + kvstore.entry(nodeid).or_default().insert(key, value); > + } > + > + /// Get key-value data from a node > + pub fn get_node_kv(&self, nodeid: u32, key: &str) -> Option> { > + let kvstore = self.kvstore.read(); > + kvstore.get(&nodeid)?.get(key).cloned() > + } > + > + /// Add cluster log entry (called by kvstore DFSM) > + /// > + /// This is the wrapper for kvstore LOG messages. > + /// Matches C implementation's clusterlog_insert call. > + pub fn add_cluster_log( > + &self, > + timestamp: u32, > + priority: u8, > + tag: String, > + node: String, > + message: String, > + ) { > + let entry = ClusterLogEntry { > + timestamp: timestamp as u64, > + node, > + priority, > + ident: String::new(), // Not used in kvstore messages > + tag, > + message, > + }; > + self.add_log_entry(entry); > + } > + > + /// Update node online status based on CPG membership (kvstore DFSM confchg callback) > + /// > + /// This is called when kvstore CPG membership changes. > + /// Matches C implementation's dfsm_confchg in status.c. > + pub fn update_member_status(&self, member_list: &[u32]) { > + let mut cluster_info = self.cluster_info.write(); > + if let Some(ref mut info) = *cluster_info { > + // Mark all nodes as offline > + for node in info.nodes_by_id.values_mut() { > + node.online = false; > + } > + for node in info.nodes_by_name.values_mut() { > + node.online = false; > + } > + > + // Mark nodes in member_list as online > + for &nodeid in member_list { > + if let Some(node) = info.nodes_by_id.get_mut(&nodeid) { > + node.online = true; > + // Also update in nodes_by_name > + if let Some(name_node) = info.nodes_by_name.get_mut(&node.name) { > + name_node.online = true; > + } > + } > + } > + > + self.cluster_version.fetch_add(1, Ordering::SeqCst); > + } > + } > + > + /// Get cluster log state (for DFSM synchronization) > + /// > + /// Returns the cluster log in C-compatible binary format (clog_base_t). > + /// Matches C implementation's clusterlog_get_state() in logger.c:553-571. > + pub fn get_cluster_log_state(&self) -> Result> { > + self.cluster_log.get_state() > + } > + > + /// Merge cluster log states from remote nodes > + /// > + /// Deserializes binary states from remote nodes and merges them with the local log. > + /// Matches C implementation's dfsm_process_state_update() in status.c:2049-2074. > + pub fn merge_cluster_log_states( > + &self, > + states: &[pmxcfs_api_types::NodeSyncInfo], > + ) -> Result<()> { > + use pmxcfs_logger::ClusterLog; > + > + let mut remote_logs = Vec::new(); > + > + for state_info in states { > + // Check if this node has state data > + let state_data = match &state_info.state { > + Some(data) if !data.is_empty() => data, > + _ => continue, > + }; > + > + match ClusterLog::deserialize_state(state_data) { > + Ok(ring_buffer) => { > + tracing::debug!( > + "Deserialized cluster log from node {}: {} entries", > + state_info.nodeid, > + ring_buffer.len() > + ); > + remote_logs.push(ring_buffer); > + } > + Err(e) => { > + tracing::warn!( > + nodeid = state_info.nodeid, > + error = %e, > + "Failed to deserialize cluster log from node" > + ); > + } > + } > + } > + > + if !remote_logs.is_empty() { > + // Merge remote logs with local log (include_local = true) > + match self.cluster_log.merge(remote_logs, true) { > + Ok(merged) => { > + // Update our buffer with the merged result > + self.cluster_log.update_buffer(merged); > + tracing::debug!("Successfully merged cluster logs"); > + } > + Err(e) => { > + tracing::error!(error = %e, "Failed to merge cluster logs"); > + } > + } > + } > + > + Ok(()) > + } > + > + /// Add cluster log entry from remote node (kvstore LOG message) > + /// > + /// Matches C implementation's clusterlog_insert() via kvstore message handling. > + pub fn add_remote_cluster_log( > + &self, > + time: u32, > + priority: u8, > + node: String, > + ident: String, > + tag: String, > + message: String, > + ) -> Result<()> { > + self.cluster_log > + .add(&node, &ident, &tag, 0, priority, time, &message)?; > + Ok(()) > + } > +} > + > +// Implement StatusOps trait for Status > +impl crate::traits::StatusOps for Status { > + fn get_node_status(&self, name: &str) -> Option { > + self.get_node_status(name) > + } > + > + fn set_node_status<'a>( > + &'a self, > + name: String, > + data: Vec, > + ) -> crate::traits::BoxFuture<'a, Result<()>> { > + Box::pin(self.set_node_status(name, data)) > + } > + > + fn add_log_entry(&self, entry: ClusterLogEntry) { > + self.add_log_entry(entry) > + } > + > + fn get_log_entries(&self, max: usize) -> Vec { > + self.get_log_entries(max) > + } > + > + fn clear_cluster_log(&self) { > + self.clear_cluster_log() > + } > + > + fn add_cluster_log( > + &self, > + timestamp: u32, > + priority: u8, > + tag: String, > + node: String, > + msg: String, > + ) { > + self.add_cluster_log(timestamp, priority, tag, node, msg) > + } > + > + fn get_cluster_log_state(&self) -> Result> { > + self.get_cluster_log_state() > + } > + > + fn merge_cluster_log_states(&self, states: &[pmxcfs_api_types::NodeSyncInfo]) -> Result<()> { > + self.merge_cluster_log_states(states) > + } > + > + fn add_remote_cluster_log( > + &self, > + time: u32, > + priority: u8, > + node: String, > + ident: String, > + tag: String, > + message: String, > + ) -> Result<()> { > + self.add_remote_cluster_log(time, priority, node, ident, tag, message) > + } > + > + fn set_rrd_data<'a>( > + &'a self, > + key: String, > + data: String, > + ) -> crate::traits::BoxFuture<'a, Result<()>> { > + Box::pin(self.set_rrd_data(key, data)) > + } > + > + fn remove_old_rrd_data(&self) { > + self.remove_old_rrd_data() > + } > + > + fn get_rrd_dump(&self) -> String { > + self.get_rrd_dump() > + } > + > + fn register_vm(&self, vmid: u32, vmtype: VmType, node: String) { > + self.register_vm(vmid, vmtype, node) > + } > + > + fn delete_vm(&self, vmid: u32) { > + self.delete_vm(vmid) > + } > + > + fn vm_exists(&self, vmid: u32) -> bool { > + self.vm_exists(vmid) > + } > + > + fn different_vm_exists(&self, vmid: u32, vmtype: VmType, node: &str) -> bool { > + self.different_vm_exists(vmid, vmtype, node) > + } > + > + fn get_vmlist(&self) -> HashMap { > + self.get_vmlist() > + } > + > + fn scan_vmlist(&self, memdb: &pmxcfs_memdb::MemDb) { > + self.scan_vmlist(memdb) > + } > + > + fn init_cluster(&self, cluster_name: String) { > + self.init_cluster(cluster_name) > + } > + > + fn register_node(&self, node_id: u32, name: String, ip: String) { > + self.register_node(node_id, name, ip) > + } > + > + fn get_cluster_info(&self) -> Option { > + self.get_cluster_info() > + } > + > + fn get_cluster_version(&self) -> u64 { > + self.get_cluster_version() > + } > + > + fn increment_cluster_version(&self) { > + self.increment_cluster_version() > + } > + > + fn update_cluster_info( > + &self, > + cluster_name: String, > + config_version: u64, > + nodes: Vec<(u32, String, String)>, > + ) -> Result<()> { > + self.update_cluster_info(cluster_name, config_version, nodes) > + } > + > + fn set_node_online(&self, node_id: u32, online: bool) { > + self.set_node_online(node_id, online) > + } > + > + fn is_quorate(&self) -> bool { > + self.is_quorate() > + } > + > + fn set_quorate(&self, quorate: bool) { > + self.set_quorate(quorate) > + } > + > + fn get_members(&self) -> Vec { > + self.get_members() > + } > + > + fn update_members(&self, members: Vec) { > + self.update_members(members) > + } > + > + fn update_member_status(&self, member_list: &[u32]) { > + self.update_member_status(member_list) > + } > + > + fn get_start_time(&self) -> u64 { > + self.get_start_time() > + } > + > + fn increment_vmlist_version(&self) { > + self.increment_vmlist_version() > + } > + > + fn get_vmlist_version(&self) -> u64 { > + self.get_vmlist_version() > + } > + > + fn increment_path_version(&self, path: &str) { > + self.increment_path_version(path) > + } > + > + fn get_path_version(&self, path: &str) -> u64 { > + self.get_path_version(path) > + } > + > + fn get_all_path_versions(&self) -> HashMap { > + self.get_all_path_versions() > + } > + > + fn increment_all_path_versions(&self) { > + self.increment_all_path_versions() > + } > + > + fn set_node_kv(&self, nodeid: u32, key: String, value: Vec) { > + self.set_node_kv(nodeid, key, value) > + } > + > + fn get_node_kv(&self, nodeid: u32, key: &str) -> Option> { > + self.get_node_kv(nodeid, key) > + } > +} > + > +#[cfg(test)] > +mod tests { [..] > +} > diff --git a/src/pmxcfs-rs/pmxcfs-status/src/traits.rs b/src/pmxcfs-rs/pmxcfs-status/src/traits.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-status/src/types.rs b/src/pmxcfs-rs/pmxcfs-status/src/types.rs > new file mode 100644 > index 00000000..393ce63a > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-status/src/types.rs > @@ -0,0 +1,62 @@ > +/// Data types for the status module > +use std::collections::HashMap; > + > +/// Cluster node information (matches C implementation's cfs_clnode_t) > +#[derive(Debug, Clone)] > +pub struct ClusterNode { > + pub name: String, > + pub node_id: u32, > + pub ip: String, > + pub online: bool, > +} > + > +/// Cluster information (matches C implementation's cfs_clinfo_t) > +#[derive(Debug, Clone)] > +pub struct ClusterInfo { > + pub cluster_name: String, > + pub nodes_by_id: HashMap, > + pub nodes_by_name: HashMap, Mutation sites have to remember to update both maps. A safer pattern would be to make nodes_by_name just an index: pub nodes_by_id: HashMap, pub nodes_by_name: HashMap, > +} > + > +impl ClusterInfo { > + pub(crate) fn new(cluster_name: String) -> Self { > + Self { > + cluster_name, > + nodes_by_id: HashMap::new(), > + nodes_by_name: HashMap::new(), > + } > + } > + > + /// Add or update a node in the cluster > + pub(crate) fn add_node(&mut self, node: ClusterNode) { > + self.nodes_by_name.insert(node.name.clone(), node.clone()); > + self.nodes_by_id.insert(node.node_id, node); > + } > +} > + > +/// Node status data > +#[derive(Clone, Debug)] > +pub struct NodeStatus { > + pub name: String, > + pub data: Vec, > + pub timestamp: u64, > +} > + > +/// Cluster log entry > +#[derive(Clone, Debug)] > +pub struct ClusterLogEntry { > + pub timestamp: u64, > + pub node: String, > + pub priority: u8, > + pub ident: String, > + pub tag: String, > + pub message: String, > +} > + > +/// RRD (Round Robin Database) entry > +#[derive(Clone, Debug)] > +pub(crate) struct RrdEntry { > + pub key: String, > + pub data: String, > + pub timestamp: u64, > +}