From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 13F481FF141 for ; Fri, 13 Feb 2026 10:47:27 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C021C33691; Fri, 13 Feb 2026 10:47:13 +0100 (CET) From: Kefu Chai To: pve-devel@lists.proxmox.com Subject: [PATCH pve-cluster 10/14 v2] pmxcfs-rs: add pmxcfs-dfsm crate Date: Fri, 13 Feb 2026 17:33:47 +0800 Message-ID: <20260213094119.2379288-11-k.chai@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260213094119.2379288-1-k.chai@proxmox.com> References: <20260213094119.2379288-1-k.chai@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1770975769654 X-SPAM-LEVEL: Spam detection results: 0 AWL -1.385 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_1 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLACK 3 Contains an URL listed in the URIBL blacklist [types.rs] X-MailFrom: k.chai@proxmox.com X-Mailman-Rule-Hits: max-size X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; news-moderation; no-subject; digests; suspicious-header Message-ID-Hash: 5VEUWZYH3IBN566LE7BZAKRXRPU6PEFF X-Message-ID-Hash: 5VEUWZYH3IBN566LE7BZAKRXRPU6PEFF X-Mailman-Approved-At: Fri, 13 Feb 2026 10:47:06 +0100 X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Add Distributed Finite State Machine for cluster synchronization: - Dfsm: Core state machine implementation - ClusterDatabaseService: MemDb sync (pmxcfs_v1 CPG group) - StatusSyncService: Status sync (pve_kvstore_v1 CPG group) - Protocol: SyncStart, State, Update, UpdateComplete, Verify - Leader election based on version and mtime - Incremental updates for efficiency This integrates pmxcfs-memdb, pmxcfs-services, and rust-corosync to provide cluster-wide database synchronization. It implements the wire-compatible protocol used by the C version. Includes unit tests for: - Index serialization and comparison - Leader election logic - Tree entry serialization - Diff computation between indices Signed-off-by: Kefu Chai --- src/pmxcfs-rs/Cargo.toml | 9 + src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml | 46 + src/pmxcfs-rs/pmxcfs-dfsm/README.md | 340 +++++ src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs | 80 ++ .../src/cluster_database_service.rs | 116 ++ src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs | 235 ++++ src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs | 722 ++++++++++ src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs | 194 +++ .../pmxcfs-dfsm/src/kv_store_message.rs | 387 +++++ src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs | 32 + src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs | 21 + .../pmxcfs-dfsm/src/state_machine.rs | 1251 +++++++++++++++++ .../pmxcfs-dfsm/src/status_sync_service.rs | 118 ++ src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs | 107 ++ src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs | 279 ++++ .../tests/multi_node_sync_tests.rs | 563 ++++++++ src/pmxcfs-rs/pmxcfs-memdb/src/database.rs | 4 +- src/pmxcfs-rs/pmxcfs-status/src/status.rs | 4 +- 18 files changed, 4504 insertions(+), 4 deletions(-) create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/README.md create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml index 07c450fb4..4dfb1c1a8 100644 --- a/src/pmxcfs-rs/Cargo.toml +++ b/src/pmxcfs-rs/Cargo.toml @@ -10,6 +10,7 @@ members = [ "pmxcfs-test-utils", # Test utilities and helpers (dev-only) "pmxcfs-services", # Service framework for automatic retry and lifecycle management "pmxcfs-ipc", # libqb-compatible IPC server + "pmxcfs-dfsm", # Distributed Finite State Machine ] resolver = "2" @@ -32,6 +33,10 @@ pmxcfs-status = { path = "pmxcfs-status" } pmxcfs-test-utils = { path = "pmxcfs-test-utils" } pmxcfs-services = { path = "pmxcfs-services" } pmxcfs-ipc = { path = "pmxcfs-ipc" } +pmxcfs-dfsm = { path = "pmxcfs-dfsm" } + +# Corosync integration +rust-corosync = "0.1" # Core async runtime tokio = { version = "1.35", features = ["full"] } @@ -51,6 +56,7 @@ async-trait = "0.1" # Serialization serde = { version = "1.0", features = ["derive"] } bincode = "1.3" +bytemuck = { version = "1.14", features = ["derive"] } # Network and cluster bytes = "1.5" @@ -63,6 +69,9 @@ parking_lot = "0.12" libc = "0.2" nix = { version = "0.29", features = ["socket", "poll"] } +# Utilities +num_enum = "0.7" + # Development dependencies tempfile = "3.8" diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml new file mode 100644 index 000000000..b495f6343 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "pmxcfs-dfsm" +description = "Distributed Finite State Machine for cluster state synchronization" + +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +# Internal dependencies +pmxcfs-api-types.workspace = true +pmxcfs-memdb.workspace = true +pmxcfs-services.workspace = true + +# Corosync integration +rust-corosync.workspace = true + +# Error handling +anyhow.workspace = true +thiserror.workspace = true + +# Async and concurrency +parking_lot.workspace = true +async-trait.workspace = true +tokio.workspace = true + +# Serialization +serde.workspace = true +bincode.workspace = true +bytemuck.workspace = true + +# Logging +tracing.workspace = true + +# Utilities +num_enum.workspace = true +libc.workspace = true + +[dev-dependencies] +tempfile.workspace = true +libc.workspace = true diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/README.md b/src/pmxcfs-rs/pmxcfs-dfsm/README.md new file mode 100644 index 000000000..a8412f1b0 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/README.md @@ -0,0 +1,340 @@ +# pmxcfs-dfsm + +**Distributed Finite State Machine** for cluster-wide state synchronization in pmxcfs. + +This crate implements the DFSM protocol used to replicate configuration changes and status updates across all nodes in a Proxmox cluster via Corosync CPG (Closed Process Group). + +## Overview + +The DFSM is the core mechanism for maintaining consistency across cluster nodes. It ensures that: + +- All nodes see filesystem operations (writes, creates, deletes) in the same order +- Database state remains synchronized even after network partitions +- Status information (VM states, RRD data) is broadcast to all nodes +- State verification catches inconsistencies + +## Architecture + +### Key Components + +### Module Structure + +| Module | Purpose | C Equivalent | +|--------|---------|--------------| +| `state_machine.rs` | Core DFSM logic, state transitions | `dfsm.c` | +| `cluster_database_service.rs` | MemDb sync service | `dcdb.c`, `loop.c:service_dcdb` | +| `status_sync_service.rs` | Status/kvstore sync service | `loop.c:service_status` | +| `cpg_service.rs` | Corosync CPG integration | `dfsm.c:cpg_callbacks` | +| `dfsm_message.rs` | Protocol message types | `dfsm.c:dfsm_message_*_header_t` | +| `message.rs` | Message trait and serialization | (inline in C) | +| `wire_format.rs` | C-compatible wire format | `dcdb.c:c_fuse_message_header_t` | +| `broadcast.rs` | Cluster-wide message broadcast | `dcdb.c:dcdb_send_fuse_message` | +| `types.rs` | Type definitions (modes, epochs) | `dfsm.c:dfsm_mode_t` | + +## C to Rust Mapping + +### Data Structures + +| C Type | Rust Type | Notes | +|--------|-----------|-------| +| `dfsm_t` | `Dfsm` | Main state machine | +| `dfsm_mode_t` | `DfsmMode` | Enum with type safety | +| `dfsm_node_info_t` | (internal) | Node state tracking | +| `dfsm_sync_info_t` | (internal) | Sync session info | +| `dfsm_callbacks_t` | Trait-based callbacks | Type-safe callbacks via traits | +| `dfsm_message_*_header_t` | `DfsmMessage` | Type-safe enum variants | + +### Functions + +#### Core DFSM Operations + +| C Function | Rust Equivalent | Location | +|-----------|-----------------|----------| +| `dfsm_new()` | `Dfsm::new()` | state_machine.rs | +| `dfsm_initialize()` | `Dfsm::init_cpg()` | state_machine.rs | +| `dfsm_join()` | (part of init_cpg) | state_machine.rs | +| `dfsm_dispatch()` | `Dfsm::dispatch_events()` | state_machine.rs | +| `dfsm_send_message()` | `Dfsm::send_message()` | state_machine.rs | +| `dfsm_send_update()` | `Dfsm::send_update()` | state_machine.rs | +| `dfsm_verify_request()` | `Dfsm::verify_request()` | state_machine.rs | +| `dfsm_finalize()` | `Dfsm::stop_services()` | state_machine.rs | + +#### DCDB (Cluster Database) Operations + +| C Function | Rust Equivalent | Location | +|-----------|-----------------|----------| +| `dcdb_new()` | `ClusterDatabaseService::new()` | cluster_database_service.rs | +| `dcdb_send_fuse_message()` | `broadcast()` | broadcast.rs | +| `dcdb_send_unlock()` | `FuseMessage::Unlock` + broadcast | broadcast.rs | +| `service_dcdb()` | `ClusterDatabaseService` | cluster_database_service.rs | + +#### Status Sync Operations + +| C Function | Rust Equivalent | Location | +|-----------|-----------------|----------| +| `service_status()` | `StatusSyncService` | status_sync_service.rs | +| (kvstore CPG group) | `StatusSyncService` | Uses separate CPG group | + +### Callback System + +**C Implementation:** + +**Rust Implementation:** +- Uses trait-based callbacks instead of function pointers +- Callbacks are implemented by `MemDbCallbacks` (memdb integration) +- Defined in external crates (pmxcfs-memdb) + +## Synchronization Protocol + +The DFSM ensures all nodes maintain consistent database state through a multi-phase synchronization protocol: + +### Protocol Phases + +#### Phase 1: Membership Change + +When nodes join or leave the cluster: + +1. **Corosync CPG** delivers membership change notification +2. **DFSM invalidates** cached checksums +3. **Message queues** are cleared +4. **Epoch counter** is incremented + +**CPG Leader** (lowest node ID): +- Initiates sync by sending `SyncStart` message +- Sends its own `State` (CPG doesn't loop back messages) + +**All Followers**: +- Respond to `SyncStart` by sending their `State` +- Wait for other nodes' states + +#### Phase 2: State Exchange + +Each node collects `State` messages containing serialized **MemDbIndex** (compact state summary using C-compatible wire format). + +State digests are computed using SHA-256 hashing to detect differences between nodes. + +#### Phase 3: Leader Election + +When all states are collected, `process_state_update()` is called: + +1. **Parse indices** from all node states +2. **Elect data leader** (may differ from CPG leader): + - Highest `version` wins + - If tied, highest `mtime` wins +3. **Identify synced nodes**: Nodes whose index matches leader exactly +4. **Determine own status**: + - If we're the data leader → send updates to followers + - If we're synced with leader → mark as Synced + - Otherwise → enter Update mode and wait + +**Leader Election Algorithm**: + +#### Phase 4: Incremental Updates + +**Data Leader** (node with highest version): + +1. **Compare indices** using `find_differences()` for each follower +2. **Serialize differing entries** to C-compatible TreeEntry format +3. **Send Update messages** via CPG +4. **Send UpdateComplete** when all updates sent + +**Followers** (out-of-sync nodes): + +1. **Receive Update messages** +2. **Deserialize TreeEntry** via `TreeEntry::deserialize_from_update()` +3. **Apply to database** via `MemDb::apply_tree_entry()`: + - INSERT OR REPLACE in SQLite + - Update in-memory structures + - Handle entry moves (parent/name changes) +4. **On UpdateComplete**: Transition to Synced mode + +#### Phase 5: Normal Operations + +When in **Synced** mode: + +- FUSE operations are broadcast via `send_fuse_message()` +- Messages are delivered immediately via `deliver_message()` +- Leader periodically sends `VerifyRequest` for checksum comparison +- Nodes respond with `Verify` containing SHA-256 of entire database +- Mismatches trigger cluster resync + +--- + +## Protocol Details + +### State Machine Transitions + +Based on analysis of C implementation (`dfsm.c` lines 795-1209): + +#### Critical Protocol Rules + +1. **Epoch Management**: + - Each node creates local epoch during confchg: `(counter++, time, own_nodeid, own_pid)` + - **Leader sends SYNC_START with its epoch** + - **Followers MUST adopt leader's epoch from SYNC_START** (`dfsm->sync_epoch = header->epoch`) + - All STATE messages in sync round use adopted epoch + - Epoch mismatch → message discarded (may lead to LEAVE) + +2. **Member List Validation**: + - Built from `member_list` in confchg callback + - Stored in `dfsm->sync_info->nodes[]` + - STATE sender MUST be in this list + - Non-member STATE → immediate LEAVE + +3. **Duplicate Detection**: + - Each node sends STATE exactly once per sync round + - Tracked via `ni->state` pointer (NULL = not received, non-NULL = received) + - Duplicate STATE from same nodeid/pid → immediate LEAVE + - ✅ **FIXED**: Rust implementation now matches C (see commit c321869cc) + +4. **Message Ordering** (one sync round): + +5. **Leader Selection**: + - Determined by `lowest_nodeid` from member list + - Set in confchg callback before any messages sent + - Used to validate SYNC_START sender (logged but not enforced) + - Re-elected during state processing based on DB versions + +### DFSM States (DfsmMode) + +| State | Value | Description | C Equivalent | +|-------|-------|-------------|--------------| +| `Start` | 0 | Initial connection | `DFSM_MODE_START` | +| `StartSync` | 1 | Beginning sync | `DFSM_MODE_START_SYNC` | +| `Synced` | 2 | Fully synchronized | `DFSM_MODE_SYNCED` | +| `Update` | 3 | Receiving updates | `DFSM_MODE_UPDATE` | +| `Leave` | 253 | Leaving group | `DFSM_MODE_LEAVE` | +| `VersionError` | 254 | Protocol mismatch | `DFSM_MODE_VERSION_ERROR` | +| `Error` | 255 | Error state | `DFSM_MODE_ERROR` | + +### Message Types (DfsmMessageType) + +| Type | Value | Purpose | +|------|-------|---------| +| `Normal` | 0 | Application messages (with header + payload) | +| `SyncStart` | 1 | Start sync (from leader) | +| `State` | 2 | Full state data | +| `Update` | 3 | Incremental update | +| `UpdateComplete` | 4 | End of updates | +| `VerifyRequest` | 5 | Request state verification | +| `Verify` | 6 | State checksum response | + +All messages use C-compatible wire format with headers and payloads. + +### Application Message Types + +The DFSM can carry two types of application messages: + +1. **Fuse Messages** (Filesystem operations) + - CPG Group: `pmxcfs_v1` (DCDB) + - Message types: `Write`, `Create`, `Delete`, `Mkdir`, `Rename`, `SetMtime`, `Unlock` + - Defined in: `pmxcfs-api-types::FuseMessage` + +2. **KvStore Messages** (Status/RRD sync) + - CPG Group: `pve_kvstore_v1` + - Message types: `Data` (key-value pairs for status sync) + - Defined in: `pmxcfs-api-types::KvStoreMessage` + +### Wire Format Compatibility + +All wire formats are **byte-compatible** with the C implementation. Messages include appropriate headers and payloads as defined in the C protocol. + +## Synchronization Flow + +### 1. Node Join + +### 2. Normal Operation + +### 3. State Verification (Periodic) + +## Key Differences from C Implementation + +### Event Loop Architecture + +**C Version:** +- Uses libqb's `qb_loop` for event loop +- CPG fd registered with `qb_loop_poll_add()` +- Dispatch called from qb_loop when fd is readable + +**Rust Version:** +- Uses tokio async runtime +- Service trait provides `dispatch()` method +- ServiceManager polls fd using tokio's async I/O +- No qb_loop dependency + +### CPG Instance Management + +**C Version:** +- Single DFSM struct with callbacks +- Two different CPG groups created separately + +**Rust Version:** +- Each CPG group gets its own `Dfsm` instance +- `ClusterDatabaseService` - manages `pmxcfs_v1` CPG group (MemDb) +- `StatusSyncService` - manages `pve_kvstore_v1` CPG group (Status/RRD) +- Both use same DFSM protocol but different callbacks + +## Error Handling + +### Split-Brain Prevention + +- Checksum verification detects divergence +- Automatic resync on mismatch +- Version monotonicity ensures forward progress + +### Network Partition Recovery + +- Membership changes trigger sync +- Highest version always wins +- Stale data is safely replaced + +### Consistency Guarantees + +- SQLite transactions ensure atomic updates +- In-memory structures updated atomically +- Version increments are monotonic +- All nodes converge to same state + +## Compatibility Matrix + +| Feature | C Version | Rust Version | Compatible | +|---------|-----------|--------------|------------| +| Wire format | `dfsm_message_*_header_t` | `DfsmMessage::serialize()` | Yes | +| CPG protocol | libcorosync | rust-corosync | Yes | +| Message types | 0-6 | `DfsmMessageType` | Yes | +| State machine | `dfsm_mode_t` | `DfsmMode` | Yes | +| Protocol version | 1 | 1 | Yes | +| Group names | `pmxcfs_v1`, `pve_kvstore_v1` | Same | Yes | + +## Known Issues / TODOs + +### Missing Features +- [ ] **Sync message batching**: C version can batch updates, Rust sends individually +- [ ] **Message queue limits**: C has MAX_QUEUE_LEN, Rust unbounded (potential memory issue) +- [ ] **Detailed error codes**: C returns specific CS_ERR_* codes, Rust uses anyhow errors + +### Behavioral Differences (Benign) +- **Logging**: Rust uses `tracing` instead of `qb_log` (compatible with journald) +- **Threading**: Rust uses tokio tasks, C uses qb_loop single-threaded model +- **Timers**: Rust uses tokio timers, C uses qb_loop timers (same timeout values) + +### Incompatibilities (None Known) +No incompatibilities have been identified. The Rust implementation is fully wire-compatible and can operate in a mixed C/Rust cluster. + +## References + +### C Implementation +- `src/pmxcfs/dfsm.c` / `dfsm.h` - Core DFSM implementation +- `src/pmxcfs/dcdb.c` / `dcdb.h` - Distributed database coordination +- `src/pmxcfs/loop.c` / `loop.h` - Service loop and management + +### Related Crates +- **pmxcfs-memdb**: Database callbacks for DFSM +- **pmxcfs-status**: Status tracking and kvstore +- **pmxcfs-api-types**: Message type definitions +- **pmxcfs-services**: Service framework for lifecycle management +- **rust-corosync**: CPG bindings (external dependency) + +### Corosync Documentation +- CPG (Closed Process Group) API: https://github.com/corosync/corosync +- Group communication semantics: Total order, virtual synchrony diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs new file mode 100644 index 000000000..f2f48619b --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs @@ -0,0 +1,80 @@ +/// DFSM application callbacks +/// +/// This module defines the callback trait that application layers implement +/// to integrate with the DFSM state machine. +use crate::NodeSyncInfo; + +/// Callback trait for DFSM operations +/// +/// The application layer implements this to receive DFSM events. +/// The associated type `Message` specifies the message type this callback handles: +/// - `FuseMessage` for main database operations +/// - `KvStoreMessage` for status synchronization +/// +/// This provides type safety by ensuring each DFSM instance only delivers +/// the correct message type to its callbacks. +pub trait Callbacks: Send + Sync { + /// The message type this callback handles + type Message: crate::message::Message; + + /// Deliver an application message + /// + /// The message type is determined by the associated type: + /// - FuseMessage for main database operations + /// - KvStoreMessage for status synchronization + fn deliver_message( + &self, + nodeid: u32, + pid: u32, + message: Self::Message, + timestamp: u64, + ) -> anyhow::Result<(i32, bool)>; + + /// Compute state checksum for verification + fn compute_checksum(&self, output: &mut [u8; 32]) -> anyhow::Result<()>; + + /// Get current state for synchronization + /// + /// Called when we need to send our state to other nodes during sync. + fn get_state(&self) -> anyhow::Result>; + + /// Process state update during synchronization + fn process_state_update(&self, states: &[NodeSyncInfo]) -> anyhow::Result; + + /// Process incremental update from leader + /// + /// The leader sends individual TreeEntry updates during synchronization. + /// The data is serialized TreeEntry in C-compatible wire format. + fn process_update(&self, nodeid: u32, pid: u32, data: &[u8]) -> anyhow::Result<()>; + + /// Commit synchronized state + fn commit_state(&self) -> anyhow::Result<()>; + + /// Called when cluster becomes synced + fn on_synced(&self); + + /// Clean up sync resources (matches C's dfsm_cleanup_fn) + /// + /// Called to release resources allocated during state synchronization. + /// This is called when sync resources are being released, typically during + /// membership changes or when transitioning out of sync mode. + /// + /// Default implementation does nothing (Rust's RAII handles most cleanup). + fn cleanup_sync_resources(&self, _states: &[NodeSyncInfo]) { + // Default: no-op, Rust's Drop trait handles cleanup + } + + /// Called on membership changes (matches C's dfsm_confchg_fn) + /// + /// Notifies the application layer when cluster membership changes. + /// This can be used for logging, monitoring, or application-specific + /// membership tracking. + /// + /// # Arguments + /// * `member_list` - Current list of cluster members after the change + /// + /// Default implementation does nothing (membership handled internally). + fn on_membership_change(&self, _member_list: &[pmxcfs_api_types::MemberInfo]) { + // Default: no-op, membership changes handled internally + } +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs new file mode 100644 index 000000000..f2847062d --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs @@ -0,0 +1,116 @@ +//! Cluster Database Service +//! +//! This service synchronizes the distributed cluster database (pmxcfs-memdb) across +//! all cluster nodes using DFSM (Distributed Finite State Machine). +//! +//! Equivalent to C implementation's service_dcdb (Distributed Cluster DataBase). +//! Provides automatic retry, event-driven CPG dispatching, and periodic state verification. + +use async_trait::async_trait; +use pmxcfs_services::{Service, ServiceError}; +use rust_corosync::CsError; +use std::sync::Arc; +use std::time::Duration; +use tracing::{debug, error, info, warn}; + +use crate::Dfsm; +use crate::message::Message; + +/// Cluster Database Service +/// +/// Synchronizes the distributed cluster database (pmxcfs-memdb) across all nodes. +/// Implements the Service trait to provide: +/// - Automatic retry if CPG initialization fails +/// - Event-driven CPG dispatching for database replication +/// - Periodic state verification via timer callback +/// +/// This is equivalent to C implementation's service_dcdb (Distributed Cluster DataBase). +/// +/// The generic parameter `M` specifies the message type this service handles. +pub struct ClusterDatabaseService { + dfsm: Arc>, + fd: Option, +} + +impl ClusterDatabaseService { + /// Create a new cluster database service + pub fn new(dfsm: Arc>) -> Self { + Self { dfsm, fd: None } + } +} + +#[async_trait] +impl Service for ClusterDatabaseService { + fn name(&self) -> &str { + "cluster-database" + } + + async fn initialize(&mut self) -> pmxcfs_services::Result { + info!("Initializing cluster database service (dcdb)"); + + // Initialize CPG connection (this also joins the group) + self.dfsm.init_cpg().map_err(|e| { + ServiceError::InitializationFailed(format!("DFSM CPG initialization failed: {e}")) + })?; + + // Get file descriptor for event monitoring + let fd = self.dfsm.fd_get().map_err(|e| { + self.dfsm.stop_services().ok(); + ServiceError::InitializationFailed(format!("Failed to get DFSM fd: {e}")) + })?; + + self.fd = Some(fd); + + info!( + "Cluster database service initialized successfully with fd {}", + fd + ); + Ok(fd) + } + + async fn dispatch(&mut self) -> pmxcfs_services::Result { + match self.dfsm.dispatch_events() { + Ok(_) => Ok(true), + Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => { + warn!("DFSM connection lost, requesting reinitialization"); + Ok(false) + } + Err(e) => { + error!("DFSM dispatch failed: {}", e); + Err(ServiceError::DispatchFailed(format!( + "DFSM dispatch failed: {e}" + ))) + } + } + } + + async fn finalize(&mut self) -> pmxcfs_services::Result<()> { + info!("Finalizing cluster database service"); + + self.fd = None; + + if let Err(e) = self.dfsm.stop_services() { + warn!("Error stopping cluster database services: {}", e); + } + + info!("Cluster database service finalized"); + Ok(()) + } + + async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> { + debug!("Cluster database timer callback: initiating state verification"); + + // Request state verification + if let Err(e) = self.dfsm.verify_request() { + warn!("DFSM state verification request failed: {}", e); + } + + Ok(()) + } + + fn timer_period(&self) -> Option { + // Match C implementation's DCDB_VERIFY_TIME (60 * 60 seconds) + // Periodic state verification happens once per hour + Some(Duration::from_secs(3600)) + } +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs new file mode 100644 index 000000000..6e74238e9 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs @@ -0,0 +1,235 @@ +//! Safe, idiomatic wrapper for Corosync CPG (Closed Process Group) +//! +//! This module provides a trait-based abstraction over the Corosync CPG C API, +//! handling the unsafe FFI boundary and callback lifecycle management internally. + +use anyhow::Result; +use rust_corosync::{NodeId, cpg}; +use std::sync::Arc; + +/// Helper to extract CpgHandler from CPG context +/// +/// # Safety +/// - Context must point to a valid Arc leaked via Box::into_raw() +/// - Handler must still be alive (CpgService not dropped) +/// - Pointer must be properly aligned for Arc +/// +/// # Errors +/// Returns error if context is invalid, null, or misaligned +unsafe fn handler_from_context<'a>(handle: cpg::Handle) -> Result<&'a dyn CpgHandler> { + let context = cpg::context_get(handle) + .map_err(|e| anyhow::anyhow!("Failed to get CPG context: {e:?}"))?; + + if context == 0 { + return Err(anyhow::anyhow!("CPG context is null - not initialized")); + } + + // Validate pointer alignment + if context % std::mem::align_of::>() as u64 != 0 { + return Err(anyhow::anyhow!("CPG context pointer misaligned")); + } + + // Context points to a leaked Arc + // We borrow the Arc to get a reference to the handler + let arc_ptr = context as *const Arc; + let arc_ref: &Arc = unsafe { &*arc_ptr }; + Ok(arc_ref.as_ref()) +} + +/// Trait for handling CPG events in a safe, idiomatic way +/// +/// Implementors receive callbacks when CPG events occur. The trait handles +/// all unsafe pointer conversion and context management internally. +pub trait CpgHandler: Send + Sync + 'static { + fn on_deliver(&self, group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]); + + fn on_confchg( + &self, + group_name: &str, + member_list: &[cpg::Address], + left_list: &[cpg::Address], + joined_list: &[cpg::Address], + ); +} + +/// Safe wrapper for CPG handle that manages callback lifecycle +/// +/// This service registers callbacks with the CPG handle and ensures proper +/// cleanup when dropped. It uses Arc reference counting to safely manage +/// the handler lifetime across the FFI boundary. +pub struct CpgService { + handle: cpg::Handle, + handler: Arc, +} + +impl CpgService { + pub fn new(handler: Arc) -> Result { + fn cpg_deliver_callback( + handle: &cpg::Handle, + group_name: String, + nodeid: NodeId, + pid: u32, + msg: &[u8], + _msg_len: usize, + ) { + // Catch panics to prevent unwinding through C code (UB) + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + match unsafe { handler_from_context(*handle) } { + Ok(handler) => handler.on_deliver(&group_name, nodeid, pid, msg), + Err(e) => { + tracing::error!("CPG deliver callback error: {}", e); + } + } + })); + + if let Err(panic) = result { + tracing::error!("PANIC in CPG deliver callback: {:?}", panic); + } + } + + fn cpg_confchg_callback( + handle: &cpg::Handle, + group_name: &str, + member_list: Vec, + left_list: Vec, + joined_list: Vec, + ) { + // Catch panics to prevent unwinding through C code (UB) + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + match unsafe { handler_from_context(*handle) } { + Ok(handler) => handler.on_confchg(group_name, &member_list, &left_list, &joined_list), + Err(e) => { + tracing::error!("CPG confchg callback error: {}", e); + } + } + })); + + if let Err(panic) = result { + tracing::error!("PANIC in CPG confchg callback: {:?}", panic); + } + } + + let model_data = cpg::ModelData::ModelV1(cpg::Model1Data { + flags: cpg::Model1Flags::None, + deliver_fn: Some(cpg_deliver_callback), + confchg_fn: Some(cpg_confchg_callback), + totem_confchg_fn: None, + }); + + let handle = cpg::initialize(&model_data, 0)?; + + let handler_dyn: Arc = handler; + let leaked_arc = Box::new(Arc::clone(&handler_dyn)); + let arc_ptr = Box::into_raw(leaked_arc) as u64; + + // Set context with error handling to prevent Arc leak + if let Err(e) = cpg::context_set(handle, arc_ptr) { + // Recover the leaked Arc on error + unsafe { + let _ = Box::from_raw(arc_ptr as *mut Arc); + } + // Finalize CPG handle + let _ = cpg::finalize(handle); + return Err(e.into()); + } + + Ok(Self { + handle, + handler: handler_dyn, + }) + } + + pub fn join(&self, group_name: &str) -> Result<()> { + // Group names are hardcoded in the application, so assert they don't contain nulls + debug_assert!(!group_name.contains('\0'), "Group name cannot contain null bytes"); + + // IMPORTANT: C implementation uses strlen(name) + 1 for CPG name length, + // which includes the trailing nul. To ensure compatibility with C nodes, + // we must add \0 to the group name. + // See src/pmxcfs/dfsm.c: dfsm->cpg_group_name.length = strlen(group_name) + 1; + let group_string = format!("{group_name}\0"); + tracing::warn!( + "CPG JOIN: Joining group '{}' (verify matches C's DCDB_CPG_GROUP_NAME='pve_dcdb_v1')", + group_name + ); + cpg::join(self.handle, &group_string)?; + tracing::info!("CPG JOIN: Successfully joined group '{}'", group_name); + Ok(()) + } + + pub fn leave(&self, group_name: &str) -> Result<()> { + // Group names are hardcoded in the application, so assert they don't contain nulls + debug_assert!(!group_name.contains('\0'), "Group name cannot contain null bytes"); + + // Include trailing nul to match C's behavior (see join() comment) + let group_string = format!("{group_name}\0"); + cpg::leave(self.handle, &group_string)?; + Ok(()) + } + + pub fn mcast(&self, guarantee: cpg::Guarantee, msg: &[u8]) -> Result<()> { + cpg::mcast_joined(self.handle, guarantee, msg)?; + Ok(()) + } + + pub fn dispatch(&self) -> Result<(), rust_corosync::CsError> { + cpg::dispatch(self.handle, rust_corosync::DispatchFlags::All) + } + + pub fn fd(&self) -> Result { + Ok(cpg::fd_get(self.handle)?) + } + + pub fn handler(&self) -> &Arc { + &self.handler + } + + pub fn handle(&self) -> cpg::Handle { + self.handle + } +} + +impl Drop for CpgService { + fn drop(&mut self) { + // CRITICAL: Finalize BEFORE recovering Arc to prevent race condition + // where callbacks could fire while we're deallocating the Arc + if let Err(e) = cpg::finalize(self.handle) { + tracing::error!("Failed to finalize CPG handle: {:?}", e); + } + + // Now safe to recover Arc - no more callbacks can fire + match cpg::context_get(self.handle) { + Ok(context) if context != 0 => { + unsafe { + let _boxed = Box::from_raw(context as *mut Arc); + } + } + Ok(_) => { + tracing::warn!("CPG context was null during drop"); + } + Err(e) => { + // Context_get might fail after finalize - this is acceptable + tracing::debug!("Context get failed during drop: {:?}", e); + } + } + } +} + +/// SAFETY: CpgService is thread-safe with the following guarantees: +/// +/// 1. cpg::Handle is thread-safe per Corosync 2.x/3.x library design +/// - CPG library uses internal mutex for concurrent access protection +/// +/// 2. Handler is protected by Arc reference counting +/// - Multiple threads can safely hold references to the handler +/// +/// 3. CpgHandler trait requires Send + Sync +/// - Implementations must handle concurrent callbacks safely +/// +/// 4. Methods join/leave/mcast are safe to call concurrently from multiple threads +/// +/// LIMITATIONS: +/// - Do NOT call dispatch() concurrently from multiple threads on the same handle +/// (CPG library dispatch is not reentrant) +unsafe impl Send for CpgService {} +unsafe impl Sync for CpgService {} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs new file mode 100644 index 000000000..5d03eae45 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs @@ -0,0 +1,722 @@ +/// DFSM Protocol Message Types +/// +/// This module defines the DfsmMessage enum which encapsulates all DFSM protocol messages +/// with their associated data, providing type-safe serialization and deserialization. +/// +/// Wire format matches C implementation's dfsm_message_*_header_t structures for compatibility. +use anyhow::Result; +use pmxcfs_memdb::TreeEntry; + +use super::message::Message; +use super::types::{DfsmMessageType, SyncEpoch}; + +/// DFSM protocol message with typed variants +/// +/// Each variant corresponds to a message type in the DFSM protocol and carries +/// the appropriate payload data. The wire format matches the C implementation: +/// +/// For Normal messages: dfsm_message_normal_header_t (24 bytes) + fuse_data +/// ```text +/// [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][fuse_data...] +/// ``` +/// +/// The generic parameter `M` specifies the application message type and must implement +/// the `Message` trait for serialization/deserialization: +/// - `DfsmMessage` for database operations +/// - `DfsmMessage` for status synchronization +#[derive(Debug, Clone)] +pub enum DfsmMessage { + /// Regular application message + /// + /// Contains a typed application message (FuseMessage or KvStoreMessage). + /// C wire format: dfsm_message_normal_header_t + application_message data + Normal { + msg_count: u64, + timestamp: u32, // Unix timestamp (matches C's u32) + protocol_version: u32, // Protocol version + message: M, // Typed message (FuseMessage or KvStoreMessage) + }, + + /// Start synchronization signal from leader (no payload) + /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + SyncStart { sync_epoch: SyncEpoch }, + + /// State data from another node during sync + /// + /// Wire format: dfsm_message_state_header_t (32 bytes) + [state_data: raw bytes] + State { + sync_epoch: SyncEpoch, + data: Vec, + }, + + /// State update from leader + /// + /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + TreeEntry fields + /// This is sent by the leader during synchronization to update followers + /// with individual database entries that differ from their state. + Update { + sync_epoch: SyncEpoch, + tree_entry: TreeEntry, + }, + + /// Update complete signal from leader (no payload) + /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + UpdateComplete { sync_epoch: SyncEpoch }, + + /// Verification request from leader + /// + /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64] + VerifyRequest { sync_epoch: SyncEpoch, csum_id: u64 }, + + /// Verification response with checksum + /// + /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64][checksum: [u8; 32]] + Verify { + sync_epoch: SyncEpoch, + csum_id: u64, + checksum: [u8; 32], + }, +} + +impl DfsmMessage { + /// Protocol version (should match cluster-wide) + pub const DEFAULT_PROTOCOL_VERSION: u32 = 1; + + /// Get the message type discriminant + pub fn message_type(&self) -> DfsmMessageType { + match self { + DfsmMessage::Normal { .. } => DfsmMessageType::Normal, + DfsmMessage::SyncStart { .. } => DfsmMessageType::SyncStart, + DfsmMessage::State { .. } => DfsmMessageType::State, + DfsmMessage::Update { .. } => DfsmMessageType::Update, + DfsmMessage::UpdateComplete { .. } => DfsmMessageType::UpdateComplete, + DfsmMessage::VerifyRequest { .. } => DfsmMessageType::VerifyRequest, + DfsmMessage::Verify { .. } => DfsmMessageType::Verify, + } + } + + /// Serialize message to C-compatible wire format + /// + /// For Normal/Update: dfsm_message_normal_header_t (24 bytes) + application_data + /// Format: [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][data...] + pub fn serialize(&self) -> Vec { + match self { + DfsmMessage::Normal { + msg_count, + timestamp, + protocol_version, + message, + } => self.serialize_normal_message(*msg_count, *timestamp, *protocol_version, message), + _ => self.serialize_state_message(), + } + } + + /// Serialize a Normal message with C-compatible header + fn serialize_normal_message( + &self, + msg_count: u64, + timestamp: u32, + protocol_version: u32, + message: &M, + ) -> Vec { + let msg_type = self.message_type() as u16; + let subtype = message.message_type(); + let app_data = message.serialize(); + + // C header: type (u16) + subtype (u16) + protocol (u32) + time (u32) + reserved (u32) + count (u64) = 24 bytes + let mut message = Vec::with_capacity(24 + app_data.len()); + + // dfsm_message_header_t fields + message.extend_from_slice(&msg_type.to_le_bytes()); + message.extend_from_slice(&subtype.to_le_bytes()); + message.extend_from_slice(&protocol_version.to_le_bytes()); + message.extend_from_slice(×tamp.to_le_bytes()); + message.extend_from_slice(&0u32.to_le_bytes()); // reserved + + // count field + message.extend_from_slice(&msg_count.to_le_bytes()); + + // application message data + message.extend_from_slice(&app_data); + + message + } + + /// Serialize state messages (non-Normal) with C-compatible header + /// C wire format: dfsm_message_state_header_t (32 bytes) + payload + /// Header breakdown: base (16 bytes) + epoch (16 bytes) + fn serialize_state_message(&self) -> Vec { + let msg_type = self.message_type() as u16; + let (sync_epoch, payload) = self.extract_epoch_and_payload(); + + // For state messages: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + payload + let mut message = Vec::with_capacity(32 + payload.len()); + + // Base header (16 bytes): type, subtype, protocol, time, reserved + message.extend_from_slice(&msg_type.to_le_bytes()); + message.extend_from_slice(&0u16.to_le_bytes()); // subtype (unused) + message.extend_from_slice(&Self::DEFAULT_PROTOCOL_VERSION.to_le_bytes()); + + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as u32; + message.extend_from_slice(×tamp.to_le_bytes()); + message.extend_from_slice(&0u32.to_le_bytes()); // reserved + + // Epoch header (16 bytes): epoch, time, nodeid, pid + message.extend_from_slice(&sync_epoch.serialize()); + + // Payload + message.extend_from_slice(&payload); + + message + } + + /// Extract sync_epoch and payload from state messages + fn extract_epoch_and_payload(&self) -> (SyncEpoch, Vec) { + match self { + DfsmMessage::Normal { .. } => { + unreachable!("Normal messages use serialize_normal_message") + } + DfsmMessage::SyncStart { sync_epoch } => (*sync_epoch, Vec::new()), + DfsmMessage::State { sync_epoch, data } => (*sync_epoch, data.clone()), + DfsmMessage::Update { + sync_epoch, + tree_entry, + } => (*sync_epoch, tree_entry.serialize_for_update()), + DfsmMessage::UpdateComplete { sync_epoch } => (*sync_epoch, Vec::new()), + DfsmMessage::VerifyRequest { + sync_epoch, + csum_id, + } => (*sync_epoch, csum_id.to_le_bytes().to_vec()), + DfsmMessage::Verify { + sync_epoch, + csum_id, + checksum, + } => { + let mut data = Vec::with_capacity(8 + 32); + data.extend_from_slice(&csum_id.to_le_bytes()); + data.extend_from_slice(checksum); + (*sync_epoch, data) + } + } + } + + /// Deserialize message from C-compatible wire format + /// + /// Normal messages: [base header: 16 bytes][count: u64][app data] + /// State messages: [base header: 16 bytes][epoch: 16 bytes][payload] + /// + /// # Arguments + /// * `data` - Raw message bytes from CPG + pub fn deserialize(data: &[u8]) -> Result { + if data.len() < 16 { + anyhow::bail!( + "Message too short: {} bytes (need at least 16 for header)", + data.len() + ); + } + + // Parse dfsm_message_header_t (16 bytes) + let msg_type = u16::from_le_bytes([data[0], data[1]]); + let subtype = u16::from_le_bytes([data[2], data[3]]); + let protocol_version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]); + let timestamp = u32::from_le_bytes([data[8], data[9], data[10], data[11]]); + let _reserved = u32::from_le_bytes([data[12], data[13], data[14], data[15]]); + + let dfsm_type = DfsmMessageType::try_from(msg_type)?; + + // Normal messages have different structure than state messages + if dfsm_type == DfsmMessageType::Normal { + // Normal: [base: 16][count: 8][app_data: ...] + let payload = &data[16..]; + Self::deserialize_normal_message(subtype, protocol_version, timestamp, payload) + } else { + // State messages: [base: 16][epoch: 16][payload: ...] + if data.len() < 32 { + anyhow::bail!( + "State message too short: {} bytes (need at least 32 for state header)", + data.len() + ); + } + let sync_epoch = SyncEpoch::deserialize(&data[16..32]) + .map_err(|e| anyhow::anyhow!("Failed to deserialize sync epoch: {e}"))?; + let payload = &data[32..]; + Self::deserialize_state_message(dfsm_type, sync_epoch, payload) + } + } + + /// Deserialize a Normal message + fn deserialize_normal_message( + subtype: u16, + protocol_version: u32, + timestamp: u32, + payload: &[u8], + ) -> Result { + // Normal messages have count field (u64) after base header + if payload.len() < 8 { + anyhow::bail!("Normal message too short: need count field"); + } + let msg_count = u64::from_le_bytes(payload[0..8].try_into().unwrap()); + let app_data = &payload[8..]; + + // Deserialize using the Message trait + let message = M::deserialize(subtype, app_data)?; + + Ok(DfsmMessage::Normal { + msg_count, + timestamp, + protocol_version, + message, + }) + } + + /// Deserialize a state message (with epoch) + fn deserialize_state_message( + dfsm_type: DfsmMessageType, + sync_epoch: SyncEpoch, + payload: &[u8], + ) -> Result { + match dfsm_type { + DfsmMessageType::Normal => { + unreachable!("Normal messages use deserialize_normal_message") + } + DfsmMessageType::Update => { + let tree_entry = TreeEntry::deserialize_from_update(payload)?; + Ok(DfsmMessage::Update { + sync_epoch, + tree_entry, + }) + } + DfsmMessageType::SyncStart => Ok(DfsmMessage::SyncStart { sync_epoch }), + DfsmMessageType::State => Ok(DfsmMessage::State { + sync_epoch, + data: payload.to_vec(), + }), + DfsmMessageType::UpdateComplete => Ok(DfsmMessage::UpdateComplete { sync_epoch }), + DfsmMessageType::VerifyRequest => { + if payload.len() < 8 { + anyhow::bail!("VerifyRequest message too short"); + } + let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap()); + Ok(DfsmMessage::VerifyRequest { + sync_epoch, + csum_id, + }) + } + DfsmMessageType::Verify => { + if payload.len() < 40 { + anyhow::bail!("Verify message too short"); + } + let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap()); + let mut checksum = [0u8; 32]; + checksum.copy_from_slice(&payload[8..40]); + Ok(DfsmMessage::Verify { + sync_epoch, + csum_id, + checksum, + }) + } + } + } + + /// Helper to create a Normal message from an application message + pub fn from_message(msg_count: u64, message: M, protocol_version: u32) -> Self { + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as u32; + + DfsmMessage::Normal { + msg_count, + timestamp, + protocol_version, + message, + } + } + + /// Helper to create an Update message from a TreeEntry + /// + /// Used by the leader during synchronization to send individual database entries + /// to nodes that need to catch up. Matches C's dcdb_send_update_inode(). + pub fn from_tree_entry(tree_entry: TreeEntry, sync_epoch: SyncEpoch) -> Self { + DfsmMessage::Update { + sync_epoch, + tree_entry, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::FuseMessage; + + #[test] + fn test_sync_start_roundtrip() { + let sync_epoch = SyncEpoch { + epoch: 1, + time: 1234567890, + nodeid: 1, + pid: 1000, + }; + let msg: DfsmMessage = DfsmMessage::SyncStart { sync_epoch }; + let serialized = msg.serialize(); + let deserialized = DfsmMessage::::deserialize(&serialized).unwrap(); + + assert!( + matches!(deserialized, DfsmMessage::SyncStart { sync_epoch: e } if e == sync_epoch) + ); + } + + #[test] + fn test_normal_roundtrip() { + let fuse_msg = FuseMessage::Create { + path: "/test/file".to_string(), + }; + + let msg: DfsmMessage = DfsmMessage::Normal { + msg_count: 42, + timestamp: 1234567890, + protocol_version: DfsmMessage::::DEFAULT_PROTOCOL_VERSION, + message: fuse_msg.clone(), + }; + + let serialized = msg.serialize(); + let deserialized = DfsmMessage::::deserialize(&serialized).unwrap(); + + match deserialized { + DfsmMessage::Normal { + msg_count, + timestamp, + protocol_version, + message, + } => { + assert_eq!(msg_count, 42); + assert_eq!(timestamp, 1234567890); + assert_eq!( + protocol_version, + DfsmMessage::::DEFAULT_PROTOCOL_VERSION + ); + assert_eq!(message, fuse_msg); + } + _ => panic!("Wrong message type"), + } + } + + #[test] + fn test_verify_request_roundtrip() { + let sync_epoch = SyncEpoch { + epoch: 2, + time: 1234567891, + nodeid: 2, + pid: 2000, + }; + let msg: DfsmMessage = DfsmMessage::VerifyRequest { + sync_epoch, + csum_id: 0x123456789ABCDEF0, + }; + let serialized = msg.serialize(); + let deserialized = DfsmMessage::::deserialize(&serialized).unwrap(); + + match deserialized { + DfsmMessage::VerifyRequest { + sync_epoch: e, + csum_id, + } => { + assert_eq!(e, sync_epoch); + assert_eq!(csum_id, 0x123456789ABCDEF0); + } + _ => panic!("Wrong message type"), + } + } + + #[test] + fn test_verify_roundtrip() { + let sync_epoch = SyncEpoch { + epoch: 3, + time: 1234567892, + nodeid: 3, + pid: 3000, + }; + let checksum = [42u8; 32]; + let msg: DfsmMessage = DfsmMessage::Verify { + sync_epoch, + csum_id: 0x1122334455667788, + checksum, + }; + let serialized = msg.serialize(); + let deserialized = DfsmMessage::::deserialize(&serialized).unwrap(); + + match deserialized { + DfsmMessage::Verify { + sync_epoch: e, + csum_id, + checksum: recv_checksum, + } => { + assert_eq!(e, sync_epoch); + assert_eq!(csum_id, 0x1122334455667788); + assert_eq!(recv_checksum, checksum); + } + _ => panic!("Wrong message type"), + } + } + + #[test] + fn test_invalid_magic() { + let data = vec![0xAA, 0x00, 0x01, 0x02]; + assert!(DfsmMessage::::deserialize(&data).is_err()); + } + + #[test] + fn test_too_short() { + let data = vec![0xFF]; + assert!(DfsmMessage::::deserialize(&data).is_err()); + } + + // ===== Edge Case Tests ===== + + #[test] + fn test_state_message_too_short() { + // State messages need at least 32 bytes (16 base + 16 epoch) + let mut data = vec![0u8; 31]; // One byte short + // Set message type to State (2) + data[0..2].copy_from_slice(&2u16.to_le_bytes()); + + let result = DfsmMessage::::deserialize(&data); + assert!(result.is_err(), "State message with 31 bytes should fail"); + assert!(result.unwrap_err().to_string().contains("too short")); + } + + #[test] + fn test_normal_message_missing_count() { + // Normal messages need count field (u64) after 16-byte header + let mut data = vec![0u8; 20]; // Header + 4 bytes (not enough for u64 count) + // Set message type to Normal (0) + data[0..2].copy_from_slice(&0u16.to_le_bytes()); + + let result = DfsmMessage::::deserialize(&data); + assert!( + result.is_err(), + "Normal message without full count field should fail" + ); + } + + #[test] + fn test_verify_message_truncated_checksum() { + // Verify messages need csum_id (8 bytes) + checksum (32 bytes) = 40 bytes payload + let sync_epoch = SyncEpoch { + epoch: 1, + time: 123, + nodeid: 1, + pid: 100, + }; + let mut data = Vec::new(); + + // Base header (16 bytes) + data.extend_from_slice(&6u16.to_le_bytes()); // Verify message type + data.extend_from_slice(&0u16.to_le_bytes()); // subtype + data.extend_from_slice(&1u32.to_le_bytes()); // protocol + data.extend_from_slice(&123u32.to_le_bytes()); // time + data.extend_from_slice(&0u32.to_le_bytes()); // reserved + + // Epoch (16 bytes) + data.extend_from_slice(&sync_epoch.serialize()); + + // Truncated payload (only 39 bytes instead of 40) + data.extend_from_slice(&0x12345678u64.to_le_bytes()); + data.extend_from_slice(&[0u8; 31]); // Only 31 bytes of checksum + + let result = DfsmMessage::::deserialize(&data); + assert!( + result.is_err(), + "Verify message with truncated checksum should fail" + ); + } + + #[test] + fn test_update_message_with_tree_entry() { + use pmxcfs_memdb::TreeEntry; + + // Create a valid tree entry with matching size + let data = vec![1, 2, 3, 4, 5]; + let tree_entry = TreeEntry { + inode: 42, + parent: 0, + version: 1, + writer: 0, + name: "testfile".to_string(), + mtime: 1234567890, + size: data.len(), // size must match data.len() + entry_type: 8, // DT_REG (regular file) + data, + }; + + let sync_epoch = SyncEpoch { + epoch: 5, + time: 999, + nodeid: 2, + pid: 200, + }; + let msg: DfsmMessage = DfsmMessage::Update { + sync_epoch, + tree_entry: tree_entry.clone(), + }; + + let serialized = msg.serialize(); + let deserialized = DfsmMessage::::deserialize(&serialized).unwrap(); + + match deserialized { + DfsmMessage::Update { + sync_epoch: e, + tree_entry: recv_entry, + } => { + assert_eq!(e, sync_epoch); + assert_eq!(recv_entry.inode, tree_entry.inode); + assert_eq!(recv_entry.name, tree_entry.name); + assert_eq!(recv_entry.size, tree_entry.size); + } + _ => panic!("Wrong message type"), + } + } + + #[test] + fn test_update_complete_roundtrip() { + let sync_epoch = SyncEpoch { + epoch: 10, + time: 5555, + nodeid: 3, + pid: 300, + }; + let msg: DfsmMessage = DfsmMessage::UpdateComplete { sync_epoch }; + + let serialized = msg.serialize(); + assert_eq!( + serialized.len(), + 32, + "UpdateComplete should be exactly 32 bytes (header + epoch)" + ); + + let deserialized = DfsmMessage::::deserialize(&serialized).unwrap(); + + assert!( + matches!(deserialized, DfsmMessage::UpdateComplete { sync_epoch: e } if e == sync_epoch) + ); + } + + #[test] + fn test_state_message_with_large_payload() { + let sync_epoch = SyncEpoch { + epoch: 7, + time: 7777, + nodeid: 4, + pid: 400, + }; + // Create a large payload (1MB) + let large_data = vec![0xAB; 1024 * 1024]; + + let msg: DfsmMessage = DfsmMessage::State { + sync_epoch, + data: large_data.clone(), + }; + + let serialized = msg.serialize(); + // Should be 32 bytes header + 1MB data + assert_eq!(serialized.len(), 32 + 1024 * 1024); + + let deserialized = DfsmMessage::::deserialize(&serialized).unwrap(); + + match deserialized { + DfsmMessage::State { + sync_epoch: e, + data, + } => { + assert_eq!(e, sync_epoch); + assert_eq!(data.len(), large_data.len()); + assert_eq!(data, large_data); + } + _ => panic!("Wrong message type"), + } + } + + #[test] + fn test_message_type_detection() { + let sync_epoch = SyncEpoch { + epoch: 1, + time: 100, + nodeid: 1, + pid: 50, + }; + + let sync_start: DfsmMessage = DfsmMessage::SyncStart { sync_epoch }; + assert_eq!(sync_start.message_type(), DfsmMessageType::SyncStart); + + let state: DfsmMessage = DfsmMessage::State { + sync_epoch, + data: vec![1, 2, 3], + }; + assert_eq!(state.message_type(), DfsmMessageType::State); + + let update_complete: DfsmMessage = DfsmMessage::UpdateComplete { sync_epoch }; + assert_eq!( + update_complete.message_type(), + DfsmMessageType::UpdateComplete + ); + } + + #[test] + fn test_from_message_helper() { + let fuse_msg = FuseMessage::Mkdir { + path: "/new/dir".to_string(), + }; + let msg_count = 123; + let protocol_version = DfsmMessage::::DEFAULT_PROTOCOL_VERSION; + + let dfsm_msg = DfsmMessage::from_message(msg_count, fuse_msg.clone(), protocol_version); + + match dfsm_msg { + DfsmMessage::Normal { + msg_count: count, + timestamp: _, + protocol_version: pv, + message, + } => { + assert_eq!(count, msg_count); + assert_eq!(pv, protocol_version); + assert_eq!(message, fuse_msg); + } + _ => panic!("from_message should create Normal variant"), + } + } + + #[test] + fn test_verify_request_with_max_csum_id() { + let sync_epoch = SyncEpoch { + epoch: 99, + time: 9999, + nodeid: 5, + pid: 500, + }; + let max_csum_id = u64::MAX; // Test with maximum value + + let msg: DfsmMessage = DfsmMessage::VerifyRequest { + sync_epoch, + csum_id: max_csum_id, + }; + + let serialized = msg.serialize(); + let deserialized = DfsmMessage::::deserialize(&serialized).unwrap(); + + match deserialized { + DfsmMessage::VerifyRequest { + sync_epoch: e, + csum_id, + } => { + assert_eq!(e, sync_epoch); + assert_eq!(csum_id, max_csum_id); + } + _ => panic!("Wrong message type"), + } + } +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs new file mode 100644 index 000000000..4d639ea1f --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs @@ -0,0 +1,194 @@ +/// FUSE message types for cluster synchronization +/// +/// These are the high-level operations that get broadcast through the cluster +/// via the main database DFSM (pmxcfs_v1 CPG group). +use anyhow::{Context, Result}; + +use crate::message::Message; +use crate::wire_format::{CFuseMessage, CMessageType}; + +#[derive(Debug, Clone, PartialEq)] +pub enum FuseMessage { + /// Create a regular file + Create { path: String }, + /// Create a directory + Mkdir { path: String }, + /// Write data to a file + Write { + path: String, + offset: u64, + data: Vec, + }, + /// Delete a file or directory + Delete { path: String }, + /// Rename/move a file or directory + Rename { from: String, to: String }, + /// Update modification time + /// + /// Note: mtime is sent via offset field in CFuseMessage (C: dcdb.c:900) + /// + /// WARNING: mtime is limited to u32 (matching C implementation). + /// This will overflow in 2038 (Year 2038 problem). + /// Consider migrating to u64 timestamps in a future protocol version. + Mtime { path: String, mtime: u32 }, + /// Request unlock (not yet implemented) + UnlockRequest { path: String }, + /// Unlock (not yet implemented) + Unlock { path: String }, +} + +impl Message for FuseMessage { + fn message_type(&self) -> u16 { + match self { + FuseMessage::Create { .. } => CMessageType::Create as u16, + FuseMessage::Mkdir { .. } => CMessageType::Mkdir as u16, + FuseMessage::Write { .. } => CMessageType::Write as u16, + FuseMessage::Delete { .. } => CMessageType::Delete as u16, + FuseMessage::Rename { .. } => CMessageType::Rename as u16, + FuseMessage::Mtime { .. } => CMessageType::Mtime as u16, + FuseMessage::UnlockRequest { .. } => CMessageType::UnlockRequest as u16, + FuseMessage::Unlock { .. } => CMessageType::Unlock as u16, + } + } + + fn serialize(&self) -> Vec { + let c_msg = match self { + FuseMessage::Create { path } => CFuseMessage { + size: 0, + offset: 0, + flags: 0, + path: path.clone(), + to: None, + data: Vec::new(), + }, + FuseMessage::Mkdir { path } => CFuseMessage { + size: 0, + offset: 0, + flags: 0, + path: path.clone(), + to: None, + data: Vec::new(), + }, + FuseMessage::Write { path, offset, data } => CFuseMessage { + size: data.len() as u32, + offset: *offset as u32, + flags: 0, + path: path.clone(), + to: None, + data: data.clone(), + }, + FuseMessage::Delete { path } => CFuseMessage { + size: 0, + offset: 0, + flags: 0, + path: path.clone(), + to: None, + data: Vec::new(), + }, + FuseMessage::Rename { from, to } => CFuseMessage { + size: 0, + offset: 0, + flags: 0, + path: from.clone(), + to: Some(to.clone()), + data: Vec::new(), + }, + FuseMessage::Mtime { path, mtime } => CFuseMessage { + size: 0, + offset: *mtime, // mtime is sent via offset field (C: dcdb.c:900) + flags: 0, + path: path.clone(), + to: None, + data: Vec::new(), + }, + FuseMessage::UnlockRequest { path } => CFuseMessage { + size: 0, + offset: 0, + flags: 0, + path: path.clone(), + to: None, + data: Vec::new(), + }, + FuseMessage::Unlock { path } => CFuseMessage { + size: 0, + offset: 0, + flags: 0, + path: path.clone(), + to: None, + data: Vec::new(), + }, + }; + + c_msg.serialize() + } + + fn deserialize(message_type: u16, data: &[u8]) -> Result { + let c_msg = CFuseMessage::parse(data).context("Failed to parse C FUSE message")?; + let msg_type = CMessageType::try_from(message_type).context("Invalid C message type")?; + + Ok(match msg_type { + CMessageType::Create => FuseMessage::Create { path: c_msg.path }, + CMessageType::Mkdir => FuseMessage::Mkdir { path: c_msg.path }, + CMessageType::Write => FuseMessage::Write { + path: c_msg.path, + offset: c_msg.offset as u64, + data: c_msg.data, + }, + CMessageType::Delete => FuseMessage::Delete { path: c_msg.path }, + CMessageType::Rename => FuseMessage::Rename { + from: c_msg.path, + to: c_msg.to.unwrap_or_default(), + }, + CMessageType::Mtime => FuseMessage::Mtime { + path: c_msg.path, + mtime: c_msg.offset, // mtime is sent via offset field (C: dcdb.c:900) + }, + CMessageType::UnlockRequest => FuseMessage::UnlockRequest { path: c_msg.path }, + CMessageType::Unlock => FuseMessage::Unlock { path: c_msg.path }, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fuse_message_create() { + let msg = FuseMessage::Create { + path: "/test/file".to_string(), + }; + assert_eq!(msg.message_type(), CMessageType::Create as u16); + + let serialized = msg.serialize(); + let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap(); + assert_eq!(msg, deserialized); + } + + #[test] + fn test_fuse_message_write() { + let msg = FuseMessage::Write { + path: "/test/file".to_string(), + offset: 100, + data: vec![1, 2, 3, 4, 5], + }; + assert_eq!(msg.message_type(), CMessageType::Write as u16); + + let serialized = msg.serialize(); + let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap(); + assert_eq!(msg, deserialized); + } + + #[test] + fn test_fuse_message_rename() { + let msg = FuseMessage::Rename { + from: "/old/path".to_string(), + to: "/new/path".to_string(), + }; + assert_eq!(msg.message_type(), CMessageType::Rename as u16); + + let serialized = msg.serialize(); + let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap(); + assert_eq!(msg, deserialized); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs new file mode 100644 index 000000000..79196eca7 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs @@ -0,0 +1,387 @@ +/// KvStore message types for DFSM status synchronization +/// +/// This module defines the KvStore message types that are delivered through +/// the status DFSM state machine (pve_kvstore_v1 CPG group). +use anyhow::Context; + +use crate::message::Message; + +/// Key size in kvstore update messages (matches C's hardcoded 256 byte buffer) +const KVSTORE_KEY_SIZE: usize = 256; + +/// Wire format header for clog_entry_t messages +/// +/// This represents only the portion that's actually transmitted over the network. +/// The full C struct includes prev/next/uid/digests/pid fields, but those are only +/// used in the in-memory representation and not sent on the wire. +#[repr(C)] +#[derive(Copy, Clone)] +struct ClogEntryHeader { + time: u32, + priority: u8, + padding: [u8; 3], + node_len: u32, + ident_len: u32, + tag_len: u32, + msg_len: u32, +} + +impl ClogEntryHeader { + /// Convert the header to bytes for serialization + /// + /// SAFETY: This is safe because: + /// 1. ClogEntryHeader is #[repr(C)] with explicit layout + /// 2. All fields are plain data types (u32, u8, [u8; 3]) + /// 3. No padding issues - padding is explicit in the struct + /// 4. Target platform (x86_64) is little-endian, matching wire format + fn to_bytes(&self) -> [u8; std::mem::size_of::()] { + unsafe { std::mem::transmute(*self) } + } + + /// Parse header from bytes + /// + /// SAFETY: This is safe because: + /// 1. We validate the input size before transmuting + /// 2. ClogEntryHeader is #[repr(C)] with well-defined layout + /// 3. All bit patterns are valid for the struct's field types + /// 4. Source platform (x86_64) is little-endian, matching wire format + fn from_bytes(data: &[u8]) -> anyhow::Result { + if data.len() < std::mem::size_of::() { + anyhow::bail!("LOG message too short: {} < {}", data.len(), std::mem::size_of::()); + } + + let header_bytes: [u8; std::mem::size_of::()] = data[..std::mem::size_of::()] + .try_into() + .expect("slice length verified above"); + + Ok(unsafe { std::mem::transmute(header_bytes) }) + } +} + + +/// KvStore message type IDs (matches C's kvstore_message_t enum) +#[derive( + Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive, +)] +#[repr(u16)] +enum KvStoreMessageType { + Update = 1, // KVSTORE_MESSAGE_UPDATE + UpdateComplete = 2, // KVSTORE_MESSAGE_UPDATE_COMPLETE + Log = 3, // KVSTORE_MESSAGE_LOG +} + +/// KvStore message types for ephemeral status synchronization +/// +/// These messages are used by the kvstore DFSM (pve_kvstore_v1 CPG group) +/// to synchronize ephemeral data like RRD metrics, node IPs, and cluster logs. +/// +/// Matches C implementation's KVSTORE_MESSAGE_* types in status.c +#[derive(Debug, Clone, PartialEq)] +pub enum KvStoreMessage { + /// Update key-value data from a node + /// + /// Wire format: key (256 bytes, null-terminated) + value (variable length) + /// Matches C's KVSTORE_MESSAGE_UPDATE + Update { key: String, value: Vec }, + + /// Cluster log entry + /// + /// Wire format: clog_entry_t struct + /// Matches C's KVSTORE_MESSAGE_LOG + Log { + time: u32, + priority: u8, + node: String, + ident: String, + tag: String, + message: String, + }, + + /// Update complete signal (not currently used) + /// + /// Matches C's KVSTORE_MESSAGE_UPDATE_COMPLETE + UpdateComplete, +} + +impl KvStoreMessage { + /// Get message type ID (matches C's kvstore_message_t enum) + pub fn message_type(&self) -> u16 { + let msg_type = match self { + KvStoreMessage::Update { .. } => KvStoreMessageType::Update, + KvStoreMessage::UpdateComplete => KvStoreMessageType::UpdateComplete, + KvStoreMessage::Log { .. } => KvStoreMessageType::Log, + }; + msg_type.into() + } + + /// Serialize to C-compatible wire format + /// + /// Update format: key (256 bytes, null-terminated) + value (variable) + /// Log format: clog_entry_t struct + pub fn serialize(&self) -> Vec { + match self { + KvStoreMessage::Update { key, value } => { + // C format: char key[KVSTORE_KEY_SIZE] + data + let mut buf = vec![0u8; KVSTORE_KEY_SIZE]; + let key_bytes = key.as_bytes(); + let copy_len = key_bytes.len().min(KVSTORE_KEY_SIZE - 1); // Leave room for null terminator + buf[..copy_len].copy_from_slice(&key_bytes[..copy_len]); + // buf is already zero-filled, so null terminator is automatic + + buf.extend_from_slice(value); + buf + } + KvStoreMessage::Log { + time, + priority, + node, + ident, + tag, + message, + } => { + let node_bytes = node.as_bytes(); + let ident_bytes = ident.as_bytes(); + let tag_bytes = tag.as_bytes(); + let msg_bytes = message.as_bytes(); + + let node_len = (node_bytes.len() + 1) as u32; // +1 for null + let ident_len = (ident_bytes.len() + 1) as u32; + let tag_len = (tag_bytes.len() + 1) as u32; + let msg_len = (msg_bytes.len() + 1) as u32; + + let total_len = std::mem::size_of::() as u32 + node_len + ident_len + tag_len + msg_len; + let mut buf = Vec::with_capacity(total_len as usize); + + // Serialize header using the struct + let header = ClogEntryHeader { + time: *time, + priority: *priority, + padding: [0u8; 3], + node_len, + ident_len, + tag_len, + msg_len, + }; + buf.extend_from_slice(&header.to_bytes()); + + // Append string data (all null-terminated) + buf.extend_from_slice(node_bytes); + buf.push(0); // null terminator + buf.extend_from_slice(ident_bytes); + buf.push(0); + buf.extend_from_slice(tag_bytes); + buf.push(0); + buf.extend_from_slice(msg_bytes); + buf.push(0); + + buf + } + KvStoreMessage::UpdateComplete => { + // No payload + Vec::new() + } + } + } + + /// Deserialize from C-compatible wire format + pub fn deserialize(msg_type: u16, data: &[u8]) -> anyhow::Result { + use KvStoreMessageType::*; + + let msg_type = KvStoreMessageType::try_from(msg_type) + .map_err(|_| anyhow::anyhow!("Unknown kvstore message type: {msg_type}"))?; + + match msg_type { + Update => { + if data.len() < KVSTORE_KEY_SIZE { + anyhow::bail!("UPDATE message too short: {} < {}", data.len(), KVSTORE_KEY_SIZE); + } + + // Find null terminator in first KVSTORE_KEY_SIZE bytes + let key_end = data[..KVSTORE_KEY_SIZE] + .iter() + .position(|&b| b == 0) + .ok_or_else(|| anyhow::anyhow!("UPDATE key not null-terminated"))?; + + let key = std::str::from_utf8(&data[..key_end]) + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in UPDATE key: {e}"))? + .to_string(); + + let value = data[KVSTORE_KEY_SIZE..].to_vec(); + + Ok(KvStoreMessage::Update { key, value }) + } + UpdateComplete => Ok(KvStoreMessage::UpdateComplete), + Log => { + // Parse header using the struct + let header = ClogEntryHeader::from_bytes(data)?; + + let node_len = header.node_len as usize; + let ident_len = header.ident_len as usize; + let tag_len = header.tag_len as usize; + let msg_len = header.msg_len as usize; + + // Validate individual field lengths are non-zero (strings must have at least null terminator) + if node_len == 0 || ident_len == 0 || tag_len == 0 || msg_len == 0 { + anyhow::bail!("LOG message contains zero-length field"); + } + + // Check for integer overflow in total size calculation + let expected_len = std::mem::size_of::() + .checked_add(node_len) + .and_then(|s| s.checked_add(ident_len)) + .and_then(|s| s.checked_add(tag_len)) + .and_then(|s| s.checked_add(msg_len)) + .ok_or_else(|| anyhow::anyhow!("LOG message size overflow"))?; + + if data.len() != expected_len { + anyhow::bail!( + "LOG message size mismatch: {} != {}", + data.len(), + expected_len + ); + } + + let mut offset = std::mem::size_of::(); + + // Safe string extraction with bounds checking + let extract_string = |offset: &mut usize, len: usize| -> Result { + let end = offset.checked_add(len) + .ok_or_else(|| anyhow::anyhow!("String offset overflow"))?; + + if end > data.len() { + return Err(anyhow::anyhow!("String exceeds buffer")); + } + + // len includes null terminator, so read len-1 bytes + let s = std::str::from_utf8(&data[*offset..end - 1]) + .map_err(|e| anyhow::anyhow!("Invalid UTF-8: {e}"))? + .to_string(); + + *offset = end; + Ok(s) + }; + + let node = extract_string(&mut offset, node_len)?; + let ident = extract_string(&mut offset, ident_len)?; + let tag = extract_string(&mut offset, tag_len)?; + let message = extract_string(&mut offset, msg_len)?; + + Ok(KvStoreMessage::Log { + time: header.time, + priority: header.priority, + node, + ident, + tag, + message, + }) + } + } + } +} + +impl Message for KvStoreMessage { + fn message_type(&self) -> u16 { + // Delegate to the existing method + KvStoreMessage::message_type(self) + } + + fn serialize(&self) -> Vec { + // Delegate to the existing method + KvStoreMessage::serialize(self) + } + + fn deserialize(message_type: u16, data: &[u8]) -> anyhow::Result { + // Delegate to the existing method + KvStoreMessage::deserialize(message_type, data) + .context("Failed to deserialize KvStoreMessage") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kvstore_message_update_serialization() { + let msg = KvStoreMessage::Update { + key: "test_key".to_string(), + value: vec![1, 2, 3, 4, 5], + }; + + let serialized = msg.serialize(); + assert_eq!(serialized.len(), KVSTORE_KEY_SIZE + 5); + assert_eq!(&serialized[..8], b"test_key"); + assert_eq!(serialized[8], 0); // null terminator + assert_eq!(&serialized[KVSTORE_KEY_SIZE..], &[1, 2, 3, 4, 5]); + + let deserialized = KvStoreMessage::deserialize(1, &serialized).unwrap(); + assert_eq!(msg, deserialized); + } + + #[test] + fn test_kvstore_message_log_serialization() { + let msg = KvStoreMessage::Log { + time: 1234567890, + priority: 5, + node: "node1".to_string(), + ident: "pmxcfs".to_string(), + tag: "info".to_string(), + message: "test message".to_string(), + }; + + let serialized = msg.serialize(); + let deserialized = KvStoreMessage::deserialize(3, &serialized).unwrap(); + assert_eq!(msg, deserialized); + } + + #[test] + fn test_kvstore_message_type() { + assert_eq!( + KvStoreMessage::Update { + key: "".into(), + value: vec![] + } + .message_type(), + 1 + ); + assert_eq!(KvStoreMessage::UpdateComplete.message_type(), 2); + assert_eq!( + KvStoreMessage::Log { + time: 0, + priority: 0, + node: "".into(), + ident: "".into(), + tag: "".into(), + message: "".into() + } + .message_type(), + 3 + ); + } + + #[test] + fn test_kvstore_message_type_roundtrip() { + // Test that message_type() and deserialize() are consistent + use super::KvStoreMessageType; + + assert_eq!(u16::from(KvStoreMessageType::Update), 1); + assert_eq!(u16::from(KvStoreMessageType::UpdateComplete), 2); + assert_eq!(u16::from(KvStoreMessageType::Log), 3); + + assert_eq!( + KvStoreMessageType::try_from(1).unwrap(), + KvStoreMessageType::Update + ); + assert_eq!( + KvStoreMessageType::try_from(2).unwrap(), + KvStoreMessageType::UpdateComplete + ); + assert_eq!( + KvStoreMessageType::try_from(3).unwrap(), + KvStoreMessageType::Log + ); + + assert!(KvStoreMessageType::try_from(0).is_err()); + assert!(KvStoreMessageType::try_from(4).is_err()); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs new file mode 100644 index 000000000..892404833 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs @@ -0,0 +1,32 @@ +/// Distributed Finite State Machine (DFSM) for cluster state synchronization +/// +/// This crate implements the state machine for synchronizing configuration +/// changes across the cluster nodes using Corosync CPG. +/// +/// The DFSM handles: +/// - State synchronization between nodes +/// - Message ordering and queuing +/// - Leader-based state updates +/// - Split-brain prevention +/// - Membership change handling +mod callbacks; +pub mod cluster_database_service; +mod cpg_service; +mod dfsm_message; +mod fuse_message; +mod kv_store_message; +mod message; +mod state_machine; +pub mod status_sync_service; +mod types; +mod wire_format; + +// Re-export public API +pub use callbacks::Callbacks; +pub use cluster_database_service::ClusterDatabaseService; +pub use cpg_service::{CpgHandler, CpgService}; +pub use fuse_message::FuseMessage; +pub use kv_store_message::KvStoreMessage; +pub use state_machine::{Dfsm, DfsmBroadcast}; +pub use status_sync_service::StatusSyncService; +pub use types::NodeSyncInfo; diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs new file mode 100644 index 000000000..a2401d030 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs @@ -0,0 +1,21 @@ +/// High-level message abstraction for DFSM +/// +/// This module provides a Message trait for working with cluster messages +/// at a higher abstraction level than raw bytes. +use anyhow::Result; + +/// Trait for messages that can be sent through DFSM +pub trait Message: Clone + std::fmt::Debug + Send + Sync + Sized + 'static { + /// Get the message type identifier + fn message_type(&self) -> u16; + + /// Serialize the message to bytes (application message payload only) + /// + /// This serializes only the application-level payload. The DFSM protocol + /// headers (msg_count, timestamp, protocol_version, etc.) are added by + /// DfsmMessage::serialize() when wrapping in DfsmMessage::Normal. + fn serialize(&self) -> Vec; + + /// Deserialize from bytes given a message type + fn deserialize(message_type: u16, data: &[u8]) -> Result; +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs new file mode 100644 index 000000000..d4eecc690 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs @@ -0,0 +1,1251 @@ +/// DFSM state machine implementation +/// +/// This module contains the main Dfsm struct and its implementation +/// for managing distributed state synchronization. +use anyhow::{Context, Result}; +use parking_lot::{Mutex as ParkingMutex, RwLock}; +use pmxcfs_api_types::MemberInfo; +use rust_corosync::{NodeId, cpg}; +use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::oneshot; + +use super::cpg_service::{CpgHandler, CpgService}; +use super::dfsm_message::DfsmMessage; +use super::message::Message; +use super::types::{DfsmMode, QueuedMessage, SyncEpoch}; +use crate::{Callbacks, NodeSyncInfo}; + +/// Maximum queue length to prevent memory exhaustion +/// C implementation uses unbounded GSequence/GList, but we add a limit for safety +/// This value should be tuned based on production workload +const MAX_QUEUE_LEN: usize = 500; + +/// Result of a synchronous message send +/// Matches C's dfsm_result_t structure +#[derive(Debug, Clone)] +pub struct MessageResult { + /// Message count for tracking + pub msgcount: u64, + /// Result code from deliver callback (0 = success, negative = errno) + pub result: i32, + /// Whether the message was processed successfully + pub processed: bool, +} + +/// Extension trait to add broadcast() method to Option>> +/// +/// This allows calling `.broadcast()` directly on Option>> fields +/// without explicit None checking at call sites. +pub trait DfsmBroadcast { + fn broadcast(&self, msg: M); +} + +impl DfsmBroadcast for Option>> { + fn broadcast(&self, msg: M) { + if let Some(dfsm) = self { + let _ = dfsm.broadcast(msg); + } + } +} + +/// DFSM state machine +/// +/// The generic parameter `M` specifies the message type this DFSM handles: +/// - `Dfsm` for main database operations +/// - `Dfsm` for status synchronization +pub struct Dfsm { + /// CPG service for cluster communication (matching C's dfsm_t->cpg_handle) + cpg_service: RwLock>>, + + /// Cluster group name for CPG + cluster_name: String, + + /// Callbacks for application integration + callbacks: Arc>, + + /// Current operating mode + mode: RwLock, + + /// Current sync epoch + sync_epoch: RwLock, + + /// Local epoch counter + local_epoch_counter: ParkingMutex, + + /// Node synchronization info + sync_nodes: RwLock>, + + /// Message queue (ordered by count) + msg_queue: ParkingMutex>>, + + /// Sync queue for messages during update mode + sync_queue: ParkingMutex>>, + + /// Message counter for ordering (atomic for lock-free increment) + msg_counter: AtomicU64, + + /// Lowest node ID in cluster (leader) + lowest_nodeid: RwLock, + + /// Our node ID (set during init_cpg via cpg_local_get) + nodeid: AtomicU32, + + /// Our process ID + pid: u32, + + /// Protocol version for cluster compatibility + protocol_version: u32, + + /// State verification - SHA-256 checksum + checksum: ParkingMutex<[u8; 32]>, + + /// Checksum epoch (when it was computed) + checksum_epoch: ParkingMutex, + + /// Checksum ID for verification + checksum_id: ParkingMutex, + + /// Checksum counter for verify requests + checksum_counter: ParkingMutex, + + /// Message count received (for synchronous send tracking) + /// Matches C's dfsm->msgcount_rcvd + msgcount_rcvd: AtomicU64, + + /// Pending message results for synchronous sends + /// Matches C's dfsm->results (GHashTable) + /// Maps msgcount -> oneshot sender for result delivery + /// Uses tokio oneshot channels - the idiomatic pattern for one-time async notifications + message_results: ParkingMutex>>, +} + +impl Dfsm { + /// Create a new DFSM instance + /// + /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg() + pub fn new(cluster_name: String, callbacks: Arc>) -> Result { + Self::new_with_protocol_version(cluster_name, callbacks, DfsmMessage::::DEFAULT_PROTOCOL_VERSION) + } + + /// Create a new DFSM instance with a specific protocol version + /// + /// This is used when the DFSM needs to use a non-default protocol version, + /// such as the status/kvstore DFSM which uses protocol version 0 for + /// compatibility with the C implementation. + /// + /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg() + pub fn new_with_protocol_version( + cluster_name: String, + callbacks: Arc>, + protocol_version: u32, + ) -> Result { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as u32; + let pid = std::process::id(); + + Ok(Self { + cpg_service: RwLock::new(None), + cluster_name, + callbacks, + mode: RwLock::new(DfsmMode::Start), + sync_epoch: RwLock::new(SyncEpoch { + epoch: 0, + time: now, + nodeid: 0, + pid, + }), + local_epoch_counter: ParkingMutex::new(0), + sync_nodes: RwLock::new(Vec::new()), + msg_queue: ParkingMutex::new(BTreeMap::new()), + sync_queue: ParkingMutex::new(VecDeque::new()), + msg_counter: AtomicU64::new(0), + lowest_nodeid: RwLock::new(0), + nodeid: AtomicU32::new(0), // Will be set by init_cpg() using cpg_local_get() + pid, + protocol_version, + checksum: ParkingMutex::new([0u8; 32]), + checksum_epoch: ParkingMutex::new(SyncEpoch { + epoch: 0, + time: 0, + nodeid: 0, + pid: 0, + }), + checksum_id: ParkingMutex::new(0), + checksum_counter: ParkingMutex::new(0), + msgcount_rcvd: AtomicU64::new(0), + message_results: ParkingMutex::new(HashMap::new()), + }) + } + + pub fn get_mode(&self) -> DfsmMode { + *self.mode.read() + } + + pub fn set_mode(&self, new_mode: DfsmMode) { + let mut mode = self.mode.write(); + let old_mode = *mode; + + // Match C's dfsm_set_mode logic (dfsm.c:450-456): + // Allow transition if: + // 1. new_mode < DFSM_ERROR_MODE_START (normal modes), OR + // 2. (old_mode < DFSM_ERROR_MODE_START OR new_mode >= old_mode) + // - If already in error mode, only allow transitions to higher error codes + if old_mode != new_mode { + let allow_transition = !new_mode.is_error() || + (!old_mode.is_error() || new_mode >= old_mode); + + if !allow_transition { + tracing::debug!( + "DFSM: blocking transition from {:?} to {:?} (error mode can only go to higher codes)", + old_mode, new_mode + ); + return; + } + } else { + // No-op transition + return; + } + + *mode = new_mode; + drop(mode); + + if new_mode.is_error() { + tracing::error!("DFSM: {}", new_mode); + } else { + tracing::info!("DFSM: {}", new_mode); + } + } + + pub fn is_leader(&self) -> bool { + let lowest = *self.lowest_nodeid.read(); + lowest > 0 && lowest == self.nodeid.load(Ordering::Relaxed) + } + + pub fn get_nodeid(&self) -> u32 { + self.nodeid.load(Ordering::Relaxed) + } + + pub fn get_pid(&self) -> u32 { + self.pid + } + + /// Check if DFSM is synced and ready + pub fn is_synced(&self) -> bool { + self.get_mode() == DfsmMode::Synced + } + + /// Check if DFSM encountered an error + pub fn is_error(&self) -> bool { + self.get_mode().is_error() + } +} + +impl Dfsm { + fn send_sync_start(&self) -> Result<()> { + tracing::debug!("DFSM: sending SYNC_START message"); + let sync_epoch = *self.sync_epoch.read(); + self.send_dfsm_message(&DfsmMessage::::SyncStart { sync_epoch }) + } + + fn send_state(&self) -> Result<()> { + tracing::debug!("DFSM: generating and sending state"); + + let state_data = self + .callbacks + .get_state() + .context("Failed to get state from callbacks")?; + + tracing::info!("DFSM: sending state ({} bytes)", state_data.len()); + + let sync_epoch = *self.sync_epoch.read(); + let dfsm_msg: DfsmMessage = DfsmMessage::State { + sync_epoch, + data: state_data, + }; + self.send_dfsm_message(&dfsm_msg)?; + + Ok(()) + } + + pub(super) fn send_dfsm_message(&self, message: &DfsmMessage) -> Result<()> { + let serialized = message.serialize(); + + if let Some(ref service) = *self.cpg_service.read() { + service + .mcast(cpg::Guarantee::TypeAgreed, &serialized) + .context("Failed to broadcast DFSM message")?; + Ok(()) + } else { + anyhow::bail!("CPG not initialized") + } + } + + pub fn process_state(&self, nodeid: u32, pid: u32, state: &[u8]) -> Result<()> { + tracing::debug!( + "DFSM: processing state from node {}/{} ({} bytes)", + nodeid, + pid, + state.len() + ); + + let mut sync_nodes = self.sync_nodes.write(); + + // Find node in sync_nodes + let node_info = sync_nodes + .iter_mut() + .find(|n| n.node_id == nodeid && n.pid == pid); + + let node_info = match node_info { + Some(ni) => ni, + None => { + // Non-member sent state - immediate LEAVE (matches C: dfsm.c:823-828) + tracing::error!( + "DFSM: received state from non-member {}/{} - entering LEAVE mode", + nodeid, pid + ); + drop(sync_nodes); + self.set_mode(DfsmMode::Leave); + return Err(anyhow::anyhow!("State from non-member")); + } + }; + + // Check for duplicate state (matches C: dfsm.c:830-835) + if node_info.state.is_some() { + tracing::error!( + "DFSM: received duplicate state from member {}/{} - entering LEAVE mode", + nodeid, pid + ); + drop(sync_nodes); + self.set_mode(DfsmMode::Leave); + return Err(anyhow::anyhow!("Duplicate state from member")); + } + + // Store state + node_info.state = Some(state.to_vec()); + + let all_received = sync_nodes.iter().all(|n| n.state.is_some()); + drop(sync_nodes); + + if all_received { + tracing::info!("DFSM: received all states, processing synchronization"); + self.process_state_sync()?; + } + + Ok(()) + } + + fn process_state_sync(&self) -> Result<()> { + tracing::info!("DFSM: processing state synchronization"); + + let sync_nodes = self.sync_nodes.read().clone(); + + match self.callbacks.process_state_update(&sync_nodes) { + Ok(synced) => { + if synced { + tracing::info!("DFSM: state synchronization successful"); + + let my_nodeid = self.nodeid.load(Ordering::Relaxed); + let mut sync_nodes_write = self.sync_nodes.write(); + if let Some(node) = sync_nodes_write + .iter_mut() + .find(|n| n.node_id == my_nodeid && n.pid == self.pid) + { + node.synced = true; + } + drop(sync_nodes_write); + + self.set_mode(DfsmMode::Synced); + self.callbacks.on_synced(); + self.deliver_message_queue()?; + } else { + tracing::info!("DFSM: entering UPDATE mode, waiting for leader"); + self.set_mode(DfsmMode::Update); + self.deliver_message_queue()?; + } + } + Err(e) => { + tracing::error!("DFSM: state synchronization failed: {}", e); + self.set_mode(DfsmMode::Error); + return Err(e); + } + } + + Ok(()) + } + + pub fn queue_message(&self, nodeid: u32, pid: u32, msg_count: u64, message: M, timestamp: u64) + where + M: Clone, + { + tracing::debug!( + "DFSM: queueing message {} from {}/{}", + msg_count, + nodeid, + pid + ); + + let qm = QueuedMessage { + nodeid, + pid, + _msg_count: msg_count, + message, + timestamp, + }; + + // Hold mode read lock during queueing decision to prevent TOCTOU race + // This ensures mode cannot change between check and queue selection + let mode_guard = self.mode.read(); + let mode = *mode_guard; + + let node_synced = self + .sync_nodes + .read() + .iter() + .find(|n| n.node_id == nodeid && n.pid == pid) + .map(|n| n.synced) + .unwrap_or(false); + + if mode == DfsmMode::Update && node_synced { + let mut sync_queue = self.sync_queue.lock(); + + // Check sync queue size limit + // Queues use a bounded size (MAX_QUEUE_LEN=500) to prevent memory exhaustion + // from slow or stuck nodes. When full, oldest messages are dropped. + // This matches distributed system semantics where old updates can be superseded. + // + // Monitoring: Track queue depth via metrics/logs to detect congestion: + // - Sustained high queue depth indicates slow message processing + // - Frequent drops indicate network partitions or overload + if sync_queue.len() >= MAX_QUEUE_LEN { + tracing::warn!( + "DFSM: sync queue full ({} messages), dropping oldest - possible network congestion or slow node", + sync_queue.len() + ); + sync_queue.pop_front(); + } + + sync_queue.push_back(qm); + } else { + let mut msg_queue = self.msg_queue.lock(); + + // Check message queue size limit (same rationale as sync queue) + if msg_queue.len() >= MAX_QUEUE_LEN { + tracing::warn!( + "DFSM: message queue full ({} messages), dropping oldest - possible network congestion or slow node", + msg_queue.len() + ); + // Drop oldest message (lowest count) + if let Some((&oldest_count, _)) = msg_queue.iter().next() { + msg_queue.remove(&oldest_count); + } + } + + msg_queue.insert(msg_count, qm); + } + + // Release mode lock after queueing decision completes + drop(mode_guard); + } + + pub(super) fn deliver_message_queue(&self) -> Result<()> + where + M: Clone, + { + let mut queue = self.msg_queue.lock(); + if queue.is_empty() { + return Ok(()); + } + + tracing::info!("DFSM: delivering {} queued messages", queue.len()); + + // Hold mode lock during iteration to prevent mode changes mid-delivery + let mode_guard = self.mode.read(); + let mode = *mode_guard; + let sync_nodes = self.sync_nodes.read().clone(); + + let mut to_remove = Vec::new(); + let mut to_sync_queue = Vec::new(); + + for (count, qm) in queue.iter() { + let node_info = sync_nodes + .iter() + .find(|n| n.node_id == qm.nodeid && n.pid == qm.pid); + + let Some(info) = node_info else { + tracing::debug!( + "DFSM: removing message from non-member {}/{}", + qm.nodeid, + qm.pid + ); + to_remove.push(*count); + continue; + }; + + if mode == DfsmMode::Synced && info.synced { + tracing::debug!("DFSM: delivering message {}", count); + + match self.callbacks.deliver_message( + qm.nodeid, + qm.pid, + qm.message.clone(), + qm.timestamp, + ) { + Ok((result, processed)) => { + tracing::debug!( + "DFSM: message delivered, result={}, processed={}", + result, + processed + ); + // Record result for synchronous sends + self.record_message_result(*count, result, processed); + } + Err(e) => { + tracing::error!("DFSM: failed to deliver message: {}", e); + // Record error result + self.record_message_result(*count, -libc::EIO, false); + } + } + + to_remove.push(*count); + } else if mode == DfsmMode::Update && info.synced { + // Collect messages to move instead of acquiring sync_queue lock + // while holding msg_queue lock to prevent deadlock + to_sync_queue.push(qm.clone()); + to_remove.push(*count); + } + } + + // Remove processed messages from queue + for count in to_remove { + queue.remove(&count); + } + + // Release locks before acquiring sync_queue to prevent deadlock + drop(mode_guard); + drop(queue); + + // Now move messages to sync_queue without holding msg_queue + if !to_sync_queue.is_empty() { + let mut sync_queue = self.sync_queue.lock(); + for qm in to_sync_queue { + sync_queue.push_back(qm); + } + } + + Ok(()) + } + + pub(super) fn deliver_sync_queue(&self) -> Result<()> { + let mut sync_queue = self.sync_queue.lock(); + let queue_len = sync_queue.len(); + + if queue_len == 0 { + return Ok(()); + } + + tracing::info!("DFSM: delivering {} sync queue messages", queue_len); + + while let Some(qm) = sync_queue.pop_front() { + tracing::debug!( + "DFSM: delivering sync message from {}/{}", + qm.nodeid, + qm.pid + ); + + match self + .callbacks + .deliver_message(qm.nodeid, qm.pid, qm.message, qm.timestamp) + { + Ok((result, processed)) => { + tracing::debug!( + "DFSM: sync message delivered, result={}, processed={}", + result, + processed + ); + // Record result for synchronous sends + self.record_message_result(qm._msg_count, result, processed); + } + Err(e) => { + tracing::error!("DFSM: failed to deliver sync message: {}", e); + // Record error result + self.record_message_result(qm._msg_count, -libc::EIO, false); + } + } + } + + Ok(()) + } + + /// Send a message to the cluster + /// + /// Creates a properly formatted Normal message with C-compatible headers. + pub fn send_message(&self, message: M) -> Result { + let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1; + + tracing::debug!("DFSM: sending message {}", msg_count); + + let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version); + + self.send_dfsm_message(&dfsm_msg)?; + + Ok(msg_count) + } + + /// Send a message to the cluster and wait for delivery result + /// + /// This is the async equivalent of send_message(), matching C's dfsm_send_message_sync(). + /// It broadcasts the message via CPG and waits for it to be delivered to the local node, + /// returning the result from the deliver callback. + /// + /// Uses tokio oneshot channels - the idiomatic pattern for one-time async result delivery. + /// This avoids any locking or notification complexity. + /// + /// # Cancellation Safety + /// If this future is dropped before completion, the cleanup guard ensures the HashMap + /// entry is removed, preventing memory leaks. + /// + /// # Arguments + /// * `message` - The message to send + /// * `timeout` - Maximum time to wait for delivery (typically 10 seconds) + /// + /// # Returns + /// * `Ok(MessageResult)` - The result from the local deliver callback + /// - Caller should check `result.result < 0` for errno-based errors + /// * `Err(_)` - If send failed, timeout occurred, or channel closed unexpectedly + pub async fn send_message_sync(&self, message: M, timeout: Duration) -> Result { + let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1; + + tracing::debug!("DFSM: sending synchronous message {}", msg_count); + + // Create oneshot channel for result delivery (tokio best practice) + let (tx, rx) = oneshot::channel(); + + // Register the sender before broadcasting + self.message_results.lock().insert(msg_count, tx); + + // RAII guard ensures cleanup on timeout, send error, or cancellation + // (record_message_result also removes, so double-remove is harmless) + struct CleanupGuard<'a> { + msg_count: u64, + results: &'a ParkingMutex>>, + } + impl Drop for CleanupGuard<'_> { + fn drop(&mut self) { + self.results.lock().remove(&self.msg_count); + } + } + let _guard = CleanupGuard { + msg_count, + results: &self.message_results, + }; + + // Send the message + let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version); + self.send_dfsm_message(&dfsm_msg)?; + + // Wait for delivery with timeout (clean tokio pattern) + match tokio::time::timeout(timeout, rx).await { + Ok(Ok(result)) => { + // Got result successfully - return it to caller + // Caller should check result.result < 0 for errno-based errors + Ok(result) + } + Ok(Err(_)) => { + // Channel closed without sending - shouldn't happen + anyhow::bail!("DFSM: message {} sender dropped", msg_count); + } + Err(_) => { + // Timeout - guard will clean up + anyhow::bail!("DFSM: message {} timed out after {:?}", msg_count, timeout); + } + } + // On cancellation (future dropped), guard cleans up automatically + } + + /// Record the result of a delivered message (for synchronous sends) + /// + /// Called from deliver_message_queue() when a message is delivered. + /// Matches C's dfsm_record_local_result(). + /// + /// Uses tokio oneshot channel to send result - clean, non-blocking, and can't fail. + fn record_message_result(&self, msg_count: u64, result: i32, processed: bool) { + tracing::debug!( + "DFSM: recording result for message {}: result={}, processed={}", + msg_count, + result, + processed + ); + + // Update msgcount_rcvd + self.msgcount_rcvd.store(msg_count, Ordering::SeqCst); + + // Send result via oneshot channel if someone is waiting + let mut results = self.message_results.lock(); + if let Some(tx) = results.remove(&msg_count) { + let msg_result = MessageResult { + msgcount: msg_count, + result, + processed, + }; + + // Send result through oneshot channel (non-blocking, infallible) + // If receiver was dropped (timeout), this silently fails - which is fine + let _ = tx.send(msg_result); + } + } + + /// Send a TreeEntry update to the cluster (leader only, during synchronization) + /// + /// This is used by the leader to send individual database entries to followers + /// that need to catch up. Matches C's dfsm_send_update(). + pub fn send_update(&self, tree_entry: pmxcfs_memdb::TreeEntry) -> Result<()> { + tracing::debug!("DFSM: sending Update for inode {}", tree_entry.inode); + + let sync_epoch = *self.sync_epoch.read(); + let dfsm_msg: DfsmMessage = DfsmMessage::from_tree_entry(tree_entry, sync_epoch); + self.send_dfsm_message(&dfsm_msg)?; + + Ok(()) + } + + /// Send UpdateComplete signal to cluster (leader only, after sending all updates) + /// + /// Signals to followers that all Update messages have been sent and they can + /// now transition to Synced mode. Matches C's dfsm_send_update_complete(). + pub fn send_update_complete(&self) -> Result<()> { + tracing::info!("DFSM: sending UpdateComplete"); + + let sync_epoch = *self.sync_epoch.read(); + let dfsm_msg: DfsmMessage = DfsmMessage::UpdateComplete { sync_epoch }; + self.send_dfsm_message(&dfsm_msg)?; + + Ok(()) + } + + /// Request checksum verification (leader only) + /// This should be called periodically by the leader to verify cluster state consistency + pub fn verify_request(&self) -> Result<()> { + // Only leader should send verify requests + if !self.is_leader() { + return Ok(()); + } + + // Only verify when synced + if self.get_mode() != DfsmMode::Synced { + return Ok(()); + } + + // Check if we need to wait for previous verification to complete + let checksum_counter = *self.checksum_counter.lock(); + let checksum_id = *self.checksum_id.lock(); + + if checksum_counter != checksum_id { + tracing::debug!( + "DFSM: delaying verify request {:016x}", + checksum_counter + 1 + ); + return Ok(()); + } + + // Increment counter and send verify request + *self.checksum_counter.lock() = checksum_counter + 1; + let new_counter = checksum_counter + 1; + + tracing::debug!("DFSM: sending verify request {:016x}", new_counter); + + // Send VERIFY_REQUEST message with counter + let sync_epoch = *self.sync_epoch.read(); + let dfsm_msg: DfsmMessage = DfsmMessage::VerifyRequest { + sync_epoch, + csum_id: new_counter, + }; + self.send_dfsm_message(&dfsm_msg)?; + + Ok(()) + } + + /// Handle verify request from leader + pub fn handle_verify_request(&self, message_epoch: SyncEpoch, csum_id: u64) -> Result<()> { + tracing::debug!("DFSM: received verify request {:016x}", csum_id); + + // Compute current state checksum + let mut checksum = [0u8; 32]; + self.callbacks.compute_checksum(&mut checksum)?; + + // Save checksum info + // Store the epoch FROM THE MESSAGE (matching C: dfsm.c:736) + *self.checksum.lock() = checksum; + *self.checksum_epoch.lock() = message_epoch; + *self.checksum_id.lock() = csum_id; + + // Send the checksum verification response + tracing::debug!("DFSM: sending verify response"); + + let sync_epoch = *self.sync_epoch.read(); + let dfsm_msg = DfsmMessage::Verify { + sync_epoch, + csum_id, + checksum, + }; + self.send_dfsm_message(&dfsm_msg)?; + + Ok(()) + } + + /// Handle verify response from a node + pub fn handle_verify( + &self, + message_epoch: SyncEpoch, + csum_id: u64, + received_checksum: &[u8; 32], + ) -> Result<()> { + tracing::debug!("DFSM: received verify response"); + + let our_checksum_id = *self.checksum_id.lock(); + let our_checksum_epoch = *self.checksum_epoch.lock(); + + // Check if this verification matches our saved checksum + // Compare with MESSAGE epoch, not current epoch (matching C: dfsm.c:766-767) + if our_checksum_id == csum_id && our_checksum_epoch == message_epoch { + let our_checksum = *self.checksum.lock(); + + // Compare checksums + if our_checksum != *received_checksum { + tracing::error!( + "DFSM: checksum mismatch! Expected {:016x?}, got {:016x?}", + &our_checksum[..8], + &received_checksum[..8] + ); + tracing::error!("DFSM: data divergence detected - restarting cluster sync"); + self.set_mode(DfsmMode::Leave); + return Err(anyhow::anyhow!("Checksum verification failed")); + } else { + tracing::info!("DFSM: data verification successful"); + } + } else { + tracing::debug!("DFSM: skipping verification - no checksum saved or epoch mismatch"); + } + + Ok(()) + } + + /// Invalidate saved checksum (called on membership changes) + pub fn invalidate_checksum(&self) { + let counter = *self.checksum_counter.lock(); + *self.checksum_id.lock() = counter; + + // Reset checksum epoch + *self.checksum_epoch.lock() = SyncEpoch { + epoch: 0, + time: 0, + nodeid: 0, + pid: 0, + }; + + tracing::debug!("DFSM: checksum invalidated"); + } + + /// Broadcast a message to the cluster + /// + /// Checks if the cluster is synced before broadcasting. + /// If not synced, the message is silently dropped. + pub fn broadcast(&self, msg: M) -> Result<()> { + if !self.is_synced() { + return Ok(()); + } + + tracing::debug!("Broadcasting {:?}", msg); + self.send_message(msg)?; + tracing::debug!("Broadcast successful"); + + Ok(()) + } + + /// Handle incoming DFSM message from cluster (called by CpgHandler) + fn handle_dfsm_message( + &self, + nodeid: u32, + pid: u32, + message: DfsmMessage, + ) -> anyhow::Result<()> { + // Validate epoch for state messages (all except Normal and SyncStart) + // This matches C implementation's epoch checking in dfsm.c:665-673 + let should_validate_epoch = !matches!( + message, + DfsmMessage::Normal { .. } | DfsmMessage::SyncStart { .. } + ); + + if should_validate_epoch { + let current_epoch = *self.sync_epoch.read(); + let message_epoch = match &message { + DfsmMessage::State { sync_epoch, .. } + | DfsmMessage::Update { sync_epoch, .. } + | DfsmMessage::UpdateComplete { sync_epoch } + | DfsmMessage::VerifyRequest { sync_epoch, .. } + | DfsmMessage::Verify { sync_epoch, .. } => *sync_epoch, + _ => unreachable!(), + }; + + if message_epoch != current_epoch { + tracing::debug!( + "DFSM: ignoring message with wrong epoch (expected {:?}, got {:?})", + current_epoch, + message_epoch + ); + return Ok(()); + } + } + + // Match on typed message variants + match message { + DfsmMessage::Normal { + msg_count, + timestamp, + protocol_version: _, + message: app_msg, + } => self.handle_normal_message(nodeid, pid, msg_count, timestamp, app_msg), + DfsmMessage::SyncStart { sync_epoch } => self.handle_sync_start(nodeid, sync_epoch), + DfsmMessage::State { + sync_epoch: _, + data, + } => self.process_state(nodeid, pid, &data), + DfsmMessage::Update { + sync_epoch: _, + tree_entry, + } => self.handle_update(nodeid, pid, tree_entry), + DfsmMessage::UpdateComplete { sync_epoch: _ } => self.handle_update_complete(), + DfsmMessage::VerifyRequest { + sync_epoch, + csum_id, + } => self.handle_verify_request(sync_epoch, csum_id), + DfsmMessage::Verify { + sync_epoch, + csum_id, + checksum, + } => self.handle_verify(sync_epoch, csum_id, &checksum), + } + } + + /// Handle membership change notification (called by CpgHandler) + fn handle_membership_change(&self, members: &[MemberInfo]) -> anyhow::Result<()> { + tracing::info!( + "DFSM: handling membership change ({} members)", + members.len() + ); + + // Invalidate saved checksum + self.invalidate_checksum(); + + // Update epoch + let mut counter = self.local_epoch_counter.lock(); + *counter += 1; + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as u32; + + let new_epoch = SyncEpoch { + epoch: *counter, + time: now, + nodeid: self.nodeid.load(Ordering::Relaxed), + pid: self.pid, + }; + + *self.sync_epoch.write() = new_epoch; + drop(counter); + + // Find lowest node ID (leader) + let lowest = members.iter().map(|m| m.node_id).min().unwrap_or(0); + *self.lowest_nodeid.write() = lowest; + + // Call cleanup callback before releasing sync resources (matches C: dfsm.c:512-514) + let old_sync_nodes = self.sync_nodes.read().clone(); + if !old_sync_nodes.is_empty() { + self.callbacks.cleanup_sync_resources(&old_sync_nodes); + } + + // Initialize sync nodes + let mut sync_nodes = self.sync_nodes.write(); + sync_nodes.clear(); + + for member in members { + sync_nodes.push(NodeSyncInfo { + node_id: member.node_id, + pid: member.pid, + state: None, + synced: false, + }); + } + drop(sync_nodes); + + // Clear queues + self.sync_queue.lock().clear(); + + // Call membership change callback (matches C: dfsm.c:1180-1182) + self.callbacks.on_membership_change(members); + + // Determine next mode + if members.len() == 1 { + // Single node - already synced + tracing::info!("DFSM: single node cluster, marking as synced"); + self.set_mode(DfsmMode::Synced); + + // Mark ourselves as synced + let mut sync_nodes = self.sync_nodes.write(); + if let Some(node) = sync_nodes.first_mut() { + node.synced = true; + } + + // Deliver queued messages + self.deliver_message_queue()?; + } else { + // Multi-node - start synchronization + tracing::info!("DFSM: multi-node cluster, starting sync"); + self.set_mode(DfsmMode::StartSync); + + // If we're the leader, initiate sync + if self.is_leader() { + tracing::info!("DFSM: we are leader, sending sync start"); + self.send_sync_start()?; + + // Leader also needs to send its own state + // (CPG doesn't loop back messages to sender) + self.send_state().context("Failed to send leader state")?; + } + } + + Ok(()) + } + + /// Handle normal application message + fn handle_normal_message( + &self, + nodeid: u32, + pid: u32, + msg_count: u64, + timestamp: u32, + message: M, + ) -> Result<()> { + // C version: deliver immediately if in Synced mode, otherwise queue + if self.get_mode() == DfsmMode::Synced { + // Deliver immediately - message is already deserialized + match self.callbacks.deliver_message( + nodeid, + pid, + message, + timestamp as u64, // Convert back to u64 for callback compatibility + ) { + Ok((result, processed)) => { + tracing::debug!( + "DFSM: message delivered immediately, result={}, processed={}", + result, + processed + ); + // Record result for synchronous sends + self.record_message_result(msg_count, result, processed); + } + Err(e) => { + tracing::error!("DFSM: failed to deliver message: {}", e); + // Record error result + self.record_message_result(msg_count, -libc::EIO, false); + } + } + } else { + // Queue for later delivery - store typed message directly + self.queue_message(nodeid, pid, msg_count, message, timestamp as u64); + } + Ok(()) + } + + /// Handle SyncStart message from leader + fn handle_sync_start(&self, nodeid: u32, new_epoch: SyncEpoch) -> Result<()> { + tracing::info!( + "DFSM: received SyncStart from node {} with epoch {:?}", + nodeid, + new_epoch + ); + + // Adopt the new epoch from the leader (critical for sync protocol!) + // This matches C implementation which updates dfsm->sync_epoch + *self.sync_epoch.write() = new_epoch; + tracing::debug!("DFSM: adopted new sync epoch from leader"); + + // Send our state back to the cluster + // BUT: don't send if we're the leader (we already sent our state in handle_membership_change) + let my_nodeid = self.nodeid.load(Ordering::Relaxed); + if nodeid != my_nodeid { + self.send_state() + .context("Failed to send state in response to SyncStart")?; + tracing::debug!("DFSM: sent state in response to SyncStart"); + } else { + tracing::debug!("DFSM: skipping state send (we're the leader who already sent state)"); + } + + Ok(()) + } + + /// Handle Update message from leader + fn handle_update( + &self, + nodeid: u32, + pid: u32, + tree_entry: pmxcfs_memdb::TreeEntry, + ) -> Result<()> { + // Serialize TreeEntry for callback (process_update expects raw bytes for now) + let serialized = tree_entry.serialize_for_update(); + if let Err(e) = self.callbacks.process_update(nodeid, pid, &serialized) { + tracing::error!("DFSM: failed to process update: {}", e); + } + Ok(()) + } + + /// Handle UpdateComplete message + fn handle_update_complete(&self) -> Result<()> { + tracing::info!("DFSM: received UpdateComplete from leader"); + self.deliver_sync_queue()?; + self.set_mode(DfsmMode::Synced); + self.callbacks.on_synced(); + Ok(()) + } +} + +/// Implementation of CpgHandler trait for DFSM +/// +/// This allows Dfsm to receive CPG callbacks in an idiomatic Rust way, +/// with all unsafe pointer handling managed by the CpgService. +impl CpgHandler for Dfsm { + fn on_deliver(&self, _group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]) { + tracing::debug!( + "DFSM CPG message from node {} (pid {}): {} bytes", + u32::from(nodeid), + pid, + msg.len() + ); + + // Deserialize DFSM protocol message + match DfsmMessage::::deserialize(msg) { + Ok(dfsm_msg) => { + if let Err(e) = self.handle_dfsm_message(u32::from(nodeid), pid, dfsm_msg) { + tracing::error!("Error handling DFSM message: {}", e); + } + } + Err(e) => { + tracing::error!("Failed to deserialize DFSM message: {}", e); + } + } + } + + fn on_confchg( + &self, + _group_name: &str, + member_list: &[cpg::Address], + _left_list: &[cpg::Address], + _joined_list: &[cpg::Address], + ) { + tracing::info!("DFSM CPG membership change: {} members", member_list.len()); + + // Build MemberInfo list from CPG addresses + let members: Vec = member_list + .iter() + .map(|addr| MemberInfo { + node_id: u32::from(addr.nodeid), + pid: addr.pid, + joined_at: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + }) + .collect(); + + // Notify DFSM of membership change + if let Err(e) = self.handle_membership_change(&members) { + tracing::error!("Failed to handle membership change: {}", e); + } + } +} + +impl Dfsm { + /// Initialize CPG (Closed Process Group) for cluster communication + /// + /// Uses the idiomatic CpgService wrapper which handles all unsafe FFI + /// and callback management internally. + pub fn init_cpg(self: &Arc) -> Result<()> { + tracing::info!("DFSM: Initializing CPG"); + + // Create CPG service with this Dfsm as the handler + // CpgService handles all callback registration and context management + let cpg_service = Arc::new(CpgService::new(Arc::clone(self))?); + + // Get our node ID from CPG (matches C's cpg_local_get) + // This MUST be done after cpg_initialize but before joining the group + let nodeid = cpg::local_get(cpg_service.handle())?; + let nodeid_u32 = u32::from(nodeid); + self.nodeid.store(nodeid_u32, Ordering::Relaxed); + tracing::info!("DFSM: Got node ID {} from CPG", nodeid_u32); + + // Join the CPG group + let group_name = &self.cluster_name; + cpg_service + .join(group_name) + .context("Failed to join CPG group")?; + + tracing::info!("DFSM joined CPG group '{}'", group_name); + + // Store the service + *self.cpg_service.write() = Some(cpg_service); + + // Dispatch once to get initial membership + if let Some(ref service) = *self.cpg_service.read() + && let Err(e) = service.dispatch() + { + tracing::warn!("Failed to dispatch CPG events: {:?}", e); + } + + tracing::info!("DFSM CPG initialized successfully"); + Ok(()) + } + + /// Dispatch CPG events (should be called periodically from event loop) + /// Matching C's service_dfsm_dispatch + pub fn dispatch_events(&self) -> Result<(), rust_corosync::CsError> { + if let Some(ref service) = *self.cpg_service.read() { + service.dispatch() + } else { + Ok(()) + } + } + + /// Get CPG file descriptor for event monitoring + pub fn fd_get(&self) -> Result { + if let Some(ref service) = *self.cpg_service.read() { + service.fd() + } else { + Err(anyhow::anyhow!("CPG service not initialized")) + } + } + + /// Stop DFSM services (leave CPG group and finalize) + pub fn stop_services(&self) -> Result<()> { + tracing::info!("DFSM: Stopping services"); + + // Leave the CPG group before dropping the service + let group_name = self.cluster_name.clone(); + if let Some(ref service) = *self.cpg_service.read() + && let Err(e) = service.leave(&group_name) + { + tracing::warn!("Error leaving CPG group: {:?}", e); + } + + // Drop the service (CpgService::drop handles finalization) + *self.cpg_service.write() = None; + + tracing::info!("DFSM services stopped"); + Ok(()) + } +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs new file mode 100644 index 000000000..203fe208b --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs @@ -0,0 +1,118 @@ +//! Status Sync Service +//! +//! This service synchronizes ephemeral status data across the cluster using a separate +//! DFSM instance with the "pve_kvstore_v1" CPG group. +//! +//! Equivalent to C implementation's service_status (the kvstore DFSM). +//! Handles synchronization of: +//! - RRD data (performance metrics from each node) +//! - Node IP addresses +//! - Cluster log entries +//! - Other ephemeral status key-value data + +use async_trait::async_trait; +use pmxcfs_services::{Service, ServiceError}; +use rust_corosync::CsError; +use std::sync::Arc; +use std::time::Duration; +use tracing::{error, info, warn}; + +use crate::Dfsm; +use crate::message::Message; + +/// Status Sync Service +/// +/// Synchronizes ephemeral status data across all nodes using a separate DFSM instance. +/// Uses CPG group "pve_kvstore_v1" (separate from main config database "pmxcfs_v1"). +/// +/// This implements the Service trait to provide: +/// - Automatic retry if CPG initialization fails +/// - Event-driven CPG dispatching for status replication +/// - Separation of status data from config data for better performance +/// +/// This is equivalent to C implementation's service_status (the kvstore DFSM). +/// +/// The generic parameter `M` specifies the message type this service handles. +pub struct StatusSyncService { + dfsm: Arc>, + fd: Option, +} + +impl StatusSyncService { + /// Create a new status sync service + pub fn new(dfsm: Arc>) -> Self { + Self { dfsm, fd: None } + } +} + +#[async_trait] +impl Service for StatusSyncService { + fn name(&self) -> &str { + "status-sync" + } + + async fn initialize(&mut self) -> pmxcfs_services::Result { + info!("Initializing status sync service (kvstore)"); + + // Initialize CPG connection for kvstore group + self.dfsm.init_cpg().map_err(|e| { + ServiceError::InitializationFailed(format!( + "Status sync CPG initialization failed: {e}" + )) + })?; + + // Get file descriptor for event monitoring + let fd = self.dfsm.fd_get().map_err(|e| { + self.dfsm.stop_services().ok(); + ServiceError::InitializationFailed(format!("Failed to get status sync fd: {e}")) + })?; + + self.fd = Some(fd); + + info!( + "Status sync service initialized successfully with fd {}", + fd + ); + Ok(fd) + } + + async fn dispatch(&mut self) -> pmxcfs_services::Result { + match self.dfsm.dispatch_events() { + Ok(_) => Ok(true), + Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => { + warn!("Status sync connection lost, requesting reinitialization"); + Ok(false) + } + Err(e) => { + error!("Status sync dispatch failed: {}", e); + Err(ServiceError::DispatchFailed(format!( + "Status sync dispatch failed: {e}" + ))) + } + } + } + + async fn finalize(&mut self) -> pmxcfs_services::Result<()> { + info!("Finalizing status sync service"); + + self.fd = None; + + if let Err(e) = self.dfsm.stop_services() { + warn!("Error stopping status sync services: {}", e); + } + + info!("Status sync service finalized"); + Ok(()) + } + + async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> { + // Status sync doesn't need periodic verification like the main database + // Status data is ephemeral and doesn't require the same consistency guarantees + Ok(()) + } + + fn timer_period(&self) -> Option { + // No periodic timer needed for status sync + None + } +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs new file mode 100644 index 000000000..5a2eb9645 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs @@ -0,0 +1,107 @@ +/// DFSM type definitions +/// +/// This module contains all type definitions used by the DFSM state machine. +/// DFSM operating modes +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum DfsmMode { + /// Initial state - starting cluster connection + Start = 0, + + /// Starting data synchronization + StartSync = 1, + + /// All data is up to date + Synced = 2, + + /// Waiting for updates from leader + Update = 3, + + /// Error states (>= 128) + Leave = 253, + VersionError = 254, + Error = 255, +} + +impl DfsmMode { + /// Check if this is an error mode + pub fn is_error(&self) -> bool { + (*self as u8) >= 128 + } +} + +impl std::fmt::Display for DfsmMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DfsmMode::Start => write!(f, "start cluster connection"), + DfsmMode::StartSync => write!(f, "starting data synchronization"), + DfsmMode::Synced => write!(f, "all data is up to date"), + DfsmMode::Update => write!(f, "waiting for updates from leader"), + DfsmMode::Leave => write!(f, "leaving cluster"), + DfsmMode::VersionError => write!(f, "protocol version mismatch"), + DfsmMode::Error => write!(f, "serious internal error"), + } + } +} + +/// DFSM message types (internal protocol messages) +/// Matches C's dfsm_message_t enum values +#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)] +#[repr(u16)] +pub enum DfsmMessageType { + Normal = 0, + SyncStart = 1, + State = 2, + Update = 3, + UpdateComplete = 4, + VerifyRequest = 5, + Verify = 6, +} + +/// Sync epoch - identifies a synchronization session +/// Matches C's dfsm_sync_epoch_t structure (16 bytes total) +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct SyncEpoch { + pub epoch: u32, + pub time: u32, + pub nodeid: u32, + pub pid: u32, +} + +impl SyncEpoch { + /// Serialize to C-compatible wire format (16 bytes) + /// Format: [epoch: u32][time: u32][nodeid: u32][pid: u32] + pub fn serialize(&self) -> [u8; 16] { + let mut bytes = [0u8; 16]; + bytes[0..4].copy_from_slice(&self.epoch.to_le_bytes()); + bytes[4..8].copy_from_slice(&self.time.to_le_bytes()); + bytes[8..12].copy_from_slice(&self.nodeid.to_le_bytes()); + bytes[12..16].copy_from_slice(&self.pid.to_le_bytes()); + bytes + } + + /// Deserialize from C-compatible wire format (16 bytes) + pub fn deserialize(bytes: &[u8]) -> Result { + if bytes.len() < 16 { + return Err("SyncEpoch requires 16 bytes"); + } + Ok(SyncEpoch { + epoch: u32::from_le_bytes(bytes[0..4].try_into().unwrap()), + time: u32::from_le_bytes(bytes[4..8].try_into().unwrap()), + nodeid: u32::from_le_bytes(bytes[8..12].try_into().unwrap()), + pid: u32::from_le_bytes(bytes[12..16].try_into().unwrap()), + }) + } +} + +/// Queued message awaiting delivery +#[derive(Debug, Clone)] +pub(super) struct QueuedMessage { + pub nodeid: u32, + pub pid: u32, + pub _msg_count: u64, + pub message: M, + pub timestamp: u64, +} + +// Re-export NodeSyncInfo from pmxcfs-api-types for use in Callbacks trait +pub use pmxcfs_api_types::NodeSyncInfo; diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs new file mode 100644 index 000000000..3022d9704 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs @@ -0,0 +1,279 @@ +/// C-compatible wire format for cluster communication +/// +/// This module implements the exact wire protocol used by the C version of pmxcfs +/// to ensure compatibility with C-based cluster nodes. +/// +/// The C version uses a simple format with iovec arrays containing raw C types. +use anyhow::{Context, Result}; +use bytemuck::{Pod, Zeroable}; +use std::ffi::CStr; + +/// C message types (must match dcdb.h) +#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)] +#[repr(u16)] +pub enum CMessageType { + Write = 1, + Mkdir = 2, + Delete = 3, + Rename = 4, + Create = 5, + Mtime = 6, + UnlockRequest = 7, + Unlock = 8, +} + +/// C-compatible FUSE message header +/// Layout matches the iovec array from C: [size][offset][pathlen][tolen][flags] +#[derive(Debug, Clone, Copy, Pod, Zeroable)] +#[repr(C)] +struct CFuseMessageHeader { + size: u32, + offset: u32, + pathlen: u32, + tolen: u32, + flags: u32, +} + +/// Parsed C FUSE message +#[derive(Debug, Clone)] +pub struct CFuseMessage { + pub size: u32, + pub offset: u32, + pub flags: u32, + pub path: String, + pub to: Option, + pub data: Vec, +} + +impl CFuseMessage { + /// Maximum message size to prevent DoS attacks (16MB) + pub const MAX_MESSAGE_SIZE: u32 = 16 * 1024 * 1024; + + /// Parse a C FUSE message from raw bytes + pub fn parse(data: &[u8]) -> Result { + if data.len() < std::mem::size_of::() { + return Err(anyhow::anyhow!( + "Message too short: {} < {}", + data.len(), + std::mem::size_of::() + )); + } + + // Parse header manually to avoid alignment issues + let header = CFuseMessageHeader { + size: u32::from_le_bytes([data[0], data[1], data[2], data[3]]), + offset: u32::from_le_bytes([data[4], data[5], data[6], data[7]]), + pathlen: u32::from_le_bytes([data[8], data[9], data[10], data[11]]), + tolen: u32::from_le_bytes([data[12], data[13], data[14], data[15]]), + flags: u32::from_le_bytes([data[16], data[17], data[18], data[19]]), + }; + + // Check for integer overflow in total size calculation + let total_size = header + .pathlen + .checked_add(header.tolen) + .and_then(|s| s.checked_add(header.size)) + .ok_or_else(|| anyhow::anyhow!("Integer overflow in message size calculation"))?; + + // Validate total size is reasonable (prevent DoS) + if total_size > Self::MAX_MESSAGE_SIZE { + return Err(anyhow::anyhow!( + "Message size {total_size} exceeds maximum {}", + Self::MAX_MESSAGE_SIZE + )); + } + + // Validate total size matches actual buffer size (prevent reading beyond buffer) + let header_size = std::mem::size_of::(); + let expected_total = header_size + .checked_add(total_size as usize) + .ok_or_else(|| anyhow::anyhow!("Total message size overflow"))?; + + if expected_total != data.len() { + return Err(anyhow::anyhow!( + "Message size mismatch: expected {}, got {}", + expected_total, + data.len() + )); + } + + let mut offset = header_size; + + // Parse path with overflow-checked arithmetic + let path = if header.pathlen > 0 { + let end_offset = offset + .checked_add(header.pathlen as usize) + .ok_or_else(|| anyhow::anyhow!("Integer overflow in path offset"))?; + + if end_offset > data.len() { + return Err(anyhow::anyhow!( + "Invalid path length: {} bytes at offset {} exceeds message size {}", + header.pathlen, + offset, + data.len() + )); + } + let path_bytes = &data[offset..end_offset]; + offset = end_offset; + + // C strings are null-terminated + CStr::from_bytes_until_nul(path_bytes) + .context("Invalid path string")? + .to_str() + .context("Path not valid UTF-8")? + .to_string() + } else { + String::new() + }; + + // Parse 'to' (for rename operations) with overflow-checked arithmetic + let to = if header.tolen > 0 { + let end_offset = offset + .checked_add(header.tolen as usize) + .ok_or_else(|| anyhow::anyhow!("Integer overflow in 'to' offset"))?; + + if end_offset > data.len() { + return Err(anyhow::anyhow!( + "Invalid 'to' length: {} bytes at offset {} exceeds message size {}", + header.tolen, + offset, + data.len() + )); + } + let to_bytes = &data[offset..end_offset]; + offset = end_offset; + + Some( + CStr::from_bytes_until_nul(to_bytes) + .context("Invalid to string")? + .to_str() + .context("To path not valid UTF-8")? + .to_string(), + ) + } else { + None + }; + + // Parse data buffer with overflow-checked arithmetic + let buf_data = if header.size > 0 { + let end_offset = offset + .checked_add(header.size as usize) + .ok_or_else(|| anyhow::anyhow!("Integer overflow in data offset"))?; + + if end_offset > data.len() { + return Err(anyhow::anyhow!( + "Invalid data size: {} bytes at offset {} exceeds message size {}", + header.size, + offset, + data.len() + )); + } + data[offset..end_offset].to_vec() + } else { + Vec::new() + }; + + Ok(CFuseMessage { + size: header.size, + offset: header.offset, + flags: header.flags, + path, + to, + data: buf_data, + }) + } + + /// Serialize to C wire format + pub fn serialize(&self) -> Vec { + let path_bytes = self.path.as_bytes(); + let pathlen = if path_bytes.is_empty() { + 0 + } else { + (path_bytes.len() + 1) as u32 // +1 for null terminator + }; + + let to_bytes = self.to.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]); + let tolen = if to_bytes.is_empty() { + 0 + } else { + (to_bytes.len() + 1) as u32 + }; + + let header = CFuseMessageHeader { + size: self.size, + offset: self.offset, + pathlen, + tolen, + flags: self.flags, + }; + + let mut result = Vec::new(); + + // Serialize header + result.extend_from_slice(bytemuck::bytes_of(&header)); + + // Serialize path (with null terminator) + if pathlen > 0 { + result.extend_from_slice(path_bytes); + result.push(0); // null terminator + } + + // Serialize 'to' (with null terminator) + if tolen > 0 { + result.extend_from_slice(to_bytes); + result.push(0); // null terminator + } + + // Serialize data + if self.size > 0 { + result.extend_from_slice(&self.data); + } + + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_serialize_deserialize_write() { + let msg = CFuseMessage { + size: 13, + offset: 0, + flags: 0, + path: "/test.txt".to_string(), + to: None, + data: b"Hello, World!".to_vec(), + }; + + let serialized = msg.serialize(); + let parsed = CFuseMessage::parse(&serialized).unwrap(); + + assert_eq!(parsed.size, msg.size); + assert_eq!(parsed.offset, msg.offset); + assert_eq!(parsed.flags, msg.flags); + assert_eq!(parsed.path, msg.path); + assert_eq!(parsed.to, msg.to); + assert_eq!(parsed.data, msg.data); + } + + #[test] + fn test_serialize_deserialize_rename() { + let msg = CFuseMessage { + size: 0, + offset: 0, + flags: 0, + path: "/old.txt".to_string(), + to: Some("/new.txt".to_string()), + data: Vec::new(), + }; + + let serialized = msg.serialize(); + let parsed = CFuseMessage::parse(&serialized).unwrap(); + + assert_eq!(parsed.path, msg.path); + assert_eq!(parsed.to, msg.to); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs new file mode 100644 index 000000000..3a371ad86 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs @@ -0,0 +1,563 @@ +/// Multi-node integration tests for DFSM cluster synchronization +/// +/// These tests simulate multi-node clusters to verify the complete synchronization +/// protocol works correctly with multiple Rust nodes exchanging state. +use anyhow::Result; +use pmxcfs_dfsm::{Callbacks, FuseMessage, NodeSyncInfo}; +use pmxcfs_memdb::{MemDb, MemDbIndex, ROOT_INODE, TreeEntry}; +use std::sync::{Arc, Mutex}; +use tempfile::TempDir; + +/// Mock callbacks for testing DFSM without full pmxcfs integration +struct MockCallbacks { + memdb: MemDb, + states_received: Arc>>, + updates_received: Arc>>, + synced_count: Arc>, +} + +impl MockCallbacks { + fn new(memdb: MemDb) -> Self { + Self { + memdb, + states_received: Arc::new(Mutex::new(Vec::new())), + updates_received: Arc::new(Mutex::new(Vec::new())), + synced_count: Arc::new(Mutex::new(0)), + } + } + + #[allow(dead_code)] + fn get_states(&self) -> Vec { + self.states_received.lock().unwrap().clone() + } + + #[allow(dead_code)] + fn get_updates(&self) -> Vec { + self.updates_received.lock().unwrap().clone() + } + + #[allow(dead_code)] + fn get_synced_count(&self) -> usize { + *self.synced_count.lock().unwrap() + } +} + +impl Callbacks for MockCallbacks { + type Message = FuseMessage; + + fn deliver_message( + &self, + _node_id: u32, + _pid: u32, + _message: FuseMessage, + _timestamp: u64, + ) -> Result<(i32, bool)> { + Ok((0, true)) + } + + fn compute_checksum(&self, output: &mut [u8; 32]) -> Result<()> { + let checksum = self.memdb.compute_database_checksum()?; + output.copy_from_slice(&checksum); + Ok(()) + } + + fn get_state(&self) -> Result> { + let index = self.memdb.encode_index()?; + Ok(index.serialize()) + } + + fn process_state_update(&self, states: &[NodeSyncInfo]) -> Result { + // Store received states for verification + *self.states_received.lock().unwrap() = states.to_vec(); + + // Parse indices from states + let mut indices: Vec<(u32, u32, MemDbIndex)> = Vec::new(); + for node in states { + if let Some(state_data) = &node.state { + match MemDbIndex::deserialize(state_data) { + Ok(index) => indices.push((node.node_id, node.pid, index)), + Err(_) => continue, + } + } + } + + if indices.is_empty() { + return Ok(true); + } + + // Find leader (highest version, or if tie, highest mtime) + let mut leader_idx = 0; + for i in 1..indices.len() { + let (_, _, current_index) = &indices[i]; + let (_, _, leader_index) = &indices[leader_idx]; + if current_index > leader_index { + leader_idx = i; + } + } + + let (_leader_nodeid, _leader_pid, leader_index) = &indices[leader_idx]; + + // Check if WE are synced with leader + let our_index = self.memdb.encode_index()?; + let we_are_synced = our_index.version == leader_index.version + && our_index.mtime == leader_index.mtime + && our_index.size == leader_index.size + && our_index.entries.len() == leader_index.entries.len() + && our_index + .entries + .iter() + .zip(leader_index.entries.iter()) + .all(|(a, b)| a.inode == b.inode && a.digest == b.digest); + + Ok(we_are_synced) + } + + fn process_update(&self, _node_id: u32, _pid: u32, data: &[u8]) -> Result<()> { + // Deserialize and store update + let tree_entry = TreeEntry::deserialize_from_update(data)?; + self.updates_received + .lock() + .unwrap() + .push(tree_entry.clone()); + + // Apply to database + self.memdb.apply_tree_entry(tree_entry)?; + Ok(()) + } + + fn commit_state(&self) -> Result<()> { + Ok(()) + } + + fn on_synced(&self) { + *self.synced_count.lock().unwrap() += 1; + } +} + +fn create_test_node(node_id: u32) -> Result<(MemDb, TempDir, Arc)> { + let temp_dir = TempDir::new()?; + let db_path = temp_dir.path().join(format!("node{node_id}.db")); + let memdb = MemDb::open(&db_path, true)?; + // Note: Local operations always use writer=0 (matching C implementation) + // Remote DFSM updates use the writer field from the incoming TreeEntry + + let callbacks = Arc::new(MockCallbacks::new(memdb.clone())); + Ok((memdb, temp_dir, callbacks)) +} + +#[test] +fn test_two_node_empty_sync() -> Result<()> { + // Create two nodes with empty databases + let (_memdb1, _temp1, callbacks1) = create_test_node(1)?; + let (_memdb2, _temp2, callbacks2) = create_test_node(2)?; + + // Generate states from both nodes + let state1 = callbacks1.get_state()?; + let state2 = callbacks2.get_state()?; + + // Simulate state exchange + let states = vec![ + NodeSyncInfo { + node_id: 1, + pid: 1000, + state: Some(state1), + synced: false, + }, + NodeSyncInfo { + node_id: 2, + pid: 2000, + state: Some(state2), + synced: false, + }, + ]; + + // Both nodes process states + let synced1 = callbacks1.process_state_update(&states)?; + let synced2 = callbacks2.process_state_update(&states)?; + + // Both should be synced (empty databases are identical) + assert!(synced1, "Node 1 should be synced"); + assert!(synced2, "Node 2 should be synced"); + + Ok(()) +} + +#[test] +fn test_two_node_leader_election() -> Result<()> { + // Create two nodes + let (memdb1, _temp1, callbacks1) = create_test_node(1)?; + let (_memdb2, _temp2, callbacks2) = create_test_node(2)?; + + // Node 1 has more data (higher version) + memdb1.create("/file1.txt", 0, 0, 1000)?; + memdb1.write("/file1.txt", 0, 0, 1001, b"data from node 1", false)?; + + // Generate states + let state1 = callbacks1.get_state()?; + let state2 = callbacks2.get_state()?; + + // Parse to check versions + let index1 = MemDbIndex::deserialize(&state1)?; + let index2 = MemDbIndex::deserialize(&state2)?; + + // Node 1 should have higher version + assert!( + index1.version > index2.version, + "Node 1 version {} should be > Node 2 version {}", + index1.version, + index2.version + ); + + // Simulate state exchange + let states = vec![ + NodeSyncInfo { + node_id: 1, + pid: 1000, + state: Some(state1), + synced: false, + }, + NodeSyncInfo { + node_id: 2, + pid: 2000, + state: Some(state2), + synced: false, + }, + ]; + + // Process states + let synced1 = callbacks1.process_state_update(&states)?; + let synced2 = callbacks2.process_state_update(&states)?; + + // Node 1 (leader) should be synced, Node 2 (follower) should not + assert!(synced1, "Node 1 (leader) should be synced"); + assert!(!synced2, "Node 2 (follower) should not be synced"); + + Ok(()) +} + +#[test] +fn test_incremental_update_transfer() -> Result<()> { + // Create leader and follower + let (leader_db, _temp_leader, _) = create_test_node(1)?; + let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?; + + // Leader has data + leader_db.create("/config", libc::S_IFDIR, 0, 1000)?; + leader_db.create("/config/node.conf", 0, 0, 1001)?; + leader_db.write("/config/node.conf", 0, 0, 1002, b"hostname=pve1", false)?; + + // Get entries from leader + let leader_entries = leader_db.get_all_entries()?; + + // Simulate sending updates to follower + for entry in leader_entries { + if entry.inode == ROOT_INODE { + continue; // Skip root (both have it) + } + + // Serialize as update message + let update_msg = entry.serialize_for_update(); + + // Follower receives and processes update + follower_callbacks.process_update(1, 1000, &update_msg)?; + } + + // Verify follower has the data + let config_dir = follower_db.lookup_path("/config"); + assert!( + config_dir.is_some(), + "Follower should have /config directory" + ); + assert!(config_dir.unwrap().is_dir()); + + let config_file = follower_db.lookup_path("/config/node.conf"); + assert!( + config_file.is_some(), + "Follower should have /config/node.conf" + ); + + let config_data = follower_db.read("/config/node.conf", 0, 1024)?; + assert_eq!( + config_data, b"hostname=pve1", + "Follower should have correct data" + ); + + Ok(()) +} + +#[test] +fn test_three_node_sync() -> Result<()> { + // Create three nodes + let (memdb1, _temp1, callbacks1) = create_test_node(1)?; + let (memdb2, _temp2, callbacks2) = create_test_node(2)?; + let (_memdb3, _temp3, callbacks3) = create_test_node(3)?; + + // Node 1 has the most recent data + memdb1.create("/cluster.conf", 0, 0, 5000)?; + memdb1.write("/cluster.conf", 0, 0, 5001, b"version=3", false)?; + + // Node 2 has older data + memdb2.create("/cluster.conf", 0, 0, 4000)?; + memdb2.write("/cluster.conf", 0, 0, 4001, b"version=2", false)?; + + // Node 3 is empty (new node joining) + + // Generate states + let state1 = callbacks1.get_state()?; + let state2 = callbacks2.get_state()?; + let state3 = callbacks3.get_state()?; + + let states = vec![ + NodeSyncInfo { + node_id: 1, + pid: 1000, + state: Some(state1.clone()), + synced: false, + }, + NodeSyncInfo { + node_id: 2, + pid: 2000, + state: Some(state2.clone()), + synced: false, + }, + NodeSyncInfo { + node_id: 3, + pid: 3000, + state: Some(state3.clone()), + synced: false, + }, + ]; + + // All nodes process states + let synced1 = callbacks1.process_state_update(&states)?; + let synced2 = callbacks2.process_state_update(&states)?; + let synced3 = callbacks3.process_state_update(&states)?; + + // Node 1 (leader) should be synced + assert!(synced1, "Node 1 (leader) should be synced"); + + // Nodes 2 and 3 need updates + assert!(!synced2, "Node 2 should need updates"); + assert!(!synced3, "Node 3 should need updates"); + + // Verify leader has highest version + let index1 = MemDbIndex::deserialize(&state1)?; + let index2 = MemDbIndex::deserialize(&state2)?; + let index3 = MemDbIndex::deserialize(&state3)?; + + assert!(index1.version >= index2.version); + assert!(index1.version >= index3.version); + + Ok(()) +} + +#[test] +fn test_update_message_wire_format_compatibility() -> Result<()> { + // Verify our wire format matches C implementation exactly + let entry = TreeEntry { + inode: 42, + parent: 1, + version: 100, + writer: 2, + mtime: 12345, + size: 11, + entry_type: 8, // DT_REG + name: "test.conf".to_string(), + data: b"hello world".to_vec(), + }; + + let serialized = entry.serialize_for_update(); + + // Verify header size (41 bytes) + // parent(8) + inode(8) + version(8) + writer(4) + mtime(4) + size(4) + namelen(4) + type(1) + let expected_header_size = 8 + 8 + 8 + 4 + 4 + 4 + 4 + 1; + assert_eq!(expected_header_size, 41); + + // Verify total size + let namelen = "test.conf".len() + 1; // Include null terminator + let expected_total = expected_header_size + namelen + 11; + assert_eq!(serialized.len(), expected_total); + + // Verify we can deserialize it back + let deserialized = TreeEntry::deserialize_from_update(&serialized)?; + assert_eq!(deserialized.inode, entry.inode); + assert_eq!(deserialized.parent, entry.parent); + assert_eq!(deserialized.version, entry.version); + assert_eq!(deserialized.writer, entry.writer); + assert_eq!(deserialized.mtime, entry.mtime); + assert_eq!(deserialized.size, entry.size); + assert_eq!(deserialized.entry_type, entry.entry_type); + assert_eq!(deserialized.name, entry.name); + assert_eq!(deserialized.data, entry.data); + + Ok(()) +} + +#[test] +fn test_index_wire_format_compatibility() -> Result<()> { + // Verify memdb_index_t wire format matches C implementation + use pmxcfs_memdb::IndexEntry; + + let entries = vec![ + IndexEntry { + inode: 1, + digest: [0u8; 32], + }, + IndexEntry { + inode: 2, + digest: [1u8; 32], + }, + ]; + + let index = MemDbIndex::new( + 100, // version + 2, // last_inode + 1, // writer + 12345, // mtime + entries, + ); + + let serialized = index.serialize(); + + // Verify header size (32 bytes) + // version(8) + last_inode(8) + writer(4) + mtime(4) + size(4) + bytes(4) + let expected_header_size = 8 + 8 + 4 + 4 + 4 + 4; + assert_eq!(expected_header_size, 32); + + // Verify entry size (40 bytes each) + // inode(8) + digest(32) + let expected_entry_size = 8 + 32; + assert_eq!(expected_entry_size, 40); + + // Verify total size + let expected_total = expected_header_size + 2 * expected_entry_size; + assert_eq!(serialized.len(), expected_total); + assert_eq!(serialized.len(), index.bytes as usize); + + // Verify deserialization + let deserialized = MemDbIndex::deserialize(&serialized)?; + assert_eq!(deserialized.version, index.version); + assert_eq!(deserialized.last_inode, index.last_inode); + assert_eq!(deserialized.writer, index.writer); + assert_eq!(deserialized.mtime, index.mtime); + assert_eq!(deserialized.size, index.size); + assert_eq!(deserialized.bytes, index.bytes); + assert_eq!(deserialized.entries.len(), 2); + + Ok(()) +} + +#[test] +fn test_sync_with_conflicts() -> Result<()> { + // Test scenario: two nodes modified different files + let (memdb1, _temp1, _callbacks1) = create_test_node(1)?; + let (memdb2, _temp2, _callbacks2) = create_test_node(2)?; + + // Both start with same base + memdb1.create("/base.conf", 0, 0, 1000)?; + memdb1.write("/base.conf", 0, 0, 1001, b"shared", false)?; + + memdb2.create("/base.conf", 0, 0, 1000)?; + memdb2.write("/base.conf", 0, 0, 1001, b"shared", false)?; + + // Node 1 adds file1 + memdb1.create("/file1.txt", 0, 0, 2000)?; + memdb1.write("/file1.txt", 0, 0, 2001, b"from node 1", false)?; + + // Node 2 adds file2 + memdb2.create("/file2.txt", 0, 0, 2000)?; + memdb2.write("/file2.txt", 0, 0, 2001, b"from node 2", false)?; + + // Generate indices + let index1 = memdb1.encode_index()?; + let index2 = memdb2.encode_index()?; + + // Find differences + let diffs_1_vs_2 = index1.find_differences(&index2); + let diffs_2_vs_1 = index2.find_differences(&index1); + + // Node 1 has file1 that node 2 doesn't have + assert!( + !diffs_1_vs_2.is_empty(), + "Node 1 should have entries node 2 doesn't have" + ); + + // Node 2 has file2 that node 1 doesn't have + assert!( + !diffs_2_vs_1.is_empty(), + "Node 2 should have entries node 1 doesn't have" + ); + + // Higher version wins - in this case they're both v3 (base + create + write) + // so mtime would be tiebreaker + + Ok(()) +} + +#[test] +fn test_large_file_update() -> Result<()> { + // Test updating a file with significant data + let (leader_db, _temp_leader, _) = create_test_node(1)?; + let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?; + + // Create a file with 10KB of data + let large_data: Vec = (0..10240).map(|i| (i % 256) as u8).collect(); + + leader_db.create("/large.bin", 0, 0, 1000)?; + leader_db.write("/large.bin", 0, 0, 1001, &large_data, false)?; + + // Get the entry + let entry = leader_db.lookup_path("/large.bin").unwrap(); + + // Serialize and send + let update_msg = entry.serialize_for_update(); + + // Follower receives + follower_callbacks.process_update(1, 1000, &update_msg)?; + + // Verify + let follower_entry = follower_db.lookup_path("/large.bin").unwrap(); + assert_eq!(follower_entry.size, large_data.len()); + assert_eq!(follower_entry.data, large_data); + + Ok(()) +} + +#[test] +fn test_directory_hierarchy_sync() -> Result<()> { + // Test syncing nested directory structure + let (leader_db, _temp_leader, _) = create_test_node(1)?; + let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?; + + // Create directory hierarchy on leader + leader_db.create("/etc", libc::S_IFDIR, 0, 1000)?; + leader_db.create("/etc/pve", libc::S_IFDIR, 0, 1001)?; + leader_db.create("/etc/pve/nodes", libc::S_IFDIR, 0, 1002)?; + leader_db.create("/etc/pve/nodes/pve1", libc::S_IFDIR, 0, 1003)?; + leader_db.create("/etc/pve/nodes/pve1/config", 0, 0, 1004)?; + leader_db.write( + "/etc/pve/nodes/pve1/config", 0, 0, 1005, b"cpu: 2\nmem: 4096", false, + )?; + + // Send all entries to follower + let entries = leader_db.get_all_entries()?; + for entry in entries { + if entry.inode == ROOT_INODE { + continue; // Skip root + } + let update_msg = entry.serialize_for_update(); + follower_callbacks.process_update(1, 1000, &update_msg)?; + } + + // Verify entire hierarchy + assert!(follower_db.lookup_path("/etc").is_some()); + assert!(follower_db.lookup_path("/etc/pve").is_some()); + assert!(follower_db.lookup_path("/etc/pve/nodes").is_some()); + assert!(follower_db.lookup_path("/etc/pve/nodes/pve1").is_some()); + + let config = follower_db.lookup_path("/etc/pve/nodes/pve1/config"); + assert!(config.is_some()); + assert_eq!(config.unwrap().data, b"cpu: 2\nmem: 4096"); + + Ok(()) +} diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs index 106f5016e..1c9b6cad8 100644 --- a/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs +++ b/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs @@ -506,7 +506,7 @@ impl MemDb { anyhow::bail!("Database has errors, refusing operation"); } - // CRITICAL FIX: Acquire write guard BEFORE any checks to prevent TOCTOU race + // Acquire write guard before any checks to prevent TOCTOU race // This ensures all validation and mutation happen atomically let _guard = self.inner.write_guard.lock(); @@ -666,7 +666,7 @@ impl MemDb { anyhow::bail!("Database has errors, refusing operation"); } - // CRITICAL FIX: Acquire write guard BEFORE any checks to prevent TOCTOU race + // Acquire write guard before any checks to prevent TOCTOU race // This ensures lookup and mutation happen atomically let _guard = self.inner.write_guard.lock(); diff --git a/src/pmxcfs-rs/pmxcfs-status/src/status.rs b/src/pmxcfs-rs/pmxcfs-status/src/status.rs index 58d81b8ed..3a0243a62 100644 --- a/src/pmxcfs-rs/pmxcfs-status/src/status.rs +++ b/src/pmxcfs-rs/pmxcfs-status/src/status.rs @@ -695,8 +695,8 @@ impl Status { /// This updates the CPG member list and synchronizes the online status /// in cluster_info to match current membership. /// - /// IMPORTANT: Both members and cluster_info are updated atomically under locks - /// to prevent TOCTOU where readers could see inconsistent state. + /// Both members and cluster_info are updated atomically under locks + /// to prevent readers from seeing inconsistent state. pub fn update_members(&self, members: Vec) { // Acquire both locks before any updates to ensure atomicity // (matches C's single mutex protection in status.c) -- 2.47.3