From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH pve-cluster 10/15] pmxcfs-rs: add pmxcfs-dfsm crate
Date: Tue, 6 Jan 2026 22:24:34 +0800 [thread overview]
Message-ID: <20260106142440.2368585-11-k.chai@proxmox.com> (raw)
In-Reply-To: <20260106142440.2368585-1-k.chai@proxmox.com>
Add Distributed Finite State Machine for cluster synchronization:
- Dfsm: Core state machine implementation
- ClusterDatabaseService: MemDb sync (pmxcfs_v1 CPG group)
- StatusSyncService: Status sync (pve_kvstore_v1 CPG group)
- Protocol: SyncStart, State, Update, UpdateComplete, Verify
- Leader election based on version and mtime
- Incremental updates for efficiency
This integrates pmxcfs-memdb, pmxcfs-services, and rust-corosync
to provide cluster-wide database synchronization. It implements
the wire-compatible protocol used by the C version.
Includes unit tests for:
- Index serialization and comparison
- Leader election logic
- Tree entry serialization
- Diff computation between indices
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/pmxcfs-rs/Cargo.toml | 1 +
src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml | 45 +
src/pmxcfs-rs/pmxcfs-dfsm/README.md | 340 ++++++
src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs | 52 +
.../src/cluster_database_service.rs | 116 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs | 163 +++
src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs | 728 ++++++++++++
src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs | 185 +++
.../pmxcfs-dfsm/src/kv_store_message.rs | 329 ++++++
src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs | 32 +
src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs | 21 +
.../pmxcfs-dfsm/src/state_machine.rs | 1013 +++++++++++++++++
.../pmxcfs-dfsm/src/status_sync_service.rs | 118 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs | 107 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs | 220 ++++
.../tests/multi_node_sync_tests.rs | 565 +++++++++
16 files changed, 4035 insertions(+)
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index f4497d58..4d18aa93 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -10,6 +10,7 @@ members = [
"pmxcfs-test-utils", # Test utilities and helpers (dev-only)
"pmxcfs-services", # Service framework for automatic retry and lifecycle management
"pmxcfs-ipc", # libqb-compatible IPC server
+ "pmxcfs-dfsm", # Distributed Finite State Machine (owns CPG)
]
resolver = "2"
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
new file mode 100644
index 00000000..12a8e7f6
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
@@ -0,0 +1,45 @@
+[package]
+name = "pmxcfs-dfsm"
+description = "Distributed Finite State Machine for cluster state synchronization"
+
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+# Internal dependencies
+pmxcfs-api-types.workspace = true
+pmxcfs-memdb.workspace = true
+pmxcfs-services.workspace = true
+
+# Corosync integration
+rust-corosync.workspace = true
+
+# Error handling
+anyhow.workspace = true
+thiserror.workspace = true
+
+# Async and concurrency
+parking_lot.workspace = true
+async-trait.workspace = true
+tokio.workspace = true
+
+# Serialization
+serde.workspace = true
+bincode.workspace = true
+bytemuck.workspace = true
+
+# Logging
+tracing.workspace = true
+
+# Utilities
+num_enum.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
+libc.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/README.md b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
new file mode 100644
index 00000000..560827a7
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
@@ -0,0 +1,340 @@
+# pmxcfs-dfsm
+
+**Distributed Finite State Machine** for cluster-wide state synchronization in pmxcfs.
+
+This crate implements the DFSM protocol used to replicate configuration changes and status updates across all nodes in a Proxmox cluster via Corosync CPG (Closed Process Group).
+
+## Overview
+
+The DFSM is the core mechanism for maintaining consistency across cluster nodes. It ensures that:
+
+- All nodes see filesystem operations (writes, creates, deletes) in the same order
+- Database state remains synchronized even after network partitions
+- Status information (VM states, RRD data) is broadcast to all nodes
+- State verification catches inconsistencies
+
+## Architecture
+
+### Key Components
+
+### Module Structure
+
+| Module | Purpose | C Equivalent |
+|--------|---------|--------------|
+| `state_machine.rs` | Core DFSM logic, state transitions | `dfsm.c` |
+| `cluster_database_service.rs` | MemDb sync service | `dcdb.c`, `loop.c:service_dcdb` |
+| `status_sync_service.rs` | Status/kvstore sync service | `loop.c:service_status` |
+| `cpg_service.rs` | Corosync CPG integration | `dfsm.c:cpg_callbacks` |
+| `dfsm_message.rs` | Protocol message types | `dfsm.c:dfsm_message_*_header_t` |
+| `message.rs` | Message trait and serialization | (inline in C) |
+| `wire_format.rs` | C-compatible wire format | `dcdb.c:c_fuse_message_header_t` |
+| `broadcast.rs` | Cluster-wide message broadcast | `dcdb.c:dcdb_send_fuse_message` |
+| `types.rs` | Type definitions (modes, epochs) | `dfsm.c:dfsm_mode_t` |
+
+## C to Rust Mapping
+
+### Data Structures
+
+| C Type | Rust Type | Notes |
+|--------|-----------|-------|
+| `dfsm_t` | `Dfsm` | Main state machine |
+| `dfsm_mode_t` | `DfsmMode` | Enum with type safety |
+| `dfsm_node_info_t` | (internal) | Node state tracking |
+| `dfsm_sync_info_t` | (internal) | Sync session info |
+| `dfsm_callbacks_t` | Trait-based callbacks | Type-safe callbacks via traits |
+| `dfsm_message_*_header_t` | `DfsmMessage` | Type-safe enum variants |
+
+### Functions
+
+#### Core DFSM Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dfsm_new()` | `Dfsm::new()` | state_machine.rs |
+| `dfsm_initialize()` | `Dfsm::init_cpg()` | state_machine.rs |
+| `dfsm_join()` | (part of init_cpg) | state_machine.rs |
+| `dfsm_dispatch()` | `Dfsm::dispatch_events()` | state_machine.rs |
+| `dfsm_send_message()` | `Dfsm::send_message()` | state_machine.rs |
+| `dfsm_send_update()` | `Dfsm::send_update()` | state_machine.rs |
+| `dfsm_verify_request()` | `Dfsm::verify_request()` | state_machine.rs |
+| `dfsm_finalize()` | `Dfsm::stop_services()` | state_machine.rs |
+
+#### DCDB (Cluster Database) Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dcdb_new()` | `ClusterDatabaseService::new()` | cluster_database_service.rs |
+| `dcdb_send_fuse_message()` | `broadcast()` | broadcast.rs |
+| `dcdb_send_unlock()` | `FuseMessage::Unlock` + broadcast | broadcast.rs |
+| `service_dcdb()` | `ClusterDatabaseService` | cluster_database_service.rs |
+
+#### Status Sync Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `service_status()` | `StatusSyncService` | status_sync_service.rs |
+| (kvstore CPG group) | `StatusSyncService` | Uses separate CPG group |
+
+### Callback System
+
+**C Implementation:**
+
+**Rust Implementation:**
+- Uses trait-based callbacks instead of function pointers
+- Callbacks are implemented by `MemDbCallbacks` (memdb integration)
+- Defined in external crates (pmxcfs-memdb)
+
+## Synchronization Protocol
+
+The DFSM ensures all nodes maintain consistent database state through a multi-phase synchronization protocol:
+
+### Protocol Phases
+
+#### Phase 1: Membership Change
+
+When nodes join or leave the cluster:
+
+1. **Corosync CPG** delivers membership change notification
+2. **DFSM invalidates** cached checksums
+3. **Message queues** are cleared
+4. **Epoch counter** is incremented
+
+**CPG Leader** (lowest node ID):
+- Initiates sync by sending `SyncStart` message
+- Sends its own `State` (CPG doesn't loop back messages)
+
+**All Followers**:
+- Respond to `SyncStart` by sending their `State`
+- Wait for other nodes' states
+
+#### Phase 2: State Exchange
+
+Each node collects `State` messages containing serialized **MemDbIndex** (compact state summary using C-compatible wire format).
+
+State digests are computed using SHA-256 hashing to detect differences between nodes.
+
+#### Phase 3: Leader Election
+
+When all states are collected, `process_state_update()` is called:
+
+1. **Parse indices** from all node states
+2. **Elect data leader** (may differ from CPG leader):
+ - Highest `version` wins
+ - If tied, highest `mtime` wins
+3. **Identify synced nodes**: Nodes whose index matches leader exactly
+4. **Determine own status**:
+ - If we're the data leader → send updates to followers
+ - If we're synced with leader → mark as Synced
+ - Otherwise → enter Update mode and wait
+
+**Leader Election Algorithm**:
+
+#### Phase 4: Incremental Updates
+
+**Data Leader** (node with highest version):
+
+1. **Compare indices** using `find_differences()` for each follower
+2. **Serialize differing entries** to C-compatible TreeEntry format
+3. **Send Update messages** via CPG
+4. **Send UpdateComplete** when all updates sent
+
+**Followers** (out-of-sync nodes):
+
+1. **Receive Update messages**
+2. **Deserialize TreeEntry** via `TreeEntry::deserialize_from_update()`
+3. **Apply to database** via `MemDb::apply_tree_entry()`:
+ - INSERT OR REPLACE in SQLite
+ - Update in-memory structures
+ - Handle entry moves (parent/name changes)
+4. **On UpdateComplete**: Transition to Synced mode
+
+#### Phase 5: Normal Operations
+
+When in **Synced** mode:
+
+- FUSE operations are broadcast via `send_fuse_message()`
+- Messages are delivered immediately via `deliver_message()`
+- Leader periodically sends `VerifyRequest` for checksum comparison
+- Nodes respond with `Verify` containing SHA-256 of entire database
+- Mismatches trigger cluster resync
+
+---
+
+## Protocol Details
+
+### State Machine Transitions
+
+Based on analysis of C implementation (`dfsm.c` lines 795-1209):
+
+#### Critical Protocol Rules
+
+1. **Epoch Management**:
+ - Each node creates local epoch during confchg: `(counter++, time, own_nodeid, own_pid)`
+ - **Leader sends SYNC_START with its epoch**
+ - **Followers MUST adopt leader's epoch from SYNC_START** (`dfsm->sync_epoch = header->epoch`)
+ - All STATE messages in sync round use adopted epoch
+ - Epoch mismatch → message discarded (may lead to LEAVE)
+
+2. **Member List Validation**:
+ - Built from `member_list` in confchg callback
+ - Stored in `dfsm->sync_info->nodes[]`
+ - STATE sender MUST be in this list
+ - Non-member STATE → immediate LEAVE
+
+3. **Duplicate Detection**:
+ - Each node sends STATE exactly once per sync round
+ - Tracked via `ni->state` pointer (NULL = not received, non-NULL = received)
+ - Duplicate STATE from same nodeid/pid → immediate LEAVE
+ - **Root cause of current Rust/C sync failure**
+
+4. **Message Ordering** (one sync round):
+
+5. **Leader Selection**:
+ - Determined by `lowest_nodeid` from member list
+ - Set in confchg callback before any messages sent
+ - Used to validate SYNC_START sender (logged but not enforced)
+ - Re-elected during state processing based on DB versions
+
+### DFSM States (DfsmMode)
+
+| State | Value | Description | C Equivalent |
+|-------|-------|-------------|--------------|
+| `Start` | 0 | Initial connection | `DFSM_MODE_START` |
+| `StartSync` | 1 | Beginning sync | `DFSM_MODE_START_SYNC` |
+| `Synced` | 2 | Fully synchronized | `DFSM_MODE_SYNCED` |
+| `Update` | 3 | Receiving updates | `DFSM_MODE_UPDATE` |
+| `Leave` | 253 | Leaving group | `DFSM_MODE_LEAVE` |
+| `VersionError` | 254 | Protocol mismatch | `DFSM_MODE_VERSION_ERROR` |
+| `Error` | 255 | Error state | `DFSM_MODE_ERROR` |
+
+### Message Types (DfsmMessageType)
+
+| Type | Value | Purpose |
+|------|-------|---------|
+| `Normal` | 0 | Application messages (with header + payload) |
+| `SyncStart` | 1 | Start sync (from leader) |
+| `State` | 2 | Full state data |
+| `Update` | 3 | Incremental update |
+| `UpdateComplete` | 4 | End of updates |
+| `VerifyRequest` | 5 | Request state verification |
+| `Verify` | 6 | State checksum response |
+
+All messages use C-compatible wire format with headers and payloads.
+
+### Application Message Types
+
+The DFSM can carry two types of application messages:
+
+1. **Fuse Messages** (Filesystem operations)
+ - CPG Group: `pmxcfs_v1` (DCDB)
+ - Message types: `Write`, `Create`, `Delete`, `Mkdir`, `Rename`, `SetMtime`, `Unlock`
+ - Defined in: `pmxcfs-api-types::FuseMessage`
+
+2. **KvStore Messages** (Status/RRD sync)
+ - CPG Group: `pve_kvstore_v1`
+ - Message types: `Data` (key-value pairs for status sync)
+ - Defined in: `pmxcfs-api-types::KvStoreMessage`
+
+### Wire Format Compatibility
+
+All wire formats are **byte-compatible** with the C implementation. Messages include appropriate headers and payloads as defined in the C protocol.
+
+## Synchronization Flow
+
+### 1. Node Join
+
+### 2. Normal Operation
+
+### 3. State Verification (Periodic)
+
+## Key Differences from C Implementation
+
+### Event Loop Architecture
+
+**C Version:**
+- Uses libqb's `qb_loop` for event loop
+- CPG fd registered with `qb_loop_poll_add()`
+- Dispatch called from qb_loop when fd is readable
+
+**Rust Version:**
+- Uses tokio async runtime
+- Service trait provides `dispatch()` method
+- ServiceManager polls fd using tokio's async I/O
+- No qb_loop dependency
+
+### CPG Instance Management
+
+**C Version:**
+- Single DFSM struct with callbacks
+- Two different CPG groups created separately
+
+**Rust Version:**
+- Each CPG group gets its own `Dfsm` instance
+- `ClusterDatabaseService` - manages `pmxcfs_v1` CPG group (MemDb)
+- `StatusSyncService` - manages `pve_kvstore_v1` CPG group (Status/RRD)
+- Both use same DFSM protocol but different callbacks
+
+## Error Handling
+
+### Split-Brain Prevention
+
+- Checksum verification detects divergence
+- Automatic resync on mismatch
+- Version monotonicity ensures forward progress
+
+### Network Partition Recovery
+
+- Membership changes trigger sync
+- Highest version always wins
+- Stale data is safely replaced
+
+### Consistency Guarantees
+
+- SQLite transactions ensure atomic updates
+- In-memory structures updated atomically
+- Version increments are monotonic
+- All nodes converge to same state
+
+## Compatibility Matrix
+
+| Feature | C Version | Rust Version | Compatible |
+|---------|-----------|--------------|------------|
+| Wire format | `dfsm_message_*_header_t` | `DfsmMessage::serialize()` | Yes |
+| CPG protocol | libcorosync | rust-corosync | Yes |
+| Message types | 0-6 | `DfsmMessageType` | Yes |
+| State machine | `dfsm_mode_t` | `DfsmMode` | Yes |
+| Protocol version | 1 | 1 | Yes |
+| Group names | `pmxcfs_v1`, `pve_kvstore_v1` | Same | Yes |
+
+## Known Issues / TODOs
+
+### Missing Features
+- [ ] **Sync message batching**: C version can batch updates, Rust sends individually
+- [ ] **Message queue limits**: C has MAX_QUEUE_LEN, Rust unbounded (potential memory issue)
+- [ ] **Detailed error codes**: C returns specific CS_ERR_* codes, Rust uses anyhow errors
+
+### Behavioral Differences (Benign)
+- **Logging**: Rust uses `tracing` instead of `qb_log` (compatible with journald)
+- **Threading**: Rust uses tokio tasks, C uses qb_loop single-threaded model
+- **Timers**: Rust uses tokio timers, C uses qb_loop timers (same timeout values)
+
+### Incompatibilities (None Known)
+No incompatibilities have been identified. The Rust implementation is fully wire-compatible and can operate in a mixed C/Rust cluster.
+
+## References
+
+### C Implementation
+- `src/pmxcfs/dfsm.c` / `dfsm.h` - Core DFSM implementation
+- `src/pmxcfs/dcdb.c` / `dcdb.h` - Distributed database coordination
+- `src/pmxcfs/loop.c` / `loop.h` - Service loop and management
+
+### Related Crates
+- **pmxcfs-memdb**: Database callbacks for DFSM
+- **pmxcfs-status**: Status tracking and kvstore
+- **pmxcfs-api-types**: Message type definitions
+- **pmxcfs-services**: Service framework for lifecycle management
+- **rust-corosync**: CPG bindings (external dependency)
+
+### Corosync Documentation
+- CPG (Closed Process Group) API: https://github.com/corosync/corosync
+- Group communication semantics: Total order, virtual synchrony
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
new file mode 100644
index 00000000..7e35b8d4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
@@ -0,0 +1,52 @@
+/// DFSM application callbacks
+///
+/// This module defines the callback trait that application layers implement
+/// to integrate with the DFSM state machine.
+use crate::NodeSyncInfo;
+
+/// Callback trait for DFSM operations
+///
+/// The application layer implements this to receive DFSM events.
+/// The generic parameter `M` specifies the message type this callback handles:
+/// - `Callbacks<FuseMessage>` for main database operations
+/// - `Callbacks<KvStoreMessage>` for status synchronization
+///
+/// This provides type safety by ensuring each DFSM instance only delivers
+/// the correct message type to its callbacks.
+pub trait Callbacks<M>: Send + Sync {
+ /// Deliver an application message
+ ///
+ /// The message type is determined by the generic parameter:
+ /// - FuseMessage for main database operations
+ /// - KvStoreMessage for status synchronization
+ fn deliver_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ message: M,
+ timestamp: u64,
+ ) -> anyhow::Result<(i32, bool)>;
+
+ /// Compute state checksum for verification
+ fn compute_checksum(&self, output: &mut [u8; 32]) -> anyhow::Result<()>;
+
+ /// Get current state for synchronization
+ ///
+ /// Called when we need to send our state to other nodes during sync.
+ fn get_state(&self) -> anyhow::Result<Vec<u8>>;
+
+ /// Process state update during synchronization
+ fn process_state_update(&self, states: &[NodeSyncInfo]) -> anyhow::Result<bool>;
+
+ /// Process incremental update from leader
+ ///
+ /// The leader sends individual TreeEntry updates during synchronization.
+ /// The data is serialized TreeEntry in C-compatible wire format.
+ fn process_update(&self, nodeid: u32, pid: u32, data: &[u8]) -> anyhow::Result<()>;
+
+ /// Commit synchronized state
+ fn commit_state(&self) -> anyhow::Result<()>;
+
+ /// Called when cluster becomes synced
+ fn on_synced(&self);
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
new file mode 100644
index 00000000..dc85a392
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
@@ -0,0 +1,116 @@
+//! Cluster Database Service
+//!
+//! This service synchronizes the distributed cluster database (pmxcfs-memdb) across
+//! all cluster nodes using DFSM (Distributed Finite State Machine).
+//!
+//! Equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+//! Provides automatic retry, event-driven CPG dispatching, and periodic state verification.
+
+use async_trait::async_trait;
+use pmxcfs_services::{DispatchAction, InitResult, Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{debug, error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message as MessageTrait;
+
+/// Cluster Database Service
+///
+/// Synchronizes the distributed cluster database (pmxcfs-memdb) across all nodes.
+/// Implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for database replication
+/// - Periodic state verification via timer callback
+///
+/// This is equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct ClusterDatabaseService<M> {
+ dfsm: Arc<Dfsm<M>>,
+ fd: Option<i32>,
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> ClusterDatabaseService<M> {
+ /// Create a new cluster database service
+ pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+ Self { dfsm, fd: None }
+ }
+}
+
+#[async_trait]
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Service for ClusterDatabaseService<M> {
+ fn name(&self) -> &str {
+ "cluster-database"
+ }
+
+ async fn initialize(&mut self) -> pmxcfs_services::Result<InitResult> {
+ info!("Initializing cluster database service (dcdb)");
+
+ // Initialize CPG connection (this also joins the group)
+ self.dfsm.init_cpg().map_err(|e| {
+ ServiceError::InitializationFailed(format!("DFSM CPG initialization failed: {e}"))
+ })?;
+
+ // Get file descriptor for event monitoring
+ let fd = self.dfsm.fd_get().map_err(|e| {
+ self.dfsm.stop_services().ok();
+ ServiceError::InitializationFailed(format!("Failed to get DFSM fd: {e}"))
+ })?;
+
+ self.fd = Some(fd);
+
+ info!(
+ "Cluster database service initialized successfully with fd {}",
+ fd
+ );
+ Ok(InitResult::WithFileDescriptor(fd))
+ }
+
+ async fn dispatch(&mut self) -> pmxcfs_services::Result<DispatchAction> {
+ match self.dfsm.dispatch_events() {
+ Ok(_) => Ok(DispatchAction::Continue),
+ Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+ warn!("DFSM connection lost, requesting reinitialization");
+ Ok(DispatchAction::Reinitialize)
+ }
+ Err(e) => {
+ error!("DFSM dispatch failed: {}", e);
+ Err(ServiceError::DispatchFailed(format!(
+ "DFSM dispatch failed: {e}"
+ )))
+ }
+ }
+ }
+
+ async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+ info!("Finalizing cluster database service");
+
+ self.fd = None;
+
+ if let Err(e) = self.dfsm.stop_services() {
+ warn!("Error stopping cluster database services: {}", e);
+ }
+
+ info!("Cluster database service finalized");
+ Ok(())
+ }
+
+ async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+ debug!("Cluster database timer callback: initiating state verification");
+
+ // Request state verification
+ if let Err(e) = self.dfsm.verify_request() {
+ warn!("DFSM state verification request failed: {}", e);
+ }
+
+ Ok(())
+ }
+
+ fn timer_period(&self) -> Option<Duration> {
+ // Match C implementation's DCDB_VERIFY_TIME (60 * 60 seconds)
+ // Periodic state verification happens once per hour
+ Some(Duration::from_secs(3600))
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
new file mode 100644
index 00000000..d7964259
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
@@ -0,0 +1,163 @@
+//! Safe, idiomatic wrapper for Corosync CPG (Closed Process Group)
+//!
+//! This module provides a trait-based abstraction over the Corosync CPG C API,
+//! handling the unsafe FFI boundary and callback lifecycle management internally.
+
+use anyhow::Result;
+use rust_corosync::{NodeId, cpg};
+use std::sync::Arc;
+
+/// Helper to extract CpgHandler from CPG context
+///
+/// # Safety
+/// Assumes context was set to a valid *const Arc<dyn CpgHandler> pointer
+unsafe fn handler_from_context(handle: cpg::Handle) -> &'static dyn CpgHandler {
+ let context = cpg::context_get(handle).expect("BUG: Failed to get CPG context");
+
+ assert_ne!(
+ context, 0,
+ "BUG: CPG context is null - CpgService not properly initialized"
+ );
+
+ // Context points to a leaked Arc<dyn CpgHandler>
+ // We borrow the Arc to get a reference to the handler
+ let arc_ptr = context as *const Arc<dyn CpgHandler>;
+ let arc_ref: &Arc<dyn CpgHandler> = unsafe { &*arc_ptr };
+ arc_ref.as_ref()
+}
+
+/// Trait for handling CPG events in a safe, idiomatic way
+///
+/// Implementors receive callbacks when CPG events occur. The trait handles
+/// all unsafe pointer conversion and context management internally.
+pub trait CpgHandler: Send + Sync + 'static {
+ fn on_deliver(&self, group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]);
+
+ fn on_confchg(
+ &self,
+ group_name: &str,
+ member_list: &[cpg::Address],
+ left_list: &[cpg::Address],
+ joined_list: &[cpg::Address],
+ );
+}
+
+/// Safe wrapper for CPG handle that manages callback lifecycle
+///
+/// This service registers callbacks with the CPG handle and ensures proper
+/// cleanup when dropped. It uses Arc reference counting to safely manage
+/// the handler lifetime across the FFI boundary.
+pub struct CpgService {
+ handle: cpg::Handle,
+ handler: Arc<dyn CpgHandler>,
+}
+
+impl CpgService {
+ pub fn new<T: CpgHandler>(handler: Arc<T>) -> Result<Self> {
+ fn cpg_deliver_callback(
+ handle: &cpg::Handle,
+ group_name: String,
+ nodeid: NodeId,
+ pid: u32,
+ msg: &[u8],
+ _msg_len: usize,
+ ) {
+ unsafe {
+ let handler = handler_from_context(*handle);
+ handler.on_deliver(&group_name, nodeid, pid, msg);
+ }
+ }
+
+ fn cpg_confchg_callback(
+ handle: &cpg::Handle,
+ group_name: &str,
+ member_list: Vec<cpg::Address>,
+ left_list: Vec<cpg::Address>,
+ joined_list: Vec<cpg::Address>,
+ ) {
+ unsafe {
+ let handler = handler_from_context(*handle);
+ handler.on_confchg(group_name, &member_list, &left_list, &joined_list);
+ }
+ }
+
+ let model_data = cpg::ModelData::ModelV1(cpg::Model1Data {
+ flags: cpg::Model1Flags::None,
+ deliver_fn: Some(cpg_deliver_callback),
+ confchg_fn: Some(cpg_confchg_callback),
+ totem_confchg_fn: None,
+ });
+
+ let handle = cpg::initialize(&model_data, 0)?;
+
+ let handler_dyn: Arc<dyn CpgHandler> = handler;
+ let leaked_arc = Box::new(Arc::clone(&handler_dyn));
+ let arc_ptr = Box::into_raw(leaked_arc) as *const _ as u64;
+ cpg::context_set(handle, arc_ptr)?;
+
+ Ok(Self {
+ handle,
+ handler: handler_dyn,
+ })
+ }
+
+ pub fn join(&self, group_name: &str) -> Result<()> {
+ // IMPORTANT: C implementation uses strlen(name) + 1 for CPG name length,
+ // which includes the trailing nul. To ensure compatibility with C nodes,
+ // we must add \0 to the group name.
+ // See src/pmxcfs/dfsm.c: dfsm->cpg_group_name.length = strlen(group_name) + 1;
+ let group_string = format!("{}\0", group_name);
+ tracing::warn!(
+ "CPG JOIN: Joining group '{}' (verify matches C's DCDB_CPG_GROUP_NAME='pve_dcdb_v1')",
+ group_name
+ );
+ cpg::join(self.handle, &group_string)?;
+ tracing::info!("CPG JOIN: Successfully joined group '{}'", group_name);
+ Ok(())
+ }
+
+ pub fn leave(&self, group_name: &str) -> Result<()> {
+ // Include trailing nul to match C's behavior (see join() comment)
+ let group_string = format!("{}\0", group_name);
+ cpg::leave(self.handle, &group_string)?;
+ Ok(())
+ }
+
+ pub fn mcast(&self, guarantee: cpg::Guarantee, msg: &[u8]) -> Result<()> {
+ cpg::mcast_joined(self.handle, guarantee, msg)?;
+ Ok(())
+ }
+
+ pub fn dispatch(&self) -> Result<(), rust_corosync::CsError> {
+ cpg::dispatch(self.handle, rust_corosync::DispatchFlags::All)
+ }
+
+ pub fn fd(&self) -> Result<i32> {
+ Ok(cpg::fd_get(self.handle)?)
+ }
+
+ pub fn handler(&self) -> &Arc<dyn CpgHandler> {
+ &self.handler
+ }
+
+ pub fn handle(&self) -> cpg::Handle {
+ self.handle
+ }
+}
+
+impl Drop for CpgService {
+ fn drop(&mut self) {
+ if let Ok(context) = cpg::context_get(self.handle)
+ && context != 0
+ {
+ unsafe {
+ let _boxed = Box::from_raw(context as *mut Arc<dyn CpgHandler>);
+ }
+ }
+
+ let _ = cpg::finalize(self.handle);
+ }
+}
+
+unsafe impl Send for CpgService {}
+unsafe impl Sync for CpgService {}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
new file mode 100644
index 00000000..054f06b8
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
@@ -0,0 +1,728 @@
+/// DFSM Protocol Message Types
+///
+/// This module defines the DfsmMessage enum which encapsulates all DFSM protocol messages
+/// with their associated data, providing type-safe serialization and deserialization.
+///
+/// Wire format matches C implementation's dfsm_message_*_header_t structures for compatibility.
+use anyhow::Result;
+use pmxcfs_memdb::TreeEntry;
+
+use super::message::Message as MessageTrait;
+use super::types::{DfsmMessageType, SyncEpoch};
+
+/// DFSM protocol message with typed variants
+///
+/// Each variant corresponds to a message type in the DFSM protocol and carries
+/// the appropriate payload data. The wire format matches the C implementation:
+///
+/// For Normal messages: dfsm_message_normal_header_t (24 bytes) + fuse_data
+/// ```text
+/// [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][fuse_data...]
+/// ```
+///
+/// The generic parameter `M` specifies the application message type and must implement
+/// the `Message` trait for serialization/deserialization:
+/// - `DfsmMessage<FuseMessage>` for database operations
+/// - `DfsmMessage<KvStoreMessage>` for status synchronization
+#[derive(Debug, Clone)]
+pub enum DfsmMessage<M>
+where
+ M: MessageTrait,
+{
+ /// Regular application message
+ ///
+ /// Contains a typed application message (FuseMessage or KvStoreMessage).
+ /// C wire format: dfsm_message_normal_header_t + application_message data
+ Normal {
+ msg_count: u64,
+ timestamp: u32, // Unix timestamp (matches C's u32)
+ protocol_version: u32, // Protocol version
+ message: M, // Typed message (FuseMessage or KvStoreMessage)
+ },
+
+ /// Start synchronization signal from leader (no payload)
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+ SyncStart { sync_epoch: SyncEpoch },
+
+ /// State data from another node during sync
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [state_data: raw bytes]
+ State {
+ sync_epoch: SyncEpoch,
+ data: Vec<u8>,
+ },
+
+ /// State update from leader
+ ///
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + TreeEntry fields
+ /// This is sent by the leader during synchronization to update followers
+ /// with individual database entries that differ from their state.
+ Update {
+ sync_epoch: SyncEpoch,
+ tree_entry: TreeEntry,
+ },
+
+ /// Update complete signal from leader (no payload)
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+ UpdateComplete { sync_epoch: SyncEpoch },
+
+ /// Verification request from leader
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64]
+ VerifyRequest { sync_epoch: SyncEpoch, csum_id: u64 },
+
+ /// Verification response with checksum
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64][checksum: [u8; 32]]
+ Verify {
+ sync_epoch: SyncEpoch,
+ csum_id: u64,
+ checksum: [u8; 32],
+ },
+}
+
+impl<M> DfsmMessage<M>
+where
+ M: MessageTrait,
+{
+ /// Protocol version (should match cluster-wide)
+ pub const DEFAULT_PROTOCOL_VERSION: u32 = 1;
+
+ /// Get the message type discriminant
+ pub fn message_type(&self) -> DfsmMessageType {
+ match self {
+ DfsmMessage::Normal { .. } => DfsmMessageType::Normal,
+ DfsmMessage::SyncStart { .. } => DfsmMessageType::SyncStart,
+ DfsmMessage::State { .. } => DfsmMessageType::State,
+ DfsmMessage::Update { .. } => DfsmMessageType::Update,
+ DfsmMessage::UpdateComplete { .. } => DfsmMessageType::UpdateComplete,
+ DfsmMessage::VerifyRequest { .. } => DfsmMessageType::VerifyRequest,
+ DfsmMessage::Verify { .. } => DfsmMessageType::Verify,
+ }
+ }
+
+ /// Serialize message to C-compatible wire format
+ ///
+ /// For Normal/Update: dfsm_message_normal_header_t (24 bytes) + application_data
+ /// Format: [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][data...]
+ pub fn serialize(&self) -> Vec<u8> {
+ match self {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ } => self.serialize_normal_message(*msg_count, *timestamp, *protocol_version, message),
+ _ => self.serialize_state_message(),
+ }
+ }
+
+ /// Serialize a Normal message with C-compatible header
+ fn serialize_normal_message(
+ &self,
+ msg_count: u64,
+ timestamp: u32,
+ protocol_version: u32,
+ message: &M,
+ ) -> Vec<u8> {
+ let msg_type = self.message_type() as u16;
+ let subtype = message.message_type();
+ let app_data = message.serialize();
+
+ // C header: type (u16) + subtype (u16) + protocol (u32) + time (u32) + reserved (u32) + count (u64) = 24 bytes
+ let mut message = Vec::with_capacity(24 + app_data.len());
+
+ // dfsm_message_header_t fields
+ message.extend_from_slice(&msg_type.to_le_bytes());
+ message.extend_from_slice(&subtype.to_le_bytes());
+ message.extend_from_slice(&protocol_version.to_le_bytes());
+ message.extend_from_slice(×tamp.to_le_bytes());
+ message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // count field
+ message.extend_from_slice(&msg_count.to_le_bytes());
+
+ // application message data
+ message.extend_from_slice(&app_data);
+
+ message
+ }
+
+ /// Serialize state messages (non-Normal) with C-compatible header
+ /// C wire format: dfsm_message_state_header_t (32 bytes) + payload
+ /// Header breakdown: base (16 bytes) + epoch (16 bytes)
+ fn serialize_state_message(&self) -> Vec<u8> {
+ let msg_type = self.message_type() as u16;
+ let (sync_epoch, payload) = self.extract_epoch_and_payload();
+
+ // For state messages: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + payload
+ let mut message = Vec::with_capacity(32 + payload.len());
+
+ // Base header (16 bytes): type, subtype, protocol, time, reserved
+ message.extend_from_slice(&msg_type.to_le_bytes());
+ message.extend_from_slice(&0u16.to_le_bytes()); // subtype (unused)
+ message.extend_from_slice(&Self::DEFAULT_PROTOCOL_VERSION.to_le_bytes());
+
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+ message.extend_from_slice(×tamp.to_le_bytes());
+ message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // Epoch header (16 bytes): epoch, time, nodeid, pid
+ message.extend_from_slice(&sync_epoch.serialize());
+
+ // Payload
+ message.extend_from_slice(&payload);
+
+ message
+ }
+
+ /// Extract sync_epoch and payload from state messages
+ fn extract_epoch_and_payload(&self) -> (SyncEpoch, Vec<u8>) {
+ match self {
+ DfsmMessage::Normal { .. } => {
+ unreachable!("Normal messages use serialize_normal_message")
+ }
+ DfsmMessage::SyncStart { sync_epoch } => (*sync_epoch, Vec::new()),
+ DfsmMessage::State { sync_epoch, data } => (*sync_epoch, data.clone()),
+ DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ } => (*sync_epoch, tree_entry.serialize_for_update()),
+ DfsmMessage::UpdateComplete { sync_epoch } => (*sync_epoch, Vec::new()),
+ DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ } => (*sync_epoch, csum_id.to_le_bytes().to_vec()),
+ DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ } => {
+ let mut data = Vec::with_capacity(8 + 32);
+ data.extend_from_slice(&csum_id.to_le_bytes());
+ data.extend_from_slice(checksum);
+ (*sync_epoch, data)
+ }
+ }
+ }
+
+ /// Deserialize message from C-compatible wire format
+ ///
+ /// Normal messages: [base header: 16 bytes][count: u64][app data]
+ /// State messages: [base header: 16 bytes][epoch: 16 bytes][payload]
+ ///
+ /// # Arguments
+ /// * `data` - Raw message bytes from CPG
+ pub fn deserialize(data: &[u8]) -> Result<Self> {
+ if data.len() < 16 {
+ anyhow::bail!(
+ "Message too short: {} bytes (need at least 16 for header)",
+ data.len()
+ );
+ }
+
+ // Parse dfsm_message_header_t (16 bytes)
+ let msg_type = u16::from_le_bytes([data[0], data[1]]);
+ let subtype = u16::from_le_bytes([data[2], data[3]]);
+ let protocol_version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
+ let timestamp = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
+ let _reserved = u32::from_le_bytes([data[12], data[13], data[14], data[15]]);
+
+ let dfsm_type = DfsmMessageType::try_from(msg_type)?;
+
+ // Normal messages have different structure than state messages
+ if dfsm_type == DfsmMessageType::Normal {
+ // Normal: [base: 16][count: 8][app_data: ...]
+ let payload = &data[16..];
+ Self::deserialize_normal_message(subtype, protocol_version, timestamp, payload)
+ } else {
+ // State messages: [base: 16][epoch: 16][payload: ...]
+ if data.len() < 32 {
+ anyhow::bail!(
+ "State message too short: {} bytes (need at least 32 for state header)",
+ data.len()
+ );
+ }
+ let sync_epoch = SyncEpoch::deserialize(&data[16..32])
+ .map_err(|e| anyhow::anyhow!("Failed to deserialize sync epoch: {e}"))?;
+ let payload = &data[32..];
+ Self::deserialize_state_message(dfsm_type, sync_epoch, payload)
+ }
+ }
+
+ /// Deserialize a Normal message
+ fn deserialize_normal_message(
+ subtype: u16,
+ protocol_version: u32,
+ timestamp: u32,
+ payload: &[u8],
+ ) -> Result<Self> {
+ // Normal messages have count field (u64) after base header
+ if payload.len() < 8 {
+ anyhow::bail!("Normal message too short: need count field");
+ }
+ let msg_count = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ let app_data = &payload[8..];
+
+ // Deserialize using the MessageTrait
+ let message = M::deserialize(subtype, app_data)?;
+
+ Ok(DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ })
+ }
+
+ /// Deserialize a state message (with epoch)
+ fn deserialize_state_message(
+ dfsm_type: DfsmMessageType,
+ sync_epoch: SyncEpoch,
+ payload: &[u8],
+ ) -> Result<Self> {
+ match dfsm_type {
+ DfsmMessageType::Normal => {
+ unreachable!("Normal messages use deserialize_normal_message")
+ }
+ DfsmMessageType::Update => {
+ let tree_entry = TreeEntry::deserialize_from_update(payload)?;
+ Ok(DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ })
+ }
+ DfsmMessageType::SyncStart => Ok(DfsmMessage::SyncStart { sync_epoch }),
+ DfsmMessageType::State => Ok(DfsmMessage::State {
+ sync_epoch,
+ data: payload.to_vec(),
+ }),
+ DfsmMessageType::UpdateComplete => Ok(DfsmMessage::UpdateComplete { sync_epoch }),
+ DfsmMessageType::VerifyRequest => {
+ if payload.len() < 8 {
+ anyhow::bail!("VerifyRequest message too short");
+ }
+ let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ Ok(DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ })
+ }
+ DfsmMessageType::Verify => {
+ if payload.len() < 40 {
+ anyhow::bail!("Verify message too short");
+ }
+ let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ let mut checksum = [0u8; 32];
+ checksum.copy_from_slice(&payload[8..40]);
+ Ok(DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ })
+ }
+ }
+ }
+
+ /// Helper to create a Normal message from an application message
+ pub fn from_message(msg_count: u64, message: M, protocol_version: u32) -> Self {
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ }
+ }
+
+ /// Helper to create an Update message from a TreeEntry
+ ///
+ /// Used by the leader during synchronization to send individual database entries
+ /// to nodes that need to catch up. Matches C's dcdb_send_update_inode().
+ pub fn from_tree_entry(tree_entry: TreeEntry, sync_epoch: SyncEpoch) -> Self {
+ DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::FuseMessage;
+
+ #[test]
+ fn test_sync_start_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 1234567890,
+ nodeid: 1,
+ pid: 1000,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ assert!(
+ matches!(deserialized, DfsmMessage::SyncStart { sync_epoch: e } if e == sync_epoch)
+ );
+ }
+
+ #[test]
+ fn test_normal_roundtrip() {
+ let fuse_msg = FuseMessage::Create {
+ path: "/test/file".to_string(),
+ };
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Normal {
+ msg_count: 42,
+ timestamp: 1234567890,
+ protocol_version: DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION,
+ message: fuse_msg.clone(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ } => {
+ assert_eq!(msg_count, 42);
+ assert_eq!(timestamp, 1234567890);
+ assert_eq!(
+ protocol_version,
+ DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION
+ );
+ assert_eq!(message, fuse_msg);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_verify_request_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 2,
+ time: 1234567891,
+ nodeid: 2,
+ pid: 2000,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: 0x123456789ABCDEF0,
+ };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::VerifyRequest {
+ sync_epoch: e,
+ csum_id,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, 0x123456789ABCDEF0);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_verify_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 3,
+ time: 1234567892,
+ nodeid: 3,
+ pid: 3000,
+ };
+ let checksum = [42u8; 32];
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Verify {
+ sync_epoch,
+ csum_id: 0x1122334455667788,
+ checksum,
+ };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Verify {
+ sync_epoch: e,
+ csum_id,
+ checksum: recv_checksum,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, 0x1122334455667788);
+ assert_eq!(recv_checksum, checksum);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_invalid_magic() {
+ let data = vec![0xAA, 0x00, 0x01, 0x02];
+ assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+ }
+
+ #[test]
+ fn test_too_short() {
+ let data = vec![0xFF];
+ assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+ }
+
+ // ===== Edge Case Tests =====
+
+ #[test]
+ fn test_state_message_too_short() {
+ // State messages need at least 32 bytes (16 base + 16 epoch)
+ let mut data = vec![0u8; 31]; // One byte short
+ // Set message type to State (2)
+ data[0..2].copy_from_slice(&2u16.to_le_bytes());
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(result.is_err(), "State message with 31 bytes should fail");
+ assert!(result.unwrap_err().to_string().contains("too short"));
+ }
+
+ #[test]
+ fn test_normal_message_missing_count() {
+ // Normal messages need count field (u64) after 16-byte header
+ let mut data = vec![0u8; 20]; // Header + 4 bytes (not enough for u64 count)
+ // Set message type to Normal (0)
+ data[0..2].copy_from_slice(&0u16.to_le_bytes());
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(
+ result.is_err(),
+ "Normal message without full count field should fail"
+ );
+ }
+
+ #[test]
+ fn test_verify_message_truncated_checksum() {
+ // Verify messages need csum_id (8 bytes) + checksum (32 bytes) = 40 bytes payload
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 123,
+ nodeid: 1,
+ pid: 100,
+ };
+ let mut data = Vec::new();
+
+ // Base header (16 bytes)
+ data.extend_from_slice(&6u16.to_le_bytes()); // Verify message type
+ data.extend_from_slice(&0u16.to_le_bytes()); // subtype
+ data.extend_from_slice(&1u32.to_le_bytes()); // protocol
+ data.extend_from_slice(&123u32.to_le_bytes()); // time
+ data.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // Epoch (16 bytes)
+ data.extend_from_slice(&sync_epoch.serialize());
+
+ // Truncated payload (only 39 bytes instead of 40)
+ data.extend_from_slice(&0x12345678u64.to_le_bytes());
+ data.extend_from_slice(&[0u8; 31]); // Only 31 bytes of checksum
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(
+ result.is_err(),
+ "Verify message with truncated checksum should fail"
+ );
+ }
+
+ #[test]
+ fn test_update_message_with_tree_entry() {
+ use pmxcfs_memdb::TreeEntry;
+
+ // Create a valid tree entry with matching size
+ let data = vec![1, 2, 3, 4, 5];
+ let tree_entry = TreeEntry {
+ inode: 42,
+ parent: 0,
+ version: 1,
+ writer: 0,
+ name: "testfile".to_string(),
+ mtime: 1234567890,
+ size: data.len(), // size must match data.len()
+ entry_type: 8, // DT_REG (regular file)
+ data,
+ };
+
+ let sync_epoch = SyncEpoch {
+ epoch: 5,
+ time: 999,
+ nodeid: 2,
+ pid: 200,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Update {
+ sync_epoch,
+ tree_entry: tree_entry.clone(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Update {
+ sync_epoch: e,
+ tree_entry: recv_entry,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(recv_entry.inode, tree_entry.inode);
+ assert_eq!(recv_entry.name, tree_entry.name);
+ assert_eq!(recv_entry.size, tree_entry.size);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_update_complete_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 10,
+ time: 5555,
+ nodeid: 3,
+ pid: 300,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+
+ let serialized = msg.serialize();
+ assert_eq!(
+ serialized.len(),
+ 32,
+ "UpdateComplete should be exactly 32 bytes (header + epoch)"
+ );
+
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ assert!(
+ matches!(deserialized, DfsmMessage::UpdateComplete { sync_epoch: e } if e == sync_epoch)
+ );
+ }
+
+ #[test]
+ fn test_state_message_with_large_payload() {
+ let sync_epoch = SyncEpoch {
+ epoch: 7,
+ time: 7777,
+ nodeid: 4,
+ pid: 400,
+ };
+ // Create a large payload (1MB)
+ let large_data = vec![0xAB; 1024 * 1024];
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::State {
+ sync_epoch,
+ data: large_data.clone(),
+ };
+
+ let serialized = msg.serialize();
+ // Should be 32 bytes header + 1MB data
+ assert_eq!(serialized.len(), 32 + 1024 * 1024);
+
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::State {
+ sync_epoch: e,
+ data,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(data.len(), large_data.len());
+ assert_eq!(data, large_data);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_message_type_detection() {
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 100,
+ nodeid: 1,
+ pid: 50,
+ };
+
+ let sync_start: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+ assert_eq!(sync_start.message_type(), DfsmMessageType::SyncStart);
+
+ let state: DfsmMessage<FuseMessage> = DfsmMessage::State {
+ sync_epoch,
+ data: vec![1, 2, 3],
+ };
+ assert_eq!(state.message_type(), DfsmMessageType::State);
+
+ let update_complete: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+ assert_eq!(
+ update_complete.message_type(),
+ DfsmMessageType::UpdateComplete
+ );
+ }
+
+ #[test]
+ fn test_from_message_helper() {
+ let fuse_msg = FuseMessage::Mkdir {
+ path: "/new/dir".to_string(),
+ };
+ let msg_count = 123;
+ let protocol_version = DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION;
+
+ let dfsm_msg = DfsmMessage::from_message(msg_count, fuse_msg.clone(), protocol_version);
+
+ match dfsm_msg {
+ DfsmMessage::Normal {
+ msg_count: count,
+ timestamp: _,
+ protocol_version: pv,
+ message,
+ } => {
+ assert_eq!(count, msg_count);
+ assert_eq!(pv, protocol_version);
+ assert_eq!(message, fuse_msg);
+ }
+ _ => panic!("from_message should create Normal variant"),
+ }
+ }
+
+ #[test]
+ fn test_verify_request_with_max_csum_id() {
+ let sync_epoch = SyncEpoch {
+ epoch: 99,
+ time: 9999,
+ nodeid: 5,
+ pid: 500,
+ };
+ let max_csum_id = u64::MAX; // Test with maximum value
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: max_csum_id,
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::VerifyRequest {
+ sync_epoch: e,
+ csum_id,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, max_csum_id);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
new file mode 100644
index 00000000..ee5d28f8
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
@@ -0,0 +1,185 @@
+/// FUSE message types for cluster synchronization
+///
+/// These are the high-level operations that get broadcast through the cluster
+/// via the main database DFSM (pmxcfs_v1 CPG group).
+use anyhow::{Context, Result};
+
+use crate::message::Message;
+use crate::wire_format::{CFuseMessage, CMessageType};
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum FuseMessage {
+ /// Create a regular file
+ Create { path: String },
+ /// Create a directory
+ Mkdir { path: String },
+ /// Write data to a file
+ Write {
+ path: String,
+ offset: u64,
+ data: Vec<u8>,
+ },
+ /// Delete a file or directory
+ Delete { path: String },
+ /// Rename/move a file or directory
+ Rename { from: String, to: String },
+ /// Update modification time
+ Mtime { path: String },
+ /// Request unlock (not yet implemented)
+ UnlockRequest { path: String },
+ /// Unlock (not yet implemented)
+ Unlock { path: String },
+}
+
+impl Message for FuseMessage {
+ fn message_type(&self) -> u16 {
+ match self {
+ FuseMessage::Create { .. } => CMessageType::Create as u16,
+ FuseMessage::Mkdir { .. } => CMessageType::Mkdir as u16,
+ FuseMessage::Write { .. } => CMessageType::Write as u16,
+ FuseMessage::Delete { .. } => CMessageType::Delete as u16,
+ FuseMessage::Rename { .. } => CMessageType::Rename as u16,
+ FuseMessage::Mtime { .. } => CMessageType::Mtime as u16,
+ FuseMessage::UnlockRequest { .. } => CMessageType::UnlockRequest as u16,
+ FuseMessage::Unlock { .. } => CMessageType::Unlock as u16,
+ }
+ }
+
+ fn serialize(&self) -> Vec<u8> {
+ let c_msg = match self {
+ FuseMessage::Create { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Mkdir { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Write { path, offset, data } => CFuseMessage {
+ size: data.len() as u32,
+ offset: *offset as u32,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: data.clone(),
+ },
+ FuseMessage::Delete { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Rename { from, to } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: from.clone(),
+ to: Some(to.clone()),
+ data: Vec::new(),
+ },
+ FuseMessage::Mtime { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::UnlockRequest { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Unlock { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ };
+
+ c_msg.serialize()
+ }
+
+ fn deserialize(message_type: u16, data: &[u8]) -> Result<Self> {
+ let c_msg = CFuseMessage::parse(data).context("Failed to parse C FUSE message")?;
+ let msg_type = CMessageType::try_from(message_type).context("Invalid C message type")?;
+
+ Ok(match msg_type {
+ CMessageType::Create => FuseMessage::Create { path: c_msg.path },
+ CMessageType::Mkdir => FuseMessage::Mkdir { path: c_msg.path },
+ CMessageType::Write => FuseMessage::Write {
+ path: c_msg.path,
+ offset: c_msg.offset as u64,
+ data: c_msg.data,
+ },
+ CMessageType::Delete => FuseMessage::Delete { path: c_msg.path },
+ CMessageType::Rename => FuseMessage::Rename {
+ from: c_msg.path,
+ to: c_msg.to.unwrap_or_default(),
+ },
+ CMessageType::Mtime => FuseMessage::Mtime { path: c_msg.path },
+ CMessageType::UnlockRequest => FuseMessage::UnlockRequest { path: c_msg.path },
+ CMessageType::Unlock => FuseMessage::Unlock { path: c_msg.path },
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_fuse_message_create() {
+ let msg = FuseMessage::Create {
+ path: "/test/file".to_string(),
+ };
+ assert_eq!(msg.message_type(), CMessageType::Create as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_fuse_message_write() {
+ let msg = FuseMessage::Write {
+ path: "/test/file".to_string(),
+ offset: 100,
+ data: vec![1, 2, 3, 4, 5],
+ };
+ assert_eq!(msg.message_type(), CMessageType::Write as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_fuse_message_rename() {
+ let msg = FuseMessage::Rename {
+ from: "/old/path".to_string(),
+ to: "/new/path".to_string(),
+ };
+ assert_eq!(msg.message_type(), CMessageType::Rename as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
new file mode 100644
index 00000000..db49a469
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
@@ -0,0 +1,329 @@
+/// KvStore message types for DFSM status synchronization
+///
+/// This module defines the KvStore message types that are delivered through
+/// the status DFSM state machine (pve_kvstore_v1 CPG group).
+use anyhow::Context;
+
+use crate::message::Message;
+
+/// KvStore message type IDs (matches C's kvstore_message_t enum)
+#[derive(
+ Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive,
+)]
+#[repr(u16)]
+enum KvStoreMessageType {
+ Update = 1, // KVSTORE_MESSAGE_UPDATE
+ UpdateComplete = 2, // KVSTORE_MESSAGE_UPDATE_COMPLETE
+ Log = 3, // KVSTORE_MESSAGE_LOG
+}
+
+/// KvStore message types for ephemeral status synchronization
+///
+/// These messages are used by the kvstore DFSM (pve_kvstore_v1 CPG group)
+/// to synchronize ephemeral data like RRD metrics, node IPs, and cluster logs.
+///
+/// Matches C implementation's KVSTORE_MESSAGE_* types in status.c
+#[derive(Debug, Clone, PartialEq)]
+pub enum KvStoreMessage {
+ /// Update key-value data from a node
+ ///
+ /// Wire format: key (256 bytes, null-terminated) + value (variable length)
+ /// Matches C's KVSTORE_MESSAGE_UPDATE
+ Update { key: String, value: Vec<u8> },
+
+ /// Cluster log entry
+ ///
+ /// Wire format: clog_entry_t struct
+ /// Matches C's KVSTORE_MESSAGE_LOG
+ Log {
+ time: u32,
+ priority: u8,
+ node: String,
+ ident: String,
+ tag: String,
+ message: String,
+ },
+
+ /// Update complete signal (not currently used)
+ ///
+ /// Matches C's KVSTORE_MESSAGE_UPDATE_COMPLETE
+ UpdateComplete,
+}
+
+impl KvStoreMessage {
+ /// Get message type ID (matches C's kvstore_message_t enum)
+ pub fn message_type(&self) -> u16 {
+ let msg_type = match self {
+ KvStoreMessage::Update { .. } => KvStoreMessageType::Update,
+ KvStoreMessage::UpdateComplete => KvStoreMessageType::UpdateComplete,
+ KvStoreMessage::Log { .. } => KvStoreMessageType::Log,
+ };
+ msg_type.into()
+ }
+
+ /// Serialize to C-compatible wire format
+ ///
+ /// Update format: key (256 bytes, null-terminated) + value (variable)
+ /// Log format: clog_entry_t struct
+ pub fn serialize(&self) -> Vec<u8> {
+ match self {
+ KvStoreMessage::Update { key, value } => {
+ // C format: char key[256] + data
+ let mut buf = vec![0u8; 256];
+ let key_bytes = key.as_bytes();
+ let copy_len = key_bytes.len().min(255); // Leave room for null terminator
+ buf[..copy_len].copy_from_slice(&key_bytes[..copy_len]);
+ // buf is already zero-filled, so null terminator is automatic
+
+ buf.extend_from_slice(value);
+ buf
+ }
+ KvStoreMessage::Log {
+ time,
+ priority,
+ node,
+ ident,
+ tag,
+ message,
+ } => {
+ // C format: clog_entry_t
+ // struct clog_entry_t {
+ // uint32_t time;
+ // uint8_t priority;
+ // uint8_t padding[3];
+ // uint32_t node_len, ident_len, tag_len, msg_len;
+ // char data[]; // node + ident + tag + message (all null-terminated)
+ // }
+
+ let node_bytes = node.as_bytes();
+ let ident_bytes = ident.as_bytes();
+ let tag_bytes = tag.as_bytes();
+ let msg_bytes = message.as_bytes();
+
+ let node_len = (node_bytes.len() + 1) as u32; // +1 for null
+ let ident_len = (ident_bytes.len() + 1) as u32;
+ let tag_len = (tag_bytes.len() + 1) as u32;
+ let msg_len = (msg_bytes.len() + 1) as u32;
+
+ let total_len = 4 + 1 + 3 + 16 + node_len + ident_len + tag_len + msg_len;
+ let mut buf = Vec::with_capacity(total_len as usize);
+
+ buf.extend_from_slice(&time.to_le_bytes());
+ buf.push(*priority);
+ buf.extend_from_slice(&[0u8; 3]); // padding
+ buf.extend_from_slice(&node_len.to_le_bytes());
+ buf.extend_from_slice(&ident_len.to_le_bytes());
+ buf.extend_from_slice(&tag_len.to_le_bytes());
+ buf.extend_from_slice(&msg_len.to_le_bytes());
+
+ buf.extend_from_slice(node_bytes);
+ buf.push(0); // null terminator
+ buf.extend_from_slice(ident_bytes);
+ buf.push(0);
+ buf.extend_from_slice(tag_bytes);
+ buf.push(0);
+ buf.extend_from_slice(msg_bytes);
+ buf.push(0);
+
+ buf
+ }
+ KvStoreMessage::UpdateComplete => {
+ // No payload
+ Vec::new()
+ }
+ }
+ }
+
+ /// Deserialize from C-compatible wire format
+ pub fn deserialize(msg_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+ use KvStoreMessageType::*;
+
+ let msg_type = KvStoreMessageType::try_from(msg_type)
+ .map_err(|_| anyhow::anyhow!("Unknown kvstore message type: {msg_type}"))?;
+
+ match msg_type {
+ Update => {
+ if data.len() < 256 {
+ anyhow::bail!("UPDATE message too short: {} < 256", data.len());
+ }
+
+ // Find null terminator in first 256 bytes
+ let key_end = data[..256]
+ .iter()
+ .position(|&b| b == 0)
+ .ok_or_else(|| anyhow::anyhow!("UPDATE key not null-terminated"))?;
+
+ let key = std::str::from_utf8(&data[..key_end])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in UPDATE key: {e}"))?
+ .to_string();
+
+ let value = data[256..].to_vec();
+
+ Ok(KvStoreMessage::Update { key, value })
+ }
+ UpdateComplete => Ok(KvStoreMessage::UpdateComplete),
+ Log => {
+ if data.len() < 20 {
+ // Minimum: 4+1+3+16 = 24 bytes header
+ anyhow::bail!("LOG message too short");
+ }
+
+ let time = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
+ let priority = data[4];
+ // data[5..8] is padding
+
+ let node_len = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
+ let ident_len =
+ u32::from_le_bytes([data[12], data[13], data[14], data[15]]) as usize;
+ let tag_len = u32::from_le_bytes([data[16], data[17], data[18], data[19]]) as usize;
+ let msg_len = u32::from_le_bytes([data[20], data[21], data[22], data[23]]) as usize;
+
+ let expected_len = 24 + node_len + ident_len + tag_len + msg_len;
+ if data.len() != expected_len {
+ anyhow::bail!(
+ "LOG message size mismatch: {} != {}",
+ data.len(),
+ expected_len
+ );
+ }
+
+ let mut offset = 24;
+
+ let node = std::str::from_utf8(&data[offset..offset + node_len - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG node: {e}"))?
+ .to_string();
+ offset += node_len;
+
+ let ident = std::str::from_utf8(&data[offset..offset + ident_len - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG ident: {e}"))?
+ .to_string();
+ offset += ident_len;
+
+ let tag = std::str::from_utf8(&data[offset..offset + tag_len - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG tag: {e}"))?
+ .to_string();
+ offset += tag_len;
+
+ let message = std::str::from_utf8(&data[offset..offset + msg_len - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG message: {e}"))?
+ .to_string();
+
+ Ok(KvStoreMessage::Log {
+ time,
+ priority,
+ node,
+ ident,
+ tag,
+ message,
+ })
+ }
+ }
+ }
+}
+
+impl Message for KvStoreMessage {
+ fn message_type(&self) -> u16 {
+ // Delegate to the existing method
+ KvStoreMessage::message_type(self)
+ }
+
+ fn serialize(&self) -> Vec<u8> {
+ // Delegate to the existing method
+ KvStoreMessage::serialize(self)
+ }
+
+ fn deserialize(message_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+ // Delegate to the existing method
+ KvStoreMessage::deserialize(message_type, data)
+ .context("Failed to deserialize KvStoreMessage")
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_kvstore_message_update_serialization() {
+ let msg = KvStoreMessage::Update {
+ key: "test_key".to_string(),
+ value: vec![1, 2, 3, 4, 5],
+ };
+
+ let serialized = msg.serialize();
+ assert_eq!(serialized.len(), 256 + 5);
+ assert_eq!(&serialized[..8], b"test_key");
+ assert_eq!(serialized[8], 0); // null terminator
+ assert_eq!(&serialized[256..], &[1, 2, 3, 4, 5]);
+
+ let deserialized = KvStoreMessage::deserialize(1, &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_kvstore_message_log_serialization() {
+ let msg = KvStoreMessage::Log {
+ time: 1234567890,
+ priority: 5,
+ node: "node1".to_string(),
+ ident: "pmxcfs".to_string(),
+ tag: "info".to_string(),
+ message: "test message".to_string(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = KvStoreMessage::deserialize(3, &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_kvstore_message_type() {
+ assert_eq!(
+ KvStoreMessage::Update {
+ key: "".into(),
+ value: vec![]
+ }
+ .message_type(),
+ 1
+ );
+ assert_eq!(KvStoreMessage::UpdateComplete.message_type(), 2);
+ assert_eq!(
+ KvStoreMessage::Log {
+ time: 0,
+ priority: 0,
+ node: "".into(),
+ ident: "".into(),
+ tag: "".into(),
+ message: "".into()
+ }
+ .message_type(),
+ 3
+ );
+ }
+
+ #[test]
+ fn test_kvstore_message_type_roundtrip() {
+ // Test that message_type() and deserialize() are consistent
+ use super::KvStoreMessageType;
+
+ assert_eq!(u16::from(KvStoreMessageType::Update), 1);
+ assert_eq!(u16::from(KvStoreMessageType::UpdateComplete), 2);
+ assert_eq!(u16::from(KvStoreMessageType::Log), 3);
+
+ assert_eq!(
+ KvStoreMessageType::try_from(1).unwrap(),
+ KvStoreMessageType::Update
+ );
+ assert_eq!(
+ KvStoreMessageType::try_from(2).unwrap(),
+ KvStoreMessageType::UpdateComplete
+ );
+ assert_eq!(
+ KvStoreMessageType::try_from(3).unwrap(),
+ KvStoreMessageType::Log
+ );
+
+ assert!(KvStoreMessageType::try_from(0).is_err());
+ assert!(KvStoreMessageType::try_from(4).is_err());
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
new file mode 100644
index 00000000..89240483
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
@@ -0,0 +1,32 @@
+/// Distributed Finite State Machine (DFSM) for cluster state synchronization
+///
+/// This crate implements the state machine for synchronizing configuration
+/// changes across the cluster nodes using Corosync CPG.
+///
+/// The DFSM handles:
+/// - State synchronization between nodes
+/// - Message ordering and queuing
+/// - Leader-based state updates
+/// - Split-brain prevention
+/// - Membership change handling
+mod callbacks;
+pub mod cluster_database_service;
+mod cpg_service;
+mod dfsm_message;
+mod fuse_message;
+mod kv_store_message;
+mod message;
+mod state_machine;
+pub mod status_sync_service;
+mod types;
+mod wire_format;
+
+// Re-export public API
+pub use callbacks::Callbacks;
+pub use cluster_database_service::ClusterDatabaseService;
+pub use cpg_service::{CpgHandler, CpgService};
+pub use fuse_message::FuseMessage;
+pub use kv_store_message::KvStoreMessage;
+pub use state_machine::{Dfsm, DfsmBroadcast};
+pub use status_sync_service::StatusSyncService;
+pub use types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
new file mode 100644
index 00000000..24e6847b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
@@ -0,0 +1,21 @@
+/// High-level message abstraction for DFSM
+///
+/// This module provides a Message trait for working with cluster messages
+/// at a higher abstraction level than raw bytes.
+use anyhow::Result;
+
+/// Trait for messages that can be sent through DFSM
+pub trait Message: Sized {
+ /// Get the message type identifier
+ fn message_type(&self) -> u16;
+
+ /// Serialize the message to bytes (application message payload only)
+ ///
+ /// This serializes only the application-level payload. The DFSM protocol
+ /// headers (msg_count, timestamp, protocol_version, etc.) are added by
+ /// DfsmMessage::serialize() when wrapping in DfsmMessage::Normal.
+ fn serialize(&self) -> Vec<u8>;
+
+ /// Deserialize from bytes given a message type
+ fn deserialize(message_type: u16, data: &[u8]) -> Result<Self>;
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
new file mode 100644
index 00000000..2c90e4ea
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
@@ -0,0 +1,1013 @@
+/// DFSM state machine implementation
+///
+/// This module contains the main Dfsm struct and its implementation
+/// for managing distributed state synchronization.
+use anyhow::{Context, Result};
+use parking_lot::{Mutex, RwLock};
+use pmxcfs_api_types::MemberInfo;
+use rust_corosync::{NodeId, cpg};
+use std::collections::{BTreeMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use super::cpg_service::{CpgHandler, CpgService};
+use super::dfsm_message::DfsmMessage;
+use super::message::Message as MessageTrait;
+use super::types::{DfsmMode, QueuedMessage, SyncEpoch};
+use crate::{Callbacks, FuseMessage, NodeSyncInfo};
+
+/// Extension trait to add broadcast() method to Option<Arc<Dfsm<FuseMessage>>>
+///
+/// This allows calling `.broadcast()` directly on Option<Arc<Dfsm<FuseMessage>>> fields
+/// without explicit None checking at call sites.
+pub trait DfsmBroadcast {
+ fn broadcast(&self, msg: FuseMessage);
+}
+
+impl DfsmBroadcast for Option<Arc<Dfsm<FuseMessage>>> {
+ fn broadcast(&self, msg: FuseMessage) {
+ if let Some(dfsm) = self {
+ let _ = dfsm.broadcast(msg);
+ }
+ }
+}
+
+/// DFSM state machine
+///
+/// The generic parameter `M` specifies the message type this DFSM handles:
+/// - `Dfsm<FuseMessage>` for main database operations
+/// - `Dfsm<KvStoreMessage>` for status synchronization
+pub struct Dfsm<M> {
+ /// CPG service for cluster communication (matching C's dfsm_t->cpg_handle)
+ cpg_service: RwLock<Option<Arc<CpgService>>>,
+
+ /// Cluster group name for CPG
+ cluster_name: String,
+
+ /// Callbacks for application integration
+ callbacks: Arc<dyn Callbacks<M>>,
+
+ /// Current operating mode
+ mode: RwLock<DfsmMode>,
+
+ /// Current sync epoch
+ sync_epoch: RwLock<SyncEpoch>,
+
+ /// Local epoch counter
+ local_epoch_counter: Mutex<u32>,
+
+ /// Node synchronization info
+ sync_nodes: RwLock<Vec<NodeSyncInfo>>,
+
+ /// Message queue (ordered by count)
+ msg_queue: Mutex<BTreeMap<u64, QueuedMessage<M>>>,
+
+ /// Sync queue for messages during update mode
+ sync_queue: Mutex<VecDeque<QueuedMessage<M>>>,
+
+ /// Message counter for ordering (atomic for lock-free increment)
+ msg_counter: AtomicU64,
+
+ /// Lowest node ID in cluster (leader)
+ lowest_nodeid: RwLock<u32>,
+
+ /// Our node ID (set during init_cpg via cpg_local_get)
+ nodeid: AtomicU32,
+
+ /// Our process ID
+ pid: u32,
+
+ /// Protocol version for cluster compatibility
+ protocol_version: u32,
+
+ /// State verification - SHA-256 checksum
+ checksum: Mutex<[u8; 32]>,
+
+ /// Checksum epoch (when it was computed)
+ checksum_epoch: Mutex<SyncEpoch>,
+
+ /// Checksum ID for verification
+ checksum_id: Mutex<u64>,
+
+ /// Checksum counter for verify requests
+ checksum_counter: Mutex<u64>,
+}
+
+impl<M> Dfsm<M>
+where
+ M: MessageTrait,
+{
+ /// Create a new DFSM instance
+ ///
+ /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+ pub fn new(cluster_name: String, callbacks: Arc<dyn Callbacks<M>>) -> Result<Self> {
+ Self::new_with_protocol_version(cluster_name, callbacks, DfsmMessage::<M>::DEFAULT_PROTOCOL_VERSION)
+ }
+
+ /// Create a new DFSM instance with a specific protocol version
+ ///
+ /// This is used when the DFSM needs to use a non-default protocol version,
+ /// such as the status/kvstore DFSM which uses protocol version 0 for
+ /// compatibility with the C implementation.
+ ///
+ /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+ pub fn new_with_protocol_version(
+ cluster_name: String,
+ callbacks: Arc<dyn Callbacks<M>>,
+ protocol_version: u32,
+ ) -> Result<Self> {
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+ let pid = std::process::id();
+
+ Ok(Self {
+ cpg_service: RwLock::new(None),
+ cluster_name,
+ callbacks,
+ mode: RwLock::new(DfsmMode::Start),
+ sync_epoch: RwLock::new(SyncEpoch {
+ epoch: 0,
+ time: now,
+ nodeid: 0,
+ pid,
+ }),
+ local_epoch_counter: Mutex::new(0),
+ sync_nodes: RwLock::new(Vec::new()),
+ msg_queue: Mutex::new(BTreeMap::new()),
+ sync_queue: Mutex::new(VecDeque::new()),
+ msg_counter: AtomicU64::new(0),
+ lowest_nodeid: RwLock::new(0),
+ nodeid: AtomicU32::new(0), // Will be set by init_cpg() using cpg_local_get()
+ pid,
+ protocol_version,
+ checksum: Mutex::new([0u8; 32]),
+ checksum_epoch: Mutex::new(SyncEpoch {
+ epoch: 0,
+ time: 0,
+ nodeid: 0,
+ pid: 0,
+ }),
+ checksum_id: Mutex::new(0),
+ checksum_counter: Mutex::new(0),
+ })
+ }
+
+ pub fn get_mode(&self) -> DfsmMode {
+ *self.mode.read()
+ }
+
+ pub fn set_mode(&self, new_mode: DfsmMode) {
+ let mut mode = self.mode.write();
+ let old_mode = *mode;
+
+ if old_mode.is_error() && !new_mode.is_error() {
+ return;
+ }
+
+ if old_mode == new_mode {
+ return;
+ }
+
+ *mode = new_mode;
+ drop(mode);
+
+ if new_mode.is_error() {
+ tracing::error!("DFSM: {}", new_mode);
+ } else {
+ tracing::info!("DFSM: {}", new_mode);
+ }
+ }
+
+ pub fn is_leader(&self) -> bool {
+ let lowest = *self.lowest_nodeid.read();
+ lowest > 0 && lowest == self.nodeid.load(Ordering::Relaxed)
+ }
+
+ pub fn get_nodeid(&self) -> u32 {
+ self.nodeid.load(Ordering::Relaxed)
+ }
+
+ pub fn get_pid(&self) -> u32 {
+ self.pid
+ }
+
+ /// Check if DFSM is synced and ready
+ pub fn is_synced(&self) -> bool {
+ self.get_mode() == DfsmMode::Synced
+ }
+
+ /// Check if DFSM encountered an error
+ pub fn is_error(&self) -> bool {
+ self.get_mode().is_error()
+ }
+}
+
+impl<M: MessageTrait + Clone> Dfsm<M> {
+ fn send_sync_start(&self) -> Result<()> {
+ tracing::debug!("DFSM: sending SYNC_START message");
+ let sync_epoch = *self.sync_epoch.read();
+ self.send_dfsm_message(&DfsmMessage::<M>::SyncStart { sync_epoch })
+ }
+
+ fn send_state(&self) -> Result<()> {
+ tracing::debug!("DFSM: generating and sending state");
+
+ let state_data = self
+ .callbacks
+ .get_state()
+ .context("Failed to get state from callbacks")?;
+
+ tracing::info!("DFSM: sending state ({} bytes)", state_data.len());
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::State {
+ sync_epoch,
+ data: state_data,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ pub(super) fn send_dfsm_message(&self, message: &DfsmMessage<M>) -> Result<()> {
+ let serialized = message.serialize();
+
+ if let Some(ref service) = *self.cpg_service.read() {
+ service
+ .mcast(cpg::Guarantee::TypeAgreed, &serialized)
+ .context("Failed to broadcast DFSM message")?;
+ Ok(())
+ } else {
+ anyhow::bail!("CPG not initialized")
+ }
+ }
+
+ pub fn process_state(&self, nodeid: u32, pid: u32, state: &[u8]) -> Result<()> {
+ tracing::debug!(
+ "DFSM: processing state from node {}/{} ({} bytes)",
+ nodeid,
+ pid,
+ state.len()
+ );
+
+ let mut sync_nodes = self.sync_nodes.write();
+
+ if let Some(node) = sync_nodes
+ .iter_mut()
+ .find(|n| n.nodeid == nodeid && n.pid == pid)
+ {
+ node.state = Some(state.to_vec());
+ } else {
+ tracing::warn!("DFSM: received state from unknown node {}/{}", nodeid, pid);
+ return Ok(());
+ }
+
+ let all_received = sync_nodes.iter().all(|n| n.state.is_some());
+ drop(sync_nodes);
+
+ if all_received {
+ tracing::info!("DFSM: received all states, processing synchronization");
+ self.process_state_sync()?;
+ }
+
+ Ok(())
+ }
+
+ fn process_state_sync(&self) -> Result<()> {
+ tracing::info!("DFSM: processing state synchronization");
+
+ let sync_nodes = self.sync_nodes.read().clone();
+
+ match self.callbacks.process_state_update(&sync_nodes) {
+ Ok(synced) => {
+ if synced {
+ tracing::info!("DFSM: state synchronization successful");
+
+ let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+ let mut sync_nodes_write = self.sync_nodes.write();
+ if let Some(node) = sync_nodes_write
+ .iter_mut()
+ .find(|n| n.nodeid == my_nodeid && n.pid == self.pid)
+ {
+ node.synced = true;
+ }
+ drop(sync_nodes_write);
+
+ self.set_mode(DfsmMode::Synced);
+ self.callbacks.on_synced();
+ self.deliver_message_queue()?;
+ } else {
+ tracing::info!("DFSM: entering UPDATE mode, waiting for leader");
+ self.set_mode(DfsmMode::Update);
+ self.deliver_message_queue()?;
+ }
+ }
+ Err(e) => {
+ tracing::error!("DFSM: state synchronization failed: {}", e);
+ self.set_mode(DfsmMode::Error);
+ return Err(e);
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn queue_message(&self, nodeid: u32, pid: u32, msg_count: u64, message: M, timestamp: u64)
+ where
+ M: Clone,
+ {
+ tracing::debug!(
+ "DFSM: queueing message {} from {}/{}",
+ msg_count,
+ nodeid,
+ pid
+ );
+
+ let qm = QueuedMessage {
+ nodeid,
+ pid,
+ _msg_count: msg_count,
+ message,
+ timestamp,
+ };
+
+ let mode = self.get_mode();
+
+ let node_synced = self
+ .sync_nodes
+ .read()
+ .iter()
+ .find(|n| n.nodeid == nodeid && n.pid == pid)
+ .map(|n| n.synced)
+ .unwrap_or(false);
+
+ if mode == DfsmMode::Update && node_synced {
+ self.sync_queue.lock().push_back(qm);
+ } else {
+ self.msg_queue.lock().insert(msg_count, qm);
+ }
+ }
+
+ pub(super) fn deliver_message_queue(&self) -> Result<()>
+ where
+ M: Clone,
+ {
+ let mut queue = self.msg_queue.lock();
+ if queue.is_empty() {
+ return Ok(());
+ }
+
+ tracing::info!("DFSM: delivering {} queued messages", queue.len());
+
+ let mode = self.get_mode();
+ let sync_nodes = self.sync_nodes.read().clone();
+
+ let mut to_remove = Vec::new();
+
+ for (count, qm) in queue.iter() {
+ let node_info = sync_nodes
+ .iter()
+ .find(|n| n.nodeid == qm.nodeid && n.pid == qm.pid);
+
+ let Some(info) = node_info else {
+ tracing::debug!(
+ "DFSM: removing message from non-member {}/{}",
+ qm.nodeid,
+ qm.pid
+ );
+ to_remove.push(*count);
+ continue;
+ };
+
+ if mode == DfsmMode::Synced && info.synced {
+ tracing::debug!("DFSM: delivering message {}", count);
+
+ match self.callbacks.deliver_message(
+ qm.nodeid,
+ qm.pid,
+ qm.message.clone(),
+ qm.timestamp,
+ ) {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: message delivered, result={}, processed={}",
+ result,
+ processed
+ );
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver message: {}", e);
+ }
+ }
+
+ to_remove.push(*count);
+ } else if mode == DfsmMode::Update && info.synced {
+ self.sync_queue.lock().push_back(qm.clone());
+ to_remove.push(*count);
+ }
+ }
+
+ for count in to_remove {
+ queue.remove(&count);
+ }
+
+ Ok(())
+ }
+
+ pub(super) fn deliver_sync_queue(&self) -> Result<()> {
+ let mut sync_queue = self.sync_queue.lock();
+ let queue_len = sync_queue.len();
+
+ if queue_len == 0 {
+ return Ok(());
+ }
+
+ tracing::info!("DFSM: delivering {} sync queue messages", queue_len);
+
+ while let Some(qm) = sync_queue.pop_front() {
+ tracing::debug!(
+ "DFSM: delivering sync message from {}/{}",
+ qm.nodeid,
+ qm.pid
+ );
+
+ match self
+ .callbacks
+ .deliver_message(qm.nodeid, qm.pid, qm.message, qm.timestamp)
+ {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: sync message delivered, result={}, processed={}",
+ result,
+ processed
+ );
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver sync message: {}", e);
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Send a message to the cluster
+ ///
+ /// Creates a properly formatted Normal message with C-compatible headers.
+ pub fn send_message(&self, message: M) -> Result<u64> {
+ let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1;
+
+ tracing::debug!("DFSM: sending message {}", msg_count);
+
+ let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version);
+
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(msg_count)
+ }
+
+ /// Send a TreeEntry update to the cluster (leader only, during synchronization)
+ ///
+ /// This is used by the leader to send individual database entries to followers
+ /// that need to catch up. Matches C's dfsm_send_update().
+ pub fn send_update(&self, tree_entry: pmxcfs_memdb::TreeEntry) -> Result<()> {
+ tracing::debug!("DFSM: sending Update for inode {}", tree_entry.inode);
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::from_tree_entry(tree_entry, sync_epoch);
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Send UpdateComplete signal to cluster (leader only, after sending all updates)
+ ///
+ /// Signals to followers that all Update messages have been sent and they can
+ /// now transition to Synced mode. Matches C's dfsm_send_update_complete().
+ pub fn send_update_complete(&self) -> Result<()> {
+ tracing::info!("DFSM: sending UpdateComplete");
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::UpdateComplete { sync_epoch };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Request checksum verification (leader only)
+ /// This should be called periodically by the leader to verify cluster state consistency
+ pub fn verify_request(&self) -> Result<()> {
+ // Only leader should send verify requests
+ if !self.is_leader() {
+ return Ok(());
+ }
+
+ // Only verify when synced
+ if self.get_mode() != DfsmMode::Synced {
+ return Ok(());
+ }
+
+ // Check if we need to wait for previous verification to complete
+ let checksum_counter = *self.checksum_counter.lock();
+ let checksum_id = *self.checksum_id.lock();
+
+ if checksum_counter != checksum_id {
+ tracing::debug!(
+ "DFSM: delaying verify request {:016x}",
+ checksum_counter + 1
+ );
+ return Ok(());
+ }
+
+ // Increment counter and send verify request
+ *self.checksum_counter.lock() = checksum_counter + 1;
+ let new_counter = checksum_counter + 1;
+
+ tracing::debug!("DFSM: sending verify request {:016x}", new_counter);
+
+ // Send VERIFY_REQUEST message with counter
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: new_counter,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Handle verify request from leader
+ pub fn handle_verify_request(&self, message_epoch: SyncEpoch, csum_id: u64) -> Result<()> {
+ tracing::debug!("DFSM: received verify request {:016x}", csum_id);
+
+ // Compute current state checksum
+ let mut checksum = [0u8; 32];
+ self.callbacks.compute_checksum(&mut checksum)?;
+
+ // Save checksum info
+ // Store the epoch FROM THE MESSAGE (matching C: dfsm.c:736)
+ *self.checksum.lock() = checksum;
+ *self.checksum_epoch.lock() = message_epoch;
+ *self.checksum_id.lock() = csum_id;
+
+ // Send the checksum verification response
+ tracing::debug!("DFSM: sending verify response");
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg = DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Handle verify response from a node
+ pub fn handle_verify(
+ &self,
+ message_epoch: SyncEpoch,
+ csum_id: u64,
+ received_checksum: &[u8; 32],
+ ) -> Result<()> {
+ tracing::debug!("DFSM: received verify response");
+
+ let our_checksum_id = *self.checksum_id.lock();
+ let our_checksum_epoch = *self.checksum_epoch.lock();
+
+ // Check if this verification matches our saved checksum
+ // Compare with MESSAGE epoch, not current epoch (matching C: dfsm.c:766-767)
+ if our_checksum_id == csum_id && our_checksum_epoch == message_epoch {
+ let our_checksum = *self.checksum.lock();
+
+ // Compare checksums
+ if our_checksum != *received_checksum {
+ tracing::error!(
+ "DFSM: checksum mismatch! Expected {:016x?}, got {:016x?}",
+ &our_checksum[..8],
+ &received_checksum[..8]
+ );
+ tracing::error!("DFSM: data divergence detected - restarting cluster sync");
+ self.set_mode(DfsmMode::Leave);
+ return Err(anyhow::anyhow!("Checksum verification failed"));
+ } else {
+ tracing::info!("DFSM: data verification successful");
+ }
+ } else {
+ tracing::debug!("DFSM: skipping verification - no checksum saved or epoch mismatch");
+ }
+
+ Ok(())
+ }
+
+ /// Invalidate saved checksum (called on membership changes)
+ pub fn invalidate_checksum(&self) {
+ let counter = *self.checksum_counter.lock();
+ *self.checksum_id.lock() = counter;
+
+ // Reset checksum epoch
+ *self.checksum_epoch.lock() = SyncEpoch {
+ epoch: 0,
+ time: 0,
+ nodeid: 0,
+ pid: 0,
+ };
+
+ tracing::debug!("DFSM: checksum invalidated");
+ }
+}
+
+/// FuseMessage-specific methods
+impl Dfsm<FuseMessage> {
+ /// Broadcast a filesystem operation to the cluster
+ ///
+ /// Checks if the cluster is synced before broadcasting.
+ /// If not synced, the message is silently dropped.
+ pub fn broadcast(&self, msg: FuseMessage) -> Result<()> {
+ if !self.is_synced() {
+ return Ok(());
+ }
+
+ tracing::debug!("Broadcasting {:?}", msg);
+ self.send_message(msg)?;
+ tracing::debug!("Broadcast successful");
+
+ Ok(())
+ }
+}
+
+impl<M: MessageTrait + Clone> Dfsm<M> {
+ /// Handle incoming DFSM message from cluster (called by CpgHandler)
+ fn handle_dfsm_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ message: DfsmMessage<M>,
+ ) -> anyhow::Result<()> {
+ // Validate epoch for state messages (all except Normal and SyncStart)
+ // This matches C implementation's epoch checking in dfsm.c:665-673
+ let should_validate_epoch = !matches!(
+ message,
+ DfsmMessage::Normal { .. } | DfsmMessage::SyncStart { .. }
+ );
+
+ if should_validate_epoch {
+ let current_epoch = *self.sync_epoch.read();
+ let message_epoch = match &message {
+ DfsmMessage::State { sync_epoch, .. }
+ | DfsmMessage::Update { sync_epoch, .. }
+ | DfsmMessage::UpdateComplete { sync_epoch }
+ | DfsmMessage::VerifyRequest { sync_epoch, .. }
+ | DfsmMessage::Verify { sync_epoch, .. } => *sync_epoch,
+ _ => unreachable!(),
+ };
+
+ if message_epoch != current_epoch {
+ tracing::debug!(
+ "DFSM: ignoring message with wrong epoch (expected {:?}, got {:?})",
+ current_epoch,
+ message_epoch
+ );
+ return Ok(());
+ }
+ }
+
+ // Match on typed message variants
+ match message {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version: _,
+ message: app_msg,
+ } => self.handle_normal_message(nodeid, pid, msg_count, timestamp, app_msg),
+ DfsmMessage::SyncStart { sync_epoch } => self.handle_sync_start(nodeid, sync_epoch),
+ DfsmMessage::State {
+ sync_epoch: _,
+ data,
+ } => self.process_state(nodeid, pid, &data),
+ DfsmMessage::Update {
+ sync_epoch: _,
+ tree_entry,
+ } => self.handle_update(nodeid, pid, tree_entry),
+ DfsmMessage::UpdateComplete { sync_epoch: _ } => self.handle_update_complete(),
+ DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ } => self.handle_verify_request(sync_epoch, csum_id),
+ DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ } => self.handle_verify(sync_epoch, csum_id, &checksum),
+ }
+ }
+
+ /// Handle membership change notification (called by CpgHandler)
+ fn handle_membership_change(&self, members: &[MemberInfo]) -> anyhow::Result<()> {
+ tracing::info!(
+ "DFSM: handling membership change ({} members)",
+ members.len()
+ );
+
+ // Invalidate saved checksum
+ self.invalidate_checksum();
+
+ // Update epoch
+ let mut counter = self.local_epoch_counter.lock();
+ *counter += 1;
+
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+
+ let new_epoch = SyncEpoch {
+ epoch: *counter,
+ time: now,
+ nodeid: self.nodeid.load(Ordering::Relaxed),
+ pid: self.pid,
+ };
+
+ *self.sync_epoch.write() = new_epoch;
+ drop(counter);
+
+ // Find lowest node ID (leader)
+ let lowest = members.iter().map(|m| m.node_id).min().unwrap_or(0);
+ *self.lowest_nodeid.write() = lowest;
+
+ // Initialize sync nodes
+ let mut sync_nodes = self.sync_nodes.write();
+ sync_nodes.clear();
+
+ for member in members {
+ sync_nodes.push(NodeSyncInfo {
+ nodeid: member.node_id,
+ pid: member.pid,
+ state: None,
+ synced: false,
+ });
+ }
+ drop(sync_nodes);
+
+ // Clear queues
+ self.sync_queue.lock().clear();
+
+ // Determine next mode
+ if members.len() == 1 {
+ // Single node - already synced
+ tracing::info!("DFSM: single node cluster, marking as synced");
+ self.set_mode(DfsmMode::Synced);
+
+ // Mark ourselves as synced
+ let mut sync_nodes = self.sync_nodes.write();
+ if let Some(node) = sync_nodes.first_mut() {
+ node.synced = true;
+ }
+
+ // Deliver queued messages
+ self.deliver_message_queue()?;
+ } else {
+ // Multi-node - start synchronization
+ tracing::info!("DFSM: multi-node cluster, starting sync");
+ self.set_mode(DfsmMode::StartSync);
+
+ // If we're the leader, initiate sync
+ if self.is_leader() {
+ tracing::info!("DFSM: we are leader, sending sync start");
+ self.send_sync_start()?;
+
+ // Leader also needs to send its own state
+ // (CPG doesn't loop back messages to sender)
+ self.send_state().context("Failed to send leader state")?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Handle normal application message
+ fn handle_normal_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ msg_count: u64,
+ timestamp: u32,
+ message: M,
+ ) -> Result<()> {
+ // C version: deliver immediately if in Synced mode, otherwise queue
+ if self.get_mode() == DfsmMode::Synced {
+ // Deliver immediately - message is already deserialized
+ match self.callbacks.deliver_message(
+ nodeid,
+ pid,
+ message,
+ timestamp as u64, // Convert back to u64 for callback compatibility
+ ) {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: message delivered immediately, result={}, processed={}",
+ result,
+ processed
+ );
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver message: {}", e);
+ }
+ }
+ } else {
+ // Queue for later delivery - store typed message directly
+ self.queue_message(nodeid, pid, msg_count, message, timestamp as u64);
+ }
+ Ok(())
+ }
+
+ /// Handle SyncStart message from leader
+ fn handle_sync_start(&self, nodeid: u32, new_epoch: SyncEpoch) -> Result<()> {
+ tracing::info!(
+ "DFSM: received SyncStart from node {} with epoch {:?}",
+ nodeid,
+ new_epoch
+ );
+
+ // Adopt the new epoch from the leader (critical for sync protocol!)
+ // This matches C implementation which updates dfsm->sync_epoch
+ *self.sync_epoch.write() = new_epoch;
+ tracing::debug!("DFSM: adopted new sync epoch from leader");
+
+ // Send our state back to the cluster
+ // BUT: don't send if we're the leader (we already sent our state in handle_membership_change)
+ let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+ if nodeid != my_nodeid {
+ self.send_state()
+ .context("Failed to send state in response to SyncStart")?;
+ tracing::debug!("DFSM: sent state in response to SyncStart");
+ } else {
+ tracing::debug!("DFSM: skipping state send (we're the leader who already sent state)");
+ }
+
+ Ok(())
+ }
+
+ /// Handle Update message from leader
+ fn handle_update(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ tree_entry: pmxcfs_memdb::TreeEntry,
+ ) -> Result<()> {
+ // Serialize TreeEntry for callback (process_update expects raw bytes for now)
+ let serialized = tree_entry.serialize_for_update();
+ if let Err(e) = self.callbacks.process_update(nodeid, pid, &serialized) {
+ tracing::error!("DFSM: failed to process update: {}", e);
+ }
+ Ok(())
+ }
+
+ /// Handle UpdateComplete message
+ fn handle_update_complete(&self) -> Result<()> {
+ tracing::info!("DFSM: received UpdateComplete from leader");
+ self.deliver_sync_queue()?;
+ self.set_mode(DfsmMode::Synced);
+ self.callbacks.on_synced();
+ Ok(())
+ }
+}
+
+/// Implementation of CpgHandler trait for DFSM
+///
+/// This allows Dfsm to receive CPG callbacks in an idiomatic Rust way,
+/// with all unsafe pointer handling managed by the CpgService.
+impl<M: MessageTrait + Clone + Send + Sync + 'static> CpgHandler for Dfsm<M> {
+ fn on_deliver(&self, _group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]) {
+ tracing::debug!(
+ "DFSM CPG message from node {} (pid {}): {} bytes",
+ u32::from(nodeid),
+ pid,
+ msg.len()
+ );
+
+ // Deserialize DFSM protocol message
+ match DfsmMessage::<M>::deserialize(msg) {
+ Ok(dfsm_msg) => {
+ if let Err(e) = self.handle_dfsm_message(u32::from(nodeid), pid, dfsm_msg) {
+ tracing::error!("Error handling DFSM message: {}", e);
+ }
+ }
+ Err(e) => {
+ tracing::error!("Failed to deserialize DFSM message: {}", e);
+ }
+ }
+ }
+
+ fn on_confchg(
+ &self,
+ _group_name: &str,
+ member_list: &[cpg::Address],
+ _left_list: &[cpg::Address],
+ _joined_list: &[cpg::Address],
+ ) {
+ tracing::info!("DFSM CPG membership change: {} members", member_list.len());
+
+ // Build MemberInfo list from CPG addresses
+ let members: Vec<MemberInfo> = member_list
+ .iter()
+ .map(|addr| MemberInfo {
+ node_id: u32::from(addr.nodeid),
+ pid: addr.pid,
+ joined_at: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs(),
+ })
+ .collect();
+
+ // Notify DFSM of membership change
+ if let Err(e) = self.handle_membership_change(&members) {
+ tracing::error!("Failed to handle membership change: {}", e);
+ }
+ }
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Dfsm<M> {
+ /// Initialize CPG (Closed Process Group) for cluster communication
+ ///
+ /// Uses the idiomatic CpgService wrapper which handles all unsafe FFI
+ /// and callback management internally.
+ pub fn init_cpg(self: &Arc<Self>) -> Result<()> {
+ tracing::info!("DFSM: Initializing CPG");
+
+ // Create CPG service with this Dfsm as the handler
+ // CpgService handles all callback registration and context management
+ let cpg_service = Arc::new(CpgService::new(Arc::clone(self))?);
+
+ // Get our node ID from CPG (matches C's cpg_local_get)
+ // This MUST be done after cpg_initialize but before joining the group
+ let nodeid = cpg::local_get(cpg_service.handle())?;
+ let nodeid_u32 = u32::from(nodeid);
+ self.nodeid.store(nodeid_u32, Ordering::Relaxed);
+ tracing::info!("DFSM: Got node ID {} from CPG", nodeid_u32);
+
+ // Join the CPG group
+ let group_name = &self.cluster_name;
+ cpg_service
+ .join(group_name)
+ .context("Failed to join CPG group")?;
+
+ tracing::info!("DFSM joined CPG group '{}'", group_name);
+
+ // Store the service
+ *self.cpg_service.write() = Some(cpg_service);
+
+ // Dispatch once to get initial membership
+ if let Some(ref service) = *self.cpg_service.read()
+ && let Err(e) = service.dispatch()
+ {
+ tracing::warn!("Failed to dispatch CPG events: {:?}", e);
+ }
+
+ tracing::info!("DFSM CPG initialized successfully");
+ Ok(())
+ }
+
+ /// Dispatch CPG events (should be called periodically from event loop)
+ /// Matching C's service_dfsm_dispatch
+ pub fn dispatch_events(&self) -> Result<(), rust_corosync::CsError> {
+ if let Some(ref service) = *self.cpg_service.read() {
+ service.dispatch()
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Get CPG file descriptor for event monitoring
+ pub fn fd_get(&self) -> Result<i32> {
+ if let Some(ref service) = *self.cpg_service.read() {
+ service.fd()
+ } else {
+ Err(anyhow::anyhow!("CPG service not initialized"))
+ }
+ }
+
+ /// Stop DFSM services (leave CPG group and finalize)
+ pub fn stop_services(&self) -> Result<()> {
+ tracing::info!("DFSM: Stopping services");
+
+ // Leave the CPG group before dropping the service
+ let group_name = self.cluster_name.clone();
+ if let Some(ref service) = *self.cpg_service.read()
+ && let Err(e) = service.leave(&group_name)
+ {
+ tracing::warn!("Error leaving CPG group: {:?}", e);
+ }
+
+ // Drop the service (CpgService::drop handles finalization)
+ *self.cpg_service.write() = None;
+
+ tracing::info!("DFSM services stopped");
+ Ok(())
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
new file mode 100644
index 00000000..877058a4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
@@ -0,0 +1,118 @@
+//! Status Sync Service
+//!
+//! This service synchronizes ephemeral status data across the cluster using a separate
+//! DFSM instance with the "pve_kvstore_v1" CPG group.
+//!
+//! Equivalent to C implementation's service_status (the kvstore DFSM).
+//! Handles synchronization of:
+//! - RRD data (performance metrics from each node)
+//! - Node IP addresses
+//! - Cluster log entries
+//! - Other ephemeral status key-value data
+
+use async_trait::async_trait;
+use pmxcfs_services::{DispatchAction, InitResult, Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message as MessageTrait;
+
+/// Status Sync Service
+///
+/// Synchronizes ephemeral status data across all nodes using a separate DFSM instance.
+/// Uses CPG group "pve_kvstore_v1" (separate from main config database "pmxcfs_v1").
+///
+/// This implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for status replication
+/// - Separation of status data from config data for better performance
+///
+/// This is equivalent to C implementation's service_status (the kvstore DFSM).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct StatusSyncService<M> {
+ dfsm: Arc<Dfsm<M>>,
+ fd: Option<i32>,
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> StatusSyncService<M> {
+ /// Create a new status sync service
+ pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+ Self { dfsm, fd: None }
+ }
+}
+
+#[async_trait]
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Service for StatusSyncService<M> {
+ fn name(&self) -> &str {
+ "status-sync"
+ }
+
+ async fn initialize(&mut self) -> pmxcfs_services::Result<InitResult> {
+ info!("Initializing status sync service (kvstore)");
+
+ // Initialize CPG connection for kvstore group
+ self.dfsm.init_cpg().map_err(|e| {
+ ServiceError::InitializationFailed(format!(
+ "Status sync CPG initialization failed: {e}"
+ ))
+ })?;
+
+ // Get file descriptor for event monitoring
+ let fd = self.dfsm.fd_get().map_err(|e| {
+ self.dfsm.stop_services().ok();
+ ServiceError::InitializationFailed(format!("Failed to get status sync fd: {e}"))
+ })?;
+
+ self.fd = Some(fd);
+
+ info!(
+ "Status sync service initialized successfully with fd {}",
+ fd
+ );
+ Ok(InitResult::WithFileDescriptor(fd))
+ }
+
+ async fn dispatch(&mut self) -> pmxcfs_services::Result<DispatchAction> {
+ match self.dfsm.dispatch_events() {
+ Ok(_) => Ok(DispatchAction::Continue),
+ Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+ warn!("Status sync connection lost, requesting reinitialization");
+ Ok(DispatchAction::Reinitialize)
+ }
+ Err(e) => {
+ error!("Status sync dispatch failed: {}", e);
+ Err(ServiceError::DispatchFailed(format!(
+ "Status sync dispatch failed: {e}"
+ )))
+ }
+ }
+ }
+
+ async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+ info!("Finalizing status sync service");
+
+ self.fd = None;
+
+ if let Err(e) = self.dfsm.stop_services() {
+ warn!("Error stopping status sync services: {}", e);
+ }
+
+ info!("Status sync service finalized");
+ Ok(())
+ }
+
+ async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+ // Status sync doesn't need periodic verification like the main database
+ // Status data is ephemeral and doesn't require the same consistency guarantees
+ Ok(())
+ }
+
+ fn timer_period(&self) -> Option<Duration> {
+ // No periodic timer needed for status sync
+ None
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
new file mode 100644
index 00000000..5a2eb964
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
@@ -0,0 +1,107 @@
+/// DFSM type definitions
+///
+/// This module contains all type definitions used by the DFSM state machine.
+/// DFSM operating modes
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub enum DfsmMode {
+ /// Initial state - starting cluster connection
+ Start = 0,
+
+ /// Starting data synchronization
+ StartSync = 1,
+
+ /// All data is up to date
+ Synced = 2,
+
+ /// Waiting for updates from leader
+ Update = 3,
+
+ /// Error states (>= 128)
+ Leave = 253,
+ VersionError = 254,
+ Error = 255,
+}
+
+impl DfsmMode {
+ /// Check if this is an error mode
+ pub fn is_error(&self) -> bool {
+ (*self as u8) >= 128
+ }
+}
+
+impl std::fmt::Display for DfsmMode {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ DfsmMode::Start => write!(f, "start cluster connection"),
+ DfsmMode::StartSync => write!(f, "starting data synchronization"),
+ DfsmMode::Synced => write!(f, "all data is up to date"),
+ DfsmMode::Update => write!(f, "waiting for updates from leader"),
+ DfsmMode::Leave => write!(f, "leaving cluster"),
+ DfsmMode::VersionError => write!(f, "protocol version mismatch"),
+ DfsmMode::Error => write!(f, "serious internal error"),
+ }
+ }
+}
+
+/// DFSM message types (internal protocol messages)
+/// Matches C's dfsm_message_t enum values
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum DfsmMessageType {
+ Normal = 0,
+ SyncStart = 1,
+ State = 2,
+ Update = 3,
+ UpdateComplete = 4,
+ VerifyRequest = 5,
+ Verify = 6,
+}
+
+/// Sync epoch - identifies a synchronization session
+/// Matches C's dfsm_sync_epoch_t structure (16 bytes total)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct SyncEpoch {
+ pub epoch: u32,
+ pub time: u32,
+ pub nodeid: u32,
+ pub pid: u32,
+}
+
+impl SyncEpoch {
+ /// Serialize to C-compatible wire format (16 bytes)
+ /// Format: [epoch: u32][time: u32][nodeid: u32][pid: u32]
+ pub fn serialize(&self) -> [u8; 16] {
+ let mut bytes = [0u8; 16];
+ bytes[0..4].copy_from_slice(&self.epoch.to_le_bytes());
+ bytes[4..8].copy_from_slice(&self.time.to_le_bytes());
+ bytes[8..12].copy_from_slice(&self.nodeid.to_le_bytes());
+ bytes[12..16].copy_from_slice(&self.pid.to_le_bytes());
+ bytes
+ }
+
+ /// Deserialize from C-compatible wire format (16 bytes)
+ pub fn deserialize(bytes: &[u8]) -> Result<Self, &'static str> {
+ if bytes.len() < 16 {
+ return Err("SyncEpoch requires 16 bytes");
+ }
+ Ok(SyncEpoch {
+ epoch: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
+ time: u32::from_le_bytes(bytes[4..8].try_into().unwrap()),
+ nodeid: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
+ pid: u32::from_le_bytes(bytes[12..16].try_into().unwrap()),
+ })
+ }
+}
+
+/// Queued message awaiting delivery
+#[derive(Debug, Clone)]
+pub(super) struct QueuedMessage<M> {
+ pub nodeid: u32,
+ pub pid: u32,
+ pub _msg_count: u64,
+ pub message: M,
+ pub timestamp: u64,
+}
+
+// Re-export NodeSyncInfo from pmxcfs-api-types for use in Callbacks trait
+pub use pmxcfs_api_types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
new file mode 100644
index 00000000..2750b281
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
@@ -0,0 +1,220 @@
+/// C-compatible wire format for cluster communication
+///
+/// This module implements the exact wire protocol used by the C version of pmxcfs
+/// to ensure compatibility with C-based cluster nodes.
+///
+/// The C version uses a simple format with iovec arrays containing raw C types.
+use anyhow::{Context, Result};
+use bytemuck::{Pod, Zeroable};
+use std::ffi::CStr;
+
+/// C message types (must match dcdb.h)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum CMessageType {
+ Write = 1,
+ Mkdir = 2,
+ Delete = 3,
+ Rename = 4,
+ Create = 5,
+ Mtime = 6,
+ UnlockRequest = 7,
+ Unlock = 8,
+}
+
+/// C-compatible FUSE message header
+/// Layout matches the iovec array from C: [size][offset][pathlen][tolen][flags]
+#[derive(Debug, Clone, Copy, Pod, Zeroable)]
+#[repr(C)]
+struct CFuseMessageHeader {
+ size: u32,
+ offset: u32,
+ pathlen: u32,
+ tolen: u32,
+ flags: u32,
+}
+
+/// Parsed C FUSE message
+#[derive(Debug, Clone)]
+pub struct CFuseMessage {
+ pub size: u32,
+ pub offset: u32,
+ pub flags: u32,
+ pub path: String,
+ pub to: Option<String>,
+ pub data: Vec<u8>,
+}
+
+impl CFuseMessage {
+ /// Parse a C FUSE message from raw bytes
+ pub fn parse(data: &[u8]) -> Result<Self> {
+ if data.len() < std::mem::size_of::<CFuseMessageHeader>() {
+ return Err(anyhow::anyhow!(
+ "Message too short: {} < {}",
+ data.len(),
+ std::mem::size_of::<CFuseMessageHeader>()
+ ));
+ }
+
+ // Parse header manually to avoid alignment issues
+ let header = CFuseMessageHeader {
+ size: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
+ offset: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
+ pathlen: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
+ tolen: u32::from_le_bytes([data[12], data[13], data[14], data[15]]),
+ flags: u32::from_le_bytes([data[16], data[17], data[18], data[19]]),
+ };
+
+ let mut offset = std::mem::size_of::<CFuseMessageHeader>();
+
+ // Parse path
+ let path = if header.pathlen > 0 {
+ if offset + header.pathlen as usize > data.len() {
+ return Err(anyhow::anyhow!("Invalid path length"));
+ }
+ let path_bytes = &data[offset..offset + header.pathlen as usize];
+ offset += header.pathlen as usize;
+
+ // C strings are null-terminated
+ CStr::from_bytes_until_nul(path_bytes)
+ .context("Invalid path string")?
+ .to_str()
+ .context("Path not valid UTF-8")?
+ .to_string()
+ } else {
+ String::new()
+ };
+
+ // Parse 'to' (for rename operations)
+ let to = if header.tolen > 0 {
+ if offset + header.tolen as usize > data.len() {
+ return Err(anyhow::anyhow!("Invalid tolen"));
+ }
+ let to_bytes = &data[offset..offset + header.tolen as usize];
+ offset += header.tolen as usize;
+
+ Some(
+ CStr::from_bytes_until_nul(to_bytes)
+ .context("Invalid to string")?
+ .to_str()
+ .context("To path not valid UTF-8")?
+ .to_string(),
+ )
+ } else {
+ None
+ };
+
+ // Parse data buffer
+ let buf_data = if header.size > 0 {
+ if offset + header.size as usize > data.len() {
+ return Err(anyhow::anyhow!("Invalid data size"));
+ }
+ data[offset..offset + header.size as usize].to_vec()
+ } else {
+ Vec::new()
+ };
+
+ Ok(CFuseMessage {
+ size: header.size,
+ offset: header.offset,
+ flags: header.flags,
+ path,
+ to,
+ data: buf_data,
+ })
+ }
+
+ /// Serialize to C wire format
+ pub fn serialize(&self) -> Vec<u8> {
+ let path_bytes = self.path.as_bytes();
+ let pathlen = if path_bytes.is_empty() {
+ 0
+ } else {
+ (path_bytes.len() + 1) as u32 // +1 for null terminator
+ };
+
+ let to_bytes = self.to.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
+ let tolen = if to_bytes.is_empty() {
+ 0
+ } else {
+ (to_bytes.len() + 1) as u32
+ };
+
+ let header = CFuseMessageHeader {
+ size: self.size,
+ offset: self.offset,
+ pathlen,
+ tolen,
+ flags: self.flags,
+ };
+
+ let mut result = Vec::new();
+
+ // Serialize header
+ result.extend_from_slice(bytemuck::bytes_of(&header));
+
+ // Serialize path (with null terminator)
+ if pathlen > 0 {
+ result.extend_from_slice(path_bytes);
+ result.push(0); // null terminator
+ }
+
+ // Serialize 'to' (with null terminator)
+ if tolen > 0 {
+ result.extend_from_slice(to_bytes);
+ result.push(0); // null terminator
+ }
+
+ // Serialize data
+ if self.size > 0 {
+ result.extend_from_slice(&self.data);
+ }
+
+ result
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_serialize_deserialize_write() {
+ let msg = CFuseMessage {
+ size: 13,
+ offset: 0,
+ flags: 0,
+ path: "/test.txt".to_string(),
+ to: None,
+ data: b"Hello, World!".to_vec(),
+ };
+
+ let serialized = msg.serialize();
+ let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+ assert_eq!(parsed.size, msg.size);
+ assert_eq!(parsed.offset, msg.offset);
+ assert_eq!(parsed.flags, msg.flags);
+ assert_eq!(parsed.path, msg.path);
+ assert_eq!(parsed.to, msg.to);
+ assert_eq!(parsed.data, msg.data);
+ }
+
+ #[test]
+ fn test_serialize_deserialize_rename() {
+ let msg = CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: "/old.txt".to_string(),
+ to: Some("/new.txt".to_string()),
+ data: Vec::new(),
+ };
+
+ let serialized = msg.serialize();
+ let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+ assert_eq!(parsed.path, msg.path);
+ assert_eq!(parsed.to, msg.to);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
new file mode 100644
index 00000000..d378f914
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
@@ -0,0 +1,565 @@
+/// Multi-node integration tests for DFSM cluster synchronization
+///
+/// These tests simulate multi-node clusters to verify the complete synchronization
+/// protocol works correctly with multiple Rust nodes exchanging state.
+use anyhow::Result;
+use pmxcfs_dfsm::{Callbacks, FuseMessage, NodeSyncInfo};
+use pmxcfs_memdb::{MemDb, MemDbIndex, ROOT_INODE, TreeEntry};
+use std::sync::{Arc, Mutex};
+use tempfile::TempDir;
+
+/// Mock callbacks for testing DFSM without full pmxcfs integration
+struct MockCallbacks {
+ memdb: MemDb,
+ states_received: Arc<Mutex<Vec<NodeSyncInfo>>>,
+ updates_received: Arc<Mutex<Vec<TreeEntry>>>,
+ synced_count: Arc<Mutex<usize>>,
+}
+
+impl MockCallbacks {
+ fn new(memdb: MemDb) -> Self {
+ Self {
+ memdb,
+ states_received: Arc::new(Mutex::new(Vec::new())),
+ updates_received: Arc::new(Mutex::new(Vec::new())),
+ synced_count: Arc::new(Mutex::new(0)),
+ }
+ }
+
+ #[allow(dead_code)]
+ fn get_states(&self) -> Vec<NodeSyncInfo> {
+ self.states_received.lock().unwrap().clone()
+ }
+
+ #[allow(dead_code)]
+ fn get_updates(&self) -> Vec<TreeEntry> {
+ self.updates_received.lock().unwrap().clone()
+ }
+
+ #[allow(dead_code)]
+ fn get_synced_count(&self) -> usize {
+ *self.synced_count.lock().unwrap()
+ }
+}
+
+impl Callbacks<FuseMessage> for MockCallbacks {
+ fn deliver_message(
+ &self,
+ _nodeid: u32,
+ _pid: u32,
+ _message: FuseMessage,
+ _timestamp: u64,
+ ) -> Result<(i32, bool)> {
+ Ok((0, true))
+ }
+
+ fn compute_checksum(&self, output: &mut [u8; 32]) -> Result<()> {
+ let checksum = self.memdb.compute_database_checksum()?;
+ output.copy_from_slice(&checksum);
+ Ok(())
+ }
+
+ fn get_state(&self) -> Result<Vec<u8>> {
+ let index = self.memdb.encode_index()?;
+ Ok(index.serialize())
+ }
+
+ fn process_state_update(&self, states: &[NodeSyncInfo]) -> Result<bool> {
+ // Store received states for verification
+ *self.states_received.lock().unwrap() = states.to_vec();
+
+ // Parse indices from states
+ let mut indices: Vec<(u32, u32, MemDbIndex)> = Vec::new();
+ for node in states {
+ if let Some(state_data) = &node.state {
+ match MemDbIndex::deserialize(state_data) {
+ Ok(index) => indices.push((node.nodeid, node.pid, index)),
+ Err(_) => continue,
+ }
+ }
+ }
+
+ if indices.is_empty() {
+ return Ok(true);
+ }
+
+ // Find leader (highest version, or if tie, highest mtime)
+ let mut leader_idx = 0;
+ for i in 1..indices.len() {
+ let (_, _, current_index) = &indices[i];
+ let (_, _, leader_index) = &indices[leader_idx];
+ if current_index > leader_index {
+ leader_idx = i;
+ }
+ }
+
+ let (_leader_nodeid, _leader_pid, leader_index) = &indices[leader_idx];
+
+ // Check if WE are synced with leader
+ let our_index = self.memdb.encode_index()?;
+ let we_are_synced = our_index.version == leader_index.version
+ && our_index.mtime == leader_index.mtime
+ && our_index.size == leader_index.size
+ && our_index.entries.len() == leader_index.entries.len()
+ && our_index
+ .entries
+ .iter()
+ .zip(leader_index.entries.iter())
+ .all(|(a, b)| a.inode == b.inode && a.digest == b.digest);
+
+ Ok(we_are_synced)
+ }
+
+ fn process_update(&self, _nodeid: u32, _pid: u32, data: &[u8]) -> Result<()> {
+ // Deserialize and store update
+ let tree_entry = TreeEntry::deserialize_from_update(data)?;
+ self.updates_received
+ .lock()
+ .unwrap()
+ .push(tree_entry.clone());
+
+ // Apply to database
+ self.memdb.apply_tree_entry(tree_entry)?;
+ Ok(())
+ }
+
+ fn commit_state(&self) -> Result<()> {
+ Ok(())
+ }
+
+ fn on_synced(&self) {
+ *self.synced_count.lock().unwrap() += 1;
+ }
+}
+
+fn create_test_node(node_id: u32) -> Result<(MemDb, TempDir, Arc<MockCallbacks>)> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join(format!("node{node_id}.db"));
+ let memdb = MemDb::open(&db_path, true)?;
+ // Note: Local operations always use writer=0 (matching C implementation)
+ // Remote DFSM updates use the writer field from the incoming TreeEntry
+
+ let callbacks = Arc::new(MockCallbacks::new(memdb.clone()));
+ Ok((memdb, temp_dir, callbacks))
+}
+
+#[test]
+fn test_two_node_empty_sync() -> Result<()> {
+ // Create two nodes with empty databases
+ let (_memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+ // Generate states from both nodes
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+
+ // Simulate state exchange
+ let states = vec![
+ NodeSyncInfo {
+ nodeid: 1,
+ pid: 1000,
+ state: Some(state1),
+ synced: false,
+ },
+ NodeSyncInfo {
+ nodeid: 2,
+ pid: 2000,
+ state: Some(state2),
+ synced: false,
+ },
+ ];
+
+ // Both nodes process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+
+ // Both should be synced (empty databases are identical)
+ assert!(synced1, "Node 1 should be synced");
+ assert!(synced2, "Node 2 should be synced");
+
+ Ok(())
+}
+
+#[test]
+fn test_two_node_leader_election() -> Result<()> {
+ // Create two nodes
+ let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+ // Node 1 has more data (higher version)
+ memdb1.create("/file1.txt", 0, 1000)?;
+ memdb1.write("/file1.txt", 0, 1001, b"data from node 1", 0)?;
+
+ // Generate states
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+
+ // Parse to check versions
+ let index1 = MemDbIndex::deserialize(&state1)?;
+ let index2 = MemDbIndex::deserialize(&state2)?;
+
+ // Node 1 should have higher version
+ assert!(
+ index1.version > index2.version,
+ "Node 1 version {} should be > Node 2 version {}",
+ index1.version,
+ index2.version
+ );
+
+ // Simulate state exchange
+ let states = vec![
+ NodeSyncInfo {
+ nodeid: 1,
+ pid: 1000,
+ state: Some(state1),
+ synced: false,
+ },
+ NodeSyncInfo {
+ nodeid: 2,
+ pid: 2000,
+ state: Some(state2),
+ synced: false,
+ },
+ ];
+
+ // Process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+
+ // Node 1 (leader) should be synced, Node 2 (follower) should not
+ assert!(synced1, "Node 1 (leader) should be synced");
+ assert!(!synced2, "Node 2 (follower) should not be synced");
+
+ Ok(())
+}
+
+#[test]
+fn test_incremental_update_transfer() -> Result<()> {
+ // Create leader and follower
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Leader has data
+ leader_db.create("/config", libc::S_IFDIR, 1000)?;
+ leader_db.create("/config/node.conf", 0, 1001)?;
+ leader_db.write("/config/node.conf", 0, 1002, b"hostname=pve1", 0)?;
+
+ // Get entries from leader
+ let leader_entries = leader_db.get_all_entries()?;
+
+ // Simulate sending updates to follower
+ for entry in leader_entries {
+ if entry.inode == ROOT_INODE {
+ continue; // Skip root (both have it)
+ }
+
+ // Serialize as update message
+ let update_msg = entry.serialize_for_update();
+
+ // Follower receives and processes update
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+ }
+
+ // Verify follower has the data
+ let config_dir = follower_db.lookup_path("/config");
+ assert!(
+ config_dir.is_some(),
+ "Follower should have /config directory"
+ );
+ assert!(config_dir.unwrap().is_dir());
+
+ let config_file = follower_db.lookup_path("/config/node.conf");
+ assert!(
+ config_file.is_some(),
+ "Follower should have /config/node.conf"
+ );
+
+ let config_data = follower_db.read("/config/node.conf", 0, 1024)?;
+ assert_eq!(
+ config_data, b"hostname=pve1",
+ "Follower should have correct data"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_three_node_sync() -> Result<()> {
+ // Create three nodes
+ let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (memdb2, _temp2, callbacks2) = create_test_node(2)?;
+ let (_memdb3, _temp3, callbacks3) = create_test_node(3)?;
+
+ // Node 1 has the most recent data
+ memdb1.create("/cluster.conf", 0, 5000)?;
+ memdb1.write("/cluster.conf", 0, 5001, b"version=3", 0)?;
+
+ // Node 2 has older data
+ memdb2.create("/cluster.conf", 0, 4000)?;
+ memdb2.write("/cluster.conf", 0, 4001, b"version=2", 0)?;
+
+ // Node 3 is empty (new node joining)
+
+ // Generate states
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+ let state3 = callbacks3.get_state()?;
+
+ let states = vec![
+ NodeSyncInfo {
+ nodeid: 1,
+ pid: 1000,
+ state: Some(state1.clone()),
+ synced: false,
+ },
+ NodeSyncInfo {
+ nodeid: 2,
+ pid: 2000,
+ state: Some(state2.clone()),
+ synced: false,
+ },
+ NodeSyncInfo {
+ nodeid: 3,
+ pid: 3000,
+ state: Some(state3.clone()),
+ synced: false,
+ },
+ ];
+
+ // All nodes process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+ let synced3 = callbacks3.process_state_update(&states)?;
+
+ // Node 1 (leader) should be synced
+ assert!(synced1, "Node 1 (leader) should be synced");
+
+ // Nodes 2 and 3 need updates
+ assert!(!synced2, "Node 2 should need updates");
+ assert!(!synced3, "Node 3 should need updates");
+
+ // Verify leader has highest version
+ let index1 = MemDbIndex::deserialize(&state1)?;
+ let index2 = MemDbIndex::deserialize(&state2)?;
+ let index3 = MemDbIndex::deserialize(&state3)?;
+
+ assert!(index1.version >= index2.version);
+ assert!(index1.version >= index3.version);
+
+ Ok(())
+}
+
+#[test]
+fn test_update_message_wire_format_compatibility() -> Result<()> {
+ // Verify our wire format matches C implementation exactly
+ let entry = TreeEntry {
+ inode: 42,
+ parent: 1,
+ version: 100,
+ writer: 2,
+ mtime: 12345,
+ size: 11,
+ entry_type: 8, // DT_REG
+ name: "test.conf".to_string(),
+ data: b"hello world".to_vec(),
+ };
+
+ let serialized = entry.serialize_for_update();
+
+ // Verify header size (41 bytes)
+ // parent(8) + inode(8) + version(8) + writer(4) + mtime(4) + size(4) + namelen(4) + type(1)
+ let expected_header_size = 8 + 8 + 8 + 4 + 4 + 4 + 4 + 1;
+ assert_eq!(expected_header_size, 41);
+
+ // Verify total size
+ let namelen = "test.conf".len() + 1; // Include null terminator
+ let expected_total = expected_header_size + namelen + 11;
+ assert_eq!(serialized.len(), expected_total);
+
+ // Verify we can deserialize it back
+ let deserialized = TreeEntry::deserialize_from_update(&serialized)?;
+ assert_eq!(deserialized.inode, entry.inode);
+ assert_eq!(deserialized.parent, entry.parent);
+ assert_eq!(deserialized.version, entry.version);
+ assert_eq!(deserialized.writer, entry.writer);
+ assert_eq!(deserialized.mtime, entry.mtime);
+ assert_eq!(deserialized.size, entry.size);
+ assert_eq!(deserialized.entry_type, entry.entry_type);
+ assert_eq!(deserialized.name, entry.name);
+ assert_eq!(deserialized.data, entry.data);
+
+ Ok(())
+}
+
+#[test]
+fn test_index_wire_format_compatibility() -> Result<()> {
+ // Verify memdb_index_t wire format matches C implementation
+ use pmxcfs_memdb::IndexEntry;
+
+ let entries = vec![
+ IndexEntry {
+ inode: 1,
+ digest: [0u8; 32],
+ },
+ IndexEntry {
+ inode: 2,
+ digest: [1u8; 32],
+ },
+ ];
+
+ let index = MemDbIndex::new(
+ 100, // version
+ 2, // last_inode
+ 1, // writer
+ 12345, // mtime
+ entries,
+ );
+
+ let serialized = index.serialize();
+
+ // Verify header size (32 bytes)
+ // version(8) + last_inode(8) + writer(4) + mtime(4) + size(4) + bytes(4)
+ let expected_header_size = 8 + 8 + 4 + 4 + 4 + 4;
+ assert_eq!(expected_header_size, 32);
+
+ // Verify entry size (40 bytes each)
+ // inode(8) + digest(32)
+ let expected_entry_size = 8 + 32;
+ assert_eq!(expected_entry_size, 40);
+
+ // Verify total size
+ let expected_total = expected_header_size + 2 * expected_entry_size;
+ assert_eq!(serialized.len(), expected_total);
+ assert_eq!(serialized.len(), index.bytes as usize);
+
+ // Verify deserialization
+ let deserialized = MemDbIndex::deserialize(&serialized)?;
+ assert_eq!(deserialized.version, index.version);
+ assert_eq!(deserialized.last_inode, index.last_inode);
+ assert_eq!(deserialized.writer, index.writer);
+ assert_eq!(deserialized.mtime, index.mtime);
+ assert_eq!(deserialized.size, index.size);
+ assert_eq!(deserialized.bytes, index.bytes);
+ assert_eq!(deserialized.entries.len(), 2);
+
+ Ok(())
+}
+
+#[test]
+fn test_sync_with_conflicts() -> Result<()> {
+ // Test scenario: two nodes modified different files
+ let (memdb1, _temp1, _callbacks1) = create_test_node(1)?;
+ let (memdb2, _temp2, _callbacks2) = create_test_node(2)?;
+
+ // Both start with same base
+ memdb1.create("/base.conf", 0, 1000)?;
+ memdb1.write("/base.conf", 0, 1001, b"shared", 0)?;
+
+ memdb2.create("/base.conf", 0, 1000)?;
+ memdb2.write("/base.conf", 0, 1001, b"shared", 0)?;
+
+ // Node 1 adds file1
+ memdb1.create("/file1.txt", 0, 2000)?;
+ memdb1.write("/file1.txt", 0, 2001, b"from node 1", 0)?;
+
+ // Node 2 adds file2
+ memdb2.create("/file2.txt", 0, 2000)?;
+ memdb2.write("/file2.txt", 0, 2001, b"from node 2", 0)?;
+
+ // Generate indices
+ let index1 = memdb1.encode_index()?;
+ let index2 = memdb2.encode_index()?;
+
+ // Find differences
+ let diffs_1_vs_2 = index1.find_differences(&index2);
+ let diffs_2_vs_1 = index2.find_differences(&index1);
+
+ // Node 1 has file1 that node 2 doesn't have
+ assert!(
+ !diffs_1_vs_2.is_empty(),
+ "Node 1 should have entries node 2 doesn't have"
+ );
+
+ // Node 2 has file2 that node 1 doesn't have
+ assert!(
+ !diffs_2_vs_1.is_empty(),
+ "Node 2 should have entries node 1 doesn't have"
+ );
+
+ // Higher version wins - in this case they're both v3 (base + create + write)
+ // so mtime would be tiebreaker
+
+ Ok(())
+}
+
+#[test]
+fn test_large_file_update() -> Result<()> {
+ // Test updating a file with significant data
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Create a file with 10KB of data
+ let large_data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect();
+
+ leader_db.create("/large.bin", 0, 1000)?;
+ leader_db.write("/large.bin", 0, 1001, &large_data, 0)?;
+
+ // Get the entry
+ let entry = leader_db.lookup_path("/large.bin").unwrap();
+
+ // Serialize and send
+ let update_msg = entry.serialize_for_update();
+
+ // Follower receives
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+
+ // Verify
+ let follower_entry = follower_db.lookup_path("/large.bin").unwrap();
+ assert_eq!(follower_entry.size, large_data.len());
+ assert_eq!(follower_entry.data, large_data);
+
+ Ok(())
+}
+
+#[test]
+fn test_directory_hierarchy_sync() -> Result<()> {
+ // Test syncing nested directory structure
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Create directory hierarchy on leader
+ leader_db.create("/etc", libc::S_IFDIR, 1000)?;
+ leader_db.create("/etc/pve", libc::S_IFDIR, 1001)?;
+ leader_db.create("/etc/pve/nodes", libc::S_IFDIR, 1002)?;
+ leader_db.create("/etc/pve/nodes/pve1", libc::S_IFDIR, 1003)?;
+ leader_db.create("/etc/pve/nodes/pve1/config", 0, 1004)?;
+ leader_db.write(
+ "/etc/pve/nodes/pve1/config",
+ 0,
+ 1005,
+ b"cpu: 2\nmem: 4096",
+ 0,
+ )?;
+
+ // Send all entries to follower
+ let entries = leader_db.get_all_entries()?;
+ for entry in entries {
+ if entry.inode == ROOT_INODE {
+ continue; // Skip root
+ }
+ let update_msg = entry.serialize_for_update();
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+ }
+
+ // Verify entire hierarchy
+ assert!(follower_db.lookup_path("/etc").is_some());
+ assert!(follower_db.lookup_path("/etc/pve").is_some());
+ assert!(follower_db.lookup_path("/etc/pve/nodes").is_some());
+ assert!(follower_db.lookup_path("/etc/pve/nodes/pve1").is_some());
+
+ let config = follower_db.lookup_path("/etc/pve/nodes/pve1/config");
+ assert!(config.is_some());
+ assert_eq!(config.unwrap().data, b"cpu: 2\nmem: 4096");
+
+ Ok(())
+}
--
2.47.3
_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel
next prev parent reply other threads:[~2026-01-07 9:15 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-01-06 14:24 [pve-devel] [PATCH pve-cluster 00/15 v1] Rewrite pmxcfs with Rust Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 01/15] pmxcfs-rs: add workspace and pmxcfs-api-types crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 02/15] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 03/15] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 04/15] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 05/15] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 06/15] pmxcfs-rs: add pmxcfs-status crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 07/15] pmxcfs-rs: add pmxcfs-test-utils infrastructure crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 08/15] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 09/15] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-01-06 14:24 ` Kefu Chai [this message]
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 11/15] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 13/15] pmxcfs-rs: add integration and workspace tests Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 14/15] pmxcfs-rs: add Makefile for build automation Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 15/15] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260106142440.2368585-11-k.chai@proxmox.com \
--to=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox