From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH pve-cluster 10/15] pmxcfs-rs: add pmxcfs-dfsm crate
Date: Tue, 6 Jan 2026 22:24:34 +0800 [thread overview]
Message-ID: <20260106142440.2368585-11-k.chai@proxmox.com> (raw)
In-Reply-To: <20260106142440.2368585-1-k.chai@proxmox.com>
Add Distributed Finite State Machine for cluster synchronization:
- Dfsm: Core state machine implementation
- ClusterDatabaseService: MemDb sync (pmxcfs_v1 CPG group)
- StatusSyncService: Status sync (pve_kvstore_v1 CPG group)
- Protocol: SyncStart, State, Update, UpdateComplete, Verify
- Leader election based on version and mtime
- Incremental updates for efficiency
This integrates pmxcfs-memdb, pmxcfs-services, and rust-corosync
to provide cluster-wide database synchronization. It implements
the wire-compatible protocol used by the C version.
Includes unit tests for:
- Index serialization and comparison
- Leader election logic
- Tree entry serialization
- Diff computation between indices
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/pmxcfs-rs/Cargo.toml | 1 +
src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml | 45 +
src/pmxcfs-rs/pmxcfs-dfsm/README.md | 340 ++++++
src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs | 52 +
.../src/cluster_database_service.rs | 116 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs | 163 +++
src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs | 728 ++++++++++++
src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs | 185 +++
.../pmxcfs-dfsm/src/kv_store_message.rs | 329 ++++++
src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs | 32 +
src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs | 21 +
.../pmxcfs-dfsm/src/state_machine.rs | 1013 +++++++++++++++++
.../pmxcfs-dfsm/src/status_sync_service.rs | 118 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs | 107 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs | 220 ++++
.../tests/multi_node_sync_tests.rs | 565 +++++++++
16 files changed, 4035 insertions(+)
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index f4497d58..4d18aa93 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -10,6 +10,7 @@ members = [
"pmxcfs-test-utils", # Test utilities and helpers (dev-only)
"pmxcfs-services", # Service framework for automatic retry and lifecycle management
"pmxcfs-ipc", # libqb-compatible IPC server
+ "pmxcfs-dfsm", # Distributed Finite State Machine (owns CPG)
]
resolver = "2"
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
new file mode 100644
index 00000000..12a8e7f6
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
@@ -0,0 +1,45 @@
+[package]
+name = "pmxcfs-dfsm"
+description = "Distributed Finite State Machine for cluster state synchronization"
+
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+# Internal dependencies
+pmxcfs-api-types.workspace = true
+pmxcfs-memdb.workspace = true
+pmxcfs-services.workspace = true
+
+# Corosync integration
+rust-corosync.workspace = true
+
+# Error handling
+anyhow.workspace = true
+thiserror.workspace = true
+
+# Async and concurrency
+parking_lot.workspace = true
+async-trait.workspace = true
+tokio.workspace = true
+
+# Serialization
+serde.workspace = true
+bincode.workspace = true
+bytemuck.workspace = true
+
+# Logging
+tracing.workspace = true
+
+# Utilities
+num_enum.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
+libc.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/README.md b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
new file mode 100644
index 00000000..560827a7
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
@@ -0,0 +1,340 @@
+# pmxcfs-dfsm
+
+**Distributed Finite State Machine** for cluster-wide state synchronization in pmxcfs.
+
+This crate implements the DFSM protocol used to replicate configuration changes and status updates across all nodes in a Proxmox cluster via Corosync CPG (Closed Process Group).
+
+## Overview
+
+The DFSM is the core mechanism for maintaining consistency across cluster nodes. It ensures that:
+
+- All nodes see filesystem operations (writes, creates, deletes) in the same order
+- Database state remains synchronized even after network partitions
+- Status information (VM states, RRD data) is broadcast to all nodes
+- State verification catches inconsistencies
+
+## Architecture
+
+### Key Components
+
+### Module Structure
+
+| Module | Purpose | C Equivalent |
+|--------|---------|--------------|
+| `state_machine.rs` | Core DFSM logic, state transitions | `dfsm.c` |
+| `cluster_database_service.rs` | MemDb sync service | `dcdb.c`, `loop.c:service_dcdb` |
+| `status_sync_service.rs` | Status/kvstore sync service | `loop.c:service_status` |
+| `cpg_service.rs` | Corosync CPG integration | `dfsm.c:cpg_callbacks` |
+| `dfsm_message.rs` | Protocol message types | `dfsm.c:dfsm_message_*_header_t` |
+| `message.rs` | Message trait and serialization | (inline in C) |
+| `wire_format.rs` | C-compatible wire format | `dcdb.c:c_fuse_message_header_t` |
+| `broadcast.rs` | Cluster-wide message broadcast | `dcdb.c:dcdb_send_fuse_message` |
+| `types.rs` | Type definitions (modes, epochs) | `dfsm.c:dfsm_mode_t` |
+
+## C to Rust Mapping
+
+### Data Structures
+
+| C Type | Rust Type | Notes |
+|--------|-----------|-------|
+| `dfsm_t` | `Dfsm` | Main state machine |
+| `dfsm_mode_t` | `DfsmMode` | Enum with type safety |
+| `dfsm_node_info_t` | (internal) | Node state tracking |
+| `dfsm_sync_info_t` | (internal) | Sync session info |
+| `dfsm_callbacks_t` | Trait-based callbacks | Type-safe callbacks via traits |
+| `dfsm_message_*_header_t` | `DfsmMessage` | Type-safe enum variants |
+
+### Functions
+
+#### Core DFSM Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dfsm_new()` | `Dfsm::new()` | state_machine.rs |
+| `dfsm_initialize()` | `Dfsm::init_cpg()` | state_machine.rs |
+| `dfsm_join()` | (part of init_cpg) | state_machine.rs |
+| `dfsm_dispatch()` | `Dfsm::dispatch_events()` | state_machine.rs |
+| `dfsm_send_message()` | `Dfsm::send_message()` | state_machine.rs |
+| `dfsm_send_update()` | `Dfsm::send_update()` | state_machine.rs |
+| `dfsm_verify_request()` | `Dfsm::verify_request()` | state_machine.rs |
+| `dfsm_finalize()` | `Dfsm::stop_services()` | state_machine.rs |
+
+#### DCDB (Cluster Database) Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dcdb_new()` | `ClusterDatabaseService::new()` | cluster_database_service.rs |
+| `dcdb_send_fuse_message()` | `broadcast()` | broadcast.rs |
+| `dcdb_send_unlock()` | `FuseMessage::Unlock` + broadcast | broadcast.rs |
+| `service_dcdb()` | `ClusterDatabaseService` | cluster_database_service.rs |
+
+#### Status Sync Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `service_status()` | `StatusSyncService` | status_sync_service.rs |
+| (kvstore CPG group) | `StatusSyncService` | Uses separate CPG group |
+
+### Callback System
+
+**C Implementation:**
+
+**Rust Implementation:**
+- Uses trait-based callbacks instead of function pointers
+- Callbacks are implemented by `MemDbCallbacks` (memdb integration)
+- Defined in external crates (pmxcfs-memdb)
+
+## Synchronization Protocol
+
+The DFSM ensures all nodes maintain consistent database state through a multi-phase synchronization protocol:
+
+### Protocol Phases
+
+#### Phase 1: Membership Change
+
+When nodes join or leave the cluster:
+
+1. **Corosync CPG** delivers membership change notification
+2. **DFSM invalidates** cached checksums
+3. **Message queues** are cleared
+4. **Epoch counter** is incremented
+
+**CPG Leader** (lowest node ID):
+- Initiates sync by sending `SyncStart` message
+- Sends its own `State` (CPG doesn't loop back messages)
+
+**All Followers**:
+- Respond to `SyncStart` by sending their `State`
+- Wait for other nodes' states
+
+#### Phase 2: State Exchange
+
+Each node collects `State` messages containing serialized **MemDbIndex** (compact state summary using C-compatible wire format).
+
+State digests are computed using SHA-256 hashing to detect differences between nodes.
+
+#### Phase 3: Leader Election
+
+When all states are collected, `process_state_update()` is called:
+
+1. **Parse indices** from all node states
+2. **Elect data leader** (may differ from CPG leader):
+ - Highest `version` wins
+ - If tied, highest `mtime` wins
+3. **Identify synced nodes**: Nodes whose index matches leader exactly
+4. **Determine own status**:
+ - If we're the data leader → send updates to followers
+ - If we're synced with leader → mark as Synced
+ - Otherwise → enter Update mode and wait
+
+**Leader Election Algorithm**:
+
+#### Phase 4: Incremental Updates
+
+**Data Leader** (node with highest version):
+
+1. **Compare indices** using `find_differences()` for each follower
+2. **Serialize differing entries** to C-compatible TreeEntry format
+3. **Send Update messages** via CPG
+4. **Send UpdateComplete** when all updates sent
+
+**Followers** (out-of-sync nodes):
+
+1. **Receive Update messages**
+2. **Deserialize TreeEntry** via `TreeEntry::deserialize_from_update()`
+3. **Apply to database** via `MemDb::apply_tree_entry()`:
+ - INSERT OR REPLACE in SQLite
+ - Update in-memory structures
+ - Handle entry moves (parent/name changes)
+4. **On UpdateComplete**: Transition to Synced mode
+
+#### Phase 5: Normal Operations
+
+When in **Synced** mode:
+
+- FUSE operations are broadcast via `send_fuse_message()`
+- Messages are delivered immediately via `deliver_message()`
+- Leader periodically sends `VerifyRequest` for checksum comparison
+- Nodes respond with `Verify` containing SHA-256 of entire database
+- Mismatches trigger cluster resync
+
+---
+
+## Protocol Details
+
+### State Machine Transitions
+
+Based on analysis of C implementation (`dfsm.c` lines 795-1209):
+
+#### Critical Protocol Rules
+
+1. **Epoch Management**:
+ - Each node creates local epoch during confchg: `(counter++, time, own_nodeid, own_pid)`
+ - **Leader sends SYNC_START with its epoch**
+ - **Followers MUST adopt leader's epoch from SYNC_START** (`dfsm->sync_epoch = header->epoch`)
+ - All STATE messages in sync round use adopted epoch
+ - Epoch mismatch → message discarded (may lead to LEAVE)
+
+2. **Member List Validation**:
+ - Built from `member_list` in confchg callback
+ - Stored in `dfsm->sync_info->nodes[]`
+ - STATE sender MUST be in this list
+ - Non-member STATE → immediate LEAVE
+
+3. **Duplicate Detection**:
+ - Each node sends STATE exactly once per sync round
+ - Tracked via `ni->state` pointer (NULL = not received, non-NULL = received)
+ - Duplicate STATE from same nodeid/pid → immediate LEAVE
+ - **Root cause of current Rust/C sync failure**
+
+4. **Message Ordering** (one sync round):
+
+5. **Leader Selection**:
+ - Determined by `lowest_nodeid` from member list
+ - Set in confchg callback before any messages sent
+ - Used to validate SYNC_START sender (logged but not enforced)
+ - Re-elected during state processing based on DB versions
+
+### DFSM States (DfsmMode)
+
+| State | Value | Description | C Equivalent |
+|-------|-------|-------------|--------------|
+| `Start` | 0 | Initial connection | `DFSM_MODE_START` |
+| `StartSync` | 1 | Beginning sync | `DFSM_MODE_START_SYNC` |
+| `Synced` | 2 | Fully synchronized | `DFSM_MODE_SYNCED` |
+| `Update` | 3 | Receiving updates | `DFSM_MODE_UPDATE` |
+| `Leave` | 253 | Leaving group | `DFSM_MODE_LEAVE` |
+| `VersionError` | 254 | Protocol mismatch | `DFSM_MODE_VERSION_ERROR` |
+| `Error` | 255 | Error state | `DFSM_MODE_ERROR` |
+
+### Message Types (DfsmMessageType)
+
+| Type | Value | Purpose |
+|------|-------|---------|
+| `Normal` | 0 | Application messages (with header + payload) |
+| `SyncStart` | 1 | Start sync (from leader) |
+| `State` | 2 | Full state data |
+| `Update` | 3 | Incremental update |
+| `UpdateComplete` | 4 | End of updates |
+| `VerifyRequest` | 5 | Request state verification |
+| `Verify` | 6 | State checksum response |
+
+All messages use C-compatible wire format with headers and payloads.
+
+### Application Message Types
+
+The DFSM can carry two types of application messages:
+
+1. **Fuse Messages** (Filesystem operations)
+ - CPG Group: `pmxcfs_v1` (DCDB)
+ - Message types: `Write`, `Create`, `Delete`, `Mkdir`, `Rename`, `SetMtime`, `Unlock`
+ - Defined in: `pmxcfs-api-types::FuseMessage`
+
+2. **KvStore Messages** (Status/RRD sync)
+ - CPG Group: `pve_kvstore_v1`
+ - Message types: `Data` (key-value pairs for status sync)
+ - Defined in: `pmxcfs-api-types::KvStoreMessage`
+
+### Wire Format Compatibility
+
+All wire formats are **byte-compatible** with the C implementation. Messages include appropriate headers and payloads as defined in the C protocol.
+
+## Synchronization Flow
+
+### 1. Node Join
+
+### 2. Normal Operation
+
+### 3. State Verification (Periodic)
+
+## Key Differences from C Implementation
+
+### Event Loop Architecture
+
+**C Version:**
+- Uses libqb's `qb_loop` for event loop
+- CPG fd registered with `qb_loop_poll_add()`
+- Dispatch called from qb_loop when fd is readable
+
+**Rust Version:**
+- Uses tokio async runtime
+- Service trait provides `dispatch()` method
+- ServiceManager polls fd using tokio's async I/O
+- No qb_loop dependency
+
+### CPG Instance Management
+
+**C Version:**
+- Single DFSM struct with callbacks
+- Two different CPG groups created separately
+
+**Rust Version:**
+- Each CPG group gets its own `Dfsm` instance
+- `ClusterDatabaseService` - manages `pmxcfs_v1` CPG group (MemDb)
+- `StatusSyncService` - manages `pve_kvstore_v1` CPG group (Status/RRD)
+- Both use same DFSM protocol but different callbacks
+
+## Error Handling
+
+### Split-Brain Prevention
+
+- Checksum verification detects divergence
+- Automatic resync on mismatch
+- Version monotonicity ensures forward progress
+
+### Network Partition Recovery
+
+- Membership changes trigger sync
+- Highest version always wins
+- Stale data is safely replaced
+
+### Consistency Guarantees
+
+- SQLite transactions ensure atomic updates
+- In-memory structures updated atomically
+- Version increments are monotonic
+- All nodes converge to same state
+
+## Compatibility Matrix
+
+| Feature | C Version | Rust Version | Compatible |
+|---------|-----------|--------------|------------|
+| Wire format | `dfsm_message_*_header_t` | `DfsmMessage::serialize()` | Yes |
+| CPG protocol | libcorosync | rust-corosync | Yes |
+| Message types | 0-6 | `DfsmMessageType` | Yes |
+| State machine | `dfsm_mode_t` | `DfsmMode` | Yes |
+| Protocol version | 1 | 1 | Yes |
+| Group names | `pmxcfs_v1`, `pve_kvstore_v1` | Same | Yes |
+
+## Known Issues / TODOs
+
+### Missing Features
+- [ ] **Sync message batching**: C version can batch updates, Rust sends individually
+- [ ] **Message queue limits**: C has MAX_QUEUE_LEN, Rust unbounded (potential memory issue)
+- [ ] **Detailed error codes**: C returns specific CS_ERR_* codes, Rust uses anyhow errors
+
+### Behavioral Differences (Benign)
+- **Logging**: Rust uses `tracing` instead of `qb_log` (compatible with journald)
+- **Threading**: Rust uses tokio tasks, C uses qb_loop single-threaded model
+- **Timers**: Rust uses tokio timers, C uses qb_loop timers (same timeout values)
+
+### Incompatibilities (None Known)
+No incompatibilities have been identified. The Rust implementation is fully wire-compatible and can operate in a mixed C/Rust cluster.
+
+## References
+
+### C Implementation
+- `src/pmxcfs/dfsm.c` / `dfsm.h` - Core DFSM implementation
+- `src/pmxcfs/dcdb.c` / `dcdb.h` - Distributed database coordination
+- `src/pmxcfs/loop.c` / `loop.h` - Service loop and management
+
+### Related Crates
+- **pmxcfs-memdb**: Database callbacks for DFSM
+- **pmxcfs-status**: Status tracking and kvstore
+- **pmxcfs-api-types**: Message type definitions
+- **pmxcfs-services**: Service framework for lifecycle management
+- **rust-corosync**: CPG bindings (external dependency)
+
+### Corosync Documentation
+- CPG (Closed Process Group) API: https://github.com/corosync/corosync
+- Group communication semantics: Total order, virtual synchrony
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
new file mode 100644
index 00000000..7e35b8d4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
@@ -0,0 +1,52 @@
+/// DFSM application callbacks
+///
+/// This module defines the callback trait that application layers implement
+/// to integrate with the DFSM state machine.
+use crate::NodeSyncInfo;
+
+/// Callback trait for DFSM operations
+///
+/// The application layer implements this to receive DFSM events.
+/// The generic parameter `M` specifies the message type this callback handles:
+/// - `Callbacks<FuseMessage>` for main database operations
+/// - `Callbacks<KvStoreMessage>` for status synchronization
+///
+/// This provides type safety by ensuring each DFSM instance only delivers
+/// the correct message type to its callbacks.
+pub trait Callbacks<M>: Send + Sync {
+ /// Deliver an application message
+ ///
+ /// The message type is determined by the generic parameter:
+ /// - FuseMessage for main database operations
+ /// - KvStoreMessage for status synchronization
+ fn deliver_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ message: M,
+ timestamp: u64,
+ ) -> anyhow::Result<(i32, bool)>;
+
+ /// Compute state checksum for verification
+ fn compute_checksum(&self, output: &mut [u8; 32]) -> anyhow::Result<()>;
+
+ /// Get current state for synchronization
+ ///
+ /// Called when we need to send our state to other nodes during sync.
+ fn get_state(&self) -> anyhow::Result<Vec<u8>>;
+
+ /// Process state update during synchronization
+ fn process_state_update(&self, states: &[NodeSyncInfo]) -> anyhow::Result<bool>;
+
+ /// Process incremental update from leader
+ ///
+ /// The leader sends individual TreeEntry updates during synchronization.
+ /// The data is serialized TreeEntry in C-compatible wire format.
+ fn process_update(&self, nodeid: u32, pid: u32, data: &[u8]) -> anyhow::Result<()>;
+
+ /// Commit synchronized state
+ fn commit_state(&self) -> anyhow::Result<()>;
+
+ /// Called when cluster becomes synced
+ fn on_synced(&self);
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
new file mode 100644
index 00000000..dc85a392
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
@@ -0,0 +1,116 @@
+//! Cluster Database Service
+//!
+//! This service synchronizes the distributed cluster database (pmxcfs-memdb) across
+//! all cluster nodes using DFSM (Distributed Finite State Machine).
+//!
+//! Equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+//! Provides automatic retry, event-driven CPG dispatching, and periodic state verification.
+
+use async_trait::async_trait;
+use pmxcfs_services::{DispatchAction, InitResult, Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{debug, error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message as MessageTrait;
+
+/// Cluster Database Service
+///
+/// Synchronizes the distributed cluster database (pmxcfs-memdb) across all nodes.
+/// Implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for database replication
+/// - Periodic state verification via timer callback
+///
+/// This is equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct ClusterDatabaseService<M> {
+ dfsm: Arc<Dfsm<M>>,
+ fd: Option<i32>,
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> ClusterDatabaseService<M> {
+ /// Create a new cluster database service
+ pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+ Self { dfsm, fd: None }
+ }
+}
+
+#[async_trait]
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Service for ClusterDatabaseService<M> {
+ fn name(&self) -> &str {
+ "cluster-database"
+ }
+
+ async fn initialize(&mut self) -> pmxcfs_services::Result<InitResult> {
+ info!("Initializing cluster database service (dcdb)");
+
+ // Initialize CPG connection (this also joins the group)
+ self.dfsm.init_cpg().map_err(|e| {
+ ServiceError::InitializationFailed(format!("DFSM CPG initialization failed: {e}"))
+ })?;
+
+ // Get file descriptor for event monitoring
+ let fd = self.dfsm.fd_get().map_err(|e| {
+ self.dfsm.stop_services().ok();
+ ServiceError::InitializationFailed(format!("Failed to get DFSM fd: {e}"))
+ })?;
+
+ self.fd = Some(fd);
+
+ info!(
+ "Cluster database service initialized successfully with fd {}",
+ fd
+ );
+ Ok(InitResult::WithFileDescriptor(fd))
+ }
+
+ async fn dispatch(&mut self) -> pmxcfs_services::Result<DispatchAction> {
+ match self.dfsm.dispatch_events() {
+ Ok(_) => Ok(DispatchAction::Continue),
+ Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+ warn!("DFSM connection lost, requesting reinitialization");
+ Ok(DispatchAction::Reinitialize)
+ }
+ Err(e) => {
+ error!("DFSM dispatch failed: {}", e);
+ Err(ServiceError::DispatchFailed(format!(
+ "DFSM dispatch failed: {e}"
+ )))
+ }
+ }
+ }
+
+ async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+ info!("Finalizing cluster database service");
+
+ self.fd = None;
+
+ if let Err(e) = self.dfsm.stop_services() {
+ warn!("Error stopping cluster database services: {}", e);
+ }
+
+ info!("Cluster database service finalized");
+ Ok(())
+ }
+
+ async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+ debug!("Cluster database timer callback: initiating state verification");
+
+ // Request state verification
+ if let Err(e) = self.dfsm.verify_request() {
+ warn!("DFSM state verification request failed: {}", e);
+ }
+
+ Ok(())
+ }
+
+ fn timer_period(&self) -> Option<Duration> {
+ // Match C implementation's DCDB_VERIFY_TIME (60 * 60 seconds)
+ // Periodic state verification happens once per hour
+ Some(Duration::from_secs(3600))
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
new file mode 100644
index 00000000..d7964259
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
@@ -0,0 +1,163 @@
+//! Safe, idiomatic wrapper for Corosync CPG (Closed Process Group)
+//!
+//! This module provides a trait-based abstraction over the Corosync CPG C API,
+//! handling the unsafe FFI boundary and callback lifecycle management internally.
+
+use anyhow::Result;
+use rust_corosync::{NodeId, cpg};
+use std::sync::Arc;
+
+/// Helper to extract CpgHandler from CPG context
+///
+/// # Safety
+/// Assumes context was set to a valid *const Arc<dyn CpgHandler> pointer
+unsafe fn handler_from_context(handle: cpg::Handle) -> &'static dyn CpgHandler {
+ let context = cpg::context_get(handle).expect("BUG: Failed to get CPG context");
+
+ assert_ne!(
+ context, 0,
+ "BUG: CPG context is null - CpgService not properly initialized"
+ );
+
+ // Context points to a leaked Arc<dyn CpgHandler>
+ // We borrow the Arc to get a reference to the handler
+ let arc_ptr = context as *const Arc<dyn CpgHandler>;
+ let arc_ref: &Arc<dyn CpgHandler> = unsafe { &*arc_ptr };
+ arc_ref.as_ref()
+}
+
+/// Trait for handling CPG events in a safe, idiomatic way
+///
+/// Implementors receive callbacks when CPG events occur. The trait handles
+/// all unsafe pointer conversion and context management internally.
+pub trait CpgHandler: Send + Sync + 'static {
+ fn on_deliver(&self, group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]);
+
+ fn on_confchg(
+ &self,
+ group_name: &str,
+ member_list: &[cpg::Address],
+ left_list: &[cpg::Address],
+ joined_list: &[cpg::Address],
+ );
+}
+
+/// Safe wrapper for CPG handle that manages callback lifecycle
+///
+/// This service registers callbacks with the CPG handle and ensures proper
+/// cleanup when dropped. It uses Arc reference counting to safely manage
+/// the handler lifetime across the FFI boundary.
+pub struct CpgService {
+ handle: cpg::Handle,
+ handler: Arc<dyn CpgHandler>,
+}
+
+impl CpgService {
+ pub fn new<T: CpgHandler>(handler: Arc<T>) -> Result<Self> {
+ fn cpg_deliver_callback(
+ handle: &cpg::Handle,
+ group_name: String,
+ nodeid: NodeId,
+ pid: u32,
+ msg: &[u8],
+ _msg_len: usize,
+ ) {
+ unsafe {
+ let handler = handler_from_context(*handle);
+ handler.on_deliver(&group_name, nodeid, pid, msg);
+ }
+ }
+
+ fn cpg_confchg_callback(
+ handle: &cpg::Handle,
+ group_name: &str,
+ member_list: Vec<cpg::Address>,
+ left_list: Vec<cpg::Address>,
+ joined_list: Vec<cpg::Address>,
+ ) {
+ unsafe {
+ let handler = handler_from_context(*handle);
+ handler.on_confchg(group_name, &member_list, &left_list, &joined_list);
+ }
+ }
+
+ let model_data = cpg::ModelData::ModelV1(cpg::Model1Data {
+ flags: cpg::Model1Flags::None,
+ deliver_fn: Some(cpg_deliver_callback),
+ confchg_fn: Some(cpg_confchg_callback),
+ totem_confchg_fn: None,
+ });
+
+ let handle = cpg::initialize(&model_data, 0)?;
+
+ let handler_dyn: Arc<dyn CpgHandler> = handler;
+ let leaked_arc = Box::new(Arc::clone(&handler_dyn));
+ let arc_ptr = Box::into_raw(leaked_arc) as *const _ as u64;
+ cpg::context_set(handle, arc_ptr)?;
+
+ Ok(Self {
+ handle,
+ handler: handler_dyn,
+ })
+ }
+
+ pub fn join(&self, group_name: &str) -> Result<()> {
+ // IMPORTANT: C implementation uses strlen(name) + 1 for CPG name length,
+ // which includes the trailing nul. To ensure compatibility with C nodes,
+ // we must add \0 to the group name.
+ // See src/pmxcfs/dfsm.c: dfsm->cpg_group_name.length = strlen(group_name) + 1;
+ let group_string = format!("{}\0", group_name);
+ tracing::warn!(
+ "CPG JOIN: Joining group '{}' (verify matches C's DCDB_CPG_GROUP_NAME='pve_dcdb_v1')",
+ group_name
+ );
+ cpg::join(self.handle, &group_string)?;
+ tracing::info!("CPG JOIN: Successfully joined group '{}'", group_name);
+ Ok(())
+ }
+
+ pub fn leave(&self, group_name: &str) -> Result<()> {
+ // Include trailing nul to match C's behavior (see join() comment)
+ let group_string = format!("{}\0", group_name);
+ cpg::leave(self.handle, &group_string)?;
+ Ok(())
+ }
+
+ pub fn mcast(&self, guarantee: cpg::Guarantee, msg: &[u8]) -> Result<()> {
+ cpg::mcast_joined(self.handle, guarantee, msg)?;
+ Ok(())
+ }
+
+ pub fn dispatch(&self) -> Result<(), rust_corosync::CsError> {
+ cpg::dispatch(self.handle, rust_corosync::DispatchFlags::All)
+ }
+
+ pub fn fd(&self) -> Result<i32> {
+ Ok(cpg::fd_get(self.handle)?)
+ }
+
+ pub fn handler(&self) -> &Arc<dyn CpgHandler> {
+ &self.handler
+ }
+
+ pub fn handle(&self) -> cpg::Handle {
+ self.handle
+ }
+}
+
+impl Drop for CpgService {
+ fn drop(&mut self) {
+ if let Ok(context) = cpg::context_get(self.handle)
+ && context != 0
+ {
+ unsafe {
+ let _boxed = Box::from_raw(context as *mut Arc<dyn CpgHandler>);
+ }
+ }
+
+ let _ = cpg::finalize(self.handle);
+ }
+}
+
+unsafe impl Send for CpgService {}
+unsafe impl Sync for CpgService {}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
new file mode 100644
index 00000000..054f06b8
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
@@ -0,0 +1,728 @@
+/// DFSM Protocol Message Types
+///
+/// This module defines the DfsmMessage enum which encapsulates all DFSM protocol messages
+/// with their associated data, providing type-safe serialization and deserialization.
+///
+/// Wire format matches C implementation's dfsm_message_*_header_t structures for compatibility.
+use anyhow::Result;
+use pmxcfs_memdb::TreeEntry;
+
+use super::message::Message as MessageTrait;
+use super::types::{DfsmMessageType, SyncEpoch};
+
+/// DFSM protocol message with typed variants
+///
+/// Each variant corresponds to a message type in the DFSM protocol and carries
+/// the appropriate payload data. The wire format matches the C implementation:
+///
+/// For Normal messages: dfsm_message_normal_header_t (24 bytes) + fuse_data
+/// ```text
+/// [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][fuse_data...]
+/// ```
+///
+/// The generic parameter `M` specifies the application message type and must implement
+/// the `Message` trait for serialization/deserialization:
+/// - `DfsmMessage<FuseMessage>` for database operations
+/// - `DfsmMessage<KvStoreMessage>` for status synchronization
+#[derive(Debug, Clone)]
+pub enum DfsmMessage<M>
+where
+ M: MessageTrait,
+{
+ /// Regular application message
+ ///
+ /// Contains a typed application message (FuseMessage or KvStoreMessage).
+ /// C wire format: dfsm_message_normal_header_t + application_message data
+ Normal {
+ msg_count: u64,
+ timestamp: u32, // Unix timestamp (matches C's u32)
+ protocol_version: u32, // Protocol version
+ message: M, // Typed message (FuseMessage or KvStoreMessage)
+ },
+
+ /// Start synchronization signal from leader (no payload)
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+ SyncStart { sync_epoch: SyncEpoch },
+
+ /// State data from another node during sync
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [state_data: raw bytes]
+ State {
+ sync_epoch: SyncEpoch,
+ data: Vec<u8>,
+ },
+
+ /// State update from leader
+ ///
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + TreeEntry fields
+ /// This is sent by the leader during synchronization to update followers
+ /// with individual database entries that differ from their state.
+ Update {
+ sync_epoch: SyncEpoch,
+ tree_entry: TreeEntry,
+ },
+
+ /// Update complete signal from leader (no payload)
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+ UpdateComplete { sync_epoch: SyncEpoch },
+
+ /// Verification request from leader
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64]
+ VerifyRequest { sync_epoch: SyncEpoch, csum_id: u64 },
+
+ /// Verification response with checksum
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64][checksum: [u8; 32]]
+ Verify {
+ sync_epoch: SyncEpoch,
+ csum_id: u64,
+ checksum: [u8; 32],
+ },
+}
+
+impl<M> DfsmMessage<M>
+where
+ M: MessageTrait,
+{
+ /// Protocol version (should match cluster-wide)
+ pub const DEFAULT_PROTOCOL_VERSION: u32 = 1;
+
+ /// Get the message type discriminant
+ pub fn message_type(&self) -> DfsmMessageType {
+ match self {
+ DfsmMessage::Normal { .. } => DfsmMessageType::Normal,
+ DfsmMessage::SyncStart { .. } => DfsmMessageType::SyncStart,
+ DfsmMessage::State { .. } => DfsmMessageType::State,
+ DfsmMessage::Update { .. } => DfsmMessageType::Update,
+ DfsmMessage::UpdateComplete { .. } => DfsmMessageType::UpdateComplete,
+ DfsmMessage::VerifyRequest { .. } => DfsmMessageType::VerifyRequest,
+ DfsmMessage::Verify { .. } => DfsmMessageType::Verify,
+ }
+ }
+
+ /// Serialize message to C-compatible wire format
+ ///
+ /// For Normal/Update: dfsm_message_normal_header_t (24 bytes) + application_data
+ /// Format: [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][data...]
+ pub fn serialize(&self) -> Vec<u8> {
+ match self {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ } => self.serialize_normal_message(*msg_count, *timestamp, *protocol_version, message),
+ _ => self.serialize_state_message(),
+ }
+ }
+
+ /// Serialize a Normal message with C-compatible header
+ fn serialize_normal_message(
+ &self,
+ msg_count: u64,
+ timestamp: u32,
+ protocol_version: u32,
+ message: &M,
+ ) -> Vec<u8> {
+ let msg_type = self.message_type() as u16;
+ let subtype = message.message_type();
+ let app_data = message.serialize();
+
+ // C header: type (u16) + subtype (u16) + protocol (u32) + time (u32) + reserved (u32) + count (u64) = 24 bytes
+ let mut message = Vec::with_capacity(24 + app_data.len());
+
+ // dfsm_message_header_t fields
+ message.extend_from_slice(&msg_type.to_le_bytes());
+ message.extend_from_slice(&subtype.to_le_bytes());
+ message.extend_from_slice(&protocol_version.to_le_bytes());
+ message.extend_from_slice(×tamp.to_le_bytes());
+ message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // count field
+ message.extend_from_slice(&msg_count.to_le_bytes());
+
+ // application message data
+ message.extend_from_slice(&app_data);
+
+ message
+ }
+
+ /// Serialize state messages (non-Normal) with C-compatible header
+ /// C wire format: dfsm_message_state_header_t (32 bytes) + payload
+ /// Header breakdown: base (16 bytes) + epoch (16 bytes)
+ fn serialize_state_message(&self) -> Vec<u8> {
+ let msg_type = self.message_type() as u16;
+ let (sync_epoch, payload) = self.extract_epoch_and_payload();
+
+ // For state messages: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + payload
+ let mut message = Vec::with_capacity(32 + payload.len());
+
+ // Base header (16 bytes): type, subtype, protocol, time, reserved
+ message.extend_from_slice(&msg_type.to_le_bytes());
+ message.extend_from_slice(&0u16.to_le_bytes()); // subtype (unused)
+ message.extend_from_slice(&Self::DEFAULT_PROTOCOL_VERSION.to_le_bytes());
+
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+ message.extend_from_slice(×tamp.to_le_bytes());
+ message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // Epoch header (16 bytes): epoch, time, nodeid, pid
+ message.extend_from_slice(&sync_epoch.serialize());
+
+ // Payload
+ message.extend_from_slice(&payload);
+
+ message
+ }
+
+ /// Extract sync_epoch and payload from state messages
+ fn extract_epoch_and_payload(&self) -> (SyncEpoch, Vec<u8>) {
+ match self {
+ DfsmMessage::Normal { .. } => {
+ unreachable!("Normal messages use serialize_normal_message")
+ }
+ DfsmMessage::SyncStart { sync_epoch } => (*sync_epoch, Vec::new()),
+ DfsmMessage::State { sync_epoch, data } => (*sync_epoch, data.clone()),
+ DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ } => (*sync_epoch, tree_entry.serialize_for_update()),
+ DfsmMessage::UpdateComplete { sync_epoch } => (*sync_epoch, Vec::new()),
+ DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ } => (*sync_epoch, csum_id.to_le_bytes().to_vec()),
+ DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ } => {
+ let mut data = Vec::with_capacity(8 + 32);
+ data.extend_from_slice(&csum_id.to_le_bytes());
+ data.extend_from_slice(checksum);
+ (*sync_epoch, data)
+ }
+ }
+ }
+
+ /// Deserialize message from C-compatible wire format
+ ///
+ /// Normal messages: [base header: 16 bytes][count: u64][app data]
+ /// State messages: [base header: 16 bytes][epoch: 16 bytes][payload]
+ ///
+ /// # Arguments
+ /// * `data` - Raw message bytes from CPG
+ pub fn deserialize(data: &[u8]) -> Result<Self> {
+ if data.len() < 16 {
+ anyhow::bail!(
+ "Message too short: {} bytes (need at least 16 for header)",
+ data.len()
+ );
+ }
+
+ // Parse dfsm_message_header_t (16 bytes)
+ let msg_type = u16::from_le_bytes([data[0], data[1]]);
+ let subtype = u16::from_le_bytes([data[2], data[3]]);
+ let protocol_version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
+ let timestamp = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
+ let _reserved = u32::from_le_bytes([data[12], data[13], data[14], data[15]]);
+
+ let dfsm_type = DfsmMessageType::try_from(msg_type)?;
+
+ // Normal messages have different structure than state messages
+ if dfsm_type == DfsmMessageType::Normal {
+ // Normal: [base: 16][count: 8][app_data: ...]
+ let payload = &data[16..];
+ Self::deserialize_normal_message(subtype, protocol_version, timestamp, payload)
+ } else {
+ // State messages: [base: 16][epoch: 16][payload: ...]
+ if data.len() < 32 {
+ anyhow::bail!(
+ "State message too short: {} bytes (need at least 32 for state header)",
+ data.len()
+ );
+ }
+ let sync_epoch = SyncEpoch::deserialize(&data[16..32])
+ .map_err(|e| anyhow::anyhow!("Failed to deserialize sync epoch: {e}"))?;
+ let payload = &data[32..];
+ Self::deserialize_state_message(dfsm_type, sync_epoch, payload)
+ }
+ }
+
+ /// Deserialize a Normal message
+ fn deserialize_normal_message(
+ subtype: u16,
+ protocol_version: u32,
+ timestamp: u32,
+ payload: &[u8],
+ ) -> Result<Self> {
+ // Normal messages have count field (u64) after base header
+ if payload.len() < 8 {
+ anyhow::bail!("Normal message too short: need count field");
+ }
+ let msg_count = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ let app_data = &payload[8..];
+
+ // Deserialize using the MessageTrait
+ let message = M::deserialize(subtype, app_data)?;
+
+ Ok(DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ })
+ }
+
+ /// Deserialize a state message (with epoch)
+ fn deserialize_state_message(
+ dfsm_type: DfsmMessageType,
+ sync_epoch: SyncEpoch,
+ payload: &[u8],
+ ) -> Result<Self> {
+ match dfsm_type {
+ DfsmMessageType::Normal => {
+ unreachable!("Normal messages use deserialize_normal_message")
+ }
+ DfsmMessageType::Update => {
+ let tree_entry = TreeEntry::deserialize_from_update(payload)?;
+ Ok(DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ })
+ }
+ DfsmMessageType::SyncStart => Ok(DfsmMessage::SyncStart { sync_epoch }),
+ DfsmMessageType::State => Ok(DfsmMessage::State {
+ sync_epoch,
+ data: payload.to_vec(),
+ }),
+ DfsmMessageType::UpdateComplete => Ok(DfsmMessage::UpdateComplete { sync_epoch }),
+ DfsmMessageType::VerifyRequest => {
+ if payload.len() < 8 {
+ anyhow::bail!("VerifyRequest message too short");
+ }
+ let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ Ok(DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ })
+ }
+ DfsmMessageType::Verify => {
+ if payload.len() < 40 {
+ anyhow::bail!("Verify message too short");
+ }
+ let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ let mut checksum = [0u8; 32];
+ checksum.copy_from_slice(&payload[8..40]);
+ Ok(DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ })
+ }
+ }
+ }
+
+ /// Helper to create a Normal message from an application message
+ pub fn from_message(msg_count: u64, message: M, protocol_version: u32) -> Self {
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ }
+ }
+
+ /// Helper to create an Update message from a TreeEntry
+ ///
+ /// Used by the leader during synchronization to send individual database entries
+ /// to nodes that need to catch up. Matches C's dcdb_send_update_inode().
+ pub fn from_tree_entry(tree_entry: TreeEntry, sync_epoch: SyncEpoch) -> Self {
+ DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::FuseMessage;
+
+ #[test]
+ fn test_sync_start_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 1234567890,
+ nodeid: 1,
+ pid: 1000,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ assert!(
+ matches!(deserialized, DfsmMessage::SyncStart { sync_epoch: e } if e == sync_epoch)
+ );
+ }
+
+ #[test]
+ fn test_normal_roundtrip() {
+ let fuse_msg = FuseMessage::Create {
+ path: "/test/file".to_string(),
+ };
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Normal {
+ msg_count: 42,
+ timestamp: 1234567890,
+ protocol_version: DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION,
+ message: fuse_msg.clone(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ } => {
+ assert_eq!(msg_count, 42);
+ assert_eq!(timestamp, 1234567890);
+ assert_eq!(
+ protocol_version,
+ DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION
+ );
+ assert_eq!(message, fuse_msg);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_verify_request_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 2,
+ time: 1234567891,
+ nodeid: 2,
+ pid: 2000,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: 0x123456789ABCDEF0,
+ };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::VerifyRequest {
+ sync_epoch: e,
+ csum_id,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, 0x123456789ABCDEF0);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_verify_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 3,
+ time: 1234567892,
+ nodeid: 3,
+ pid: 3000,
+ };
+ let checksum = [42u8; 32];
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Verify {
+ sync_epoch,
+ csum_id: 0x1122334455667788,
+ checksum,
+ };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Verify {
+ sync_epoch: e,
+ csum_id,
+ checksum: recv_checksum,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, 0x1122334455667788);
+ assert_eq!(recv_checksum, checksum);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_invalid_magic() {
+ let data = vec![0xAA, 0x00, 0x01, 0x02];
+ assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+ }
+
+ #[test]
+ fn test_too_short() {
+ let data = vec![0xFF];
+ assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+ }
+
+ // ===== Edge Case Tests =====
+
+ #[test]
+ fn test_state_message_too_short() {
+ // State messages need at least 32 bytes (16 base + 16 epoch)
+ let mut data = vec![0u8; 31]; // One byte short
+ // Set message type to State (2)
+ data[0..2].copy_from_slice(&2u16.to_le_bytes());
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(result.is_err(), "State message with 31 bytes should fail");
+ assert!(result.unwrap_err().to_string().contains("too short"));
+ }
+
+ #[test]
+ fn test_normal_message_missing_count() {
+ // Normal messages need count field (u64) after 16-byte header
+ let mut data = vec![0u8; 20]; // Header + 4 bytes (not enough for u64 count)
+ // Set message type to Normal (0)
+ data[0..2].copy_from_slice(&0u16.to_le_bytes());
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(
+ result.is_err(),
+ "Normal message without full count field should fail"
+ );
+ }
+
+ #[test]
+ fn test_verify_message_truncated_checksum() {
+ // Verify messages need csum_id (8 bytes) + checksum (32 bytes) = 40 bytes payload
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 123,
+ nodeid: 1,
+ pid: 100,
+ };
+ let mut data = Vec::new();
+
+ // Base header (16 bytes)
+ data.extend_from_slice(&6u16.to_le_bytes()); // Verify message type
+ data.extend_from_slice(&0u16.to_le_bytes()); // subtype
+ data.extend_from_slice(&1u32.to_le_bytes()); // protocol
+ data.extend_from_slice(&123u32.to_le_bytes()); // time
+ data.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // Epoch (16 bytes)
+ data.extend_from_slice(&sync_epoch.serialize());
+
+ // Truncated payload (only 39 bytes instead of 40)
+ data.extend_from_slice(&0x12345678u64.to_le_bytes());
+ data.extend_from_slice(&[0u8; 31]); // Only 31 bytes of checksum
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(
+ result.is_err(),
+ "Verify message with truncated checksum should fail"
+ );
+ }
+
+ #[test]
+ fn test_update_message_with_tree_entry() {
+ use pmxcfs_memdb::TreeEntry;
+
+ // Create a valid tree entry with matching size
+ let data = vec![1, 2, 3, 4, 5];
+ let tree_entry = TreeEntry {
+ inode: 42,
+ parent: 0,
+ version: 1,
+ writer: 0,
+ name: "testfile".to_string(),
+ mtime: 1234567890,
+ size: data.len(), // size must match data.len()
+ entry_type: 8, // DT_REG (regular file)
+ data,
+ };
+
+ let sync_epoch = SyncEpoch {
+ epoch: 5,
+ time: 999,
+ nodeid: 2,
+ pid: 200,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Update {
+ sync_epoch,
+ tree_entry: tree_entry.clone(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Update {
+ sync_epoch: e,
+ tree_entry: recv_entry,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(recv_entry.inode, tree_entry.inode);
+ assert_eq!(recv_entry.name, tree_entry.name);
+ assert_eq!(recv_entry.size, tree_entry.size);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_update_complete_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 10,
+ time: 5555,
+ nodeid: 3,
+ pid: 300,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+
+ let serialized = msg.serialize();
+ assert_eq!(
+ serialized.len(),
+ 32,
+ "UpdateComplete should be exactly 32 bytes (header + epoch)"
+ );
+
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ assert!(
+ matches!(deserialized, DfsmMessage::UpdateComplete { sync_epoch: e } if e == sync_epoch)
+ );
+ }
+
+ #[test]
+ fn test_state_message_with_large_payload() {
+ let sync_epoch = SyncEpoch {
+ epoch: 7,
+ time: 7777,
+ nodeid: 4,
+ pid: 400,
+ };
+ // Create a large payload (1MB)
+ let large_data = vec![0xAB; 1024 * 1024];
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::State {
+ sync_epoch,
+ data: large_data.clone(),
+ };
+
+ let serialized = msg.serialize();
+ // Should be 32 bytes header + 1MB data
+ assert_eq!(serialized.len(), 32 + 1024 * 1024);
+
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::State {
+ sync_epoch: e,
+ data,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(data.len(), large_data.len());
+ assert_eq!(data, large_data);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_message_type_detection() {
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 100,
+ nodeid: 1,
+ pid: 50,
+ };
+
+ let sync_start: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+ assert_eq!(sync_start.message_type(), DfsmMessageType::SyncStart);
+
+ let state: DfsmMessage<FuseMessage> = DfsmMessage::State {
+ sync_epoch,
+ data: vec![1, 2, 3],
+ };
+ assert_eq!(state.message_type(), DfsmMessageType::State);
+
+ let update_complete: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+ assert_eq!(
+ update_complete.message_type(),
+ DfsmMessageType::UpdateComplete
+ );
+ }
+
+ #[test]
+ fn test_from_message_helper() {
+ let fuse_msg = FuseMessage::Mkdir {
+ path: "/new/dir".to_string(),
+ };
+ let msg_count = 123;
+ let protocol_version = DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION;
+
+ let dfsm_msg = DfsmMessage::from_message(msg_count, fuse_msg.clone(), protocol_version);
+
+ match dfsm_msg {
+ DfsmMessage::Normal {
+ msg_count: count,
+ timestamp: _,
+ protocol_version: pv,
+ message,
+ } => {
+ assert_eq!(count, msg_count);
+ assert_eq!(pv, protocol_version);
+ assert_eq!(message, fuse_msg);
+ }
+ _ => panic!("from_message should create Normal variant"),
+ }
+ }
+
+ #[test]
+ fn test_verify_request_with_max_csum_id() {
+ let sync_epoch = SyncEpoch {
+ epoch: 99,
+ time: 9999,
+ nodeid: 5,
+ pid: 500,
+ };
+ let max_csum_id = u64::MAX; // Test with maximum value
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: max_csum_id,
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::VerifyRequest {
+ sync_epoch: e,
+ csum_id,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, max_csum_id);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
new file mode 100644
index 00000000..ee5d28f8
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
@@ -0,0 +1,185 @@
+/// FUSE message types for cluster synchronization
+///
+/// These are the high-level operations that get broadcast through the cluster
+/// via the main database DFSM (pmxcfs_v1 CPG group).
+use anyhow::{Context, Result};
+
+use crate::message::Message;
+use crate::wire_format::{CFuseMessage, CMessageType};
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum FuseMessage {
+ /// Create a regular file
+ Create { path: String },
+ /// Create a directory
+ Mkdir { path: String },
+ /// Write data to a file
+ Write {
+ path: String,
+ offset: u64,
+ data: Vec<u8>,
+ },
+ /// Delete a file or directory
+ Delete { path: String },
+ /// Rename/move a file or directory
+ Rename { from: String, to: String },
+ /// Update modification time
+ Mtime { path: String },
+ /// Request unlock (not yet implemented)
+ UnlockRequest { path: String },
+ /// Unlock (not yet implemented)
+ Unlock { path: String },
+}
+
+impl Message for FuseMessage {
+ fn message_type(&self) -> u16 {
+ match self {
+ FuseMessage::Create { .. } => CMessageType::Create as u16,
+ FuseMessage::Mkdir { .. } => CMessageType::Mkdir as u16,
+ FuseMessage::Write { .. } => CMessageType::Write as u16,
+ FuseMessage::Delete { .. } => CMessageType::Delete as u16,
+ FuseMessage::Rename { .. } => CMessageType::Rename as u16,
+ FuseMessage::Mtime { .. } => CMessageType::Mtime as u16,
+ FuseMessage::UnlockRequest { .. } => CMessageType::UnlockRequest as u16,
+ FuseMessage::Unlock { .. } => CMessageType::Unlock as u16,
+ }
+ }
+
+ fn serialize(&self) -> Vec<u8> {
+ let c_msg = match self {
+ FuseMessage::Create { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Mkdir { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Write { path, offset, data } => CFuseMessage {
+ size: data.len() as u32,
+ offset: *offset as u32,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: data.clone(),
+ },
+ FuseMessage::Delete { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Rename { from, to } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: from.clone(),
+ to: Some(to.clone()),
+ data: Vec::new(),
+ },
+ FuseMessage::Mtime { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::UnlockRequest { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Unlock { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ };
+
+ c_msg.serialize()
+ }
+
+ fn deserialize(message_type: u16, data: &[u8]) -> Result<Self> {
+ let c_msg = CFuseMessage::parse(data).context("Failed to parse C FUSE message")?;
+ let msg_type = CMessageType::try_from(message_type).context("Invalid C message type")?;
+
+ Ok(match msg_type {
+ CMessageType::Create => FuseMessage::Create { path: c_msg.path },
+ CMessageType::Mkdir => FuseMessage::Mkdir { path: c_msg.path },
+ CMessageType::Write => FuseMessage::Write {
+ path: c_msg.path,
+ offset: c_msg.offset as u64,
+ data: c_msg.data,
+ },
+ CMessageType::Delete => FuseMessage::Delete { path: c_msg.path },
+ CMessageType::Rename => FuseMessage::Rename {
+ from: c_msg.path,
+ to: c_msg.to.unwrap_or_default(),
+ },
+ CMessageType::Mtime => FuseMessage::Mtime { path: c_msg.path },
+ CMessageType::UnlockRequest => FuseMessage::UnlockRequest { path: c_msg.path },
+ CMessageType::Unlock => FuseMessage::Unlock { path: c_msg.path },
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_fuse_message_create() {
+ let msg = FuseMessage::Create {
+ path: "/test/file".to_string(),
+ };
+ assert_eq!(msg.message_type(), CMessageType::Create as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_fuse_message_write() {
+ let msg = FuseMessage::Write {
+ path: "/test/file".to_string(),
+ offset: 100,
+ data: vec![1, 2, 3, 4, 5],
+ };
+ assert_eq!(msg.message_type(), CMessageType::Write as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_fuse_message_rename() {
+ let msg = FuseMessage::Rename {
+ from: "/old/path".to_string(),
+ to: "/new/path".to_string(),
+ };
+ assert_eq!(msg.message_type(), CMessageType::Rename as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
new file mode 100644
index 00000000..db49a469
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
@@ -0,0 +1,329 @@
+/// KvStore message types for DFSM status synchronization
+///
+/// This module defines the KvStore message types that are delivered through
+/// the status DFSM state machine (pve_kvstore_v1 CPG group).
+use anyhow::Context;
+
+use crate::message::Message;
+
+/// KvStore message type IDs (matches C's kvstore_message_t enum)
+#[derive(
+ Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive,
+)]
+#[repr(u16)]
+enum KvStoreMessageType {
+ Update = 1, // KVSTORE_MESSAGE_UPDATE
+ UpdateComplete = 2, // KVSTORE_MESSAGE_UPDATE_COMPLETE
+ Log = 3, // KVSTORE_MESSAGE_LOG
+}
+
+/// KvStore message types for ephemeral status synchronization
+///
+/// These messages are used by the kvstore DFSM (pve_kvstore_v1 CPG group)
+/// to synchronize ephemeral data like RRD metrics, node IPs, and cluster logs.
+///
+/// Matches C implementation's KVSTORE_MESSAGE_* types in status.c
+#[derive(Debug, Clone, PartialEq)]
+pub enum KvStoreMessage {
+ /// Update key-value data from a node
+ ///
+ /// Wire format: key (256 bytes, null-terminated) + value (variable length)
+ /// Matches C's KVSTORE_MESSAGE_UPDATE
+ Update { key: String, value: Vec<u8> },
+
+ /// Cluster log entry
+ ///
+ /// Wire format: clog_entry_t struct
+ /// Matches C's KVSTORE_MESSAGE_LOG
+ Log {
+ time: u32,
+ priority: u8,
+ node: String,
+ ident: String,
+ tag: String,
+ message: String,
+ },
+
+ /// Update complete signal (not currently used)
+ ///
+ /// Matches C's KVSTORE_MESSAGE_UPDATE_COMPLETE
+ UpdateComplete,
+}
+
+impl KvStoreMessage {
+ /// Get message type ID (matches C's kvstore_message_t enum)
+ pub fn message_type(&self) -> u16 {
+ let msg_type = match self {
+ KvStoreMessage::Update { .. } => KvStoreMessageType::Update,
+ KvStoreMessage::UpdateComplete => KvStoreMessageType::UpdateComplete,
+ KvStoreMessage::Log { .. } => KvStoreMessageType::Log,
+ };
+ msg_type.into()
+ }
+
+ /// Serialize to C-compatible wire format
+ ///
+ /// Update format: key (256 bytes, null-terminated) + value (variable)
+ /// Log format: clog_entry_t struct
+ pub fn serialize(&self) -> Vec<u8> {
+ match self {
+ KvStoreMessage::Update { key, value } => {
+ // C format: char key[256] + data
+ let mut buf = vec![0u8; 256];
+ let key_bytes = key.as_bytes();
+ let copy_len = key_bytes.len().min(255); // Leave room for null terminator
+ buf[..copy_len].copy_from_slice(&key_bytes[..copy_len]);
+ // buf is already zero-filled, so null terminator is automatic
+
+ buf.extend_from_slice(value);
+ buf
+ }
+ KvStoreMessage::Log {
+ time,
+ priority,
+ node,
+ ident,
+ tag,
+ message,
+ } => {
+ // C format: clog_entry_t
+ // struct clog_entry_t {
+ // uint32_t time;
+ // uint8_t priority;
+ // uint8_t padding[3];
+ // uint32_t node_len, ident_len, tag_len, msg_len;
+ // char data[]; // node + ident + tag + message (all null-terminated)
+ // }
+
+ let node_bytes = node.as_bytes();
+ let ident_bytes = ident.as_bytes();
+ let tag_bytes = tag.as_bytes();
+ let msg_bytes = message.as_bytes();
+
+ let node_len = (node_bytes.len() + 1) as u32; // +1 for null
+ let ident_len = (ident_bytes.len() + 1) as u32;
+ let tag_len = (tag_bytes.len() + 1) as u32;
+ let msg_len = (msg_bytes.len() + 1) as u32;
+
+ let total_len = 4 + 1 + 3 + 16 + node_len + ident_len + tag_len + msg_len;
+ let mut buf = Vec::with_capacity(total_len as usize);
+
+ buf.extend_from_slice(&time.to_le_bytes());
+ buf.push(*priority);
+ buf.extend_from_slice(&[0u8; 3]); // padding
+ buf.extend_from_slice(&node_len.to_le_bytes());
+ buf.extend_from_slice(&ident_len.to_le_bytes());
+ buf.extend_from_slice(&tag_len.to_le_bytes());
+ buf.extend_from_slice(&msg_len.to_le_bytes());
+
+ buf.extend_from_slice(node_bytes);
+ buf.push(0); // null terminator
+ buf.extend_from_slice(ident_bytes);
+ buf.push(0);
+ buf.extend_from_slice(tag_bytes);
+ buf.push(0);
+ buf.extend_from_slice(msg_bytes);
+ buf.push(0);
+
+ buf
+ }
+ KvStoreMessage::UpdateComplete => {
+ // No payload
+ Vec::new()
+ }
+ }
+ }
+
+ /// Deserialize from C-compatible wire format
+ pub fn deserialize(msg_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+ use KvStoreMessageType::*;
+
+ let msg_type = KvStoreMessageType::try_from(msg_type)
+ .map_err(|_| anyhow::anyhow!("Unknown kvstore message type: {msg_type}"))?;
+
+ match msg_type {
+ Update => {
+ if data.len() < 256 {
+ anyhow::bail!("UPDATE message too short: {} < 256", data.len());
+ }
+
+ // Find null terminator in first 256 bytes
+ let key_end = data[..256]
+ .iter()
+ .position(|&b| b == 0)
+ .ok_or_else(|| anyhow::anyhow!("UPDATE key not null-terminated"))?;
+
+ let key = std::str::from_utf8(&data[..key_end])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in UPDATE key: {e}"))?
+ .to_string();
+
+ let value = data[256..].to_vec();
+
+ Ok(KvStoreMessage::Update { key, value })
+ }
+ UpdateComplete => Ok(KvStoreMessage::UpdateComplete),
+ Log => {
+ if data.len() < 20 {
+ // Minimum: 4+1+3+16 = 24 bytes header
+ anyhow::bail!("LOG message too short");
+ }
+
+ let time = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
+ let priority = data[4];
+ // data[5..8] is padding
+
+ let node_len = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
+ let ident_len =
+ u32::from_le_bytes([data[12], data[13], data[14], data[15]]) as usize;
+ let tag_len = u32::from_le_bytes([data[16], data[17], data[18], data[19]]) as usize;
+ let msg_len = u32::from_le_bytes([data[20], data[21], data[22], data[23]]) as usize;
+
+ let expected_len = 24 + node_len + ident_len + tag_len + msg_len;
+ if data.len() != expected_len {
+ anyhow::bail!(
+ "LOG message size mismatch: {} != {}",
+ data.len(),
+ expected_len
+ );
+ }
+
+ let mut offset = 24;
+
+ let node = std::str::from_utf8(&data[offset..offset + node_len - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG node: {e}"))?
+ .to_string();
+ offset += node_len;
+
+ let ident = std::str::from_utf8(&data[offset..offset + ident_len - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG ident: {e}"))?
+ .to_string();
+ offset += ident_len;
+
+ let tag = std::str::from_utf8(&data[offset..offset + tag_len - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG tag: {e}"))?
+ .to_string();
+ offset += tag_len;
+
+ let message = std::str::from_utf8(&data[offset..offset + msg_len - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG message: {e}"))?
+ .to_string();
+
+ Ok(KvStoreMessage::Log {
+ time,
+ priority,
+ node,
+ ident,
+ tag,
+ message,
+ })
+ }
+ }
+ }
+}
+
+impl Message for KvStoreMessage {
+ fn message_type(&self) -> u16 {
+ // Delegate to the existing method
+ KvStoreMessage::message_type(self)
+ }
+
+ fn serialize(&self) -> Vec<u8> {
+ // Delegate to the existing method
+ KvStoreMessage::serialize(self)
+ }
+
+ fn deserialize(message_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+ // Delegate to the existing method
+ KvStoreMessage::deserialize(message_type, data)
+ .context("Failed to deserialize KvStoreMessage")
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_kvstore_message_update_serialization() {
+ let msg = KvStoreMessage::Update {
+ key: "test_key".to_string(),
+ value: vec![1, 2, 3, 4, 5],
+ };
+
+ let serialized = msg.serialize();
+ assert_eq!(serialized.len(), 256 + 5);
+ assert_eq!(&serialized[..8], b"test_key");
+ assert_eq!(serialized[8], 0); // null terminator
+ assert_eq!(&serialized[256..], &[1, 2, 3, 4, 5]);
+
+ let deserialized = KvStoreMessage::deserialize(1, &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_kvstore_message_log_serialization() {
+ let msg = KvStoreMessage::Log {
+ time: 1234567890,
+ priority: 5,
+ node: "node1".to_string(),
+ ident: "pmxcfs".to_string(),
+ tag: "info".to_string(),
+ message: "test message".to_string(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = KvStoreMessage::deserialize(3, &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_kvstore_message_type() {
+ assert_eq!(
+ KvStoreMessage::Update {
+ key: "".into(),
+ value: vec![]
+ }
+ .message_type(),
+ 1
+ );
+ assert_eq!(KvStoreMessage::UpdateComplete.message_type(), 2);
+ assert_eq!(
+ KvStoreMessage::Log {
+ time: 0,
+ priority: 0,
+ node: "".into(),
+ ident: "".into(),
+ tag: "".into(),
+ message: "".into()
+ }
+ .message_type(),
+ 3
+ );
+ }
+
+ #[test]
+ fn test_kvstore_message_type_roundtrip() {
+ // Test that message_type() and deserialize() are consistent
+ use super::KvStoreMessageType;
+
+ assert_eq!(u16::from(KvStoreMessageType::Update), 1);
+ assert_eq!(u16::from(KvStoreMessageType::UpdateComplete), 2);
+ assert_eq!(u16::from(KvStoreMessageType::Log), 3);
+
+ assert_eq!(
+ KvStoreMessageType::try_from(1).unwrap(),
+ KvStoreMessageType::Update
+ );
+ assert_eq!(
+ KvStoreMessageType::try_from(2).unwrap(),
+ KvStoreMessageType::UpdateComplete
+ );
+ assert_eq!(
+ KvStoreMessageType::try_from(3).unwrap(),
+ KvStoreMessageType::Log
+ );
+
+ assert!(KvStoreMessageType::try_from(0).is_err());
+ assert!(KvStoreMessageType::try_from(4).is_err());
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
new file mode 100644
index 00000000..89240483
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
@@ -0,0 +1,32 @@
+/// Distributed Finite State Machine (DFSM) for cluster state synchronization
+///
+/// This crate implements the state machine for synchronizing configuration
+/// changes across the cluster nodes using Corosync CPG.
+///
+/// The DFSM handles:
+/// - State synchronization between nodes
+/// - Message ordering and queuing
+/// - Leader-based state updates
+/// - Split-brain prevention
+/// - Membership change handling
+mod callbacks;
+pub mod cluster_database_service;
+mod cpg_service;
+mod dfsm_message;
+mod fuse_message;
+mod kv_store_message;
+mod message;
+mod state_machine;
+pub mod status_sync_service;
+mod types;
+mod wire_format;
+
+// Re-export public API
+pub use callbacks::Callbacks;
+pub use cluster_database_service::ClusterDatabaseService;
+pub use cpg_service::{CpgHandler, CpgService};
+pub use fuse_message::FuseMessage;
+pub use kv_store_message::KvStoreMessage;
+pub use state_machine::{Dfsm, DfsmBroadcast};
+pub use status_sync_service::StatusSyncService;
+pub use types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
new file mode 100644
index 00000000..24e6847b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
@@ -0,0 +1,21 @@
+/// High-level message abstraction for DFSM
+///
+/// This module provides a Message trait for working with cluster messages
+/// at a higher abstraction level than raw bytes.
+use anyhow::Result;
+
+/// Trait for messages that can be sent through DFSM
+pub trait Message: Sized {
+ /// Get the message type identifier
+ fn message_type(&self) -> u16;
+
+ /// Serialize the message to bytes (application message payload only)
+ ///
+ /// This serializes only the application-level payload. The DFSM protocol
+ /// headers (msg_count, timestamp, protocol_version, etc.) are added by
+ /// DfsmMessage::serialize() when wrapping in DfsmMessage::Normal.
+ fn serialize(&self) -> Vec<u8>;
+
+ /// Deserialize from bytes given a message type
+ fn deserialize(message_type: u16, data: &[u8]) -> Result<Self>;
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
new file mode 100644
index 00000000..2c90e4ea
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
@@ -0,0 +1,1013 @@
+/// DFSM state machine implementation
+///
+/// This module contains the main Dfsm struct and its implementation
+/// for managing distributed state synchronization.
+use anyhow::{Context, Result};
+use parking_lot::{Mutex, RwLock};
+use pmxcfs_api_types::MemberInfo;
+use rust_corosync::{NodeId, cpg};
+use std::collections::{BTreeMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use super::cpg_service::{CpgHandler, CpgService};
+use super::dfsm_message::DfsmMessage;
+use super::message::Message as MessageTrait;
+use super::types::{DfsmMode, QueuedMessage, SyncEpoch};
+use crate::{Callbacks, FuseMessage, NodeSyncInfo};
+
+/// Extension trait to add broadcast() method to Option<Arc<Dfsm<FuseMessage>>>
+///
+/// This allows calling `.broadcast()` directly on Option<Arc<Dfsm<FuseMessage>>> fields
+/// without explicit None checking at call sites.
+pub trait DfsmBroadcast {
+ fn broadcast(&self, msg: FuseMessage);
+}
+
+impl DfsmBroadcast for Option<Arc<Dfsm<FuseMessage>>> {
+ fn broadcast(&self, msg: FuseMessage) {
+ if let Some(dfsm) = self {
+ let _ = dfsm.broadcast(msg);
+ }
+ }
+}
+
+/// DFSM state machine
+///
+/// The generic parameter `M` specifies the message type this DFSM handles:
+/// - `Dfsm<FuseMessage>` for main database operations
+/// - `Dfsm<KvStoreMessage>` for status synchronization
+pub struct Dfsm<M> {
+ /// CPG service for cluster communication (matching C's dfsm_t->cpg_handle)
+ cpg_service: RwLock<Option<Arc<CpgService>>>,
+
+ /// Cluster group name for CPG
+ cluster_name: String,
+
+ /// Callbacks for application integration
+ callbacks: Arc<dyn Callbacks<M>>,
+
+ /// Current operating mode
+ mode: RwLock<DfsmMode>,
+
+ /// Current sync epoch
+ sync_epoch: RwLock<SyncEpoch>,
+
+ /// Local epoch counter
+ local_epoch_counter: Mutex<u32>,
+
+ /// Node synchronization info
+ sync_nodes: RwLock<Vec<NodeSyncInfo>>,
+
+ /// Message queue (ordered by count)
+ msg_queue: Mutex<BTreeMap<u64, QueuedMessage<M>>>,
+
+ /// Sync queue for messages during update mode
+ sync_queue: Mutex<VecDeque<QueuedMessage<M>>>,
+
+ /// Message counter for ordering (atomic for lock-free increment)
+ msg_counter: AtomicU64,
+
+ /// Lowest node ID in cluster (leader)
+ lowest_nodeid: RwLock<u32>,
+
+ /// Our node ID (set during init_cpg via cpg_local_get)
+ nodeid: AtomicU32,
+
+ /// Our process ID
+ pid: u32,
+
+ /// Protocol version for cluster compatibility
+ protocol_version: u32,
+
+ /// State verification - SHA-256 checksum
+ checksum: Mutex<[u8; 32]>,
+
+ /// Checksum epoch (when it was computed)
+ checksum_epoch: Mutex<SyncEpoch>,
+
+ /// Checksum ID for verification
+ checksum_id: Mutex<u64>,
+
+ /// Checksum counter for verify requests
+ checksum_counter: Mutex<u64>,
+}
+
+impl<M> Dfsm<M>
+where
+ M: MessageTrait,
+{
+ /// Create a new DFSM instance
+ ///
+ /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+ pub fn new(cluster_name: String, callbacks: Arc<dyn Callbacks<M>>) -> Result<Self> {
+ Self::new_with_protocol_version(cluster_name, callbacks, DfsmMessage::<M>::DEFAULT_PROTOCOL_VERSION)
+ }
+
+ /// Create a new DFSM instance with a specific protocol version
+ ///
+ /// This is used when the DFSM needs to use a non-default protocol version,
+ /// such as the status/kvstore DFSM which uses protocol version 0 for
+ /// compatibility with the C implementation.
+ ///
+ /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+ pub fn new_with_protocol_version(
+ cluster_name: String,
+ callbacks: Arc<dyn Callbacks<M>>,
+ protocol_version: u32,
+ ) -> Result<Self> {
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+ let pid = std::process::id();
+
+ Ok(Self {
+ cpg_service: RwLock::new(None),
+ cluster_name,
+ callbacks,
+ mode: RwLock::new(DfsmMode::Start),
+ sync_epoch: RwLock::new(SyncEpoch {
+ epoch: 0,
+ time: now,
+ nodeid: 0,
+ pid,
+ }),
+ local_epoch_counter: Mutex::new(0),
+ sync_nodes: RwLock::new(Vec::new()),
+ msg_queue: Mutex::new(BTreeMap::new()),
+ sync_queue: Mutex::new(VecDeque::new()),
+ msg_counter: AtomicU64::new(0),
+ lowest_nodeid: RwLock::new(0),
+ nodeid: AtomicU32::new(0), // Will be set by init_cpg() using cpg_local_get()
+ pid,
+ protocol_version,
+ checksum: Mutex::new([0u8; 32]),
+ checksum_epoch: Mutex::new(SyncEpoch {
+ epoch: 0,
+ time: 0,
+ nodeid: 0,
+ pid: 0,
+ }),
+ checksum_id: Mutex::new(0),
+ checksum_counter: Mutex::new(0),
+ })
+ }
+
+ pub fn get_mode(&self) -> DfsmMode {
+ *self.mode.read()
+ }
+
+ pub fn set_mode(&self, new_mode: DfsmMode) {
+ let mut mode = self.mode.write();
+ let old_mode = *mode;
+
+ if old_mode.is_error() && !new_mode.is_error() {
+ return;
+ }
+
+ if old_mode == new_mode {
+ return;
+ }
+
+ *mode = new_mode;
+ drop(mode);
+
+ if new_mode.is_error() {
+ tracing::error!("DFSM: {}", new_mode);
+ } else {
+ tracing::info!("DFSM: {}", new_mode);
+ }
+ }
+
+ pub fn is_leader(&self) -> bool {
+ let lowest = *self.lowest_nodeid.read();
+ lowest > 0 && lowest == self.nodeid.load(Ordering::Relaxed)
+ }
+
+ pub fn get_nodeid(&self) -> u32 {
+ self.nodeid.load(Ordering::Relaxed)
+ }
+
+ pub fn get_pid(&self) -> u32 {
+ self.pid
+ }
+
+ /// Check if DFSM is synced and ready
+ pub fn is_synced(&self) -> bool {
+ self.get_mode() == DfsmMode::Synced
+ }
+
+ /// Check if DFSM encountered an error
+ pub fn is_error(&self) -> bool {
+ self.get_mode().is_error()
+ }
+}
+
+impl<M: MessageTrait + Clone> Dfsm<M> {
+ fn send_sync_start(&self) -> Result<()> {
+ tracing::debug!("DFSM: sending SYNC_START message");
+ let sync_epoch = *self.sync_epoch.read();
+ self.send_dfsm_message(&DfsmMessage::<M>::SyncStart { sync_epoch })
+ }
+
+ fn send_state(&self) -> Result<()> {
+ tracing::debug!("DFSM: generating and sending state");
+
+ let state_data = self
+ .callbacks
+ .get_state()
+ .context("Failed to get state from callbacks")?;
+
+ tracing::info!("DFSM: sending state ({} bytes)", state_data.len());
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::State {
+ sync_epoch,
+ data: state_data,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ pub(super) fn send_dfsm_message(&self, message: &DfsmMessage<M>) -> Result<()> {
+ let serialized = message.serialize();
+
+ if let Some(ref service) = *self.cpg_service.read() {
+ service
+ .mcast(cpg::Guarantee::TypeAgreed, &serialized)
+ .context("Failed to broadcast DFSM message")?;
+ Ok(())
+ } else {
+ anyhow::bail!("CPG not initialized")
+ }
+ }
+
+ pub fn process_state(&self, nodeid: u32, pid: u32, state: &[u8]) -> Result<()> {
+ tracing::debug!(
+ "DFSM: processing state from node {}/{} ({} bytes)",
+ nodeid,
+ pid,
+ state.len()
+ );
+
+ let mut sync_nodes = self.sync_nodes.write();
+
+ if let Some(node) = sync_nodes
+ .iter_mut()
+ .find(|n| n.nodeid == nodeid && n.pid == pid)
+ {
+ node.state = Some(state.to_vec());
+ } else {
+ tracing::warn!("DFSM: received state from unknown node {}/{}", nodeid, pid);
+ return Ok(());
+ }
+
+ let all_received = sync_nodes.iter().all(|n| n.state.is_some());
+ drop(sync_nodes);
+
+ if all_received {
+ tracing::info!("DFSM: received all states, processing synchronization");
+ self.process_state_sync()?;
+ }
+
+ Ok(())
+ }
+
+ fn process_state_sync(&self) -> Result<()> {
+ tracing::info!("DFSM: processing state synchronization");
+
+ let sync_nodes = self.sync_nodes.read().clone();
+
+ match self.callbacks.process_state_update(&sync_nodes) {
+ Ok(synced) => {
+ if synced {
+ tracing::info!("DFSM: state synchronization successful");
+
+ let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+ let mut sync_nodes_write = self.sync_nodes.write();
+ if let Some(node) = sync_nodes_write
+ .iter_mut()
+ .find(|n| n.nodeid == my_nodeid && n.pid == self.pid)
+ {
+ node.synced = true;
+ }
+ drop(sync_nodes_write);
+
+ self.set_mode(DfsmMode::Synced);
+ self.callbacks.on_synced();
+ self.deliver_message_queue()?;
+ } else {
+ tracing::info!("DFSM: entering UPDATE mode, waiting for leader");
+ self.set_mode(DfsmMode::Update);
+ self.deliver_message_queue()?;
+ }
+ }
+ Err(e) => {
+ tracing::error!("DFSM: state synchronization failed: {}", e);
+ self.set_mode(DfsmMode::Error);
+ return Err(e);
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn queue_message(&self, nodeid: u32, pid: u32, msg_count: u64, message: M, timestamp: u64)
+ where
+ M: Clone,
+ {
+ tracing::debug!(
+ "DFSM: queueing message {} from {}/{}",
+ msg_count,
+ nodeid,
+ pid
+ );
+
+ let qm = QueuedMessage {
+ nodeid,
+ pid,
+ _msg_count: msg_count,
+ message,
+ timestamp,
+ };
+
+ let mode = self.get_mode();
+
+ let node_synced = self
+ .sync_nodes
+ .read()
+ .iter()
+ .find(|n| n.nodeid == nodeid && n.pid == pid)
+ .map(|n| n.synced)
+ .unwrap_or(false);
+
+ if mode == DfsmMode::Update && node_synced {
+ self.sync_queue.lock().push_back(qm);
+ } else {
+ self.msg_queue.lock().insert(msg_count, qm);
+ }
+ }
+
+ pub(super) fn deliver_message_queue(&self) -> Result<()>
+ where
+ M: Clone,
+ {
+ let mut queue = self.msg_queue.lock();
+ if queue.is_empty() {
+ return Ok(());
+ }
+
+ tracing::info!("DFSM: delivering {} queued messages", queue.len());
+
+ let mode = self.get_mode();
+ let sync_nodes = self.sync_nodes.read().clone();
+
+ let mut to_remove = Vec::new();
+
+ for (count, qm) in queue.iter() {
+ let node_info = sync_nodes
+ .iter()
+ .find(|n| n.nodeid == qm.nodeid && n.pid == qm.pid);
+
+ let Some(info) = node_info else {
+ tracing::debug!(
+ "DFSM: removing message from non-member {}/{}",
+ qm.nodeid,
+ qm.pid
+ );
+ to_remove.push(*count);
+ continue;
+ };
+
+ if mode == DfsmMode::Synced && info.synced {
+ tracing::debug!("DFSM: delivering message {}", count);
+
+ match self.callbacks.deliver_message(
+ qm.nodeid,
+ qm.pid,
+ qm.message.clone(),
+ qm.timestamp,
+ ) {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: message delivered, result={}, processed={}",
+ result,
+ processed
+ );
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver message: {}", e);
+ }
+ }
+
+ to_remove.push(*count);
+ } else if mode == DfsmMode::Update && info.synced {
+ self.sync_queue.lock().push_back(qm.clone());
+ to_remove.push(*count);
+ }
+ }
+
+ for count in to_remove {
+ queue.remove(&count);
+ }
+
+ Ok(())
+ }
+
+ pub(super) fn deliver_sync_queue(&self) -> Result<()> {
+ let mut sync_queue = self.sync_queue.lock();
+ let queue_len = sync_queue.len();
+
+ if queue_len == 0 {
+ return Ok(());
+ }
+
+ tracing::info!("DFSM: delivering {} sync queue messages", queue_len);
+
+ while let Some(qm) = sync_queue.pop_front() {
+ tracing::debug!(
+ "DFSM: delivering sync message from {}/{}",
+ qm.nodeid,
+ qm.pid
+ );
+
+ match self
+ .callbacks
+ .deliver_message(qm.nodeid, qm.pid, qm.message, qm.timestamp)
+ {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: sync message delivered, result={}, processed={}",
+ result,
+ processed
+ );
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver sync message: {}", e);
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Send a message to the cluster
+ ///
+ /// Creates a properly formatted Normal message with C-compatible headers.
+ pub fn send_message(&self, message: M) -> Result<u64> {
+ let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1;
+
+ tracing::debug!("DFSM: sending message {}", msg_count);
+
+ let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version);
+
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(msg_count)
+ }
+
+ /// Send a TreeEntry update to the cluster (leader only, during synchronization)
+ ///
+ /// This is used by the leader to send individual database entries to followers
+ /// that need to catch up. Matches C's dfsm_send_update().
+ pub fn send_update(&self, tree_entry: pmxcfs_memdb::TreeEntry) -> Result<()> {
+ tracing::debug!("DFSM: sending Update for inode {}", tree_entry.inode);
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::from_tree_entry(tree_entry, sync_epoch);
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Send UpdateComplete signal to cluster (leader only, after sending all updates)
+ ///
+ /// Signals to followers that all Update messages have been sent and they can
+ /// now transition to Synced mode. Matches C's dfsm_send_update_complete().
+ pub fn send_update_complete(&self) -> Result<()> {
+ tracing::info!("DFSM: sending UpdateComplete");
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::UpdateComplete { sync_epoch };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Request checksum verification (leader only)
+ /// This should be called periodically by the leader to verify cluster state consistency
+ pub fn verify_request(&self) -> Result<()> {
+ // Only leader should send verify requests
+ if !self.is_leader() {
+ return Ok(());
+ }
+
+ // Only verify when synced
+ if self.get_mode() != DfsmMode::Synced {
+ return Ok(());
+ }
+
+ // Check if we need to wait for previous verification to complete
+ let checksum_counter = *self.checksum_counter.lock();
+ let checksum_id = *self.checksum_id.lock();
+
+ if checksum_counter != checksum_id {
+ tracing::debug!(
+ "DFSM: delaying verify request {:016x}",
+ checksum_counter + 1
+ );
+ return Ok(());
+ }
+
+ // Increment counter and send verify request
+ *self.checksum_counter.lock() = checksum_counter + 1;
+ let new_counter = checksum_counter + 1;
+
+ tracing::debug!("DFSM: sending verify request {:016x}", new_counter);
+
+ // Send VERIFY_REQUEST message with counter
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: new_counter,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Handle verify request from leader
+ pub fn handle_verify_request(&self, message_epoch: SyncEpoch, csum_id: u64) -> Result<()> {
+ tracing::debug!("DFSM: received verify request {:016x}", csum_id);
+
+ // Compute current state checksum
+ let mut checksum = [0u8; 32];
+ self.callbacks.compute_checksum(&mut checksum)?;
+
+ // Save checksum info
+ // Store the epoch FROM THE MESSAGE (matching C: dfsm.c:736)
+ *self.checksum.lock() = checksum;
+ *self.checksum_epoch.lock() = message_epoch;
+ *self.checksum_id.lock() = csum_id;
+
+ // Send the checksum verification response
+ tracing::debug!("DFSM: sending verify response");
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg = DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Handle verify response from a node
+ pub fn handle_verify(
+ &self,
+ message_epoch: SyncEpoch,
+ csum_id: u64,
+ received_checksum: &[u8; 32],
+ ) -> Result<()> {
+ tracing::debug!("DFSM: received verify response");
+
+ let our_checksum_id = *self.checksum_id.lock();
+ let our_checksum_epoch = *self.checksum_epoch.lock();
+
+ // Check if this verification matches our saved checksum
+ // Compare with MESSAGE epoch, not current epoch (matching C: dfsm.c:766-767)
+ if our_checksum_id == csum_id && our_checksum_epoch == message_epoch {
+ let our_checksum = *self.checksum.lock();
+
+ // Compare checksums
+ if our_checksum != *received_checksum {
+ tracing::error!(
+ "DFSM: checksum mismatch! Expected {:016x?}, got {:016x?}",
+ &our_checksum[..8],
+ &received_checksum[..8]
+ );
+ tracing::error!("DFSM: data divergence detected - restarting cluster sync");
+ self.set_mode(DfsmMode::Leave);
+ return Err(anyhow::anyhow!("Checksum verification failed"));
+ } else {
+ tracing::info!("DFSM: data verification successful");
+ }
+ } else {
+ tracing::debug!("DFSM: skipping verification - no checksum saved or epoch mismatch");
+ }
+
+ Ok(())
+ }
+
+ /// Invalidate saved checksum (called on membership changes)
+ pub fn invalidate_checksum(&self) {
+ let counter = *self.checksum_counter.lock();
+ *self.checksum_id.lock() = counter;
+
+ // Reset checksum epoch
+ *self.checksum_epoch.lock() = SyncEpoch {
+ epoch: 0,
+ time: 0,
+ nodeid: 0,
+ pid: 0,
+ };
+
+ tracing::debug!("DFSM: checksum invalidated");
+ }
+}
+
+/// FuseMessage-specific methods
+impl Dfsm<FuseMessage> {
+ /// Broadcast a filesystem operation to the cluster
+ ///
+ /// Checks if the cluster is synced before broadcasting.
+ /// If not synced, the message is silently dropped.
+ pub fn broadcast(&self, msg: FuseMessage) -> Result<()> {
+ if !self.is_synced() {
+ return Ok(());
+ }
+
+ tracing::debug!("Broadcasting {:?}", msg);
+ self.send_message(msg)?;
+ tracing::debug!("Broadcast successful");
+
+ Ok(())
+ }
+}
+
+impl<M: MessageTrait + Clone> Dfsm<M> {
+ /// Handle incoming DFSM message from cluster (called by CpgHandler)
+ fn handle_dfsm_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ message: DfsmMessage<M>,
+ ) -> anyhow::Result<()> {
+ // Validate epoch for state messages (all except Normal and SyncStart)
+ // This matches C implementation's epoch checking in dfsm.c:665-673
+ let should_validate_epoch = !matches!(
+ message,
+ DfsmMessage::Normal { .. } | DfsmMessage::SyncStart { .. }
+ );
+
+ if should_validate_epoch {
+ let current_epoch = *self.sync_epoch.read();
+ let message_epoch = match &message {
+ DfsmMessage::State { sync_epoch, .. }
+ | DfsmMessage::Update { sync_epoch, .. }
+ | DfsmMessage::UpdateComplete { sync_epoch }
+ | DfsmMessage::VerifyRequest { sync_epoch, .. }
+ | DfsmMessage::Verify { sync_epoch, .. } => *sync_epoch,
+ _ => unreachable!(),
+ };
+
+ if message_epoch != current_epoch {
+ tracing::debug!(
+ "DFSM: ignoring message with wrong epoch (expected {:?}, got {:?})",
+ current_epoch,
+ message_epoch
+ );
+ return Ok(());
+ }
+ }
+
+ // Match on typed message variants
+ match message {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version: _,
+ message: app_msg,
+ } => self.handle_normal_message(nodeid, pid, msg_count, timestamp, app_msg),
+ DfsmMessage::SyncStart { sync_epoch } => self.handle_sync_start(nodeid, sync_epoch),
+ DfsmMessage::State {
+ sync_epoch: _,
+ data,
+ } => self.process_state(nodeid, pid, &data),
+ DfsmMessage::Update {
+ sync_epoch: _,
+ tree_entry,
+ } => self.handle_update(nodeid, pid, tree_entry),
+ DfsmMessage::UpdateComplete { sync_epoch: _ } => self.handle_update_complete(),
+ DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ } => self.handle_verify_request(sync_epoch, csum_id),
+ DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ } => self.handle_verify(sync_epoch, csum_id, &checksum),
+ }
+ }
+
+ /// Handle membership change notification (called by CpgHandler)
+ fn handle_membership_change(&self, members: &[MemberInfo]) -> anyhow::Result<()> {
+ tracing::info!(
+ "DFSM: handling membership change ({} members)",
+ members.len()
+ );
+
+ // Invalidate saved checksum
+ self.invalidate_checksum();
+
+ // Update epoch
+ let mut counter = self.local_epoch_counter.lock();
+ *counter += 1;
+
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+
+ let new_epoch = SyncEpoch {
+ epoch: *counter,
+ time: now,
+ nodeid: self.nodeid.load(Ordering::Relaxed),
+ pid: self.pid,
+ };
+
+ *self.sync_epoch.write() = new_epoch;
+ drop(counter);
+
+ // Find lowest node ID (leader)
+ let lowest = members.iter().map(|m| m.node_id).min().unwrap_or(0);
+ *self.lowest_nodeid.write() = lowest;
+
+ // Initialize sync nodes
+ let mut sync_nodes = self.sync_nodes.write();
+ sync_nodes.clear();
+
+ for member in members {
+ sync_nodes.push(NodeSyncInfo {
+ nodeid: member.node_id,
+ pid: member.pid,
+ state: None,
+ synced: false,
+ });
+ }
+ drop(sync_nodes);
+
+ // Clear queues
+ self.sync_queue.lock().clear();
+
+ // Determine next mode
+ if members.len() == 1 {
+ // Single node - already synced
+ tracing::info!("DFSM: single node cluster, marking as synced");
+ self.set_mode(DfsmMode::Synced);
+
+ // Mark ourselves as synced
+ let mut sync_nodes = self.sync_nodes.write();
+ if let Some(node) = sync_nodes.first_mut() {
+ node.synced = true;
+ }
+
+ // Deliver queued messages
+ self.deliver_message_queue()?;
+ } else {
+ // Multi-node - start synchronization
+ tracing::info!("DFSM: multi-node cluster, starting sync");
+ self.set_mode(DfsmMode::StartSync);
+
+ // If we're the leader, initiate sync
+ if self.is_leader() {
+ tracing::info!("DFSM: we are leader, sending sync start");
+ self.send_sync_start()?;
+
+ // Leader also needs to send its own state
+ // (CPG doesn't loop back messages to sender)
+ self.send_state().context("Failed to send leader state")?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Handle normal application message
+ fn handle_normal_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ msg_count: u64,
+ timestamp: u32,
+ message: M,
+ ) -> Result<()> {
+ // C version: deliver immediately if in Synced mode, otherwise queue
+ if self.get_mode() == DfsmMode::Synced {
+ // Deliver immediately - message is already deserialized
+ match self.callbacks.deliver_message(
+ nodeid,
+ pid,
+ message,
+ timestamp as u64, // Convert back to u64 for callback compatibility
+ ) {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: message delivered immediately, result={}, processed={}",
+ result,
+ processed
+ );
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver message: {}", e);
+ }
+ }
+ } else {
+ // Queue for later delivery - store typed message directly
+ self.queue_message(nodeid, pid, msg_count, message, timestamp as u64);
+ }
+ Ok(())
+ }
+
+ /// Handle SyncStart message from leader
+ fn handle_sync_start(&self, nodeid: u32, new_epoch: SyncEpoch) -> Result<()> {
+ tracing::info!(
+ "DFSM: received SyncStart from node {} with epoch {:?}",
+ nodeid,
+ new_epoch
+ );
+
+ // Adopt the new epoch from the leader (critical for sync protocol!)
+ // This matches C implementation which updates dfsm->sync_epoch
+ *self.sync_epoch.write() = new_epoch;
+ tracing::debug!("DFSM: adopted new sync epoch from leader");
+
+ // Send our state back to the cluster
+ // BUT: don't send if we're the leader (we already sent our state in handle_membership_change)
+ let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+ if nodeid != my_nodeid {
+ self.send_state()
+ .context("Failed to send state in response to SyncStart")?;
+ tracing::debug!("DFSM: sent state in response to SyncStart");
+ } else {
+ tracing::debug!("DFSM: skipping state send (we're the leader who already sent state)");
+ }
+
+ Ok(())
+ }
+
+ /// Handle Update message from leader
+ fn handle_update(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ tree_entry: pmxcfs_memdb::TreeEntry,
+ ) -> Result<()> {
+ // Serialize TreeEntry for callback (process_update expects raw bytes for now)
+ let serialized = tree_entry.serialize_for_update();
+ if let Err(e) = self.callbacks.process_update(nodeid, pid, &serialized) {
+ tracing::error!("DFSM: failed to process update: {}", e);
+ }
+ Ok(())
+ }
+
+ /// Handle UpdateComplete message
+ fn handle_update_complete(&self) -> Result<()> {
+ tracing::info!("DFSM: received UpdateComplete from leader");
+ self.deliver_sync_queue()?;
+ self.set_mode(DfsmMode::Synced);
+ self.callbacks.on_synced();
+ Ok(())
+ }
+}
+
+/// Implementation of CpgHandler trait for DFSM
+///
+/// This allows Dfsm to receive CPG callbacks in an idiomatic Rust way,
+/// with all unsafe pointer handling managed by the CpgService.
+impl<M: MessageTrait + Clone + Send + Sync + 'static> CpgHandler for Dfsm<M> {
+ fn on_deliver(&self, _group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]) {
+ tracing::debug!(
+ "DFSM CPG message from node {} (pid {}): {} bytes",
+ u32::from(nodeid),
+ pid,
+ msg.len()
+ );
+
+ // Deserialize DFSM protocol message
+ match DfsmMessage::<M>::deserialize(msg) {
+ Ok(dfsm_msg) => {
+ if let Err(e) = self.handle_dfsm_message(u32::from(nodeid), pid, dfsm_msg) {
+ tracing::error!("Error handling DFSM message: {}", e);
+ }
+ }
+ Err(e) => {
+ tracing::error!("Failed to deserialize DFSM message: {}", e);
+ }
+ }
+ }
+
+ fn on_confchg(
+ &self,
+ _group_name: &str,
+ member_list: &[cpg::Address],
+ _left_list: &[cpg::Address],
+ _joined_list: &[cpg::Address],
+ ) {
+ tracing::info!("DFSM CPG membership change: {} members", member_list.len());
+
+ // Build MemberInfo list from CPG addresses
+ let members: Vec<MemberInfo> = member_list
+ .iter()
+ .map(|addr| MemberInfo {
+ node_id: u32::from(addr.nodeid),
+ pid: addr.pid,
+ joined_at: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs(),
+ })
+ .collect();
+
+ // Notify DFSM of membership change
+ if let Err(e) = self.handle_membership_change(&members) {
+ tracing::error!("Failed to handle membership change: {}", e);
+ }
+ }
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Dfsm<M> {
+ /// Initialize CPG (Closed Process Group) for cluster communication
+ ///
+ /// Uses the idiomatic CpgService wrapper which handles all unsafe FFI
+ /// and callback management internally.
+ pub fn init_cpg(self: &Arc<Self>) -> Result<()> {
+ tracing::info!("DFSM: Initializing CPG");
+
+ // Create CPG service with this Dfsm as the handler
+ // CpgService handles all callback registration and context management
+ let cpg_service = Arc::new(CpgService::new(Arc::clone(self))?);
+
+ // Get our node ID from CPG (matches C's cpg_local_get)
+ // This MUST be done after cpg_initialize but before joining the group
+ let nodeid = cpg::local_get(cpg_service.handle())?;
+ let nodeid_u32 = u32::from(nodeid);
+ self.nodeid.store(nodeid_u32, Ordering::Relaxed);
+ tracing::info!("DFSM: Got node ID {} from CPG", nodeid_u32);
+
+ // Join the CPG group
+ let group_name = &self.cluster_name;
+ cpg_service
+ .join(group_name)
+ .context("Failed to join CPG group")?;
+
+ tracing::info!("DFSM joined CPG group '{}'", group_name);
+
+ // Store the service
+ *self.cpg_service.write() = Some(cpg_service);
+
+ // Dispatch once to get initial membership
+ if let Some(ref service) = *self.cpg_service.read()
+ && let Err(e) = service.dispatch()
+ {
+ tracing::warn!("Failed to dispatch CPG events: {:?}", e);
+ }
+
+ tracing::info!("DFSM CPG initialized successfully");
+ Ok(())
+ }
+
+ /// Dispatch CPG events (should be called periodically from event loop)
+ /// Matching C's service_dfsm_dispatch
+ pub fn dispatch_events(&self) -> Result<(), rust_corosync::CsError> {
+ if let Some(ref service) = *self.cpg_service.read() {
+ service.dispatch()
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Get CPG file descriptor for event monitoring
+ pub fn fd_get(&self) -> Result<i32> {
+ if let Some(ref service) = *self.cpg_service.read() {
+ service.fd()
+ } else {
+ Err(anyhow::anyhow!("CPG service not initialized"))
+ }
+ }
+
+ /// Stop DFSM services (leave CPG group and finalize)
+ pub fn stop_services(&self) -> Result<()> {
+ tracing::info!("DFSM: Stopping services");
+
+ // Leave the CPG group before dropping the service
+ let group_name = self.cluster_name.clone();
+ if let Some(ref service) = *self.cpg_service.read()
+ && let Err(e) = service.leave(&group_name)
+ {
+ tracing::warn!("Error leaving CPG group: {:?}", e);
+ }
+
+ // Drop the service (CpgService::drop handles finalization)
+ *self.cpg_service.write() = None;
+
+ tracing::info!("DFSM services stopped");
+ Ok(())
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
new file mode 100644
index 00000000..877058a4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
@@ -0,0 +1,118 @@
+//! Status Sync Service
+//!
+//! This service synchronizes ephemeral status data across the cluster using a separate
+//! DFSM instance with the "pve_kvstore_v1" CPG group.
+//!
+//! Equivalent to C implementation's service_status (the kvstore DFSM).
+//! Handles synchronization of:
+//! - RRD data (performance metrics from each node)
+//! - Node IP addresses
+//! - Cluster log entries
+//! - Other ephemeral status key-value data
+
+use async_trait::async_trait;
+use pmxcfs_services::{DispatchAction, InitResult, Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message as MessageTrait;
+
+/// Status Sync Service
+///
+/// Synchronizes ephemeral status data across all nodes using a separate DFSM instance.
+/// Uses CPG group "pve_kvstore_v1" (separate from main config database "pmxcfs_v1").
+///
+/// This implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for status replication
+/// - Separation of status data from config data for better performance
+///
+/// This is equivalent to C implementation's service_status (the kvstore DFSM).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct StatusSyncService<M> {
+ dfsm: Arc<Dfsm<M>>,
+ fd: Option<i32>,
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> StatusSyncService<M> {
+ /// Create a new status sync service
+ pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+ Self { dfsm, fd: None }
+ }
+}
+
+#[async_trait]
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Service for StatusSyncService<M> {
+ fn name(&self) -> &str {
+ "status-sync"
+ }
+
+ async fn initialize(&mut self) -> pmxcfs_services::Result<InitResult> {
+ info!("Initializing status sync service (kvstore)");
+
+ // Initialize CPG connection for kvstore group
+ self.dfsm.init_cpg().map_err(|e| {
+ ServiceError::InitializationFailed(format!(
+ "Status sync CPG initialization failed: {e}"
+ ))
+ })?;
+
+ // Get file descriptor for event monitoring
+ let fd = self.dfsm.fd_get().map_err(|e| {
+ self.dfsm.stop_services().ok();
+ ServiceError::InitializationFailed(format!("Failed to get status sync fd: {e}"))
+ })?;
+
+ self.fd = Some(fd);
+
+ info!(
+ "Status sync service initialized successfully with fd {}",
+ fd
+ );
+ Ok(InitResult::WithFileDescriptor(fd))
+ }
+
+ async fn dispatch(&mut self) -> pmxcfs_services::Result<DispatchAction> {
+ match self.dfsm.dispatch_events() {
+ Ok(_) => Ok(DispatchAction::Continue),
+ Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+ warn!("Status sync connection lost, requesting reinitialization");
+ Ok(DispatchAction::Reinitialize)
+ }
+ Err(e) => {
+ error!("Status sync dispatch failed: {}", e);
+ Err(ServiceError::DispatchFailed(format!(
+ "Status sync dispatch failed: {e}"
+ )))
+ }
+ }
+ }
+
+ async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+ info!("Finalizing status sync service");
+
+ self.fd = None;
+
+ if let Err(e) = self.dfsm.stop_services() {
+ warn!("Error stopping status sync services: {}", e);
+ }
+
+ info!("Status sync service finalized");
+ Ok(())
+ }
+
+ async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+ // Status sync doesn't need periodic verification like the main database
+ // Status data is ephemeral and doesn't require the same consistency guarantees
+ Ok(())
+ }
+
+ fn timer_period(&self) -> Option<Duration> {
+ // No periodic timer needed for status sync
+ None
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
new file mode 100644
index 00000000..5a2eb964
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
@@ -0,0 +1,107 @@
+/// DFSM type definitions
+///
+/// This module contains all type definitions used by the DFSM state machine.
+/// DFSM operating modes
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub enum DfsmMode {
+ /// Initial state - starting cluster connection
+ Start = 0,
+
+ /// Starting data synchronization
+ StartSync = 1,
+
+ /// All data is up to date
+ Synced = 2,
+
+ /// Waiting for updates from leader
+ Update = 3,
+
+ /// Error states (>= 128)
+ Leave = 253,
+ VersionError = 254,
+ Error = 255,
+}
+
+impl DfsmMode {
+ /// Check if this is an error mode
+ pub fn is_error(&self) -> bool {
+ (*self as u8) >= 128
+ }
+}
+
+impl std::fmt::Display for DfsmMode {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ DfsmMode::Start => write!(f, "start cluster connection"),
+ DfsmMode::StartSync => write!(f, "starting data synchronization"),
+ DfsmMode::Synced => write!(f, "all data is up to date"),
+ DfsmMode::Update => write!(f, "waiting for updates from leader"),
+ DfsmMode::Leave => write!(f, "leaving cluster"),
+ DfsmMode::VersionError => write!(f, "protocol version mismatch"),
+ DfsmMode::Error => write!(f, "serious internal error"),
+ }
+ }
+}
+
+/// DFSM message types (internal protocol messages)
+/// Matches C's dfsm_message_t enum values
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum DfsmMessageType {
+ Normal = 0,
+ SyncStart = 1,
+ State = 2,
+ Update = 3,
+ UpdateComplete = 4,
+ VerifyRequest = 5,
+ Verify = 6,
+}
+
+/// Sync epoch - identifies a synchronization session
+/// Matches C's dfsm_sync_epoch_t structure (16 bytes total)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct SyncEpoch {
+ pub epoch: u32,
+ pub time: u32,
+ pub nodeid: u32,
+ pub pid: u32,
+}
+
+impl SyncEpoch {
+ /// Serialize to C-compatible wire format (16 bytes)
+ /// Format: [epoch: u32][time: u32][nodeid: u32][pid: u32]
+ pub fn serialize(&self) -> [u8; 16] {
+ let mut bytes = [0u8; 16];
+ bytes[0..4].copy_from_slice(&self.epoch.to_le_bytes());
+ bytes[4..8].copy_from_slice(&self.time.to_le_bytes());
+ bytes[8..12].copy_from_slice(&self.nodeid.to_le_bytes());
+ bytes[12..16].copy_from_slice(&self.pid.to_le_bytes());
+ bytes
+ }
+
+ /// Deserialize from C-compatible wire format (16 bytes)
+ pub fn deserialize(bytes: &[u8]) -> Result<Self, &'static str> {
+ if bytes.len() < 16 {
+ return Err("SyncEpoch requires 16 bytes");
+ }
+ Ok(SyncEpoch {
+ epoch: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
+ time: u32::from_le_bytes(bytes[4..8].try_into().unwrap()),
+ nodeid: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
+ pid: u32::from_le_bytes(bytes[12..16].try_into().unwrap()),
+ })
+ }
+}
+
+/// Queued message awaiting delivery
+#[derive(Debug, Clone)]
+pub(super) struct QueuedMessage<M> {
+ pub nodeid: u32,
+ pub pid: u32,
+ pub _msg_count: u64,
+ pub message: M,
+ pub timestamp: u64,
+}
+
+// Re-export NodeSyncInfo from pmxcfs-api-types for use in Callbacks trait
+pub use pmxcfs_api_types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
new file mode 100644
index 00000000..2750b281
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
@@ -0,0 +1,220 @@
+/// C-compatible wire format for cluster communication
+///
+/// This module implements the exact wire protocol used by the C version of pmxcfs
+/// to ensure compatibility with C-based cluster nodes.
+///
+/// The C version uses a simple format with iovec arrays containing raw C types.
+use anyhow::{Context, Result};
+use bytemuck::{Pod, Zeroable};
+use std::ffi::CStr;
+
+/// C message types (must match dcdb.h)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum CMessageType {
+ Write = 1,
+ Mkdir = 2,
+ Delete = 3,
+ Rename = 4,
+ Create = 5,
+ Mtime = 6,
+ UnlockRequest = 7,
+ Unlock = 8,
+}
+
+/// C-compatible FUSE message header
+/// Layout matches the iovec array from C: [size][offset][pathlen][tolen][flags]
+#[derive(Debug, Clone, Copy, Pod, Zeroable)]
+#[repr(C)]
+struct CFuseMessageHeader {
+ size: u32,
+ offset: u32,
+ pathlen: u32,
+ tolen: u32,
+ flags: u32,
+}
+
+/// Parsed C FUSE message
+#[derive(Debug, Clone)]
+pub struct CFuseMessage {
+ pub size: u32,
+ pub offset: u32,
+ pub flags: u32,
+ pub path: String,
+ pub to: Option<String>,
+ pub data: Vec<u8>,
+}
+
+impl CFuseMessage {
+ /// Parse a C FUSE message from raw bytes
+ pub fn parse(data: &[u8]) -> Result<Self> {
+ if data.len() < std::mem::size_of::<CFuseMessageHeader>() {
+ return Err(anyhow::anyhow!(
+ "Message too short: {} < {}",
+ data.len(),
+ std::mem::size_of::<CFuseMessageHeader>()
+ ));
+ }
+
+ // Parse header manually to avoid alignment issues
+ let header = CFuseMessageHeader {
+ size: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
+ offset: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
+ pathlen: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
+ tolen: u32::from_le_bytes([data[12], data[13], data[14], data[15]]),
+ flags: u32::from_le_bytes([data[16], data[17], data[18], data[19]]),
+ };
+
+ let mut offset = std::mem::size_of::<CFuseMessageHeader>();
+
+ // Parse path
+ let path = if header.pathlen > 0 {
+ if offset + header.pathlen as usize > data.len() {
+ return Err(anyhow::anyhow!("Invalid path length"));
+ }
+ let path_bytes = &data[offset..offset + header.pathlen as usize];
+ offset += header.pathlen as usize;
+
+ // C strings are null-terminated
+ CStr::from_bytes_until_nul(path_bytes)
+ .context("Invalid path string")?
+ .to_str()
+ .context("Path not valid UTF-8")?
+ .to_string()
+ } else {
+ String::new()
+ };
+
+ // Parse 'to' (for rename operations)
+ let to = if header.tolen > 0 {
+ if offset + header.tolen as usize > data.len() {
+ return Err(anyhow::anyhow!("Invalid tolen"));
+ }
+ let to_bytes = &data[offset..offset + header.tolen as usize];
+ offset += header.tolen as usize;
+
+ Some(
+ CStr::from_bytes_until_nul(to_bytes)
+ .context("Invalid to string")?
+ .to_str()
+ .context("To path not valid UTF-8")?
+ .to_string(),
+ )
+ } else {
+ None
+ };
+
+ // Parse data buffer
+ let buf_data = if header.size > 0 {
+ if offset + header.size as usize > data.len() {
+ return Err(anyhow::anyhow!("Invalid data size"));
+ }
+ data[offset..offset + header.size as usize].to_vec()
+ } else {
+ Vec::new()
+ };
+
+ Ok(CFuseMessage {
+ size: header.size,
+ offset: header.offset,
+ flags: header.flags,
+ path,
+ to,
+ data: buf_data,
+ })
+ }
+
+ /// Serialize to C wire format
+ pub fn serialize(&self) -> Vec<u8> {
+ let path_bytes = self.path.as_bytes();
+ let pathlen = if path_bytes.is_empty() {
+ 0
+ } else {
+ (path_bytes.len() + 1) as u32 // +1 for null terminator
+ };
+
+ let to_bytes = self.to.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
+ let tolen = if to_bytes.is_empty() {
+ 0
+ } else {
+ (to_bytes.len() + 1) as u32
+ };
+
+ let header = CFuseMessageHeader {
+ size: self.size,
+ offset: self.offset,
+ pathlen,
+ tolen,
+ flags: self.flags,
+ };
+
+ let mut result = Vec::new();
+
+ // Serialize header
+ result.extend_from_slice(bytemuck::bytes_of(&header));
+
+ // Serialize path (with null terminator)
+ if pathlen > 0 {
+ result.extend_from_slice(path_bytes);
+ result.push(0); // null terminator
+ }
+
+ // Serialize 'to' (with null terminator)
+ if tolen > 0 {
+ result.extend_from_slice(to_bytes);
+ result.push(0); // null terminator
+ }
+
+ // Serialize data
+ if self.size > 0 {
+ result.extend_from_slice(&self.data);
+ }
+
+ result
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_serialize_deserialize_write() {
+ let msg = CFuseMessage {
+ size: 13,
+ offset: 0,
+ flags: 0,
+ path: "/test.txt".to_string(),
+ to: None,
+ data: b"Hello, World!".to_vec(),
+ };
+
+ let serialized = msg.serialize();
+ let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+ assert_eq!(parsed.size, msg.size);
+ assert_eq!(parsed.offset, msg.offset);
+ assert_eq!(parsed.flags, msg.flags);
+ assert_eq!(parsed.path, msg.path);
+ assert_eq!(parsed.to, msg.to);
+ assert_eq!(parsed.data, msg.data);
+ }
+
+ #[test]
+ fn test_serialize_deserialize_rename() {
+ let msg = CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: "/old.txt".to_string(),
+ to: Some("/new.txt".to_string()),
+ data: Vec::new(),
+ };
+
+ let serialized = msg.serialize();
+ let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+ assert_eq!(parsed.path, msg.path);
+ assert_eq!(parsed.to, msg.to);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
new file mode 100644
index 00000000..d378f914
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
@@ -0,0 +1,565 @@
+/// Multi-node integration tests for DFSM cluster synchronization
+///
+/// These tests simulate multi-node clusters to verify the complete synchronization
+/// protocol works correctly with multiple Rust nodes exchanging state.
+use anyhow::Result;
+use pmxcfs_dfsm::{Callbacks, FuseMessage, NodeSyncInfo};
+use pmxcfs_memdb::{MemDb, MemDbIndex, ROOT_INODE, TreeEntry};
+use std::sync::{Arc, Mutex};
+use tempfile::TempDir;
+
+/// Mock callbacks for testing DFSM without full pmxcfs integration
+struct MockCallbacks {
+ memdb: MemDb,
+ states_received: Arc<Mutex<Vec<NodeSyncInfo>>>,
+ updates_received: Arc<Mutex<Vec<TreeEntry>>>,
+ synced_count: Arc<Mutex<usize>>,
+}
+
+impl MockCallbacks {
+ fn new(memdb: MemDb) -> Self {
+ Self {
+ memdb,
+ states_received: Arc::new(Mutex::new(Vec::new())),
+ updates_received: Arc::new(Mutex::new(Vec::new())),
+ synced_count: Arc::new(Mutex::new(0)),
+ }
+ }
+
+ #[allow(dead_code)]
+ fn get_states(&self) -> Vec<NodeSyncInfo> {
+ self.states_received.lock().unwrap().clone()
+ }
+
+ #[allow(dead_code)]
+ fn get_updates(&self) -> Vec<TreeEntry> {
+ self.updates_received.lock().unwrap().clone()
+ }
+
+ #[allow(dead_code)]
+ fn get_synced_count(&self) -> usize {
+ *self.synced_count.lock().unwrap()
+ }
+}
+
+impl Callbacks<FuseMessage> for MockCallbacks {
+ fn deliver_message(
+ &self,
+ _nodeid: u32,
+ _pid: u32,
+ _message: FuseMessage,
+ _timestamp: u64,
+ ) -> Result<(i32, bool)> {
+ Ok((0, true))
+ }
+
+ fn compute_checksum(&self, output: &mut [u8; 32]) -> Result<()> {
+ let checksum = self.memdb.compute_database_checksum()?;
+ output.copy_from_slice(&checksum);
+ Ok(())
+ }
+
+ fn get_state(&self) -> Result<Vec<u8>> {
+ let index = self.memdb.encode_index()?;
+ Ok(index.serialize())
+ }
+
+ fn process_state_update(&self, states: &[NodeSyncInfo]) -> Result<bool> {
+ // Store received states for verification
+ *self.states_received.lock().unwrap() = states.to_vec();
+
+ // Parse indices from states
+ let mut indices: Vec<(u32, u32, MemDbIndex)> = Vec::new();
+ for node in states {
+ if let Some(state_data) = &node.state {
+ match MemDbIndex::deserialize(state_data) {
+ Ok(index) => indices.push((node.nodeid, node.pid, index)),
+ Err(_) => continue,
+ }
+ }
+ }
+
+ if indices.is_empty() {
+ return Ok(true);
+ }
+
+ // Find leader (highest version, or if tie, highest mtime)
+ let mut leader_idx = 0;
+ for i in 1..indices.len() {
+ let (_, _, current_index) = &indices[i];
+ let (_, _, leader_index) = &indices[leader_idx];
+ if current_index > leader_index {
+ leader_idx = i;
+ }
+ }
+
+ let (_leader_nodeid, _leader_pid, leader_index) = &indices[leader_idx];
+
+ // Check if WE are synced with leader
+ let our_index = self.memdb.encode_index()?;
+ let we_are_synced = our_index.version == leader_index.version
+ && our_index.mtime == leader_index.mtime
+ && our_index.size == leader_index.size
+ && our_index.entries.len() == leader_index.entries.len()
+ && our_index
+ .entries
+ .iter()
+ .zip(leader_index.entries.iter())
+ .all(|(a, b)| a.inode == b.inode && a.digest == b.digest);
+
+ Ok(we_are_synced)
+ }
+
+ fn process_update(&self, _nodeid: u32, _pid: u32, data: &[u8]) -> Result<()> {
+ // Deserialize and store update
+ let tree_entry = TreeEntry::deserialize_from_update(data)?;
+ self.updates_received
+ .lock()
+ .unwrap()
+ .push(tree_entry.clone());
+
+ // Apply to database
+ self.memdb.apply_tree_entry(tree_entry)?;
+ Ok(())
+ }
+
+ fn commit_state(&self) -> Result<()> {
+ Ok(())
+ }
+
+ fn on_synced(&self) {
+ *self.synced_count.lock().unwrap() += 1;
+ }
+}
+
+fn create_test_node(node_id: u32) -> Result<(MemDb, TempDir, Arc<MockCallbacks>)> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join(format!("node{node_id}.db"));
+ let memdb = MemDb::open(&db_path, true)?;
+ // Note: Local operations always use writer=0 (matching C implementation)
+ // Remote DFSM updates use the writer field from the incoming TreeEntry
+
+ let callbacks = Arc::new(MockCallbacks::new(memdb.clone()));
+ Ok((memdb, temp_dir, callbacks))
+}
+
+#[test]
+fn test_two_node_empty_sync() -> Result<()> {
+ // Create two nodes with empty databases
+ let (_memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+ // Generate states from both nodes
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+
+ // Simulate state exchange
+ let states = vec![
+ NodeSyncInfo {
+ nodeid: 1,
+ pid: 1000,
+ state: Some(state1),
+ synced: false,
+ },
+ NodeSyncInfo {
+ nodeid: 2,
+ pid: 2000,
+ state: Some(state2),
+ synced: false,
+ },
+ ];
+
+ // Both nodes process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+
+ // Both should be synced (empty databases are identical)
+ assert!(synced1, "Node 1 should be synced");
+ assert!(synced2, "Node 2 should be synced");
+
+ Ok(())
+}
+
+#[test]
+fn test_two_node_leader_election() -> Result<()> {
+ // Create two nodes
+ let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+ // Node 1 has more data (higher version)
+ memdb1.create("/file1.txt", 0, 1000)?;
+ memdb1.write("/file1.txt", 0, 1001, b"data from node 1", 0)?;
+
+ // Generate states
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+
+ // Parse to check versions
+ let index1 = MemDbIndex::deserialize(&state1)?;
+ let index2 = MemDbIndex::deserialize(&state2)?;
+
+ // Node 1 should have higher version
+ assert!(
+ index1.version > index2.version,
+ "Node 1 version {} should be > Node 2 version {}",
+ index1.version,
+ index2.version
+ );
+
+ // Simulate state exchange
+ let states = vec![
+ NodeSyncInfo {
+ nodeid: 1,
+ pid: 1000,
+ state: Some(state1),
+ synced: false,
+ },
+ NodeSyncInfo {
+ nodeid: 2,
+ pid: 2000,
+ state: Some(state2),
+ synced: false,
+ },
+ ];
+
+ // Process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+
+ // Node 1 (leader) should be synced, Node 2 (follower) should not
+ assert!(synced1, "Node 1 (leader) should be synced");
+ assert!(!synced2, "Node 2 (follower) should not be synced");
+
+ Ok(())
+}
+
+#[test]
+fn test_incremental_update_transfer() -> Result<()> {
+ // Create leader and follower
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Leader has data
+ leader_db.create("/config", libc::S_IFDIR, 1000)?;
+ leader_db.create("/config/node.conf", 0, 1001)?;
+ leader_db.write("/config/node.conf", 0, 1002, b"hostname=pve1", 0)?;
+
+ // Get entries from leader
+ let leader_entries = leader_db.get_all_entries()?;
+
+ // Simulate sending updates to follower
+ for entry in leader_entries {
+ if entry.inode == ROOT_INODE {
+ continue; // Skip root (both have it)
+ }
+
+ // Serialize as update message
+ let update_msg = entry.serialize_for_update();
+
+ // Follower receives and processes update
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+ }
+
+ // Verify follower has the data
+ let config_dir = follower_db.lookup_path("/config");
+ assert!(
+ config_dir.is_some(),
+ "Follower should have /config directory"
+ );
+ assert!(config_dir.unwrap().is_dir());
+
+ let config_file = follower_db.lookup_path("/config/node.conf");
+ assert!(
+ config_file.is_some(),
+ "Follower should have /config/node.conf"
+ );
+
+ let config_data = follower_db.read("/config/node.conf", 0, 1024)?;
+ assert_eq!(
+ config_data, b"hostname=pve1",
+ "Follower should have correct data"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_three_node_sync() -> Result<()> {
+ // Create three nodes
+ let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (memdb2, _temp2, callbacks2) = create_test_node(2)?;
+ let (_memdb3, _temp3, callbacks3) = create_test_node(3)?;
+
+ // Node 1 has the most recent data
+ memdb1.create("/cluster.conf", 0, 5000)?;
+ memdb1.write("/cluster.conf", 0, 5001, b"version=3", 0)?;
+
+ // Node 2 has older data
+ memdb2.create("/cluster.conf", 0, 4000)?;
+ memdb2.write("/cluster.conf", 0, 4001, b"version=2", 0)?;
+
+ // Node 3 is empty (new node joining)
+
+ // Generate states
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+ let state3 = callbacks3.get_state()?;
+
+ let states = vec![
+ NodeSyncInfo {
+ nodeid: 1,
+ pid: 1000,
+ state: Some(state1.clone()),
+ synced: false,
+ },
+ NodeSyncInfo {
+ nodeid: 2,
+ pid: 2000,
+ state: Some(state2.clone()),
+ synced: false,
+ },
+ NodeSyncInfo {
+ nodeid: 3,
+ pid: 3000,
+ state: Some(state3.clone()),
+ synced: false,
+ },
+ ];
+
+ // All nodes process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+ let synced3 = callbacks3.process_state_update(&states)?;
+
+ // Node 1 (leader) should be synced
+ assert!(synced1, "Node 1 (leader) should be synced");
+
+ // Nodes 2 and 3 need updates
+ assert!(!synced2, "Node 2 should need updates");
+ assert!(!synced3, "Node 3 should need updates");
+
+ // Verify leader has highest version
+ let index1 = MemDbIndex::deserialize(&state1)?;
+ let index2 = MemDbIndex::deserialize(&state2)?;
+ let index3 = MemDbIndex::deserialize(&state3)?;
+
+ assert!(index1.version >= index2.version);
+ assert!(index1.version >= index3.version);
+
+ Ok(())
+}
+
+#[test]
+fn test_update_message_wire_format_compatibility() -> Result<()> {
+ // Verify our wire format matches C implementation exactly
+ let entry = TreeEntry {
+ inode: 42,
+ parent: 1,
+ version: 100,
+ writer: 2,
+ mtime: 12345,
+ size: 11,
+ entry_type: 8, // DT_REG
+ name: "test.conf".to_string(),
+ data: b"hello world".to_vec(),
+ };
+
+ let serialized = entry.serialize_for_update();
+
+ // Verify header size (41 bytes)
+ // parent(8) + inode(8) + version(8) + writer(4) + mtime(4) + size(4) + namelen(4) + type(1)
+ let expected_header_size = 8 + 8 + 8 + 4 + 4 + 4 + 4 + 1;
+ assert_eq!(expected_header_size, 41);
+
+ // Verify total size
+ let namelen = "test.conf".len() + 1; // Include null terminator
+ let expected_total = expected_header_size + namelen + 11;
+ assert_eq!(serialized.len(), expected_total);
+
+ // Verify we can deserialize it back
+ let deserialized = TreeEntry::deserialize_from_update(&serialized)?;
+ assert_eq!(deserialized.inode, entry.inode);
+ assert_eq!(deserialized.parent, entry.parent);
+ assert_eq!(deserialized.version, entry.version);
+ assert_eq!(deserialized.writer, entry.writer);
+ assert_eq!(deserialized.mtime, entry.mtime);
+ assert_eq!(deserialized.size, entry.size);
+ assert_eq!(deserialized.entry_type, entry.entry_type);
+ assert_eq!(deserialized.name, entry.name);
+ assert_eq!(deserialized.data, entry.data);
+
+ Ok(())
+}
+
+#[test]
+fn test_index_wire_format_compatibility() -> Result<()> {
+ // Verify memdb_index_t wire format matches C implementation
+ use pmxcfs_memdb::IndexEntry;
+
+ let entries = vec![
+ IndexEntry {
+ inode: 1,
+ digest: [0u8; 32],
+ },
+ IndexEntry {
+ inode: 2,
+ digest: [1u8; 32],
+ },
+ ];
+
+ let index = MemDbIndex::new(
+ 100, // version
+ 2, // last_inode
+ 1, // writer
+ 12345, // mtime
+ entries,
+ );
+
+ let serialized = index.serialize();
+
+ // Verify header size (32 bytes)
+ // version(8) + last_inode(8) + writer(4) + mtime(4) + size(4) + bytes(4)
+ let expected_header_size = 8 + 8 + 4 + 4 + 4 + 4;
+ assert_eq!(expected_header_size, 32);
+
+ // Verify entry size (40 bytes each)
+ // inode(8) + digest(32)
+ let expected_entry_size = 8 + 32;
+ assert_eq!(expected_entry_size, 40);
+
+ // Verify total size
+ let expected_total = expected_header_size + 2 * expected_entry_size;
+ assert_eq!(serialized.len(), expected_total);
+ assert_eq!(serialized.len(), index.bytes as usize);
+
+ // Verify deserialization
+ let deserialized = MemDbIndex::deserialize(&serialized)?;
+ assert_eq!(deserialized.version, index.version);
+ assert_eq!(deserialized.last_inode, index.last_inode);
+ assert_eq!(deserialized.writer, index.writer);
+ assert_eq!(deserialized.mtime, index.mtime);
+ assert_eq!(deserialized.size, index.size);
+ assert_eq!(deserialized.bytes, index.bytes);
+ assert_eq!(deserialized.entries.len(), 2);
+
+ Ok(())
+}
+
+#[test]
+fn test_sync_with_conflicts() -> Result<()> {
+ // Test scenario: two nodes modified different files
+ let (memdb1, _temp1, _callbacks1) = create_test_node(1)?;
+ let (memdb2, _temp2, _callbacks2) = create_test_node(2)?;
+
+ // Both start with same base
+ memdb1.create("/base.conf", 0, 1000)?;
+ memdb1.write("/base.conf", 0, 1001, b"shared", 0)?;
+
+ memdb2.create("/base.conf", 0, 1000)?;
+ memdb2.write("/base.conf", 0, 1001, b"shared", 0)?;
+
+ // Node 1 adds file1
+ memdb1.create("/file1.txt", 0, 2000)?;
+ memdb1.write("/file1.txt", 0, 2001, b"from node 1", 0)?;
+
+ // Node 2 adds file2
+ memdb2.create("/file2.txt", 0, 2000)?;
+ memdb2.write("/file2.txt", 0, 2001, b"from node 2", 0)?;
+
+ // Generate indices
+ let index1 = memdb1.encode_index()?;
+ let index2 = memdb2.encode_index()?;
+
+ // Find differences
+ let diffs_1_vs_2 = index1.find_differences(&index2);
+ let diffs_2_vs_1 = index2.find_differences(&index1);
+
+ // Node 1 has file1 that node 2 doesn't have
+ assert!(
+ !diffs_1_vs_2.is_empty(),
+ "Node 1 should have entries node 2 doesn't have"
+ );
+
+ // Node 2 has file2 that node 1 doesn't have
+ assert!(
+ !diffs_2_vs_1.is_empty(),
+ "Node 2 should have entries node 1 doesn't have"
+ );
+
+ // Higher version wins - in this case they're both v3 (base + create + write)
+ // so mtime would be tiebreaker
+
+ Ok(())
+}
+
+#[test]
+fn test_large_file_update() -> Result<()> {
+ // Test updating a file with significant data
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Create a file with 10KB of data
+ let large_data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect();
+
+ leader_db.create("/large.bin", 0, 1000)?;
+ leader_db.write("/large.bin", 0, 1001, &large_data, 0)?;
+
+ // Get the entry
+ let entry = leader_db.lookup_path("/large.bin").unwrap();
+
+ // Serialize and send
+ let update_msg = entry.serialize_for_update();
+
+ // Follower receives
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+
+ // Verify
+ let follower_entry = follower_db.lookup_path("/large.bin").unwrap();
+ assert_eq!(follower_entry.size, large_data.len());
+ assert_eq!(follower_entry.data, large_data);
+
+ Ok(())
+}
+
+#[test]
+fn test_directory_hierarchy_sync() -> Result<()> {
+ // Test syncing nested directory structure
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Create directory hierarchy on leader
+ leader_db.create("/etc", libc::S_IFDIR, 1000)?;
+ leader_db.create("/etc/pve", libc::S_IFDIR, 1001)?;
+ leader_db.create("/etc/pve/nodes", libc::S_IFDIR, 1002)?;
+ leader_db.create("/etc/pve/nodes/pve1", libc::S_IFDIR, 1003)?;
+ leader_db.create("/etc/pve/nodes/pve1/config", 0, 1004)?;
+ leader_db.write(
+ "/etc/pve/nodes/pve1/config",
+ 0,
+ 1005,
+ b"cpu: 2\nmem: 4096",
+ 0,
+ )?;
+
+ // Send all entries to follower
+ let entries = leader_db.get_all_entries()?;
+ for entry in entries {
+ if entry.inode == ROOT_INODE {
+ continue; // Skip root
+ }
+ let update_msg = entry.serialize_for_update();
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+ }
+
+ // Verify entire hierarchy
+ assert!(follower_db.lookup_path("/etc").is_some());
+ assert!(follower_db.lookup_path("/etc/pve").is_some());
+ assert!(follower_db.lookup_path("/etc/pve/nodes").is_some());
+ assert!(follower_db.lookup_path("/etc/pve/nodes/pve1").is_some());
+
+ let config = follower_db.lookup_path("/etc/pve/nodes/pve1/config");
+ assert!(config.is_some());
+ assert_eq!(config.unwrap().data, b"cpu: 2\nmem: 4096");
+
+ Ok(())
+}
--
2.47.3
_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel
next prev parent reply other threads:[~2026-01-07 9:15 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-01-06 14:24 [pve-devel] [PATCH pve-cluster 00/15 v1] Rewrite pmxcfs with Rust Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 01/15] pmxcfs-rs: add workspace and pmxcfs-api-types crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 02/15] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 03/15] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 04/15] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 05/15] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 06/15] pmxcfs-rs: add pmxcfs-status crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 07/15] pmxcfs-rs: add pmxcfs-test-utils infrastructure crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 08/15] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 09/15] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-01-06 14:24 ` Kefu Chai [this message]
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 11/15] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 13/15] pmxcfs-rs: add integration and workspace tests Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 14/15] pmxcfs-rs: add Makefile for build automation Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 15/15] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260106142440.2368585-11-k.chai@proxmox.com \
--to=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.