public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Cc: Kefu Chai <tchaikov@gmail.com>
Subject: [PATCH pve-cluster v3 09/13] pmxcfs-rs: add pmxcfs-dfsm crate
Date: Mon, 23 Mar 2026 19:32:24 +0800	[thread overview]
Message-ID: <20260323113239.942866-10-k.chai@proxmox.com> (raw)
In-Reply-To: <20260323113239.942866-1-k.chai@proxmox.com>

Add Distributed Finite State Machine for cluster synchronization:
- Dfsm: Core state machine implementation
- ClusterDatabaseService: MemDb sync (pmxcfs_v1 CPG group)
- StatusSyncService: Status sync (pve_kvstore_v1 CPG group)
- Protocol: SyncStart, State, Update, UpdateComplete, Verify
- Leader election based on version and mtime
- Incremental updates for efficiency

This integrates pmxcfs-memdb, pmxcfs-services, and rust-corosync
to provide cluster-wide database synchronization. It implements
the wire-compatible protocol used by the C version.

Includes unit tests for:
- Index serialization and comparison
- Leader election logic
- Tree entry serialization
- Diff computation between indices

Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
 src/pmxcfs-rs/Cargo.toml                      |    9 +
 src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml          |   47 +
 src/pmxcfs-rs/pmxcfs-dfsm/README.md           |  340 +++
 src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs    |   80 +
 .../src/cluster_database_service.rs           |  111 +
 src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs  |  256 +++
 src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs |  725 +++++++
 src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs |  233 +++
 .../pmxcfs-dfsm/src/kv_store_message.rs       |  560 +++++
 src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs          |   32 +
 src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs      |   21 +
 .../pmxcfs-dfsm/src/state_machine.rs          | 1815 +++++++++++++++++
 .../pmxcfs-dfsm/src/status_sync_service.rs    |  113 +
 src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs        |  107 +
 src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs  |  283 +++
 .../tests/multi_node_sync_tests.rs            |  568 ++++++
 src/pmxcfs-rs/pmxcfs-memdb/src/database.rs    |    4 +-
 17 files changed, 5302 insertions(+), 2 deletions(-)
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/README.md
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs

diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 9a4d3a51b..ef1fefb4a 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -10,6 +10,7 @@ members = [
     "pmxcfs-test-utils", # Test utilities and helpers (dev-only)
     "pmxcfs-services",   # Service framework for automatic retry and lifecycle management
     "pmxcfs-ipc",        # libqb-compatible IPC server
+    "pmxcfs-dfsm",       # Distributed Finite State Machine
 ]
 resolver = "2"
 
@@ -32,6 +33,10 @@ pmxcfs-status = { path = "pmxcfs-status" }
 pmxcfs-test-utils = { path = "pmxcfs-test-utils" }
 pmxcfs-services = { path = "pmxcfs-services" }
 pmxcfs-ipc = { path = "pmxcfs-ipc" }
+pmxcfs-dfsm = { path = "pmxcfs-dfsm" }
+
+# Corosync integration
+rust-corosync = "0.1"
 
 # Core async runtime
 tokio = { version = "1.35", features = ["full"] }
@@ -51,6 +56,7 @@ async-trait = "0.1"
 # Serialization
 serde = { version = "1.0", features = ["derive"] }
 bincode = "1.3"
+bytemuck = { version = "1.14", features = ["derive"] }
 
 # Network and cluster
 bytes = "1.5"
@@ -63,6 +69,9 @@ parking_lot = "0.12"
 libc = "0.2"
 nix = { version = "0.29", features = ["socket", "poll"] }
 
+# Utilities
+num_enum = "0.5"
+
 # Development dependencies
 tempfile = "3.8"
 
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
new file mode 100644
index 000000000..1e66c047e
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
@@ -0,0 +1,47 @@
+[package]
+name = "pmxcfs-dfsm"
+description = "Distributed Finite State Machine for cluster state synchronization"
+
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+# Internal dependencies
+pmxcfs-api-types.workspace = true
+pmxcfs-logger.workspace = true
+pmxcfs-memdb.workspace = true
+pmxcfs-services.workspace = true
+
+# Corosync integration
+rust-corosync.workspace = true
+
+# Error handling
+anyhow.workspace = true
+thiserror.workspace = true
+
+# Async and concurrency
+parking_lot.workspace = true
+async-trait.workspace = true
+tokio.workspace = true
+
+# Serialization
+serde.workspace = true
+bincode.workspace = true
+bytemuck.workspace = true
+
+# Logging
+tracing.workspace = true
+
+# Utilities
+num_enum.workspace = true
+libc.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
+libc.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/README.md b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
new file mode 100644
index 000000000..a8412f1b0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
@@ -0,0 +1,340 @@
+# pmxcfs-dfsm
+
+**Distributed Finite State Machine** for cluster-wide state synchronization in pmxcfs.
+
+This crate implements the DFSM protocol used to replicate configuration changes and status updates across all nodes in a Proxmox cluster via Corosync CPG (Closed Process Group).
+
+## Overview
+
+The DFSM is the core mechanism for maintaining consistency across cluster nodes. It ensures that:
+
+- All nodes see filesystem operations (writes, creates, deletes) in the same order
+- Database state remains synchronized even after network partitions
+- Status information (VM states, RRD data) is broadcast to all nodes
+- State verification catches inconsistencies
+
+## Architecture
+
+### Key Components
+
+### Module Structure
+
+| Module | Purpose | C Equivalent |
+|--------|---------|--------------|
+| `state_machine.rs` | Core DFSM logic, state transitions | `dfsm.c` |
+| `cluster_database_service.rs` | MemDb sync service | `dcdb.c`, `loop.c:service_dcdb` |
+| `status_sync_service.rs` | Status/kvstore sync service | `loop.c:service_status` |
+| `cpg_service.rs` | Corosync CPG integration | `dfsm.c:cpg_callbacks` |
+| `dfsm_message.rs` | Protocol message types | `dfsm.c:dfsm_message_*_header_t` |
+| `message.rs` | Message trait and serialization | (inline in C) |
+| `wire_format.rs` | C-compatible wire format | `dcdb.c:c_fuse_message_header_t` |
+| `broadcast.rs` | Cluster-wide message broadcast | `dcdb.c:dcdb_send_fuse_message` |
+| `types.rs` | Type definitions (modes, epochs) | `dfsm.c:dfsm_mode_t` |
+
+## C to Rust Mapping
+
+### Data Structures
+
+| C Type | Rust Type | Notes |
+|--------|-----------|-------|
+| `dfsm_t` | `Dfsm` | Main state machine |
+| `dfsm_mode_t` | `DfsmMode` | Enum with type safety |
+| `dfsm_node_info_t` | (internal) | Node state tracking |
+| `dfsm_sync_info_t` | (internal) | Sync session info |
+| `dfsm_callbacks_t` | Trait-based callbacks | Type-safe callbacks via traits |
+| `dfsm_message_*_header_t` | `DfsmMessage` | Type-safe enum variants |
+
+### Functions
+
+#### Core DFSM Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dfsm_new()` | `Dfsm::new()` | state_machine.rs |
+| `dfsm_initialize()` | `Dfsm::init_cpg()` | state_machine.rs |
+| `dfsm_join()` | (part of init_cpg) | state_machine.rs |
+| `dfsm_dispatch()` | `Dfsm::dispatch_events()` | state_machine.rs |
+| `dfsm_send_message()` | `Dfsm::send_message()` | state_machine.rs |
+| `dfsm_send_update()` | `Dfsm::send_update()` | state_machine.rs |
+| `dfsm_verify_request()` | `Dfsm::verify_request()` | state_machine.rs |
+| `dfsm_finalize()` | `Dfsm::stop_services()` | state_machine.rs |
+
+#### DCDB (Cluster Database) Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dcdb_new()` | `ClusterDatabaseService::new()` | cluster_database_service.rs |
+| `dcdb_send_fuse_message()` | `broadcast()` | broadcast.rs |
+| `dcdb_send_unlock()` | `FuseMessage::Unlock` + broadcast | broadcast.rs |
+| `service_dcdb()` | `ClusterDatabaseService` | cluster_database_service.rs |
+
+#### Status Sync Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `service_status()` | `StatusSyncService` | status_sync_service.rs |
+| (kvstore CPG group) | `StatusSyncService` | Uses separate CPG group |
+
+### Callback System
+
+**C Implementation:**
+
+**Rust Implementation:**
+- Uses trait-based callbacks instead of function pointers
+- Callbacks are implemented by `MemDbCallbacks` (memdb integration)
+- Defined in external crates (pmxcfs-memdb)
+
+## Synchronization Protocol
+
+The DFSM ensures all nodes maintain consistent database state through a multi-phase synchronization protocol:
+
+### Protocol Phases
+
+#### Phase 1: Membership Change
+
+When nodes join or leave the cluster:
+
+1. **Corosync CPG** delivers membership change notification
+2. **DFSM invalidates** cached checksums
+3. **Message queues** are cleared
+4. **Epoch counter** is incremented
+
+**CPG Leader** (lowest node ID):
+- Initiates sync by sending `SyncStart` message
+- Sends its own `State` (CPG doesn't loop back messages)
+
+**All Followers**:
+- Respond to `SyncStart` by sending their `State`
+- Wait for other nodes' states
+
+#### Phase 2: State Exchange
+
+Each node collects `State` messages containing serialized **MemDbIndex** (compact state summary using C-compatible wire format).
+
+State digests are computed using SHA-256 hashing to detect differences between nodes.
+
+#### Phase 3: Leader Election
+
+When all states are collected, `process_state_update()` is called:
+
+1. **Parse indices** from all node states
+2. **Elect data leader** (may differ from CPG leader):
+   - Highest `version` wins
+   - If tied, highest `mtime` wins
+3. **Identify synced nodes**: Nodes whose index matches leader exactly
+4. **Determine own status**:
+   - If we're the data leader → send updates to followers
+   - If we're synced with leader → mark as Synced
+   - Otherwise → enter Update mode and wait
+
+**Leader Election Algorithm**:
+
+#### Phase 4: Incremental Updates
+
+**Data Leader** (node with highest version):
+
+1. **Compare indices** using `find_differences()` for each follower
+2. **Serialize differing entries** to C-compatible TreeEntry format
+3. **Send Update messages** via CPG
+4. **Send UpdateComplete** when all updates sent
+
+**Followers** (out-of-sync nodes):
+
+1. **Receive Update messages**
+2. **Deserialize TreeEntry** via `TreeEntry::deserialize_from_update()`
+3. **Apply to database** via `MemDb::apply_tree_entry()`:
+   - INSERT OR REPLACE in SQLite
+   - Update in-memory structures
+   - Handle entry moves (parent/name changes)
+4. **On UpdateComplete**: Transition to Synced mode
+
+#### Phase 5: Normal Operations
+
+When in **Synced** mode:
+
+- FUSE operations are broadcast via `send_fuse_message()`
+- Messages are delivered immediately via `deliver_message()`
+- Leader periodically sends `VerifyRequest` for checksum comparison
+- Nodes respond with `Verify` containing SHA-256 of entire database
+- Mismatches trigger cluster resync
+
+---
+
+## Protocol Details
+
+### State Machine Transitions
+
+Based on analysis of C implementation (`dfsm.c` lines 795-1209):
+
+#### Critical Protocol Rules
+
+1. **Epoch Management**:
+   - Each node creates local epoch during confchg: `(counter++, time, own_nodeid, own_pid)`
+   - **Leader sends SYNC_START with its epoch**
+   - **Followers MUST adopt leader's epoch from SYNC_START** (`dfsm->sync_epoch = header->epoch`)
+   - All STATE messages in sync round use adopted epoch
+   - Epoch mismatch → message discarded (may lead to LEAVE)
+
+2. **Member List Validation**:
+   - Built from `member_list` in confchg callback
+   - Stored in `dfsm->sync_info->nodes[]`
+   - STATE sender MUST be in this list
+   - Non-member STATE → immediate LEAVE
+
+3. **Duplicate Detection**:
+   - Each node sends STATE exactly once per sync round
+   - Tracked via `ni->state` pointer (NULL = not received, non-NULL = received)
+   - Duplicate STATE from same nodeid/pid → immediate LEAVE
+   - ✅ **FIXED**: Rust implementation now matches C (see commit c321869cc)
+
+4. **Message Ordering** (one sync round):
+   
+5. **Leader Selection**:
+   - Determined by `lowest_nodeid` from member list
+   - Set in confchg callback before any messages sent
+   - Used to validate SYNC_START sender (logged but not enforced)
+   - Re-elected during state processing based on DB versions
+
+### DFSM States (DfsmMode)
+
+| State | Value | Description | C Equivalent |
+|-------|-------|-------------|--------------|
+| `Start` | 0 | Initial connection | `DFSM_MODE_START` |
+| `StartSync` | 1 | Beginning sync | `DFSM_MODE_START_SYNC` |
+| `Synced` | 2 | Fully synchronized | `DFSM_MODE_SYNCED` |
+| `Update` | 3 | Receiving updates | `DFSM_MODE_UPDATE` |
+| `Leave` | 253 | Leaving group | `DFSM_MODE_LEAVE` |
+| `VersionError` | 254 | Protocol mismatch | `DFSM_MODE_VERSION_ERROR` |
+| `Error` | 255 | Error state | `DFSM_MODE_ERROR` |
+
+### Message Types (DfsmMessageType)
+
+| Type | Value | Purpose |
+|------|-------|---------|
+| `Normal` | 0 | Application messages (with header + payload) |
+| `SyncStart` | 1 | Start sync (from leader) |
+| `State` | 2 | Full state data |
+| `Update` | 3 | Incremental update |
+| `UpdateComplete` | 4 | End of updates |
+| `VerifyRequest` | 5 | Request state verification |
+| `Verify` | 6 | State checksum response |
+
+All messages use C-compatible wire format with headers and payloads.
+
+### Application Message Types
+
+The DFSM can carry two types of application messages:
+
+1. **Fuse Messages** (Filesystem operations)
+   - CPG Group: `pmxcfs_v1` (DCDB)
+   - Message types: `Write`, `Create`, `Delete`, `Mkdir`, `Rename`, `SetMtime`, `Unlock`
+   - Defined in: `pmxcfs-api-types::FuseMessage`
+
+2. **KvStore Messages** (Status/RRD sync)
+   - CPG Group: `pve_kvstore_v1`
+   - Message types: `Data` (key-value pairs for status sync)
+   - Defined in: `pmxcfs-api-types::KvStoreMessage`
+
+### Wire Format Compatibility
+
+All wire formats are **byte-compatible** with the C implementation. Messages include appropriate headers and payloads as defined in the C protocol.
+
+## Synchronization Flow
+
+### 1. Node Join
+
+### 2. Normal Operation
+
+### 3. State Verification (Periodic)
+
+## Key Differences from C Implementation
+
+### Event Loop Architecture
+
+**C Version:**
+- Uses libqb's `qb_loop` for event loop
+- CPG fd registered with `qb_loop_poll_add()`
+- Dispatch called from qb_loop when fd is readable
+
+**Rust Version:**
+- Uses tokio async runtime
+- Service trait provides `dispatch()` method
+- ServiceManager polls fd using tokio's async I/O
+- No qb_loop dependency
+
+### CPG Instance Management
+
+**C Version:**
+- Single DFSM struct with callbacks
+- Two different CPG groups created separately
+
+**Rust Version:**
+- Each CPG group gets its own `Dfsm` instance
+- `ClusterDatabaseService` - manages `pmxcfs_v1` CPG group (MemDb)
+- `StatusSyncService` - manages `pve_kvstore_v1` CPG group (Status/RRD)
+- Both use same DFSM protocol but different callbacks
+
+## Error Handling
+
+### Split-Brain Prevention
+
+- Checksum verification detects divergence
+- Automatic resync on mismatch
+- Version monotonicity ensures forward progress
+
+### Network Partition Recovery
+
+- Membership changes trigger sync
+- Highest version always wins
+- Stale data is safely replaced
+
+### Consistency Guarantees
+
+- SQLite transactions ensure atomic updates
+- In-memory structures updated atomically
+- Version increments are monotonic
+- All nodes converge to same state
+
+## Compatibility Matrix
+
+| Feature | C Version | Rust Version | Compatible |
+|---------|-----------|--------------|------------|
+| Wire format | `dfsm_message_*_header_t` | `DfsmMessage::serialize()` | Yes |
+| CPG protocol | libcorosync | rust-corosync | Yes |
+| Message types | 0-6 | `DfsmMessageType` | Yes |
+| State machine | `dfsm_mode_t` | `DfsmMode` | Yes |
+| Protocol version | 1 | 1 | Yes |
+| Group names | `pmxcfs_v1`, `pve_kvstore_v1` | Same | Yes |
+
+## Known Issues / TODOs
+
+### Missing Features
+- [ ] **Sync message batching**: C version can batch updates, Rust sends individually
+- [ ] **Message queue limits**: C has MAX_QUEUE_LEN, Rust unbounded (potential memory issue)
+- [ ] **Detailed error codes**: C returns specific CS_ERR_* codes, Rust uses anyhow errors
+
+### Behavioral Differences (Benign)
+- **Logging**: Rust uses `tracing` instead of `qb_log` (compatible with journald)
+- **Threading**: Rust uses tokio tasks, C uses qb_loop single-threaded model
+- **Timers**: Rust uses tokio timers, C uses qb_loop timers (same timeout values)
+
+### Incompatibilities (None Known)
+No incompatibilities have been identified. The Rust implementation is fully wire-compatible and can operate in a mixed C/Rust cluster.
+
+## References
+
+### C Implementation
+- `src/pmxcfs/dfsm.c` / `dfsm.h` - Core DFSM implementation
+- `src/pmxcfs/dcdb.c` / `dcdb.h` - Distributed database coordination
+- `src/pmxcfs/loop.c` / `loop.h` - Service loop and management
+
+### Related Crates
+- **pmxcfs-memdb**: Database callbacks for DFSM
+- **pmxcfs-status**: Status tracking and kvstore
+- **pmxcfs-api-types**: Message type definitions
+- **pmxcfs-services**: Service framework for lifecycle management
+- **rust-corosync**: CPG bindings (external dependency)
+
+### Corosync Documentation
+- CPG (Closed Process Group) API: https://github.com/corosync/corosync
+- Group communication semantics: Total order, virtual synchrony
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
new file mode 100644
index 000000000..8490ca701
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
@@ -0,0 +1,80 @@
+/// DFSM application callbacks
+///
+/// This module defines the callback trait that application layers implement
+/// to integrate with the DFSM state machine.
+use crate::NodeSyncInfo;
+
+/// Callback trait for DFSM operations
+///
+/// The application layer implements this to receive DFSM events.
+/// The associated type `Message` specifies the message type this callback handles:
+/// - `FuseMessage` for main database operations
+/// - `KvStoreMessage` for status synchronization
+///
+/// This provides type safety by ensuring each DFSM instance only delivers
+/// the correct message type to its callbacks.
+pub trait Callbacks: Send + Sync {
+    /// The message type this callback handles
+    type Message: crate::message::Message;
+
+    /// Deliver an application message
+    ///
+    /// The message type is determined by the associated type:
+    /// - FuseMessage for main database operations
+    /// - KvStoreMessage for status synchronization
+    fn deliver_message(
+        &self,
+        nodeid: u32,
+        pid: u32,
+        message: Self::Message,
+        timestamp: u64,
+    ) -> std::io::Result<(i32, bool)>;
+
+    /// Compute state checksum for verification
+    fn compute_checksum(&self, output: &mut [u8; 32]) -> anyhow::Result<()>;
+
+    /// Get current state for synchronization
+    ///
+    /// Called when we need to send our state to other nodes during sync.
+    fn get_state(&self) -> anyhow::Result<Vec<u8>>;
+
+    /// Process state update during synchronization
+    fn process_state_update(&self, states: &[NodeSyncInfo]) -> anyhow::Result<bool>;
+
+    /// Process incremental update from leader
+    ///
+    /// The leader sends individual TreeEntry updates during synchronization.
+    /// The data is serialized TreeEntry in C-compatible wire format.
+    fn process_update(&self, nodeid: u32, pid: u32, data: &[u8]) -> anyhow::Result<()>;
+
+    /// Commit synchronized state
+    fn commit_state(&self) -> anyhow::Result<()>;
+
+    /// Called when cluster becomes synced
+    fn on_synced(&self);
+
+    /// Clean up sync resources (matches C's dfsm_cleanup_fn)
+    ///
+    /// Called to release resources allocated during state synchronization.
+    /// This is called when sync resources are being released, typically during
+    /// membership changes or when transitioning out of sync mode.
+    ///
+    /// Default implementation does nothing (Rust's RAII handles most cleanup).
+    fn cleanup_sync_resources(&self, _states: &[NodeSyncInfo]) {
+        // Default: no-op, Rust's Drop trait handles cleanup
+    }
+
+    /// Called on membership changes (matches C's dfsm_confchg_fn)
+    ///
+    /// Notifies the application layer when cluster membership changes.
+    /// This can be used for logging, monitoring, or application-specific
+    /// membership tracking.
+    ///
+    /// # Arguments
+    /// * `member_list` - Current list of cluster members after the change
+    ///
+    /// Default implementation does nothing (membership handled internally).
+    fn on_membership_change(&self, _member_list: &[pmxcfs_api_types::MemberInfo]) {
+        // Default: no-op, membership changes handled internally
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
new file mode 100644
index 000000000..24ac255e0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
@@ -0,0 +1,111 @@
+//! Cluster Database Service
+//!
+//! This service synchronizes the distributed cluster database (pmxcfs-memdb) across
+//! all cluster nodes using DFSM (Distributed Finite State Machine).
+//!
+//! Equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+//! Provides automatic retry, event-driven CPG dispatching, and periodic state verification.
+
+use async_trait::async_trait;
+use pmxcfs_services::{Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{debug, error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message;
+
+/// Cluster Database Service
+///
+/// Synchronizes the distributed cluster database (pmxcfs-memdb) across all nodes.
+/// Implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for database replication
+/// - Periodic state verification via timer callback
+///
+/// This is equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct ClusterDatabaseService<M> {
+    dfsm: Arc<Dfsm<M>>,
+}
+
+impl<M: Message> ClusterDatabaseService<M> {
+    /// Create a new cluster database service
+    pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+        Self { dfsm }
+    }
+}
+
+#[async_trait]
+impl<M: Message> Service for ClusterDatabaseService<M> {
+    fn name(&self) -> &str {
+        "cluster-database"
+    }
+
+    async fn initialize(&mut self) -> pmxcfs_services::Result<std::os::unix::io::RawFd> {
+        info!("Initializing cluster database service (dcdb)");
+
+        // Initialize CPG connection (this also joins the group)
+        self.dfsm.init_cpg().map_err(|e| {
+            ServiceError::InitializationFailed(format!("DFSM CPG initialization failed: {e}"))
+        })?;
+
+        // Get file descriptor for event monitoring
+        let fd = self.dfsm.fd_get().map_err(|e| {
+            self.dfsm.stop_services().ok();
+            ServiceError::InitializationFailed(format!("Failed to get DFSM fd: {e}"))
+        })?;
+
+        info!(
+            "Cluster database service initialized successfully with fd {}",
+            fd
+        );
+        Ok(fd)
+    }
+
+    async fn dispatch(&mut self) -> pmxcfs_services::Result<bool> {
+        match self.dfsm.dispatch_events() {
+            Ok(_) => Ok(true),
+            Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+                warn!("DFSM connection lost, requesting reinitialization");
+                Ok(false)
+            }
+            Err(e) => {
+                error!("DFSM dispatch failed: {}", e);
+                Err(ServiceError::DispatchFailed(format!(
+                    "DFSM dispatch failed: {e}"
+                )))
+            }
+        }
+    }
+
+    async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+        info!("Finalizing cluster database service");
+
+        if let Err(e) = self.dfsm.stop_services() {
+            warn!("Error stopping cluster database services: {}", e);
+        }
+
+        info!("Cluster database service finalized");
+        Ok(())
+    }
+
+    async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+        debug!("Cluster database timer callback: initiating state verification");
+
+        // Request state verification
+        if let Err(e) = self.dfsm.verify_request() {
+            warn!("DFSM state verification request failed: {}", e);
+        }
+
+        Ok(())
+    }
+
+    fn timer_period(&self) -> Option<Duration> {
+        // Match C implementation's DCDB_VERIFY_TIME (60 * 60 seconds)
+        // Periodic state verification happens once per hour
+        Some(Duration::from_secs(3600))
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
new file mode 100644
index 000000000..45a78ab27
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
@@ -0,0 +1,256 @@
+//! Safe, idiomatic wrapper for Corosync CPG (Closed Process Group)
+//!
+//! This module provides a trait-based abstraction over the Corosync CPG C API,
+//! handling the unsafe FFI boundary and callback lifecycle management internally.
+
+use anyhow::Result;
+use rust_corosync::{CsError, NodeId, cpg};
+use std::sync::Arc;
+use std::time::Duration;
+
+/// Helper to extract CpgHandler from CPG context
+///
+/// # Safety
+/// - Context must point to a valid Arc<dyn CpgHandler> leaked via Box::into_raw()
+/// - Handler must still be alive (CpgService not dropped)
+/// - Pointer must be properly aligned for Arc<dyn CpgHandler>
+///
+/// # Errors
+/// Returns error if context is invalid, null, or misaligned
+unsafe fn handler_from_context<'a>(handle: cpg::Handle) -> Result<&'a dyn CpgHandler> {
+    let context = cpg::context_get(handle)
+        .map_err(|e| anyhow::anyhow!("Failed to get CPG context: {e:?}"))?;
+
+    if context == 0 {
+        return Err(anyhow::anyhow!("CPG context is null - not initialized"));
+    }
+
+    // Validate pointer alignment
+    if context % std::mem::align_of::<Arc<dyn CpgHandler>>() as u64 != 0 {
+        return Err(anyhow::anyhow!("CPG context pointer misaligned"));
+    }
+
+    // Context points to a leaked Arc<dyn CpgHandler>
+    // We borrow the Arc to get a reference to the handler
+    let arc_ptr = context as *const Arc<dyn CpgHandler>;
+    let arc_ref: &Arc<dyn CpgHandler> = unsafe { &*arc_ptr };
+    Ok(arc_ref.as_ref())
+}
+
+/// Trait for handling CPG events in a safe, idiomatic way
+///
+/// Implementors receive callbacks when CPG events occur. The trait handles
+/// all unsafe pointer conversion and context management internally.
+pub trait CpgHandler: Send + Sync + 'static {
+    fn on_deliver(&self, group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]);
+
+    fn on_confchg(
+        &self,
+        group_name: &str,
+        member_list: &[cpg::Address],
+        left_list: &[cpg::Address],
+        joined_list: &[cpg::Address],
+    );
+}
+
+/// Safe wrapper for CPG handle that manages callback lifecycle
+///
+/// This service registers callbacks with the CPG handle and ensures proper
+/// cleanup when dropped. It uses Arc reference counting to safely manage
+/// the handler lifetime across the FFI boundary.
+pub struct CpgService {
+    handle: cpg::Handle,
+    handler: Arc<dyn CpgHandler>,
+}
+
+impl CpgService {
+    pub fn new<T: CpgHandler>(handler: Arc<T>) -> Result<Self> {
+        fn cpg_deliver_callback(
+            handle: &cpg::Handle,
+            group_name: String,
+            nodeid: NodeId,
+            pid: u32,
+            msg: &[u8],
+            _msg_len: usize,
+        ) {
+            // Catch panics to prevent unwinding through C code (UB)
+            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+                match unsafe { handler_from_context(*handle) } {
+                    Ok(handler) => handler.on_deliver(&group_name, nodeid, pid, msg),
+                    Err(e) => {
+                        tracing::error!("CPG deliver callback error: {}", e);
+                    }
+                }
+            }));
+
+            if let Err(panic) = result {
+                tracing::error!("PANIC in CPG deliver callback: {:?}", panic);
+            }
+        }
+
+        fn cpg_confchg_callback(
+            handle: &cpg::Handle,
+            group_name: &str,
+            member_list: Vec<cpg::Address>,
+            left_list: Vec<cpg::Address>,
+            joined_list: Vec<cpg::Address>,
+        ) {
+            // Catch panics to prevent unwinding through C code (UB)
+            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+                match unsafe { handler_from_context(*handle) } {
+                    Ok(handler) => {
+                        handler.on_confchg(group_name, &member_list, &left_list, &joined_list)
+                    }
+                    Err(e) => {
+                        tracing::error!("CPG confchg callback error: {}", e);
+                    }
+                }
+            }));
+
+            if let Err(panic) = result {
+                tracing::error!("PANIC in CPG confchg callback: {:?}", panic);
+            }
+        }
+
+        let model_data = cpg::ModelData::ModelV1(cpg::Model1Data {
+            flags: cpg::Model1Flags::None,
+            deliver_fn: Some(cpg_deliver_callback),
+            confchg_fn: Some(cpg_confchg_callback),
+            totem_confchg_fn: None,
+        });
+
+        let handle = cpg::initialize(&model_data, 0)?;
+
+        let handler_dyn: Arc<dyn CpgHandler> = handler;
+        let leaked_arc = Box::new(Arc::clone(&handler_dyn));
+        let arc_ptr = Box::into_raw(leaked_arc) as u64;
+
+        // Set context with error handling to prevent Arc leak
+        if let Err(e) = cpg::context_set(handle, arc_ptr) {
+            // Recover the leaked Arc on error
+            unsafe {
+                let _ = Box::from_raw(arc_ptr as *mut Arc<dyn CpgHandler>);
+            }
+            // Finalize CPG handle
+            let _ = cpg::finalize(handle);
+            return Err(e.into());
+        }
+
+        Ok(Self {
+            handle,
+            handler: handler_dyn,
+        })
+    }
+
+    pub fn join(&self, group_name: &str) -> Result<()> {
+        // Group names are hardcoded in the application, so assert they don't contain nulls
+        debug_assert!(
+            !group_name.contains('\0'),
+            "Group name cannot contain null bytes"
+        );
+
+        // IMPORTANT: C implementation uses strlen(name) + 1 for CPG name length,
+        // which includes the trailing nul. To ensure compatibility with C nodes,
+        // we must add \0 to the group name.
+        // See src/pmxcfs/dfsm.c: dfsm->cpg_group_name.length = strlen(group_name) + 1;
+        let group_string = format!("{group_name}\0");
+        tracing::warn!(
+            "CPG JOIN: Joining group '{}' (verify matches C's DCDB_CPG_GROUP_NAME='pve_dcdb_v1')",
+            group_name
+        );
+        cpg::join(self.handle, &group_string)?;
+        tracing::info!("CPG JOIN: Successfully joined group '{}'", group_name);
+        Ok(())
+    }
+
+    pub fn leave(&self, group_name: &str) -> Result<()> {
+        // Group names are hardcoded in the application, so assert they don't contain nulls
+        debug_assert!(
+            !group_name.contains('\0'),
+            "Group name cannot contain null bytes"
+        );
+
+        // Include trailing nul to match C's behavior (see join() comment)
+        let group_string = format!("{group_name}\0");
+        cpg::leave(self.handle, &group_string)?;
+        Ok(())
+    }
+
+    pub fn mcast(&self, guarantee: cpg::Guarantee, msg: &[u8]) -> Result<()> {
+        // cpg_mcast_joined() returns CS_ERR_TRY_AGAIN when called from within a
+        // cpg_configuration_change_fn callback. The C implementation retries up to
+        // 100 times with 100ms sleep (dfsm_send_message_full in dfsm.c). We match
+        // that behavior here so that on_confchg can broadcast SYNC_START — required
+        // for backward compatibility with C nodes that always enter START_SYNC on
+        // any membership change (see state_machine.rs: handle_membership_change).
+        for attempt in 0..100u32 {
+            match cpg::mcast_joined(self.handle, guarantee, msg) {
+                Ok(()) => return Ok(()),
+                Err(CsError::CsErrTryAgain) if attempt < 99 => {
+                    std::thread::sleep(Duration::from_millis(100));
+                }
+                Err(e) => return Err(e.into()),
+            }
+        }
+        anyhow::bail!("cpg_mcast_joined returned CS_ERR_TRY_AGAIN after 100 retries")
+    }
+
+    pub fn dispatch(&self) -> Result<(), rust_corosync::CsError> {
+        cpg::dispatch(self.handle, rust_corosync::DispatchFlags::All)
+    }
+
+    pub fn fd(&self) -> Result<i32> {
+        Ok(cpg::fd_get(self.handle)?)
+    }
+
+    pub fn handler(&self) -> &Arc<dyn CpgHandler> {
+        &self.handler
+    }
+
+    pub fn handle(&self) -> cpg::Handle {
+        self.handle
+    }
+}
+
+impl Drop for CpgService {
+    fn drop(&mut self) {
+        // CRITICAL: Finalize BEFORE recovering Arc to prevent race condition
+        // where callbacks could fire while we're deallocating the Arc
+        if let Err(e) = cpg::finalize(self.handle) {
+            tracing::error!("Failed to finalize CPG handle: {:?}", e);
+        }
+
+        // Now safe to recover Arc - no more callbacks can fire
+        match cpg::context_get(self.handle) {
+            Ok(context) if context != 0 => unsafe {
+                let _boxed = Box::from_raw(context as *mut Arc<dyn CpgHandler>);
+            },
+            Ok(_) => {
+                tracing::warn!("CPG context was null during drop");
+            }
+            Err(e) => {
+                // Context_get might fail after finalize - this is acceptable
+                tracing::debug!("Context get failed during drop: {:?}", e);
+            }
+        }
+    }
+}
+
+/// SAFETY: CpgService is thread-safe with the following guarantees:
+///
+/// 1. cpg::Handle is thread-safe per Corosync 2.x/3.x library design
+///    - CPG library uses internal mutex for concurrent access protection
+///
+/// 2. Handler is protected by Arc reference counting
+///    - Multiple threads can safely hold references to the handler
+///
+/// 3. CpgHandler trait requires Send + Sync
+///    - Implementations must handle concurrent callbacks safely
+///
+/// 4. Methods join/leave/mcast are safe to call concurrently from multiple threads
+///
+/// LIMITATIONS:
+/// - Do NOT call dispatch() concurrently from multiple threads on the same handle
+///   (CPG library dispatch is not reentrant)
+unsafe impl Send for CpgService {}
+unsafe impl Sync for CpgService {}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
new file mode 100644
index 000000000..da825ca45
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
@@ -0,0 +1,725 @@
+/// DFSM Protocol Message Types
+///
+/// This module defines the DfsmMessage enum which encapsulates all DFSM protocol messages
+/// with their associated data, providing type-safe serialization and deserialization.
+///
+/// Wire format matches C implementation's dfsm_message_*_header_t structures for compatibility.
+use anyhow::Result;
+use pmxcfs_memdb::TreeEntry;
+
+use super::message::Message;
+use super::types::{DfsmMessageType, SyncEpoch};
+
+/// DFSM protocol message with typed variants
+///
+/// Each variant corresponds to a message type in the DFSM protocol and carries
+/// the appropriate payload data. The wire format matches the C implementation:
+///
+/// For Normal messages: dfsm_message_normal_header_t (24 bytes) + fuse_data
+/// ```text
+/// [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][fuse_data...]
+/// ```
+///
+/// The generic parameter `M` specifies the application message type and must implement
+/// the `Message` trait for serialization/deserialization:
+/// - `DfsmMessage<FuseMessage>` for database operations
+/// - `DfsmMessage<KvStoreMessage>` for status synchronization
+#[derive(Debug, Clone)]
+pub enum DfsmMessage<M: Message> {
+    /// Regular application message
+    ///
+    /// Contains a typed application message (FuseMessage or KvStoreMessage).
+    /// C wire format: dfsm_message_normal_header_t + application_message data
+    Normal {
+        msg_count: u64,
+        timestamp: u32,        // Unix timestamp (matches C's u32)
+        protocol_version: u32, // Protocol version
+        message: M,            // Typed message (FuseMessage or KvStoreMessage)
+    },
+
+    /// Start synchronization signal from leader (no payload)
+    /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+    SyncStart { sync_epoch: SyncEpoch },
+
+    /// State data from another node during sync
+    ///
+    /// Wire format: dfsm_message_state_header_t (32 bytes) + [state_data: raw bytes]
+    State {
+        sync_epoch: SyncEpoch,
+        data: Vec<u8>,
+    },
+
+    /// State update from leader
+    ///
+    /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + TreeEntry fields
+    /// This is sent by the leader during synchronization to update followers
+    /// with individual database entries that differ from their state.
+    Update {
+        sync_epoch: SyncEpoch,
+        tree_entry: TreeEntry,
+    },
+
+    /// Update complete signal from leader (no payload)
+    /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+    UpdateComplete { sync_epoch: SyncEpoch },
+
+    /// Verification request from leader
+    ///
+    /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64]
+    VerifyRequest { sync_epoch: SyncEpoch, csum_id: u64 },
+
+    /// Verification response with checksum
+    ///
+    /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64][checksum: [u8; 32]]
+    Verify {
+        sync_epoch: SyncEpoch,
+        csum_id: u64,
+        checksum: [u8; 32],
+    },
+}
+
+impl<M: Message> DfsmMessage<M> {
+    /// Protocol version (should match cluster-wide)
+    pub const DEFAULT_PROTOCOL_VERSION: u32 = 1;
+
+    /// Get the message type discriminant
+    pub fn message_type(&self) -> DfsmMessageType {
+        match self {
+            DfsmMessage::Normal { .. } => DfsmMessageType::Normal,
+            DfsmMessage::SyncStart { .. } => DfsmMessageType::SyncStart,
+            DfsmMessage::State { .. } => DfsmMessageType::State,
+            DfsmMessage::Update { .. } => DfsmMessageType::Update,
+            DfsmMessage::UpdateComplete { .. } => DfsmMessageType::UpdateComplete,
+            DfsmMessage::VerifyRequest { .. } => DfsmMessageType::VerifyRequest,
+            DfsmMessage::Verify { .. } => DfsmMessageType::Verify,
+        }
+    }
+
+    /// Serialize message to C-compatible wire format
+    ///
+    /// For Normal/Update: dfsm_message_normal_header_t (24 bytes) + application_data
+    /// Format: [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][data...]
+    pub fn serialize(&self) -> Vec<u8> {
+        match self {
+            DfsmMessage::Normal {
+                msg_count,
+                timestamp,
+                protocol_version,
+                message,
+            } => self.serialize_normal_message(*msg_count, *timestamp, *protocol_version, message),
+            _ => self.serialize_state_message(),
+        }
+    }
+
+    /// Serialize a Normal message with C-compatible header
+    fn serialize_normal_message(
+        &self,
+        msg_count: u64,
+        timestamp: u32,
+        protocol_version: u32,
+        message: &M,
+    ) -> Vec<u8> {
+        let msg_type = self.message_type() as u16;
+        let subtype = message.message_type();
+        let app_data = message.serialize();
+
+        // C header: type (u16) + subtype (u16) + protocol (u32) + time (u32) + reserved (u32) + count (u64) = 24 bytes
+        let mut message = Vec::with_capacity(24 + app_data.len());
+
+        // dfsm_message_header_t fields
+        message.extend_from_slice(&msg_type.to_le_bytes());
+        message.extend_from_slice(&subtype.to_le_bytes());
+        message.extend_from_slice(&protocol_version.to_le_bytes());
+        message.extend_from_slice(&timestamp.to_le_bytes());
+        message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+        // count field
+        message.extend_from_slice(&msg_count.to_le_bytes());
+
+        // application message data
+        message.extend_from_slice(&app_data);
+
+        message
+    }
+
+    /// Serialize state messages (non-Normal) with C-compatible header
+    /// C wire format: dfsm_message_state_header_t (32 bytes) + payload
+    /// Header breakdown: base (16 bytes) + epoch (16 bytes)
+    fn serialize_state_message(&self) -> Vec<u8> {
+        let msg_type = self.message_type() as u16;
+        let (sync_epoch, payload) = self.extract_epoch_and_payload();
+
+        // For state messages: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + payload
+        let mut message = Vec::with_capacity(32 + payload.len());
+
+        // Base header (16 bytes): type, subtype, protocol, time, reserved
+        message.extend_from_slice(&msg_type.to_le_bytes());
+        message.extend_from_slice(&0u16.to_le_bytes()); // subtype (unused)
+        message.extend_from_slice(&Self::DEFAULT_PROTOCOL_VERSION.to_le_bytes());
+
+        message.extend_from_slice(&pmxcfs_api_types::unix_now_secs().to_le_bytes());
+        message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+        // Epoch header (16 bytes): epoch, time, nodeid, pid
+        message.extend_from_slice(&sync_epoch.serialize());
+
+        // Payload
+        message.extend_from_slice(&payload);
+
+        message
+    }
+
+    /// Extract sync_epoch and payload from state messages
+    fn extract_epoch_and_payload(&self) -> (SyncEpoch, Vec<u8>) {
+        match self {
+            DfsmMessage::Normal { .. } => {
+                unreachable!("Normal messages use serialize_normal_message")
+            }
+            DfsmMessage::SyncStart { sync_epoch } => (*sync_epoch, Vec::new()),
+            DfsmMessage::State { sync_epoch, data } => (*sync_epoch, data.clone()),
+            DfsmMessage::Update {
+                sync_epoch,
+                tree_entry,
+            } => (*sync_epoch, tree_entry.serialize_for_update()),
+            DfsmMessage::UpdateComplete { sync_epoch } => (*sync_epoch, Vec::new()),
+            DfsmMessage::VerifyRequest {
+                sync_epoch,
+                csum_id,
+            } => (*sync_epoch, csum_id.to_le_bytes().to_vec()),
+            DfsmMessage::Verify {
+                sync_epoch,
+                csum_id,
+                checksum,
+            } => {
+                let mut data = Vec::with_capacity(8 + 32);
+                data.extend_from_slice(&csum_id.to_le_bytes());
+                data.extend_from_slice(checksum);
+                (*sync_epoch, data)
+            }
+        }
+    }
+
+    /// Deserialize message from C-compatible wire format
+    ///
+    /// Normal messages: [base header: 16 bytes][count: u64][app data]
+    /// State messages:  [base header: 16 bytes][epoch: 16 bytes][payload]
+    ///
+    /// # Arguments
+    /// * `data` - Raw message bytes from CPG
+    pub fn deserialize(data: &[u8]) -> Result<Self> {
+        if data.len() < 16 {
+            anyhow::bail!(
+                "Message too short: {} bytes (need at least 16 for header)",
+                data.len()
+            );
+        }
+
+        // Parse dfsm_message_header_t (16 bytes)
+        let msg_type = u16::from_le_bytes([data[0], data[1]]);
+        let subtype = u16::from_le_bytes([data[2], data[3]]);
+        let protocol_version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
+        let timestamp = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
+        let _reserved = u32::from_le_bytes([data[12], data[13], data[14], data[15]]);
+
+        let dfsm_type = DfsmMessageType::try_from(msg_type)?;
+
+        // Normal messages have different structure than state messages
+        if dfsm_type == DfsmMessageType::Normal {
+            // Normal: [base: 16][count: 8][app_data: ...]
+            let payload = &data[16..];
+            Self::deserialize_normal_message(subtype, protocol_version, timestamp, payload)
+        } else {
+            // State messages: [base: 16][epoch: 16][payload: ...]
+            if data.len() < 32 {
+                anyhow::bail!(
+                    "State message too short: {} bytes (need at least 32 for state header)",
+                    data.len()
+                );
+            }
+            let sync_epoch = SyncEpoch::deserialize(&data[16..32])
+                .map_err(|e| anyhow::anyhow!("Failed to deserialize sync epoch: {e}"))?;
+            let payload = &data[32..];
+            Self::deserialize_state_message(dfsm_type, sync_epoch, payload)
+        }
+    }
+
+    /// Deserialize a Normal message
+    fn deserialize_normal_message(
+        subtype: u16,
+        protocol_version: u32,
+        timestamp: u32,
+        payload: &[u8],
+    ) -> Result<Self> {
+        // Normal messages have count field (u64) after base header
+        if payload.len() < 8 {
+            anyhow::bail!("Normal message too short: need count field");
+        }
+        let msg_count = u64::from_le_bytes(
+            payload[0..8]
+                .try_into()
+                .map_err(|_| anyhow::anyhow!("Failed to extract msg_count bytes"))?,
+        );
+        let app_data = &payload[8..];
+
+        // Deserialize using the Message trait
+        let message = M::deserialize(subtype, app_data)?;
+
+        Ok(DfsmMessage::Normal {
+            msg_count,
+            timestamp,
+            protocol_version,
+            message,
+        })
+    }
+
+    /// Deserialize a state message (with epoch)
+    fn deserialize_state_message(
+        dfsm_type: DfsmMessageType,
+        sync_epoch: SyncEpoch,
+        payload: &[u8],
+    ) -> Result<Self> {
+        match dfsm_type {
+            DfsmMessageType::Normal => {
+                unreachable!("Normal messages use deserialize_normal_message")
+            }
+            DfsmMessageType::Update => {
+                let tree_entry = TreeEntry::deserialize_from_update(payload)?;
+                Ok(DfsmMessage::Update {
+                    sync_epoch,
+                    tree_entry,
+                })
+            }
+            DfsmMessageType::SyncStart => Ok(DfsmMessage::SyncStart { sync_epoch }),
+            DfsmMessageType::State => Ok(DfsmMessage::State {
+                sync_epoch,
+                data: payload.to_vec(),
+            }),
+            DfsmMessageType::UpdateComplete => Ok(DfsmMessage::UpdateComplete { sync_epoch }),
+            DfsmMessageType::VerifyRequest => {
+                if payload.len() < 8 {
+                    anyhow::bail!("VerifyRequest message too short");
+                }
+                let csum_id = u64::from_le_bytes(
+                    payload[0..8]
+                        .try_into()
+                        .map_err(|_| anyhow::anyhow!("Failed to extract csum_id bytes"))?,
+                );
+                Ok(DfsmMessage::VerifyRequest {
+                    sync_epoch,
+                    csum_id,
+                })
+            }
+            DfsmMessageType::Verify => {
+                if payload.len() < 40 {
+                    anyhow::bail!("Verify message too short");
+                }
+                let csum_id = u64::from_le_bytes(
+                    payload[0..8]
+                        .try_into()
+                        .map_err(|_| anyhow::anyhow!("Failed to extract csum_id bytes"))?,
+                );
+                let mut checksum = [0u8; 32];
+                checksum.copy_from_slice(&payload[8..40]);
+                Ok(DfsmMessage::Verify {
+                    sync_epoch,
+                    csum_id,
+                    checksum,
+                })
+            }
+        }
+    }
+
+    /// Helper to create a Normal message from an application message
+    pub fn from_message(msg_count: u64, message: M, protocol_version: u32) -> Self {
+        DfsmMessage::Normal {
+            msg_count,
+            timestamp: pmxcfs_api_types::unix_now_secs(),
+            protocol_version,
+            message,
+        }
+    }
+
+    /// Helper to create an Update message from a TreeEntry
+    ///
+    /// Used by the leader during synchronization to send individual database entries
+    /// to nodes that need to catch up. Matches C's dcdb_send_update_inode().
+    pub fn from_tree_entry(tree_entry: TreeEntry, sync_epoch: SyncEpoch) -> Self {
+        DfsmMessage::Update {
+            sync_epoch,
+            tree_entry,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::FuseMessage;
+
+    #[test]
+    fn test_sync_start_roundtrip() {
+        let sync_epoch = SyncEpoch {
+            epoch: 1,
+            time: 1234567890,
+            nodeid: 1,
+            pid: 1000,
+        };
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        assert!(
+            matches!(deserialized, DfsmMessage::SyncStart { sync_epoch: e } if e == sync_epoch)
+        );
+    }
+
+    #[test]
+    fn test_normal_roundtrip() {
+        let fuse_msg = FuseMessage::Create {
+            path: "/test/file".to_string(),
+        };
+
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::Normal {
+            msg_count: 42,
+            timestamp: 1234567890,
+            protocol_version: DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION,
+            message: fuse_msg.clone(),
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::Normal {
+                msg_count,
+                timestamp,
+                protocol_version,
+                message,
+            } => {
+                assert_eq!(msg_count, 42);
+                assert_eq!(timestamp, 1234567890);
+                assert_eq!(
+                    protocol_version,
+                    DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION
+                );
+                assert_eq!(message, fuse_msg);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_verify_request_roundtrip() {
+        let sync_epoch = SyncEpoch {
+            epoch: 2,
+            time: 1234567891,
+            nodeid: 2,
+            pid: 2000,
+        };
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+            sync_epoch,
+            csum_id: 0x123456789ABCDEF0,
+        };
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::VerifyRequest {
+                sync_epoch: e,
+                csum_id,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(csum_id, 0x123456789ABCDEF0);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_verify_roundtrip() {
+        let sync_epoch = SyncEpoch {
+            epoch: 3,
+            time: 1234567892,
+            nodeid: 3,
+            pid: 3000,
+        };
+        let checksum = [42u8; 32];
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::Verify {
+            sync_epoch,
+            csum_id: 0x1122334455667788,
+            checksum,
+        };
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::Verify {
+                sync_epoch: e,
+                csum_id,
+                checksum: recv_checksum,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(csum_id, 0x1122334455667788);
+                assert_eq!(recv_checksum, checksum);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_invalid_magic() {
+        let data = vec![0xAA, 0x00, 0x01, 0x02];
+        assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+    }
+
+    #[test]
+    fn test_too_short() {
+        let data = vec![0xFF];
+        assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+    }
+
+    // ===== Edge Case Tests =====
+
+    #[test]
+    fn test_state_message_too_short() {
+        // State messages need at least 32 bytes (16 base + 16 epoch)
+        let mut data = vec![0u8; 31]; // One byte short
+        // Set message type to State (2)
+        data[0..2].copy_from_slice(&2u16.to_le_bytes());
+
+        let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+        assert!(result.is_err(), "State message with 31 bytes should fail");
+        assert!(result.unwrap_err().to_string().contains("too short"));
+    }
+
+    #[test]
+    fn test_normal_message_missing_count() {
+        // Normal messages need count field (u64) after 16-byte header
+        let mut data = vec![0u8; 20]; // Header + 4 bytes (not enough for u64 count)
+        // Set message type to Normal (0)
+        data[0..2].copy_from_slice(&0u16.to_le_bytes());
+
+        let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+        assert!(
+            result.is_err(),
+            "Normal message without full count field should fail"
+        );
+    }
+
+    #[test]
+    fn test_verify_message_truncated_checksum() {
+        // Verify messages need csum_id (8 bytes) + checksum (32 bytes) = 40 bytes payload
+        let sync_epoch = SyncEpoch {
+            epoch: 1,
+            time: 123,
+            nodeid: 1,
+            pid: 100,
+        };
+        let mut data = Vec::new();
+
+        // Base header (16 bytes)
+        data.extend_from_slice(&6u16.to_le_bytes()); // Verify message type
+        data.extend_from_slice(&0u16.to_le_bytes()); // subtype
+        data.extend_from_slice(&1u32.to_le_bytes()); // protocol
+        data.extend_from_slice(&123u32.to_le_bytes()); // time
+        data.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+        // Epoch (16 bytes)
+        data.extend_from_slice(&sync_epoch.serialize());
+
+        // Truncated payload (only 39 bytes instead of 40)
+        data.extend_from_slice(&0x12345678u64.to_le_bytes());
+        data.extend_from_slice(&[0u8; 31]); // Only 31 bytes of checksum
+
+        let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+        assert!(
+            result.is_err(),
+            "Verify message with truncated checksum should fail"
+        );
+    }
+
+    #[test]
+    fn test_update_message_with_tree_entry() {
+        use pmxcfs_memdb::TreeEntry;
+
+        // Create a valid tree entry with matching size
+        let data = vec![1, 2, 3, 4, 5];
+        let tree_entry = TreeEntry {
+            inode: 42,
+            parent: 0,
+            version: 1,
+            writer: 0,
+            name: "testfile".to_string(),
+            mtime: 1234567890,
+            size: data.len(), // size must match data.len()
+            entry_type: 8,    // DT_REG (regular file)
+            data,
+        };
+
+        let sync_epoch = SyncEpoch {
+            epoch: 5,
+            time: 999,
+            nodeid: 2,
+            pid: 200,
+        };
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::Update {
+            sync_epoch,
+            tree_entry: tree_entry.clone(),
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::Update {
+                sync_epoch: e,
+                tree_entry: recv_entry,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(recv_entry.inode, tree_entry.inode);
+                assert_eq!(recv_entry.name, tree_entry.name);
+                assert_eq!(recv_entry.size, tree_entry.size);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_update_complete_roundtrip() {
+        let sync_epoch = SyncEpoch {
+            epoch: 10,
+            time: 5555,
+            nodeid: 3,
+            pid: 300,
+        };
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+
+        let serialized = msg.serialize();
+        assert_eq!(
+            serialized.len(),
+            32,
+            "UpdateComplete should be exactly 32 bytes (header + epoch)"
+        );
+
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        assert!(
+            matches!(deserialized, DfsmMessage::UpdateComplete { sync_epoch: e } if e == sync_epoch)
+        );
+    }
+
+    #[test]
+    fn test_state_message_with_large_payload() {
+        let sync_epoch = SyncEpoch {
+            epoch: 7,
+            time: 7777,
+            nodeid: 4,
+            pid: 400,
+        };
+        // Create a large payload (1MB)
+        let large_data = vec![0xAB; 1024 * 1024];
+
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::State {
+            sync_epoch,
+            data: large_data.clone(),
+        };
+
+        let serialized = msg.serialize();
+        // Should be 32 bytes header + 1MB data
+        assert_eq!(serialized.len(), 32 + 1024 * 1024);
+
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::State {
+                sync_epoch: e,
+                data,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(data.len(), large_data.len());
+                assert_eq!(data, large_data);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_message_type_detection() {
+        let sync_epoch = SyncEpoch {
+            epoch: 1,
+            time: 100,
+            nodeid: 1,
+            pid: 50,
+        };
+
+        let sync_start: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+        assert_eq!(sync_start.message_type(), DfsmMessageType::SyncStart);
+
+        let state: DfsmMessage<FuseMessage> = DfsmMessage::State {
+            sync_epoch,
+            data: vec![1, 2, 3],
+        };
+        assert_eq!(state.message_type(), DfsmMessageType::State);
+
+        let update_complete: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+        assert_eq!(
+            update_complete.message_type(),
+            DfsmMessageType::UpdateComplete
+        );
+    }
+
+    #[test]
+    fn test_from_message_helper() {
+        let fuse_msg = FuseMessage::Mkdir {
+            path: "/new/dir".to_string(),
+        };
+        let msg_count = 123;
+        let protocol_version = DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION;
+
+        let dfsm_msg = DfsmMessage::from_message(msg_count, fuse_msg.clone(), protocol_version);
+
+        match dfsm_msg {
+            DfsmMessage::Normal {
+                msg_count: count,
+                timestamp: _,
+                protocol_version: pv,
+                message,
+            } => {
+                assert_eq!(count, msg_count);
+                assert_eq!(pv, protocol_version);
+                assert_eq!(message, fuse_msg);
+            }
+            _ => panic!("from_message should create Normal variant"),
+        }
+    }
+
+    #[test]
+    fn test_verify_request_with_max_csum_id() {
+        let sync_epoch = SyncEpoch {
+            epoch: 99,
+            time: 9999,
+            nodeid: 5,
+            pid: 500,
+        };
+        let max_csum_id = u64::MAX; // Test with maximum value
+
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+            sync_epoch,
+            csum_id: max_csum_id,
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::VerifyRequest {
+                sync_epoch: e,
+                csum_id,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(csum_id, max_csum_id);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
new file mode 100644
index 000000000..73c954108
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
@@ -0,0 +1,233 @@
+/// FUSE message types for cluster synchronization
+///
+/// These are the high-level operations that get broadcast through the cluster
+/// via the main database DFSM (pmxcfs_v1 CPG group).
+use anyhow::{Context, Result};
+use std::ffi::CStr;
+
+use crate::message::Message;
+use crate::wire_format::{CFuseMessage, CMessageType};
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum FuseMessage {
+    /// Create a regular file
+    Create { path: String },
+    /// Create a directory
+    Mkdir { path: String },
+    /// Write data to a file
+    Write {
+        path: String,
+        offset: u64,
+        truncate: bool,
+        data: Vec<u8>,
+    },
+    /// Delete a file or directory
+    Delete { path: String },
+    /// Rename/move a file or directory
+    Rename { from: String, to: String },
+    /// Update modification time
+    ///
+    /// Note: mtime is sent via offset field in CFuseMessage (C: dcdb.c:900)
+    ///
+    /// WARNING: mtime is limited to u32 (matching C implementation).
+    /// This will overflow in 2038 (Year 2038 problem).
+    /// Consider migrating to u64 timestamps in a future protocol version.
+    Mtime { path: String, mtime: u32 },
+    /// Request lock expiry processing using the current lock checksum.
+    UnlockRequest { path: String, checksum: [u8; 32] },
+    /// Delete an expired lock if its checksum still matches.
+    Unlock { path: String, checksum: [u8; 32] },
+}
+
+impl Message for FuseMessage {
+    fn message_type(&self) -> u16 {
+        match self {
+            FuseMessage::Create { .. } => CMessageType::Create as u16,
+            FuseMessage::Mkdir { .. } => CMessageType::Mkdir as u16,
+            FuseMessage::Write { .. } => CMessageType::Write as u16,
+            FuseMessage::Delete { .. } => CMessageType::Delete as u16,
+            FuseMessage::Rename { .. } => CMessageType::Rename as u16,
+            FuseMessage::Mtime { .. } => CMessageType::Mtime as u16,
+            FuseMessage::UnlockRequest { .. } => CMessageType::UnlockRequest as u16,
+            FuseMessage::Unlock { .. } => CMessageType::Unlock as u16,
+        }
+    }
+
+    fn serialize(&self) -> Vec<u8> {
+        match self {
+            FuseMessage::UnlockRequest { path, checksum }
+            | FuseMessage::Unlock { path, checksum } => {
+                let mut payload = Vec::with_capacity(32 + path.len() + 1);
+                payload.extend_from_slice(checksum);
+                payload.extend_from_slice(path.as_bytes());
+                payload.push(0);
+                payload
+            }
+            FuseMessage::Create { path }
+            | FuseMessage::Mkdir { path }
+            | FuseMessage::Delete { path } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: Vec::new(),
+            }
+            .serialize(),
+            FuseMessage::Write {
+                path,
+                offset,
+                truncate,
+                data,
+            } => CFuseMessage {
+                size: data.len() as u32,
+                offset: *offset as u32,
+                flags: u32::from(*truncate),
+                path: path.clone(),
+                to: None,
+                data: data.clone(),
+            }
+            .serialize(),
+            FuseMessage::Rename { from, to } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: from.clone(),
+                to: Some(to.clone()),
+                data: Vec::new(),
+            }
+            .serialize(),
+            FuseMessage::Mtime { path, mtime } => CFuseMessage {
+                size: 0,
+                offset: *mtime, // mtime is sent via offset field (C: dcdb.c:900)
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: Vec::new(),
+            }
+            .serialize(),
+        }
+    }
+
+    fn deserialize(message_type: u16, data: &[u8]) -> Result<Self> {
+        let msg_type = CMessageType::try_from(message_type).context("Invalid C message type")?;
+
+        if matches!(msg_type, CMessageType::UnlockRequest | CMessageType::Unlock) {
+            if data.len() < 33 {
+                anyhow::bail!("Unlock payload too short: {} < 33", data.len());
+            }
+
+            let mut checksum = [0u8; 32];
+            checksum.copy_from_slice(&data[..32]);
+
+            let path_bytes = &data[32..];
+            let path = CStr::from_bytes_with_nul(path_bytes)
+                .context("Invalid unlock path string")?
+                .to_str()
+                .context("Unlock path not valid UTF-8")?
+                .to_string();
+
+            return Ok(match msg_type {
+                CMessageType::UnlockRequest => FuseMessage::UnlockRequest { path, checksum },
+                CMessageType::Unlock => FuseMessage::Unlock { path, checksum },
+                _ => unreachable!(),
+            });
+        }
+
+        let c_msg = CFuseMessage::parse(data).context("Failed to parse C FUSE message")?;
+
+        Ok(match msg_type {
+            CMessageType::Create => FuseMessage::Create { path: c_msg.path },
+            CMessageType::Mkdir => FuseMessage::Mkdir { path: c_msg.path },
+            CMessageType::Write => FuseMessage::Write {
+                path: c_msg.path,
+                offset: c_msg.offset as u64,
+                truncate: c_msg.flags != 0,
+                data: c_msg.data,
+            },
+            CMessageType::Delete => FuseMessage::Delete { path: c_msg.path },
+            CMessageType::Rename => FuseMessage::Rename {
+                from: c_msg.path,
+                to: c_msg.to.unwrap_or_default(),
+            },
+            CMessageType::Mtime => FuseMessage::Mtime {
+                path: c_msg.path,
+                mtime: c_msg.offset, // mtime is sent via offset field (C: dcdb.c:900)
+            },
+            CMessageType::UnlockRequest | CMessageType::Unlock => unreachable!(),
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_fuse_message_create() {
+        let msg = FuseMessage::Create {
+            path: "/test/file".to_string(),
+        };
+        assert_eq!(msg.message_type(), CMessageType::Create as u16);
+
+        let serialized = msg.serialize();
+        let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_fuse_message_write() {
+        let msg = FuseMessage::Write {
+            path: "/test/file".to_string(),
+            offset: 100,
+            truncate: false,
+            data: vec![1, 2, 3, 4, 5],
+        };
+        assert_eq!(msg.message_type(), CMessageType::Write as u16);
+
+        let serialized = msg.serialize();
+        let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_fuse_message_truncate_roundtrip() {
+        let msg = FuseMessage::Write {
+            path: "/test/file".to_string(),
+            offset: 7,
+            truncate: true,
+            data: Vec::new(),
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_fuse_message_rename() {
+        let msg = FuseMessage::Rename {
+            from: "/old/path".to_string(),
+            to: "/new/path".to_string(),
+        };
+        assert_eq!(msg.message_type(), CMessageType::Rename as u16);
+
+        let serialized = msg.serialize();
+        let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_fuse_message_unlock_roundtrip() {
+        let msg = FuseMessage::UnlockRequest {
+            path: "/priv/lock/test-lock".to_string(),
+            checksum: [0x42; 32],
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+        assert_eq!(&serialized[..32], &[0x42; 32]);
+        assert_eq!(serialized.last(), Some(&0));
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
new file mode 100644
index 000000000..d07335074
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
@@ -0,0 +1,560 @@
+/// KvStore message types for DFSM status synchronization
+///
+/// This module defines the KvStore message types that are delivered through
+/// the status DFSM state machine (pve_kvstore_v1 CPG group).
+use anyhow::Context;
+use pmxcfs_logger::{CLOG_ENTRY_HEADER_SIZE, fnv_64a_str, next_uid};
+
+use crate::message::Message;
+
+/// Key size in kvstore update messages (matches C's hardcoded 256 byte buffer)
+const KVSTORE_KEY_SIZE: usize = 256;
+
+/// C's sizeof(clog_entry_t) = 48 (struct alignment is 8 due to uint64_t fields,
+/// so 4 bytes of trailing padding are added to make the struct size a multiple of 8).
+///
+/// C's clog_entry_size() = sizeof(clog_entry_t) + node_len + ident_len + tag_len + msg_len
+///                        = 48 + strings_len
+///
+/// Wire format: [44-byte header][strings_len bytes of strings][4 bytes trailing padding]
+/// Total = 48 + strings_len bytes
+const CLOG_ENTRY_SIZEOF: usize = 48;
+
+/// Byte offsets of fields within the clog_entry_t header
+mod header_offset {
+    pub const PREV: usize = 0;
+    pub const NEXT: usize = 4;
+    pub const UID: usize = 8;
+    pub const TIME: usize = 12;
+    pub const NODE_DIGEST: usize = 16;
+    pub const IDENT_DIGEST: usize = 24;
+    pub const PID: usize = 32;
+    pub const PRIORITY: usize = 36;
+    pub const NODE_LEN: usize = 37;
+    pub const IDENT_LEN: usize = 38;
+    pub const TAG_LEN: usize = 39;
+    pub const MSG_LEN: usize = 40;
+}
+
+/// Serialize a clog_entry_t header into a 44-byte buffer, matching C's wire format exactly.
+///
+/// All multi-byte integers are little-endian (x86 native, matching C).
+#[allow(clippy::too_many_arguments)]
+fn serialize_clog_header(
+    prev: u32,
+    next: u32,
+    uid: u32,
+    time: u32,
+    node_digest: u64,
+    ident_digest: u64,
+    pid: u32,
+    priority: u8,
+    node_len: u8,
+    ident_len: u8,
+    tag_len: u8,
+    msg_len: u32,
+) -> [u8; CLOG_ENTRY_HEADER_SIZE] {
+    let mut buf = [0u8; CLOG_ENTRY_HEADER_SIZE];
+    buf[header_offset::PREV..header_offset::PREV + 4].copy_from_slice(&prev.to_le_bytes());
+    buf[header_offset::NEXT..header_offset::NEXT + 4].copy_from_slice(&next.to_le_bytes());
+    buf[header_offset::UID..header_offset::UID + 4].copy_from_slice(&uid.to_le_bytes());
+    buf[header_offset::TIME..header_offset::TIME + 4].copy_from_slice(&time.to_le_bytes());
+    buf[header_offset::NODE_DIGEST..header_offset::NODE_DIGEST + 8]
+        .copy_from_slice(&node_digest.to_le_bytes());
+    buf[header_offset::IDENT_DIGEST..header_offset::IDENT_DIGEST + 8]
+        .copy_from_slice(&ident_digest.to_le_bytes());
+    buf[header_offset::PID..header_offset::PID + 4].copy_from_slice(&pid.to_le_bytes());
+    buf[header_offset::PRIORITY] = priority;
+    buf[header_offset::NODE_LEN] = node_len;
+    buf[header_offset::IDENT_LEN] = ident_len;
+    buf[header_offset::TAG_LEN] = tag_len;
+    buf[header_offset::MSG_LEN..header_offset::MSG_LEN + 4].copy_from_slice(&msg_len.to_le_bytes());
+    buf
+}
+
+struct ParsedClogHeader {
+    time: u32,
+    priority: u8,
+    node_len: usize,
+    ident_len: usize,
+    tag_len: usize,
+    msg_len: usize,
+}
+
+/// Parse a clog_entry_t header from the start of a byte slice.
+fn parse_clog_header(data: &[u8]) -> anyhow::Result<ParsedClogHeader> {
+    if data.len() < CLOG_ENTRY_HEADER_SIZE {
+        anyhow::bail!(
+            "LOG message too short: {} < {}",
+            data.len(),
+            CLOG_ENTRY_HEADER_SIZE
+        );
+    }
+
+    let time = u32::from_le_bytes(data[header_offset::TIME..header_offset::TIME + 4].try_into()?);
+    let priority = data[header_offset::PRIORITY];
+    let node_len = data[header_offset::NODE_LEN] as usize;
+    let ident_len = data[header_offset::IDENT_LEN] as usize;
+    let tag_len = data[header_offset::TAG_LEN] as usize;
+    let msg_len =
+        u32::from_le_bytes(data[header_offset::MSG_LEN..header_offset::MSG_LEN + 4].try_into()?)
+            as usize;
+
+    Ok(ParsedClogHeader {
+        time,
+        priority,
+        node_len,
+        ident_len,
+        tag_len,
+        msg_len,
+    })
+}
+
+/// KvStore message type IDs (matches C's kvstore_message_t enum)
+#[derive(
+    Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive,
+)]
+#[repr(u16)]
+enum KvStoreMessageType {
+    Update = 1,         // KVSTORE_MESSAGE_UPDATE
+    UpdateComplete = 2, // KVSTORE_MESSAGE_UPDATE_COMPLETE
+    Log = 3,            // KVSTORE_MESSAGE_LOG
+}
+
+/// KvStore message types for ephemeral status synchronization
+///
+/// These messages are used by the kvstore DFSM (pve_kvstore_v1 CPG group)
+/// to synchronize ephemeral data like RRD metrics, node IPs, and cluster logs.
+///
+/// Matches C implementation's KVSTORE_MESSAGE_* types in status.c
+#[derive(Debug, Clone, PartialEq)]
+pub enum KvStoreMessage {
+    /// Update key-value data from a node
+    ///
+    /// Wire format: key (256 bytes, null-terminated) + value (variable length)
+    /// Matches C's KVSTORE_MESSAGE_UPDATE
+    Update { key: String, value: Vec<u8> },
+
+    /// Cluster log entry
+    ///
+    /// Wire format: clog_entry_t struct (44-byte header + null-terminated strings)
+    /// Matches C's KVSTORE_MESSAGE_LOG
+    Log {
+        time: u32,
+        priority: u8,
+        node: String,
+        ident: String,
+        tag: String,
+        message: String,
+    },
+
+    /// Update complete signal (not currently used)
+    ///
+    /// Matches C's KVSTORE_MESSAGE_UPDATE_COMPLETE
+    UpdateComplete,
+}
+
+impl KvStoreMessage {
+    /// Serialize to C-compatible wire format
+    ///
+    /// Update format: key (256 bytes, null-terminated) + value (variable)
+    /// Log format: clog_entry_t struct (44-byte header + null-terminated strings)
+    fn serialize_to_wire(&self) -> Vec<u8> {
+        match self {
+            KvStoreMessage::Update { key, value } => {
+                // C format: char key[KVSTORE_KEY_SIZE] + data
+                let mut buf = vec![0u8; KVSTORE_KEY_SIZE];
+                let key_bytes = key.as_bytes();
+                let copy_len = key_bytes.len().min(KVSTORE_KEY_SIZE - 1); // Leave room for null terminator
+                buf[..copy_len].copy_from_slice(&key_bytes[..copy_len]);
+                // buf is already zero-filled, so null terminator is automatic
+
+                buf.extend_from_slice(value);
+                buf
+            }
+            KvStoreMessage::Log {
+                time,
+                priority,
+                node,
+                ident,
+                tag,
+                message,
+            } => {
+                let node_bytes = node.as_bytes();
+                let ident_bytes = ident.as_bytes();
+                let tag_bytes = tag.as_bytes();
+                let msg_bytes = message.as_bytes();
+
+                // Cap u8 lengths at 255 to match C's MIN(strlen+1, 255)
+                let node_len = (node_bytes.len() + 1).min(255) as u8;
+                let ident_len = (ident_bytes.len() + 1).min(255) as u8;
+                let tag_len = (tag_bytes.len() + 1).min(255) as u8;
+                let msg_len = (msg_bytes.len() + 1) as u32;
+
+                let node_digest = fnv_64a_str(node);
+                let ident_digest = fnv_64a_str(ident);
+                let pid = std::process::id();
+
+                // Total wire size = sizeof(clog_entry_t) + strings = CLOG_ENTRY_SIZEOF + strings
+                // (CLOG_ENTRY_SIZEOF = 48 = 44-byte fields + 4 bytes C struct trailing padding)
+                let total_len = CLOG_ENTRY_SIZEOF
+                    + node_len as usize
+                    + ident_len as usize
+                    + tag_len as usize
+                    + msg_len as usize;
+                let mut buf = Vec::with_capacity(total_len);
+
+                // Assign a monotonically increasing uid (matching C's ++uid_counter).
+                // The C dedup logic rejects an entry if uid <= last_uid from the same node
+                // (when time is equal), so each message must have a unique, increasing uid.
+                // Drawing from the shared pmxcfs_logger counter ensures no (time, nodeid, uid)
+                // collisions between the FUSE-path logger and this kvstore path.
+                let uid = next_uid();
+
+                // Serialize header matching C's clog_entry_t wire layout.
+                // String data (entry->data) starts at offset 44 = CLOG_ENTRY_HEADER_SIZE.
+                let header = serialize_clog_header(
+                    0, // prev (not used for wire transmission)
+                    0, // next (not used for wire transmission)
+                    uid,
+                    *time,
+                    node_digest,
+                    ident_digest,
+                    pid,
+                    *priority,
+                    node_len,
+                    ident_len,
+                    tag_len,
+                    msg_len,
+                );
+                buf.extend_from_slice(&header);
+
+                // Append string data (all null-terminated).
+                // Truncate to (len - 1) bytes before the NUL so the body size
+                // exactly matches the header length fields (which are capped).
+                buf.extend_from_slice(&node_bytes[..node_len as usize - 1]);
+                buf.push(0u8); // NUL terminator
+                buf.extend_from_slice(&ident_bytes[..ident_len as usize - 1]);
+                buf.push(0u8);
+                buf.extend_from_slice(&tag_bytes[..tag_len as usize - 1]);
+                buf.push(0u8);
+                // msg uses u32 so truncation is unrealistic in practice, but
+                // keep it consistent: write exactly msg_len bytes (str + NUL).
+                buf.extend_from_slice(&msg_bytes[..msg_len as usize - 1]);
+                buf.push(0u8);
+
+                // 4 bytes of trailing padding: C's clog_entry_size() = sizeof(clog_entry_t) +
+                // strings_len = 48 + strings_len, but data starts at offset 44.
+                // The 4 extra bytes (sizeof - offsetof(data) = 4) come at the END of the wire
+                // message. C's kvstore_parse_log_message checks: msg_len == sizeof + strings.
+                buf.extend_from_slice(&[0u8; 4]);
+
+                buf
+            }
+            KvStoreMessage::UpdateComplete => {
+                // No payload
+                Vec::new()
+            }
+        }
+    }
+
+    /// Deserialize from C-compatible wire format
+    fn deserialize_from_wire(msg_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+        use KvStoreMessageType::*;
+
+        let msg_type = KvStoreMessageType::try_from(msg_type)
+            .map_err(|_| anyhow::anyhow!("Unknown kvstore message type: {msg_type}"))?;
+
+        match msg_type {
+            Update => {
+                if data.len() < KVSTORE_KEY_SIZE {
+                    anyhow::bail!(
+                        "UPDATE message too short: {} < {}",
+                        data.len(),
+                        KVSTORE_KEY_SIZE
+                    );
+                }
+
+                // Find null terminator in first KVSTORE_KEY_SIZE bytes
+                let key_end = data[..KVSTORE_KEY_SIZE]
+                    .iter()
+                    .position(|&b| b == 0)
+                    .ok_or_else(|| anyhow::anyhow!("UPDATE key not null-terminated"))?;
+
+                let key = std::str::from_utf8(&data[..key_end])
+                    .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in UPDATE key: {e}"))?
+                    .to_string();
+
+                let value = data[KVSTORE_KEY_SIZE..].to_vec();
+
+                Ok(KvStoreMessage::Update { key, value })
+            }
+            UpdateComplete => Ok(KvStoreMessage::UpdateComplete),
+            Log => {
+                // Parse the 44-byte clog_entry_t header
+                let h = parse_clog_header(data)?;
+
+                // Validate individual field lengths are non-zero (strings must have at least null terminator)
+                if h.node_len == 0 || h.ident_len == 0 || h.tag_len == 0 || h.msg_len == 0 {
+                    anyhow::bail!("LOG message contains zero-length field");
+                }
+
+                // Check for integer overflow in total size calculation.
+                // Wire size = sizeof(clog_entry_t) + strings_len = CLOG_ENTRY_SIZEOF + strings.
+                // (C's kvstore_parse_log_message checks: msg_len == sizeof + strings_len)
+                let expected_len = CLOG_ENTRY_SIZEOF
+                    .checked_add(h.node_len)
+                    .and_then(|s| s.checked_add(h.ident_len))
+                    .and_then(|s| s.checked_add(h.tag_len))
+                    .and_then(|s| s.checked_add(h.msg_len))
+                    .ok_or_else(|| anyhow::anyhow!("LOG message size overflow"))?;
+
+                if data.len() != expected_len {
+                    anyhow::bail!(
+                        "LOG message size mismatch: {} != {}",
+                        data.len(),
+                        expected_len
+                    );
+                }
+
+                let mut offset = CLOG_ENTRY_HEADER_SIZE;
+
+                // Safe string extraction with bounds checking
+                let extract_string =
+                    |offset: &mut usize, len: usize| -> Result<String, anyhow::Error> {
+                        let end = offset
+                            .checked_add(len)
+                            .ok_or_else(|| anyhow::anyhow!("String offset overflow"))?;
+
+                        if end > data.len() {
+                            return Err(anyhow::anyhow!("String exceeds buffer"));
+                        }
+
+                        // len includes null terminator, so read len-1 bytes
+                        let s = std::str::from_utf8(&data[*offset..end - 1])
+                            .map_err(|e| anyhow::anyhow!("Invalid UTF-8: {e}"))?
+                            .to_string();
+
+                        *offset = end;
+                        Ok(s)
+                    };
+
+                let node = extract_string(&mut offset, h.node_len)?;
+                let ident = extract_string(&mut offset, h.ident_len)?;
+                let tag = extract_string(&mut offset, h.tag_len)?;
+                let message = extract_string(&mut offset, h.msg_len)?;
+
+                Ok(KvStoreMessage::Log {
+                    time: h.time,
+                    priority: h.priority,
+                    node,
+                    ident,
+                    tag,
+                    message,
+                })
+            }
+        }
+    }
+}
+
+impl Message for KvStoreMessage {
+    fn message_type(&self) -> u16 {
+        let msg_type = match self {
+            KvStoreMessage::Update { .. } => KvStoreMessageType::Update,
+            KvStoreMessage::UpdateComplete => KvStoreMessageType::UpdateComplete,
+            KvStoreMessage::Log { .. } => KvStoreMessageType::Log,
+        };
+        msg_type.into()
+    }
+
+    fn serialize(&self) -> Vec<u8> {
+        self.serialize_to_wire()
+    }
+
+    fn deserialize(message_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+        KvStoreMessage::deserialize_from_wire(message_type, data)
+            .context("Failed to deserialize KvStoreMessage")
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_clog_entry_header_size() {
+        assert_eq!(
+            CLOG_ENTRY_HEADER_SIZE, 44,
+            "CLOG_ENTRY_HEADER_SIZE is 44 bytes (offsetof(clog_entry_t, data))"
+        );
+        assert_eq!(
+            CLOG_ENTRY_SIZEOF, 48,
+            "CLOG_ENTRY_SIZEOF must be 48 to match C's sizeof(clog_entry_t)"
+        );
+    }
+
+    #[test]
+    fn test_clog_entry_header_offsets() {
+        assert_eq!(header_offset::PREV, 0);
+        assert_eq!(header_offset::NEXT, 4);
+        assert_eq!(header_offset::UID, 8);
+        assert_eq!(header_offset::TIME, 12);
+        assert_eq!(header_offset::NODE_DIGEST, 16);
+        assert_eq!(header_offset::IDENT_DIGEST, 24);
+        assert_eq!(header_offset::PID, 32);
+        assert_eq!(header_offset::PRIORITY, 36);
+        assert_eq!(header_offset::NODE_LEN, 37);
+        assert_eq!(header_offset::IDENT_LEN, 38);
+        assert_eq!(header_offset::TAG_LEN, 39);
+        assert_eq!(header_offset::MSG_LEN, 40);
+        // Field data ends at byte 44 (= CLOG_ENTRY_HEADER_SIZE = offsetof(data))
+        assert_eq!(header_offset::MSG_LEN + 4, CLOG_ENTRY_HEADER_SIZE);
+    }
+
+    #[test]
+    fn test_kvstore_message_update_serialization() {
+        let msg = KvStoreMessage::Update {
+            key: "test_key".to_string(),
+            value: vec![1, 2, 3, 4, 5],
+        };
+
+        let serialized = msg.serialize();
+        assert_eq!(serialized.len(), KVSTORE_KEY_SIZE + 5);
+        assert_eq!(&serialized[..8], b"test_key");
+        assert_eq!(serialized[8], 0); // null terminator
+        assert_eq!(&serialized[KVSTORE_KEY_SIZE..], &[1, 2, 3, 4, 5]);
+
+        let deserialized = <KvStoreMessage as Message>::deserialize(1, &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_kvstore_message_log_serialization() {
+        let msg = KvStoreMessage::Log {
+            time: 1234567890,
+            priority: 5,
+            node: "node1".to_string(),
+            ident: "pmxcfs".to_string(),
+            tag: "info".to_string(),
+            message: "test message".to_string(),
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = <KvStoreMessage as Message>::deserialize(3, &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_kvstore_message_log_header_layout() {
+        let msg = KvStoreMessage::Log {
+            time: 0xDEADBEEF,
+            priority: 6,
+            node: "pve1".to_string(),
+            ident: "root".to_string(),
+            tag: "cluster".to_string(),
+            message: "hello".to_string(),
+        };
+
+        let serialized = msg.serialize();
+
+        // Header must be exactly 44 bytes before string data
+        assert!(
+            serialized.len() >= CLOG_ENTRY_HEADER_SIZE,
+            "serialized len {} < {}",
+            serialized.len(),
+            CLOG_ENTRY_HEADER_SIZE
+        );
+
+        // prev (offset 0) and next (offset 4) are 0
+        assert_eq!(
+            &serialized[header_offset::PREV..header_offset::PREV + 4],
+            &0u32.to_le_bytes(),
+            "prev should be 0"
+        );
+        assert_eq!(
+            &serialized[header_offset::NEXT..header_offset::NEXT + 4],
+            &0u32.to_le_bytes(),
+            "next should be 0"
+        );
+
+        // time at offset 12
+        assert_eq!(
+            &serialized[header_offset::TIME..header_offset::TIME + 4],
+            &0xDEADBEEFu32.to_le_bytes(),
+            "time mismatch"
+        );
+
+        // priority at offset 36
+        assert_eq!(serialized[header_offset::PRIORITY], 6, "priority mismatch");
+
+        // node_len at offset 37: "pve1" = 4 bytes + 1 null = 5
+        assert_eq!(serialized[header_offset::NODE_LEN], 5, "node_len mismatch");
+        // ident_len at offset 38: "root" = 4 + 1 = 5
+        assert_eq!(
+            serialized[header_offset::IDENT_LEN],
+            5,
+            "ident_len mismatch"
+        );
+        // tag_len at offset 39: "cluster" = 7 + 1 = 8
+        assert_eq!(serialized[header_offset::TAG_LEN], 8, "tag_len mismatch");
+
+        // msg_len at offset 40 (u32 LE): "hello" = 5 + 1 = 6
+        let msg_len = u32::from_le_bytes(
+            serialized[header_offset::MSG_LEN..header_offset::MSG_LEN + 4]
+                .try_into()
+                .unwrap(),
+        );
+        assert_eq!(msg_len, 6, "msg_len mismatch");
+    }
+
+    #[test]
+    fn test_kvstore_message_type() {
+        assert_eq!(
+            KvStoreMessage::Update {
+                key: "".into(),
+                value: vec![]
+            }
+            .message_type(),
+            1
+        );
+        assert_eq!(KvStoreMessage::UpdateComplete.message_type(), 2);
+        assert_eq!(
+            KvStoreMessage::Log {
+                time: 0,
+                priority: 0,
+                node: "".into(),
+                ident: "".into(),
+                tag: "".into(),
+                message: "".into()
+            }
+            .message_type(),
+            3
+        );
+    }
+
+    #[test]
+    fn test_kvstore_message_type_roundtrip() {
+        // Test that message_type() and deserialize() are consistent
+        use super::KvStoreMessageType;
+
+        assert_eq!(u16::from(KvStoreMessageType::Update), 1);
+        assert_eq!(u16::from(KvStoreMessageType::UpdateComplete), 2);
+        assert_eq!(u16::from(KvStoreMessageType::Log), 3);
+
+        assert_eq!(
+            KvStoreMessageType::try_from(1).unwrap(),
+            KvStoreMessageType::Update
+        );
+        assert_eq!(
+            KvStoreMessageType::try_from(2).unwrap(),
+            KvStoreMessageType::UpdateComplete
+        );
+        assert_eq!(
+            KvStoreMessageType::try_from(3).unwrap(),
+            KvStoreMessageType::Log
+        );
+
+        assert!(KvStoreMessageType::try_from(0).is_err());
+        assert!(KvStoreMessageType::try_from(4).is_err());
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
new file mode 100644
index 000000000..892404833
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
@@ -0,0 +1,32 @@
+/// Distributed Finite State Machine (DFSM) for cluster state synchronization
+///
+/// This crate implements the state machine for synchronizing configuration
+/// changes across the cluster nodes using Corosync CPG.
+///
+/// The DFSM handles:
+/// - State synchronization between nodes
+/// - Message ordering and queuing
+/// - Leader-based state updates
+/// - Split-brain prevention
+/// - Membership change handling
+mod callbacks;
+pub mod cluster_database_service;
+mod cpg_service;
+mod dfsm_message;
+mod fuse_message;
+mod kv_store_message;
+mod message;
+mod state_machine;
+pub mod status_sync_service;
+mod types;
+mod wire_format;
+
+// Re-export public API
+pub use callbacks::Callbacks;
+pub use cluster_database_service::ClusterDatabaseService;
+pub use cpg_service::{CpgHandler, CpgService};
+pub use fuse_message::FuseMessage;
+pub use kv_store_message::KvStoreMessage;
+pub use state_machine::{Dfsm, DfsmBroadcast};
+pub use status_sync_service::StatusSyncService;
+pub use types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
new file mode 100644
index 000000000..a2401d030
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
@@ -0,0 +1,21 @@
+/// High-level message abstraction for DFSM
+///
+/// This module provides a Message trait for working with cluster messages
+/// at a higher abstraction level than raw bytes.
+use anyhow::Result;
+
+/// Trait for messages that can be sent through DFSM
+pub trait Message: Clone + std::fmt::Debug + Send + Sync + Sized + 'static {
+    /// Get the message type identifier
+    fn message_type(&self) -> u16;
+
+    /// Serialize the message to bytes (application message payload only)
+    ///
+    /// This serializes only the application-level payload. The DFSM protocol
+    /// headers (msg_count, timestamp, protocol_version, etc.) are added by
+    /// DfsmMessage::serialize() when wrapping in DfsmMessage::Normal.
+    fn serialize(&self) -> Vec<u8>;
+
+    /// Deserialize from bytes given a message type
+    fn deserialize(message_type: u16, data: &[u8]) -> Result<Self>;
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
new file mode 100644
index 000000000..008d22e31
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
@@ -0,0 +1,1815 @@
+/// DFSM state machine implementation
+///
+/// This module contains the main Dfsm struct and its implementation
+/// for managing distributed state synchronization.
+use anyhow::{Context, Result};
+use parking_lot::{Mutex as ParkingMutex, RwLock};
+use pmxcfs_api_types::MemberInfo;
+use rust_corosync::{NodeId, cpg};
+use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tokio::sync::oneshot;
+
+use super::cpg_service::{CpgHandler, CpgService};
+use super::dfsm_message::DfsmMessage;
+use super::message::Message;
+use super::types::{DfsmMode, QueuedMessage, SyncEpoch};
+use crate::{Callbacks, NodeSyncInfo};
+
+/// Maximum queue length to prevent memory exhaustion
+/// C implementation uses unbounded GSequence/GList, but we add a limit for safety
+/// This value should be tuned based on production workload
+const MAX_QUEUE_LEN: usize = 500;
+
+/// Byte offset of the protocol_version field in a serialized DfsmMessage.
+/// Layout: [msg_type: u32][protocol_version: u32][...rest]
+const PROTOCOL_VERSION_OFFSET: usize = 4;
+
+/// Result of a synchronous message send
+/// Matches C's dfsm_result_t structure
+#[derive(Debug, Clone)]
+pub struct MessageResult {
+    /// Message count for tracking
+    pub msgcount: u64,
+    /// Result code from deliver callback (0 = success, negative = errno)
+    pub result: i32,
+    /// Whether the message was processed successfully
+    pub processed: bool,
+}
+
+/// Extension trait to add broadcast() method to Option<Arc<Dfsm<M>>>
+///
+/// This allows calling `.broadcast()` directly on Option<Arc<Dfsm<M>>> fields
+/// without explicit None checking at call sites.
+pub trait DfsmBroadcast<M: Message> {
+    fn broadcast(&self, msg: M);
+}
+
+impl<M: Message> DfsmBroadcast<M> for Option<Arc<Dfsm<M>>> {
+    fn broadcast(&self, msg: M) {
+        if let Some(dfsm) = self {
+            let _ = dfsm.broadcast(msg);
+        }
+    }
+}
+
+/// DFSM state machine
+///
+/// The generic parameter `M` specifies the message type this DFSM handles:
+/// - `Dfsm<FuseMessage>` for main database operations
+/// - `Dfsm<KvStoreMessage>` for status synchronization
+pub struct Dfsm<M> {
+    /// CPG service for cluster communication (matching C's dfsm_t->cpg_handle)
+    cpg_service: RwLock<Option<Arc<CpgService>>>,
+
+    /// Cluster group name for CPG
+    cluster_name: String,
+
+    /// Callbacks for application integration
+    callbacks: Arc<dyn Callbacks<Message = M>>,
+
+    /// Current operating mode
+    mode: RwLock<DfsmMode>,
+
+    /// Current sync epoch
+    sync_epoch: RwLock<SyncEpoch>,
+
+    /// Local epoch counter
+    local_epoch_counter: ParkingMutex<u32>,
+
+    /// Node synchronization info
+    sync_nodes: RwLock<Vec<NodeSyncInfo>>,
+
+    /// Message queue (ordered by count)
+    msg_queue: ParkingMutex<BTreeMap<u64, QueuedMessage<M>>>,
+
+    /// Sync queue for messages during update mode
+    sync_queue: ParkingMutex<VecDeque<QueuedMessage<M>>>,
+
+    /// Message counter for ordering (atomic for lock-free increment)
+    msg_counter: AtomicU64,
+
+    /// Lowest node ID in cluster (leader)
+    lowest_nodeid: RwLock<u32>,
+
+    /// Our node ID (set during init_cpg via cpg_local_get)
+    nodeid: AtomicU32,
+
+    /// Our process ID
+    pid: u32,
+
+    /// Protocol version for cluster compatibility
+    protocol_version: u32,
+
+    /// State verification - SHA-256 checksum
+    checksum: ParkingMutex<[u8; 32]>,
+
+    /// Checksum epoch (when it was computed)
+    checksum_epoch: ParkingMutex<SyncEpoch>,
+
+    /// Checksum ID for verification
+    checksum_id: ParkingMutex<u64>,
+
+    /// Checksum counter for verify requests
+    checksum_counter: ParkingMutex<u64>,
+
+    /// Message count received (for synchronous send tracking)
+    /// Matches C's dfsm->msgcount_rcvd
+    msgcount_rcvd: AtomicU64,
+
+    /// Pending message results for synchronous sends
+    /// Matches C's dfsm->results (GHashTable)
+    /// Maps msgcount -> oneshot sender for result delivery
+    /// Uses tokio oneshot channels - the idiomatic pattern for one-time async notifications
+    message_results: ParkingMutex<HashMap<u64, oneshot::Sender<MessageResult>>>,
+}
+
+impl<M: Message> Dfsm<M> {
+    /// Create a new DFSM instance
+    ///
+    /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+    pub fn new(cluster_name: String, callbacks: Arc<dyn Callbacks<Message = M>>) -> Result<Self> {
+        Self::new_with_protocol_version(
+            cluster_name,
+            callbacks,
+            DfsmMessage::<M>::DEFAULT_PROTOCOL_VERSION,
+        )
+    }
+
+    /// Create a new DFSM instance with a specific protocol version
+    ///
+    /// This is used when the DFSM needs to use a non-default protocol version,
+    /// such as the status/kvstore DFSM which uses protocol version 0 for
+    /// compatibility with the C implementation.
+    ///
+    /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+    pub fn new_with_protocol_version(
+        cluster_name: String,
+        callbacks: Arc<dyn Callbacks<Message = M>>,
+        protocol_version: u32,
+    ) -> Result<Self> {
+        let now = pmxcfs_api_types::unix_now_secs();
+        let pid = std::process::id();
+
+        Ok(Self {
+            cpg_service: RwLock::new(None),
+            cluster_name,
+            callbacks,
+            mode: RwLock::new(DfsmMode::Start),
+            sync_epoch: RwLock::new(SyncEpoch {
+                epoch: 0,
+                time: now,
+                nodeid: 0,
+                pid,
+            }),
+            local_epoch_counter: ParkingMutex::new(0),
+            sync_nodes: RwLock::new(Vec::new()),
+            msg_queue: ParkingMutex::new(BTreeMap::new()),
+            sync_queue: ParkingMutex::new(VecDeque::new()),
+            msg_counter: AtomicU64::new(0),
+            lowest_nodeid: RwLock::new(0),
+            nodeid: AtomicU32::new(0), // Will be set by init_cpg() using cpg_local_get()
+            pid,
+            protocol_version,
+            checksum: ParkingMutex::new([0u8; 32]),
+            checksum_epoch: ParkingMutex::new(SyncEpoch {
+                epoch: 0,
+                time: 0,
+                nodeid: 0,
+                pid: 0,
+            }),
+            checksum_id: ParkingMutex::new(0),
+            checksum_counter: ParkingMutex::new(0),
+            msgcount_rcvd: AtomicU64::new(0),
+            message_results: ParkingMutex::new(HashMap::new()),
+        })
+    }
+
+    pub fn get_mode(&self) -> DfsmMode {
+        *self.mode.read()
+    }
+
+    pub fn set_mode(&self, new_mode: DfsmMode) {
+        let mut mode = self.mode.write();
+        let old_mode = *mode;
+
+        // Match C's dfsm_set_mode logic (dfsm.c:450-456):
+        // Allow transition if:
+        // 1. new_mode < DFSM_ERROR_MODE_START (normal modes), OR
+        // 2. (old_mode < DFSM_ERROR_MODE_START OR new_mode >= old_mode)
+        //    - If already in error mode, only allow transitions to higher error codes
+        if old_mode != new_mode {
+            let allow_transition =
+                !new_mode.is_error() || (!old_mode.is_error() || new_mode >= old_mode);
+
+            if !allow_transition {
+                tracing::debug!(
+                    "DFSM: blocking transition from {:?} to {:?} (error mode can only go to higher codes)",
+                    old_mode,
+                    new_mode
+                );
+                return;
+            }
+        } else {
+            // No-op transition
+            return;
+        }
+
+        *mode = new_mode;
+        drop(mode);
+
+        if new_mode.is_error() {
+            tracing::error!("DFSM: {}", new_mode);
+        } else {
+            tracing::info!("DFSM: {}", new_mode);
+        }
+    }
+
+    pub fn is_leader(&self) -> bool {
+        let lowest = *self.lowest_nodeid.read();
+        lowest > 0 && lowest == self.nodeid.load(Ordering::Relaxed)
+    }
+
+    pub fn get_nodeid(&self) -> u32 {
+        self.nodeid.load(Ordering::Relaxed)
+    }
+
+    pub fn get_pid(&self) -> u32 {
+        self.pid
+    }
+
+    /// Check if DFSM is synced and ready
+    pub fn is_synced(&self) -> bool {
+        self.get_mode() == DfsmMode::Synced
+    }
+
+    /// Check if DFSM encountered an error
+    pub fn is_error(&self) -> bool {
+        self.get_mode().is_error()
+    }
+}
+
+impl<M: Message> Dfsm<M> {
+    fn release_sync_resources(&self) {
+        let old_sync_nodes = {
+            let mut sync_nodes = self.sync_nodes.write();
+            // Only clone (and call cleanup) when at least one node has state to release.
+            if !sync_nodes.iter().any(|node| node.state.is_some()) {
+                return;
+            }
+            let snapshot = sync_nodes.clone();
+            for node in sync_nodes.iter_mut() {
+                node.state = None;
+            }
+            snapshot
+        };
+
+        self.callbacks.cleanup_sync_resources(&old_sync_nodes);
+    }
+
+    fn send_sync_start(&self) -> Result<()> {
+        tracing::debug!("DFSM: sending SYNC_START message");
+        let sync_epoch = *self.sync_epoch.read();
+        self.send_dfsm_message(&DfsmMessage::<M>::SyncStart { sync_epoch })
+    }
+
+    fn send_state(&self) -> Result<()> {
+        tracing::debug!("DFSM: generating and sending state");
+
+        let state_data = self
+            .callbacks
+            .get_state()
+            .context("Failed to get state from callbacks")?;
+
+        tracing::info!("DFSM: sending state ({} bytes)", state_data.len());
+
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg: DfsmMessage<M> = DfsmMessage::State {
+            sync_epoch,
+            data: state_data,
+        };
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    pub(super) fn send_dfsm_message(&self, message: &DfsmMessage<M>) -> Result<()> {
+        let mut serialized = message.serialize();
+
+        // State messages (SyncStart, State, Update, etc.) use DEFAULT_PROTOCOL_VERSION=1
+        // as a constant, but this DFSM may be configured with a different protocol version
+        // (kvstore uses 0 to match C).  Patch bytes [4..8] which hold protocol_version
+        // in both the normal and state message headers.  Normal messages already carry the
+        // correct version in their DfsmMessage::Normal variant; state messages do not, so
+        // we overwrite here.
+        let is_normal = matches!(message, DfsmMessage::Normal { .. });
+        if !is_normal && serialized.len() >= PROTOCOL_VERSION_OFFSET + 4 {
+            serialized[PROTOCOL_VERSION_OFFSET..PROTOCOL_VERSION_OFFSET + 4]
+                .copy_from_slice(&self.protocol_version.to_le_bytes());
+        }
+
+        if let Some(ref service) = *self.cpg_service.read() {
+            service
+                .mcast(cpg::Guarantee::TypeAgreed, &serialized)
+                .context("Failed to broadcast DFSM message")?;
+            Ok(())
+        } else {
+            anyhow::bail!("CPG not initialized")
+        }
+    }
+
+    fn resend_queued_messages_with<F>(&self, mut send: F) -> Result<usize>
+    where
+        F: FnMut(&DfsmMessage<M>) -> Result<()>,
+    {
+        let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+        let my_pid = self.pid;
+        // Filter to our own messages before cloning to avoid copying messages
+        // that will be skipped anyway (e.g. entries from other nodes in the queue).
+        let queued_messages: Vec<_> = {
+            let queue = self.msg_queue.lock();
+            queue
+                .values()
+                .filter(|qm| qm.nodeid == my_nodeid && qm.pid == my_pid)
+                .cloned()
+                .collect()
+        };
+
+        if queued_messages.is_empty() {
+            return Ok(0);
+        }
+
+        let mut resent = 0;
+        let send_result: Result<()> = (|| {
+            for qm in &queued_messages {
+                let message = DfsmMessage::Normal {
+                    msg_count: qm.msg_count,
+                    timestamp: qm.timestamp as u32,
+                    protocol_version: self.protocol_version,
+                    message: qm.message.clone(),
+                };
+                send(&message)?;
+                resent += 1;
+            }
+
+            Ok(())
+        })();
+
+        // Only clear the queue if all sends succeeded; on failure, leave the
+        // queue intact so callers can retry or inspect the pending messages.
+        if send_result.is_ok() {
+            self.msg_queue.lock().clear();
+        }
+        send_result?;
+
+        Ok(resent)
+    }
+
+    pub fn process_state(&self, nodeid: u32, pid: u32, state: &[u8]) -> Result<()> {
+        // Only collect state when actively syncing. In Synced mode we may receive
+        // State messages from a sync round initiated by the leader on behalf of C
+        // peers (backward-compat departure path); we are already up to date and
+        // must not re-trigger process_state_sync / on_synced().
+        if self.get_mode() != DfsmMode::StartSync {
+            tracing::debug!(
+                "DFSM: ignoring State from {}/{} (not in StartSync, mode={:?})",
+                nodeid,
+                pid,
+                self.get_mode()
+            );
+            return Ok(());
+        }
+
+        tracing::debug!(
+            "DFSM: processing state from node {}/{} ({} bytes)",
+            nodeid,
+            pid,
+            state.len()
+        );
+
+        let mut sync_nodes = self.sync_nodes.write();
+
+        // Find node in sync_nodes
+        let node_info = sync_nodes
+            .iter_mut()
+            .find(|n| n.node_id == nodeid && n.pid == pid);
+
+        let node_info = match node_info {
+            Some(ni) => ni,
+            None => {
+                // Non-member sent state - immediate LEAVE (matches C: dfsm.c:823-828)
+                tracing::error!(
+                    "DFSM: received state from non-member {}/{} - entering LEAVE mode",
+                    nodeid,
+                    pid
+                );
+                drop(sync_nodes);
+                self.set_mode(DfsmMode::Leave);
+                return Err(anyhow::anyhow!("State from non-member"));
+            }
+        };
+
+        // Check for duplicate state (matches C: dfsm.c:830-835)
+        if node_info.state.is_some() {
+            tracing::error!(
+                "DFSM: received duplicate state from member {}/{} - entering LEAVE mode",
+                nodeid,
+                pid
+            );
+            drop(sync_nodes);
+            self.set_mode(DfsmMode::Leave);
+            return Err(anyhow::anyhow!("Duplicate state from member"));
+        }
+
+        // Store state
+        node_info.state = Some(state.to_vec());
+
+        let all_received = sync_nodes.iter().all(|n| n.state.is_some());
+        drop(sync_nodes);
+
+        if all_received {
+            tracing::info!("DFSM: received all states, processing synchronization");
+            self.process_state_sync()?;
+        }
+
+        Ok(())
+    }
+
+    fn process_state_sync(&self) -> Result<()> {
+        tracing::info!("DFSM: processing state synchronization");
+
+        let sync_nodes = {
+            let guard = self.sync_nodes.read();
+            guard.clone()
+        };
+
+        match self.callbacks.process_state_update(&sync_nodes) {
+            Ok(synced) => {
+                if synced {
+                    tracing::info!("DFSM: state synchronization successful");
+
+                    let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+                    let mut sync_nodes_write = self.sync_nodes.write();
+                    if let Some(node) = sync_nodes_write
+                        .iter_mut()
+                        .find(|n| n.node_id == my_nodeid && n.pid == self.pid)
+                    {
+                        node.synced = true;
+                    }
+                    drop(sync_nodes_write);
+
+                    self.set_mode(DfsmMode::Synced);
+                    self.release_sync_resources();
+                    self.callbacks.on_synced();
+                    self.deliver_message_queue()?;
+                } else {
+                    tracing::info!("DFSM: entering UPDATE mode, waiting for leader");
+                    self.set_mode(DfsmMode::Update);
+                    self.deliver_message_queue()?;
+                }
+            }
+            Err(e) => {
+                tracing::error!("DFSM: state synchronization failed: {}", e);
+                self.set_mode(DfsmMode::Error);
+                return Err(e);
+            }
+        }
+
+        Ok(())
+    }
+
+    pub fn queue_message(&self, nodeid: u32, pid: u32, msg_count: u64, message: M, timestamp: u64)
+    where
+        M: Clone,
+    {
+        tracing::debug!(
+            "DFSM: queueing message {} from {}/{}",
+            msg_count,
+            nodeid,
+            pid
+        );
+
+        let qm = QueuedMessage {
+            nodeid,
+            pid,
+            msg_count,
+            message,
+            timestamp,
+        };
+
+        // Hold mode read lock during queueing decision to prevent TOCTOU race
+        // This ensures mode cannot change between check and queue selection
+        let mode_guard = self.mode.read();
+        let mode = *mode_guard;
+
+        let node_synced = self
+            .sync_nodes
+            .read()
+            .iter()
+            .find(|n| n.node_id == nodeid && n.pid == pid)
+            .map(|n| n.synced)
+            .unwrap_or(false);
+
+        if mode == DfsmMode::Update && node_synced {
+            let mut sync_queue = self.sync_queue.lock();
+
+            // Check sync queue size limit
+            // Queues use a bounded size (MAX_QUEUE_LEN=500) to prevent memory exhaustion
+            // from slow or stuck nodes. When full, oldest messages are dropped.
+            // This matches distributed system semantics where old updates can be superseded.
+            //
+            // Monitoring: Track queue depth via metrics/logs to detect congestion:
+            // - Sustained high queue depth indicates slow message processing
+            // - Frequent drops indicate network partitions or overload
+            if sync_queue.len() >= MAX_QUEUE_LEN {
+                tracing::warn!(
+                    "DFSM: sync queue full ({} messages), dropping oldest - possible network congestion or slow node",
+                    sync_queue.len()
+                );
+                sync_queue.pop_front();
+            }
+
+            sync_queue.push_back(qm);
+        } else {
+            let mut msg_queue = self.msg_queue.lock();
+
+            // Check message queue size limit (same rationale as sync queue)
+            if msg_queue.len() >= MAX_QUEUE_LEN {
+                tracing::warn!(
+                    "DFSM: message queue full ({} messages), dropping oldest - possible network congestion or slow node",
+                    msg_queue.len()
+                );
+                // Drop oldest message (lowest count)
+                if let Some((&oldest_count, _)) = msg_queue.iter().next() {
+                    msg_queue.remove(&oldest_count);
+                }
+            }
+
+            msg_queue.insert(msg_count, qm);
+        }
+
+        // Release mode lock after queueing decision completes
+        drop(mode_guard);
+    }
+
+    pub(super) fn deliver_message_queue(&self) -> Result<()>
+    where
+        M: Clone,
+    {
+        let mut queue = self.msg_queue.lock();
+        if queue.is_empty() {
+            return Ok(());
+        }
+
+        tracing::info!("DFSM: delivering {} queued messages", queue.len());
+
+        // Hold mode lock during iteration to prevent mode changes mid-delivery
+        let mode_guard = self.mode.read();
+        let mode = *mode_guard;
+        let sync_nodes = {
+            let guard = self.sync_nodes.read();
+            guard.clone()
+        };
+
+        let mut to_remove = Vec::new();
+        let mut to_sync_queue = Vec::new();
+
+        for (count, qm) in queue.iter() {
+            let node_info = sync_nodes
+                .iter()
+                .find(|n| n.node_id == qm.nodeid && n.pid == qm.pid);
+
+            let Some(info) = node_info else {
+                tracing::debug!(
+                    "DFSM: removing message from non-member {}/{}",
+                    qm.nodeid,
+                    qm.pid
+                );
+                to_remove.push(*count);
+                continue;
+            };
+
+            if mode == DfsmMode::Synced && info.synced {
+                tracing::debug!("DFSM: delivering message {}", count);
+
+                match self.callbacks.deliver_message(
+                    qm.nodeid,
+                    qm.pid,
+                    qm.message.clone(),
+                    qm.timestamp,
+                ) {
+                    Ok((result, processed)) => {
+                        tracing::debug!(
+                            "DFSM: message delivered, result={}, processed={}",
+                            result,
+                            processed
+                        );
+                        // Record result for synchronous sends
+                        self.record_message_result(*count, result, processed);
+                    }
+                    Err(e) => {
+                        tracing::error!("DFSM: failed to deliver message: {}", e);
+                        // Record error result
+                        self.record_message_result(*count, -libc::EIO, false);
+                    }
+                }
+
+                to_remove.push(*count);
+            } else if mode == DfsmMode::Update && info.synced {
+                // Collect messages to move instead of acquiring sync_queue lock
+                // while holding msg_queue lock to prevent deadlock
+                to_sync_queue.push(qm.clone());
+                to_remove.push(*count);
+            }
+        }
+
+        // Remove processed messages from queue
+        for count in to_remove {
+            queue.remove(&count);
+        }
+
+        // Release locks before acquiring sync_queue to prevent deadlock
+        drop(mode_guard);
+        drop(queue);
+
+        // Now move messages to sync_queue without holding msg_queue
+        if !to_sync_queue.is_empty() {
+            let mut sync_queue = self.sync_queue.lock();
+            for qm in to_sync_queue {
+                sync_queue.push_back(qm);
+            }
+        }
+
+        Ok(())
+    }
+
+    pub(super) fn deliver_sync_queue(&self) -> Result<()> {
+        let mut sync_queue = self.sync_queue.lock();
+        let queue_len = sync_queue.len();
+
+        if queue_len == 0 {
+            return Ok(());
+        }
+
+        tracing::info!("DFSM: delivering {} sync queue messages", queue_len);
+
+        while let Some(qm) = sync_queue.pop_front() {
+            tracing::debug!(
+                "DFSM: delivering sync message from {}/{}",
+                qm.nodeid,
+                qm.pid
+            );
+
+            match self
+                .callbacks
+                .deliver_message(qm.nodeid, qm.pid, qm.message, qm.timestamp)
+            {
+                Ok((result, processed)) => {
+                    tracing::debug!(
+                        "DFSM: sync message delivered, result={}, processed={}",
+                        result,
+                        processed
+                    );
+                    // Record result for synchronous sends
+                    self.record_message_result(qm.msg_count, result, processed);
+                }
+                Err(e) => {
+                    tracing::error!("DFSM: failed to deliver sync message: {}", e);
+                    // Record error result
+                    self.record_message_result(qm.msg_count, -libc::EIO, false);
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Send a message to the cluster
+    ///
+    /// Creates a properly formatted Normal message with C-compatible headers.
+    pub fn send_message(&self, message: M) -> Result<u64> {
+        let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1;
+
+        tracing::debug!("DFSM: sending message {}", msg_count);
+
+        let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version);
+
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(msg_count)
+    }
+
+    /// Send a message to the cluster and wait for delivery result
+    ///
+    /// This is the async equivalent of send_message(), matching C's dfsm_send_message_sync().
+    /// It broadcasts the message via CPG and waits for it to be delivered to the local node,
+    /// returning the result from the deliver callback.
+    ///
+    /// Uses tokio oneshot channels - the idiomatic pattern for one-time async result delivery.
+    /// This avoids any locking or notification complexity.
+    ///
+    /// # Cancellation Safety
+    /// If this future is dropped before completion, the cleanup guard ensures the HashMap
+    /// entry is removed, preventing memory leaks.
+    ///
+    /// # Arguments
+    /// * `message` - The message to send
+    /// * `timeout` - Maximum time to wait for delivery (typically 10 seconds)
+    ///
+    /// # Returns
+    /// * `Ok(MessageResult)` - The result from the local deliver callback
+    ///   - Caller should check `result.result < 0` for errno-based errors
+    /// * `Err(_)` - If send failed, timeout occurred, or channel closed unexpectedly
+    pub async fn send_message_sync(&self, message: M, timeout: Duration) -> Result<MessageResult> {
+        let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1;
+
+        tracing::debug!("DFSM: sending synchronous message {}", msg_count);
+
+        // Create oneshot channel for result delivery (tokio best practice)
+        let (tx, rx) = oneshot::channel();
+
+        // Register the sender before broadcasting
+        self.message_results.lock().insert(msg_count, tx);
+
+        // RAII guard ensures cleanup on timeout, send error, or cancellation
+        // (record_message_result also removes, so double-remove is harmless)
+        struct CleanupGuard<'a> {
+            msg_count: u64,
+            results: &'a ParkingMutex<HashMap<u64, oneshot::Sender<MessageResult>>>,
+        }
+        impl Drop for CleanupGuard<'_> {
+            fn drop(&mut self) {
+                self.results.lock().remove(&self.msg_count);
+            }
+        }
+        let _guard = CleanupGuard {
+            msg_count,
+            results: &self.message_results,
+        };
+
+        // Send the message
+        let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version);
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        // Wait for delivery with timeout (clean tokio pattern)
+        match tokio::time::timeout(timeout, rx).await {
+            Ok(Ok(result)) => {
+                // Got result successfully - return it to caller
+                // Caller should check result.result < 0 for errno-based errors
+                Ok(result)
+            }
+            Ok(Err(_)) => {
+                // Channel closed without sending - shouldn't happen
+                anyhow::bail!("DFSM: message {msg_count} sender dropped");
+            }
+            Err(_) => {
+                // Timeout - guard will clean up
+                anyhow::bail!("DFSM: message {msg_count} timed out after {timeout:?}");
+            }
+        }
+        // On cancellation (future dropped), guard cleans up automatically
+    }
+
+    /// Record the result of a delivered message (for synchronous sends)
+    ///
+    /// Called from deliver_message_queue() when a message is delivered.
+    /// Matches C's dfsm_record_local_result().
+    ///
+    /// Uses tokio oneshot channel to send result - clean, non-blocking, and can't fail.
+    fn record_message_result(&self, msg_count: u64, result: i32, processed: bool) {
+        tracing::debug!(
+            "DFSM: recording result for message {}: result={}, processed={}",
+            msg_count,
+            result,
+            processed
+        );
+
+        // Update msgcount_rcvd
+        self.msgcount_rcvd.store(msg_count, Ordering::SeqCst);
+
+        // Send result via oneshot channel if someone is waiting
+        let mut results = self.message_results.lock();
+        if let Some(tx) = results.remove(&msg_count) {
+            let msg_result = MessageResult {
+                msgcount: msg_count,
+                result,
+                processed,
+            };
+
+            // Send result through oneshot channel (non-blocking, infallible)
+            // If receiver was dropped (timeout), this silently fails - which is fine
+            let _ = tx.send(msg_result);
+        }
+    }
+
+    /// Send a TreeEntry update to the cluster (leader only, during synchronization)
+    ///
+    /// This is used by the leader to send individual database entries to followers
+    /// that need to catch up. Matches C's dfsm_send_update().
+    pub fn send_update(&self, tree_entry: pmxcfs_memdb::TreeEntry) -> Result<()> {
+        tracing::debug!("DFSM: sending Update for inode {}", tree_entry.inode);
+
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg: DfsmMessage<M> = DfsmMessage::from_tree_entry(tree_entry, sync_epoch);
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    /// Send UpdateComplete signal to cluster (leader only, after sending all updates)
+    ///
+    /// Signals to followers that all Update messages have been sent and they can
+    /// now transition to Synced mode. Matches C's dfsm_send_update_complete().
+    pub fn send_update_complete(&self) -> Result<()> {
+        tracing::info!("DFSM: sending UpdateComplete");
+
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg: DfsmMessage<M> = DfsmMessage::UpdateComplete { sync_epoch };
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    /// Request checksum verification (leader only)
+    /// This should be called periodically by the leader to verify cluster state consistency
+    pub fn verify_request(&self) -> Result<()> {
+        // Only leader should send verify requests
+        if !self.is_leader() {
+            return Ok(());
+        }
+
+        // Only verify when synced
+        if self.get_mode() != DfsmMode::Synced {
+            return Ok(());
+        }
+
+        // Check if we need to wait for previous verification to complete
+        let mut checksum_counter = self.checksum_counter.lock();
+        let checksum_id = *self.checksum_id.lock();
+
+        if *checksum_counter != checksum_id {
+            tracing::debug!(
+                "DFSM: delaying verify request {:016x}",
+                *checksum_counter + 1
+            );
+            return Ok(());
+        }
+
+        // Increment counter and send verify request
+        *checksum_counter += 1;
+        let new_counter = *checksum_counter;
+        drop(checksum_counter);
+
+        tracing::debug!("DFSM: sending verify request {:016x}", new_counter);
+
+        // Send VERIFY_REQUEST message with counter
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg: DfsmMessage<M> = DfsmMessage::VerifyRequest {
+            sync_epoch,
+            csum_id: new_counter,
+        };
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    /// Handle verify request from leader
+    pub fn handle_verify_request(&self, message_epoch: SyncEpoch, csum_id: u64) -> Result<()> {
+        tracing::debug!("DFSM: received verify request {:016x}", csum_id);
+
+        // Compute current state checksum
+        let mut checksum = [0u8; 32];
+        self.callbacks.compute_checksum(&mut checksum)?;
+
+        // Save checksum info
+        // Store the epoch FROM THE MESSAGE (matching C: dfsm.c:736)
+        *self.checksum.lock() = checksum;
+        *self.checksum_epoch.lock() = message_epoch;
+        *self.checksum_id.lock() = csum_id;
+
+        // Send the checksum verification response
+        tracing::debug!("DFSM: sending verify response");
+
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg = DfsmMessage::Verify {
+            sync_epoch,
+            csum_id,
+            checksum,
+        };
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    /// Handle verify response from a node
+    pub fn handle_verify(
+        &self,
+        message_epoch: SyncEpoch,
+        csum_id: u64,
+        received_checksum: &[u8; 32],
+    ) -> Result<()> {
+        tracing::debug!("DFSM: received verify response");
+
+        let our_checksum_id = *self.checksum_id.lock();
+        let our_checksum_epoch = *self.checksum_epoch.lock();
+
+        // Check if this verification matches our saved checksum
+        // Compare with MESSAGE epoch, not current epoch (matching C: dfsm.c:766-767)
+        if our_checksum_id == csum_id && our_checksum_epoch == message_epoch {
+            let our_checksum = *self.checksum.lock();
+
+            // Compare checksums
+            if our_checksum != *received_checksum {
+                tracing::error!(
+                    "DFSM: checksum mismatch! Expected {:016x?}, got {:016x?}",
+                    &our_checksum[..8],
+                    &received_checksum[..8]
+                );
+                tracing::error!("DFSM: data divergence detected - restarting cluster sync");
+                self.set_mode(DfsmMode::Leave);
+                return Err(anyhow::anyhow!("Checksum verification failed"));
+            } else {
+                tracing::info!("DFSM: data verification successful");
+            }
+        } else {
+            tracing::debug!("DFSM: skipping verification - no checksum saved or epoch mismatch");
+        }
+
+        Ok(())
+    }
+
+    /// Invalidate saved checksum (called on membership changes)
+    pub fn invalidate_checksum(&self) {
+        let counter = *self.checksum_counter.lock();
+        *self.checksum_id.lock() = counter;
+
+        // Reset checksum epoch
+        *self.checksum_epoch.lock() = SyncEpoch {
+            epoch: 0,
+            time: 0,
+            nodeid: 0,
+            pid: 0,
+        };
+
+        tracing::debug!("DFSM: checksum invalidated");
+    }
+
+    /// Broadcast a message to the cluster
+    ///
+    /// Checks if the cluster is synced before broadcasting.
+    /// If not synced, the message is silently dropped.
+    pub fn broadcast(&self, msg: M) -> Result<()> {
+        if !self.is_synced() {
+            return Ok(());
+        }
+
+        tracing::debug!("Broadcasting {:?}", msg);
+        self.send_message(msg)?;
+        tracing::debug!("Broadcast successful");
+
+        Ok(())
+    }
+
+    /// Handle incoming DFSM message from cluster (called by CpgHandler)
+    fn handle_dfsm_message(
+        &self,
+        nodeid: u32,
+        pid: u32,
+        message: DfsmMessage<M>,
+    ) -> anyhow::Result<()> {
+        // Validate epoch for state messages (all except Normal and SyncStart)
+        // This matches C implementation's epoch checking in dfsm.c:665-673
+        let should_validate_epoch = !matches!(
+            message,
+            DfsmMessage::Normal { .. } | DfsmMessage::SyncStart { .. }
+        );
+
+        if should_validate_epoch {
+            let current_epoch = *self.sync_epoch.read();
+            let message_epoch = match &message {
+                DfsmMessage::State { sync_epoch, .. }
+                | DfsmMessage::Update { sync_epoch, .. }
+                | DfsmMessage::UpdateComplete { sync_epoch }
+                | DfsmMessage::VerifyRequest { sync_epoch, .. }
+                | DfsmMessage::Verify { sync_epoch, .. } => *sync_epoch,
+                _ => unreachable!(),
+            };
+
+            if message_epoch != current_epoch {
+                tracing::debug!(
+                    "DFSM: ignoring message with wrong epoch (expected {:?}, got {:?})",
+                    current_epoch,
+                    message_epoch
+                );
+                return Ok(());
+            }
+        }
+
+        // Match on typed message variants
+        match message {
+            DfsmMessage::Normal {
+                msg_count,
+                timestamp,
+                protocol_version: _,
+                message: app_msg,
+            } => self.handle_normal_message(nodeid, pid, msg_count, timestamp, app_msg),
+            DfsmMessage::SyncStart { sync_epoch } => self.handle_sync_start(nodeid, sync_epoch),
+            DfsmMessage::State {
+                sync_epoch: _,
+                data,
+            } => self.process_state(nodeid, pid, &data),
+            DfsmMessage::Update {
+                sync_epoch: _,
+                tree_entry,
+            } => self.handle_update(nodeid, pid, tree_entry),
+            DfsmMessage::UpdateComplete { sync_epoch: _ } => self.handle_update_complete(),
+            DfsmMessage::VerifyRequest {
+                sync_epoch,
+                csum_id,
+            } => self.handle_verify_request(sync_epoch, csum_id),
+            DfsmMessage::Verify {
+                sync_epoch,
+                csum_id,
+                checksum,
+            } => self.handle_verify(sync_epoch, csum_id, &checksum),
+        }
+    }
+
+    fn handle_membership_change_with<F>(
+        &self,
+        members: &[MemberInfo],
+        joined_member_count: usize,
+        mut send: F,
+    ) -> anyhow::Result<()>
+    where
+        F: FnMut(&DfsmMessage<M>) -> Result<()>,
+    {
+        tracing::info!(
+            "DFSM: handling membership change ({} members)",
+            members.len()
+        );
+
+        let current_mode = self.get_mode();
+        tracing::debug!("DFSM: membership change while in mode {:?}", current_mode);
+
+        // Invalidate saved checksum
+        self.invalidate_checksum();
+
+        // Update epoch
+        let mut counter = self.local_epoch_counter.lock();
+        *counter += 1;
+
+        let now = pmxcfs_api_types::unix_now_secs();
+
+        let new_epoch = SyncEpoch {
+            epoch: *counter,
+            time: now,
+            nodeid: self.nodeid.load(Ordering::Relaxed),
+            pid: self.pid,
+        };
+
+        *self.sync_epoch.write() = new_epoch;
+        drop(counter);
+
+        // Find lowest node ID (leader)
+        let lowest = members.iter().map(|m| m.node_id).min().unwrap_or(0);
+        *self.lowest_nodeid.write() = lowest;
+
+        self.sync_queue.lock().clear();
+
+        if current_mode.is_error() {
+            tracing::debug!("DFSM: already left group - ignoring membership change");
+            return Ok(());
+        }
+
+        let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+        let we_are_member = members
+            .iter()
+            .any(|member| member.node_id == my_nodeid && member.pid == self.pid);
+
+        // Call cleanup callback before releasing sync resources (matches C: dfsm.c:512-514)
+        let old_sync_nodes = {
+            let guard = self.sync_nodes.read();
+            guard.clone()
+        };
+        if !old_sync_nodes.is_empty() {
+            self.callbacks.cleanup_sync_resources(&old_sync_nodes);
+        }
+
+        // Initialize sync nodes
+        let mut sync_nodes = self.sync_nodes.write();
+        sync_nodes.clear();
+
+        for member in members {
+            sync_nodes.push(NodeSyncInfo {
+                node_id: member.node_id,
+                pid: member.pid,
+                state: None,
+                synced: false,
+            });
+        }
+        drop(sync_nodes);
+
+        if !we_are_member {
+            if current_mode == DfsmMode::Start {
+                tracing::debug!("DFSM: ignoring membership change before local join completes");
+                return Ok(());
+            }
+
+            tracing::info!(
+                "DFSM: we ({}/{}) left the process group",
+                my_nodeid,
+                self.pid
+            );
+            self.set_mode(DfsmMode::Leave);
+            return Ok(());
+        }
+
+        if members.len() > 1 && joined_member_count > 0 {
+            let queued_count = self.msg_queue.lock().len();
+            if queued_count > 0 {
+                tracing::info!(
+                    "DFSM: queue not empty after join, resending {} queued messages",
+                    queued_count
+                );
+                self.resend_queued_messages_with(&mut send)
+                    .context("Failed to resend queued messages after membership change")?;
+            }
+        }
+
+        // Determine next mode
+        if members.len() == 1 {
+            // Single node - already synced
+            tracing::info!("DFSM: single node cluster, marking as synced");
+            self.set_mode(DfsmMode::Synced);
+
+            // Mark ourselves as synced
+            let mut sync_nodes = self.sync_nodes.write();
+            if let Some(node) = sync_nodes.first_mut() {
+                node.synced = true;
+            }
+            drop(sync_nodes);
+
+            // Notify application layer (matches multi-node path in process_state_sync)
+            self.callbacks.on_synced();
+
+            // Deliver queued messages
+            self.deliver_message_queue()?;
+        } else if joined_member_count > 0 {
+            // Multi-node with new members joining - start synchronization so
+            // the joining node(s) can receive the full cluster state.
+            tracing::info!(
+                "DFSM: multi-node cluster with {} joining member(s), starting sync",
+                joined_member_count
+            );
+            self.set_mode(DfsmMode::StartSync);
+
+            // If we're the leader, initiate sync
+            if self.is_leader() {
+                tracing::info!("DFSM: we are leader, sending sync start");
+                self.send_sync_start()?;
+
+                // Leader also needs to send its own state
+                // (CPG doesn't loop back messages to sender)
+                self.send_state().context("Failed to send leader state")?;
+            }
+        } else {
+            // Multi-node, only departures (joined_member_count == 0).
+            //
+            // The remaining nodes were already in sync before the departure —
+            // no new member needs to catch up, so re-synchronization is
+            // semantically unnecessary.
+            //
+            // BACKWARD-COMPAT HACK: C nodes without the pure-departure fix
+            // unconditionally enter START_SYNC on any membership change and
+            // wait for SYNC_START from the lowest-nodeid (leader). If we are
+            // the leader and skip SYNC_START, those C peers hang indefinitely.
+            //
+            // To preserve interoperability without modifying C (which would
+            // introduce its own rolling-upgrade hazard between fixed and
+            // unfixed C nodes), we apply the workaround on the Rust side:
+            //
+            //   • Leader: enter StartSync and send SYNC_START + own state.
+            //     CpgService::mcast retries on CS_ERR_TRY_AGAIN (matching C's
+            //     dfsm_send_message_full) so the send succeeds from within
+            //     this confchg callback.
+            //
+            //   • Non-leaders: stay in current mode (Synced). When SYNC_START
+            //     arrives, handle_sync_start responds with state regardless of
+            //     mode — no mode guard by design. process_state ignores State
+            //     messages received in Synced mode so on_synced() is not
+            //     fired a second time.
+            //
+            // In a pure-Rust cluster this results in a lightweight no-op sync
+            // round on each departure (leader enters StartSync briefly; the
+            // merge is a no-op since all nodes already agree). Once all C
+            // nodes are replaced by Rust nodes with this fix, this branch can
+            // be simplified to skip the sync entirely.
+            if self.is_leader() {
+                tracing::info!(
+                    "DFSM: pure departure — leader sending SYNC_START for C backward compat"
+                );
+                self.set_mode(DfsmMode::StartSync);
+                self.send_sync_start()?;
+                // Leader must also broadcast its own state: handle_sync_start
+                // skips re-sending when nodeid == my_nodeid, so we send it
+                // here directly (same as the joined_member_count > 0 path).
+                self.send_state()
+                    .context("Failed to send leader state in departure sync")?;
+            } else {
+                tracing::info!(
+                    "DFSM: pure departure — non-leader staying in {:?}, will respond to SYNC_START",
+                    current_mode
+                );
+            }
+        }
+
+        // Call membership change callback (matches C: dfsm.c:1180-1182)
+        self.callbacks.on_membership_change(members);
+
+        Ok(())
+    }
+
+    /// Handle membership change notification (called by CpgHandler)
+    fn handle_membership_change(
+        &self,
+        members: &[MemberInfo],
+        joined_member_count: usize,
+    ) -> anyhow::Result<()> {
+        self.handle_membership_change_with(members, joined_member_count, |message| {
+            self.send_dfsm_message(message)
+        })
+    }
+
+    /// Handle normal application message
+    fn handle_normal_message(
+        &self,
+        nodeid: u32,
+        pid: u32,
+        msg_count: u64,
+        timestamp: u32,
+        message: M,
+    ) -> Result<()> {
+        // C version: deliver immediately if in Synced mode, otherwise queue
+        if self.get_mode() == DfsmMode::Synced {
+            // Deliver immediately - message is already deserialized
+            match self.callbacks.deliver_message(
+                nodeid,
+                pid,
+                message,
+                timestamp as u64, // Convert back to u64 for callback compatibility
+            ) {
+                Ok((result, processed)) => {
+                    tracing::debug!(
+                        "DFSM: message delivered immediately, result={}, processed={}",
+                        result,
+                        processed
+                    );
+                    // Record result for synchronous sends
+                    self.record_message_result(msg_count, result, processed);
+                }
+                Err(e) => {
+                    tracing::error!("DFSM: failed to deliver message: {}", e);
+                    // Record error result
+                    self.record_message_result(msg_count, -libc::EIO, false);
+                }
+            }
+        } else {
+            // Queue for later delivery - store typed message directly
+            self.queue_message(nodeid, pid, msg_count, message, timestamp as u64);
+        }
+        Ok(())
+    }
+
+    /// Handle SyncStart message from leader
+    fn handle_sync_start(&self, nodeid: u32, new_epoch: SyncEpoch) -> Result<()> {
+        tracing::info!(
+            "DFSM: received SyncStart from node {} with epoch {:?}",
+            nodeid,
+            new_epoch
+        );
+
+        // Adopt the new epoch from the leader (critical for sync protocol!)
+        // This matches C implementation which updates dfsm->sync_epoch
+        *self.sync_epoch.write() = new_epoch;
+        tracing::debug!("DFSM: adopted new sync epoch from leader");
+
+        // Send our state back to the cluster
+        // BUT: don't send if we're the leader (we already sent our state in handle_membership_change)
+        let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+        if nodeid != my_nodeid {
+            self.send_state()
+                .context("Failed to send state in response to SyncStart")?;
+            tracing::debug!("DFSM: sent state in response to SyncStart");
+        } else {
+            tracing::debug!("DFSM: skipping state send (we're the leader who already sent state)");
+        }
+
+        Ok(())
+    }
+
+    /// Handle Update message from leader
+    fn handle_update(
+        &self,
+        nodeid: u32,
+        pid: u32,
+        tree_entry: pmxcfs_memdb::TreeEntry,
+    ) -> Result<()> {
+        // Serialize TreeEntry for callback (process_update expects raw bytes for now)
+        let serialized = tree_entry.serialize_for_update();
+        if let Err(e) = self.callbacks.process_update(nodeid, pid, &serialized) {
+            tracing::error!("DFSM: failed to process update: {}", e);
+        }
+        Ok(())
+    }
+
+    /// Handle UpdateComplete message
+    fn handle_update_complete(&self) -> Result<()> {
+        tracing::info!("DFSM: received UpdateComplete from leader");
+
+        self.callbacks
+            .commit_state()
+            .context("DFSM: failed to commit synchronized state")?;
+
+        {
+            let mut sync_nodes = self.sync_nodes.write();
+            for node in sync_nodes.iter_mut() {
+                node.synced = true;
+            }
+        }
+
+        self.set_mode(DfsmMode::Synced);
+        self.deliver_sync_queue()?;
+        self.deliver_message_queue()?;
+        self.release_sync_resources();
+        self.callbacks.on_synced();
+        Ok(())
+    }
+}
+
+/// Implementation of CpgHandler trait for DFSM
+///
+/// This allows Dfsm to receive CPG callbacks in an idiomatic Rust way,
+/// with all unsafe pointer handling managed by the CpgService.
+impl<M: Message> CpgHandler for Dfsm<M> {
+    fn on_deliver(&self, _group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]) {
+        tracing::debug!(
+            "DFSM CPG message from node {} (pid {}): {} bytes",
+            u32::from(nodeid),
+            pid,
+            msg.len()
+        );
+
+        // Deserialize DFSM protocol message
+        match DfsmMessage::<M>::deserialize(msg) {
+            Ok(dfsm_msg) => {
+                if let Err(e) = self.handle_dfsm_message(u32::from(nodeid), pid, dfsm_msg) {
+                    tracing::error!("Error handling DFSM message: {}", e);
+                }
+            }
+            Err(e) => {
+                tracing::error!("Failed to deserialize DFSM message: {}", e);
+            }
+        }
+    }
+
+    fn on_confchg(
+        &self,
+        _group_name: &str,
+        member_list: &[cpg::Address],
+        _left_list: &[cpg::Address],
+        joined_list: &[cpg::Address],
+    ) {
+        tracing::info!("DFSM CPG membership change: {} members", member_list.len());
+
+        // Build MemberInfo list from CPG addresses
+        let members: Vec<MemberInfo> = member_list
+            .iter()
+            .map(|addr| MemberInfo {
+                node_id: u32::from(addr.nodeid),
+                pid: addr.pid,
+                joined_at: SystemTime::now()
+                    .duration_since(UNIX_EPOCH)
+                    .unwrap_or_default()
+                    .as_secs(),
+            })
+            .collect();
+
+        // Notify DFSM of membership change
+        if let Err(e) = self.handle_membership_change(&members, joined_list.len()) {
+            tracing::error!("Failed to handle membership change: {}", e);
+        }
+    }
+}
+
+impl<M: Message> Dfsm<M> {
+    /// Initialize CPG (Closed Process Group) for cluster communication
+    ///
+    /// Uses the idiomatic CpgService wrapper which handles all unsafe FFI
+    /// and callback management internally.
+    pub fn init_cpg(self: &Arc<Self>) -> Result<()> {
+        tracing::info!("DFSM: Initializing CPG");
+
+        // Create CPG service with this Dfsm as the handler
+        // CpgService handles all callback registration and context management
+        let cpg_service = Arc::new(CpgService::new(Arc::clone(self))?);
+
+        // Get our node ID from CPG (matches C's cpg_local_get)
+        // This MUST be done after cpg_initialize but before joining the group
+        let nodeid = cpg::local_get(cpg_service.handle())?;
+        let nodeid_u32 = u32::from(nodeid);
+        self.nodeid.store(nodeid_u32, Ordering::Relaxed);
+        tracing::info!("DFSM: Got node ID {} from CPG", nodeid_u32);
+
+        // Join the CPG group
+        let group_name = &self.cluster_name;
+        cpg_service
+            .join(group_name)
+            .context("Failed to join CPG group")?;
+
+        tracing::info!("DFSM joined CPG group '{}'", group_name);
+
+        // Store the service
+        *self.cpg_service.write() = Some(cpg_service);
+
+        // Dispatch once to get initial membership
+        if let Some(ref service) = *self.cpg_service.read()
+            && let Err(e) = service.dispatch()
+        {
+            tracing::warn!("Failed to dispatch CPG events: {:?}", e);
+        }
+
+        tracing::info!("DFSM CPG initialized successfully");
+        Ok(())
+    }
+
+    /// Dispatch CPG events (should be called periodically from event loop)
+    /// Matching C's service_dfsm_dispatch
+    pub fn dispatch_events(&self) -> Result<(), rust_corosync::CsError> {
+        if let Some(ref service) = *self.cpg_service.read() {
+            service.dispatch()
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Get CPG file descriptor for event monitoring
+    pub fn fd_get(&self) -> Result<i32> {
+        if let Some(ref service) = *self.cpg_service.read() {
+            service.fd()
+        } else {
+            Err(anyhow::anyhow!("CPG service not initialized"))
+        }
+    }
+
+    /// Stop DFSM services (leave CPG group and finalize)
+    pub fn stop_services(&self) -> Result<()> {
+        tracing::info!("DFSM: Stopping services");
+
+        // Leave the CPG group before dropping the service
+        let group_name = self.cluster_name.clone();
+        if let Some(ref service) = *self.cpg_service.read()
+            && let Err(e) = service.leave(&group_name)
+        {
+            tracing::warn!("Error leaving CPG group: {:?}", e);
+        }
+
+        // Drop the service (CpgService::drop handles finalization)
+        *self.cpg_service.write() = None;
+
+        tracing::info!("DFSM services stopped");
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::FuseMessage;
+    use std::sync::Mutex;
+
+    #[derive(Default)]
+    struct TrackingCallbacks {
+        deliveries: Mutex<Vec<String>>,
+        commit_count: AtomicU32,
+        synced_count: AtomicU32,
+        cleanup_count: AtomicU32,
+    }
+
+    impl Callbacks for TrackingCallbacks {
+        type Message = FuseMessage;
+
+        fn deliver_message(
+            &self,
+            _nodeid: u32,
+            _pid: u32,
+            message: Self::Message,
+            _timestamp: u64,
+        ) -> std::io::Result<(i32, bool)> {
+            let label = match message {
+                FuseMessage::Create { path } => format!("create:{path}"),
+                FuseMessage::Delete { path } => format!("delete:{path}"),
+                other => format!("{other:?}"),
+            };
+            self.deliveries.lock().unwrap().push(label);
+            Ok((0, true))
+        }
+
+        fn compute_checksum(&self, _output: &mut [u8; 32]) -> anyhow::Result<()> {
+            Ok(())
+        }
+
+        fn get_state(&self) -> anyhow::Result<Vec<u8>> {
+            Ok(Vec::new())
+        }
+
+        fn process_state_update(&self, _states: &[NodeSyncInfo]) -> anyhow::Result<bool> {
+            Ok(false)
+        }
+
+        fn process_update(&self, _nodeid: u32, _pid: u32, _data: &[u8]) -> anyhow::Result<()> {
+            Ok(())
+        }
+
+        fn commit_state(&self) -> anyhow::Result<()> {
+            self.commit_count.fetch_add(1, Ordering::SeqCst);
+            Ok(())
+        }
+
+        fn on_synced(&self) {
+            self.synced_count.fetch_add(1, Ordering::SeqCst);
+        }
+
+        fn cleanup_sync_resources(&self, _states: &[NodeSyncInfo]) {
+            self.cleanup_count.fetch_add(1, Ordering::SeqCst);
+        }
+    }
+
+    #[test]
+    fn test_update_complete_commits_and_drains_queues() {
+        let callbacks = Arc::new(TrackingCallbacks::default());
+        let dfsm = Dfsm::new("test-cluster".to_string(), callbacks.clone()).unwrap();
+
+        dfsm.set_mode(DfsmMode::Update);
+        dfsm.nodeid.store(1, Ordering::Relaxed);
+        *dfsm.sync_nodes.write() = vec![
+            NodeSyncInfo {
+                node_id: 1,
+                pid: dfsm.pid,
+                state: Some(vec![1]),
+                synced: false,
+            },
+            NodeSyncInfo {
+                node_id: 2,
+                pid: 2000,
+                state: Some(vec![2]),
+                synced: false,
+            },
+        ];
+
+        dfsm.sync_queue.lock().push_back(QueuedMessage {
+            nodeid: 2,
+            pid: 2000,
+            msg_count: 1,
+            message: FuseMessage::Create {
+                path: "/from-sync-queue".to_string(),
+            },
+            timestamp: 10,
+        });
+        dfsm.msg_queue.lock().insert(
+            2,
+            QueuedMessage {
+                nodeid: 2,
+                pid: 2000,
+                msg_count: 2,
+                message: FuseMessage::Delete {
+                    path: "/from-msg-queue".to_string(),
+                },
+                timestamp: 11,
+            },
+        );
+
+        dfsm.handle_update_complete().unwrap();
+
+        assert_eq!(dfsm.get_mode(), DfsmMode::Synced);
+        assert!(dfsm.sync_queue.lock().is_empty());
+        assert!(dfsm.msg_queue.lock().is_empty());
+        assert!(dfsm.sync_nodes.read().iter().all(|node| node.synced));
+        assert!(
+            dfsm.sync_nodes
+                .read()
+                .iter()
+                .all(|node| node.state.is_none())
+        );
+        assert_eq!(callbacks.commit_count.load(Ordering::SeqCst), 1);
+        assert_eq!(callbacks.synced_count.load(Ordering::SeqCst), 1);
+        assert_eq!(callbacks.cleanup_count.load(Ordering::SeqCst), 1);
+        assert_eq!(
+            callbacks.deliveries.lock().unwrap().as_slice(),
+            ["create:/from-sync-queue", "delete:/from-msg-queue"]
+        );
+    }
+
+    #[test]
+    fn test_membership_change_self_removed_enters_leave_mode() {
+        let callbacks = Arc::new(TrackingCallbacks::default());
+        let dfsm = Dfsm::new("test-cluster".to_string(), callbacks).unwrap();
+
+        dfsm.nodeid.store(1, Ordering::Relaxed);
+        dfsm.set_mode(DfsmMode::Synced);
+
+        dfsm.handle_membership_change(
+            &[MemberInfo {
+                node_id: 2,
+                pid: 2000,
+                joined_at: 0,
+            }],
+            0,
+        )
+        .unwrap();
+
+        assert_eq!(dfsm.get_mode(), DfsmMode::Leave);
+    }
+
+    #[test]
+    fn test_membership_change_ignored_in_leave_mode() {
+        let callbacks = Arc::new(TrackingCallbacks::default());
+        let dfsm = Dfsm::new("test-cluster".to_string(), callbacks).unwrap();
+
+        dfsm.nodeid.store(1, Ordering::Relaxed);
+        dfsm.set_mode(DfsmMode::Leave);
+        dfsm.sync_nodes.write().push(NodeSyncInfo {
+            node_id: 99,
+            pid: 9999,
+            state: Some(vec![9]),
+            synced: false,
+        });
+        dfsm.sync_queue.lock().push_back(QueuedMessage {
+            nodeid: 99,
+            pid: 9999,
+            msg_count: 7,
+            message: FuseMessage::Create {
+                path: "/ignored".to_string(),
+            },
+            timestamp: 0,
+        });
+
+        dfsm.handle_membership_change(
+            &[MemberInfo {
+                node_id: 1,
+                pid: dfsm.pid,
+                joined_at: 0,
+            }],
+            1,
+        )
+        .unwrap();
+
+        assert_eq!(dfsm.get_mode(), DfsmMode::Leave);
+        assert_eq!(dfsm.sync_nodes.read().len(), 1);
+        assert!(dfsm.sync_queue.lock().is_empty());
+    }
+
+    #[test]
+    fn test_single_node_membership_change_drains_queued_messages() {
+        let callbacks = Arc::new(TrackingCallbacks::default());
+        let dfsm = Dfsm::new("test-cluster".to_string(), callbacks.clone()).unwrap();
+
+        dfsm.nodeid.store(1, Ordering::Relaxed);
+        dfsm.msg_queue.lock().insert(
+            1,
+            QueuedMessage {
+                nodeid: 1,
+                pid: dfsm.pid,
+                msg_count: 1,
+                message: FuseMessage::Create {
+                    path: "/queued-create".to_string(),
+                },
+                timestamp: 1,
+            },
+        );
+
+        dfsm.handle_membership_change(
+            &[MemberInfo {
+                node_id: 1,
+                pid: dfsm.pid,
+                joined_at: 0,
+            }],
+            0,
+        )
+        .unwrap();
+
+        assert_eq!(dfsm.get_mode(), DfsmMode::Synced);
+        assert!(dfsm.msg_queue.lock().is_empty());
+        assert_eq!(
+            callbacks.deliveries.lock().unwrap().as_slice(),
+            ["create:/queued-create"]
+        );
+    }
+
+    #[test]
+    fn test_resend_queued_messages_replays_local_messages_and_clears_queue() {
+        let callbacks = Arc::new(TrackingCallbacks::default());
+        let dfsm = Dfsm::new("test-cluster".to_string(), callbacks).unwrap();
+
+        dfsm.nodeid.store(1, Ordering::Relaxed);
+        dfsm.msg_queue.lock().insert(
+            1,
+            QueuedMessage {
+                nodeid: 1,
+                pid: dfsm.pid,
+                msg_count: 1,
+                message: FuseMessage::Create {
+                    path: "/local".to_string(),
+                },
+                timestamp: 11,
+            },
+        );
+        dfsm.msg_queue.lock().insert(
+            2,
+            QueuedMessage {
+                nodeid: 2,
+                pid: 2000,
+                msg_count: 2,
+                message: FuseMessage::Delete {
+                    path: "/remote".to_string(),
+                },
+                timestamp: 12,
+            },
+        );
+
+        let sent = ParkingMutex::new(Vec::new());
+        let resent = dfsm
+            .resend_queued_messages_with(|message| {
+                sent.lock().push(message.clone());
+                Ok(())
+            })
+            .unwrap();
+
+        assert_eq!(resent, 1);
+        assert!(dfsm.msg_queue.lock().is_empty());
+
+        let sent = sent.lock();
+        assert_eq!(sent.len(), 1);
+        match &sent[0] {
+            DfsmMessage::Normal {
+                msg_count,
+                timestamp,
+                message: FuseMessage::Create { path },
+                ..
+            } => {
+                assert_eq!(*msg_count, 1);
+                assert_eq!(*timestamp, 11);
+                assert_eq!(path, "/local");
+            }
+            other => panic!("unexpected resent message: {other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_membership_change_with_join_resends_queued_messages() {
+        let callbacks = Arc::new(TrackingCallbacks::default());
+        let dfsm = Dfsm::new("test-cluster".to_string(), callbacks).unwrap();
+
+        dfsm.nodeid.store(2, Ordering::Relaxed);
+        dfsm.msg_queue.lock().insert(
+            1,
+            QueuedMessage {
+                nodeid: 2,
+                pid: dfsm.pid,
+                msg_count: 1,
+                message: FuseMessage::Create {
+                    path: "/resend-before-sync".to_string(),
+                },
+                timestamp: 21,
+            },
+        );
+
+        let resent = ParkingMutex::new(Vec::new());
+        dfsm.handle_membership_change_with(
+            &[
+                MemberInfo {
+                    node_id: 1,
+                    pid: 1000,
+                    joined_at: 0,
+                },
+                MemberInfo {
+                    node_id: 2,
+                    pid: dfsm.pid,
+                    joined_at: 0,
+                },
+            ],
+            1,
+            |message| {
+                resent.lock().push(message.clone());
+                Ok(())
+            },
+        )
+        .unwrap();
+
+        assert!(dfsm.msg_queue.lock().is_empty());
+        assert_eq!(resent.lock().len(), 1);
+        assert_eq!(dfsm.get_mode(), DfsmMode::StartSync);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
new file mode 100644
index 000000000..5ee2956f9
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
@@ -0,0 +1,113 @@
+//! Status Sync Service
+//!
+//! This service synchronizes ephemeral status data across the cluster using a separate
+//! DFSM instance with the "pve_kvstore_v1" CPG group.
+//!
+//! Equivalent to C implementation's service_status (the kvstore DFSM).
+//! Handles synchronization of:
+//! - RRD data (performance metrics from each node)
+//! - Node IP addresses
+//! - Cluster log entries
+//! - Other ephemeral status key-value data
+
+use async_trait::async_trait;
+use pmxcfs_services::{Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message;
+
+/// Status Sync Service
+///
+/// Synchronizes ephemeral status data across all nodes using a separate DFSM instance.
+/// Uses CPG group "pve_kvstore_v1" (separate from main config database "pmxcfs_v1").
+///
+/// This implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for status replication
+/// - Separation of status data from config data for better performance
+///
+/// This is equivalent to C implementation's service_status (the kvstore DFSM).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct StatusSyncService<M> {
+    dfsm: Arc<Dfsm<M>>,
+}
+
+impl<M: Message> StatusSyncService<M> {
+    /// Create a new status sync service
+    pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+        Self { dfsm }
+    }
+}
+
+#[async_trait]
+impl<M: Message> Service for StatusSyncService<M> {
+    fn name(&self) -> &str {
+        "status-sync"
+    }
+
+    async fn initialize(&mut self) -> pmxcfs_services::Result<std::os::unix::io::RawFd> {
+        info!("Initializing status sync service (kvstore)");
+
+        // Initialize CPG connection for kvstore group
+        self.dfsm.init_cpg().map_err(|e| {
+            ServiceError::InitializationFailed(format!(
+                "Status sync CPG initialization failed: {e}"
+            ))
+        })?;
+
+        // Get file descriptor for event monitoring
+        let fd = self.dfsm.fd_get().map_err(|e| {
+            self.dfsm.stop_services().ok();
+            ServiceError::InitializationFailed(format!("Failed to get status sync fd: {e}"))
+        })?;
+
+        info!(
+            "Status sync service initialized successfully with fd {}",
+            fd
+        );
+        Ok(fd)
+    }
+
+    async fn dispatch(&mut self) -> pmxcfs_services::Result<bool> {
+        match self.dfsm.dispatch_events() {
+            Ok(_) => Ok(true),
+            Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+                warn!("Status sync connection lost, requesting reinitialization");
+                Ok(false)
+            }
+            Err(e) => {
+                error!("Status sync dispatch failed: {}", e);
+                Err(ServiceError::DispatchFailed(format!(
+                    "Status sync dispatch failed: {e}"
+                )))
+            }
+        }
+    }
+
+    async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+        info!("Finalizing status sync service");
+
+        if let Err(e) = self.dfsm.stop_services() {
+            warn!("Error stopping status sync services: {}", e);
+        }
+
+        info!("Status sync service finalized");
+        Ok(())
+    }
+
+    async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+        // Status sync doesn't need periodic verification like the main database
+        // Status data is ephemeral and doesn't require the same consistency guarantees
+        Ok(())
+    }
+
+    fn timer_period(&self) -> Option<Duration> {
+        // No periodic timer needed for status sync
+        None
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
new file mode 100644
index 000000000..cc18873f6
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
@@ -0,0 +1,107 @@
+/// DFSM type definitions
+///
+/// This module contains all type definitions used by the DFSM state machine.
+/// DFSM operating modes
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub enum DfsmMode {
+    /// Initial state - starting cluster connection
+    Start = 0,
+
+    /// Starting data synchronization
+    StartSync = 1,
+
+    /// All data is up to date
+    Synced = 2,
+
+    /// Waiting for updates from leader
+    Update = 3,
+
+    /// Error states (>= 128)
+    Leave = 253,
+    VersionError = 254,
+    Error = 255,
+}
+
+impl DfsmMode {
+    /// Check if this is an error mode
+    pub fn is_error(&self) -> bool {
+        (*self as u8) >= 128
+    }
+}
+
+impl std::fmt::Display for DfsmMode {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            DfsmMode::Start => write!(f, "start cluster connection"),
+            DfsmMode::StartSync => write!(f, "starting data synchronization"),
+            DfsmMode::Synced => write!(f, "all data is up to date"),
+            DfsmMode::Update => write!(f, "waiting for updates from leader"),
+            DfsmMode::Leave => write!(f, "leaving cluster"),
+            DfsmMode::VersionError => write!(f, "protocol version mismatch"),
+            DfsmMode::Error => write!(f, "serious internal error"),
+        }
+    }
+}
+
+/// DFSM message types (internal protocol messages)
+/// Matches C's dfsm_message_t enum values
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum DfsmMessageType {
+    Normal = 0,
+    SyncStart = 1,
+    State = 2,
+    Update = 3,
+    UpdateComplete = 4,
+    VerifyRequest = 5,
+    Verify = 6,
+}
+
+/// Sync epoch - identifies a synchronization session
+/// Matches C's dfsm_sync_epoch_t structure (16 bytes total)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct SyncEpoch {
+    pub epoch: u32,
+    pub time: u32,
+    pub nodeid: u32,
+    pub pid: u32,
+}
+
+impl SyncEpoch {
+    /// Serialize to C-compatible wire format (16 bytes)
+    /// Format: [epoch: u32][time: u32][nodeid: u32][pid: u32]
+    pub fn serialize(&self) -> [u8; 16] {
+        let mut bytes = [0u8; 16];
+        bytes[0..4].copy_from_slice(&self.epoch.to_le_bytes());
+        bytes[4..8].copy_from_slice(&self.time.to_le_bytes());
+        bytes[8..12].copy_from_slice(&self.nodeid.to_le_bytes());
+        bytes[12..16].copy_from_slice(&self.pid.to_le_bytes());
+        bytes
+    }
+
+    /// Deserialize from C-compatible wire format (16 bytes)
+    pub fn deserialize(bytes: &[u8]) -> Result<Self, &'static str> {
+        if bytes.len() < 16 {
+            return Err("SyncEpoch requires 16 bytes");
+        }
+        Ok(SyncEpoch {
+            epoch: u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
+            time: u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
+            nodeid: u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
+            pid: u32::from_le_bytes([bytes[12], bytes[13], bytes[14], bytes[15]]),
+        })
+    }
+}
+
+/// Queued message awaiting delivery
+#[derive(Debug, Clone)]
+pub(super) struct QueuedMessage<M> {
+    pub nodeid: u32,
+    pub pid: u32,
+    pub msg_count: u64,
+    pub message: M,
+    pub timestamp: u64,
+}
+
+// Re-export NodeSyncInfo from pmxcfs-api-types for use in Callbacks trait
+pub use pmxcfs_api_types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
new file mode 100644
index 000000000..e9578b6cc
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
@@ -0,0 +1,283 @@
+/// C-compatible wire format for cluster communication
+///
+/// This module implements the exact wire protocol used by the C version of pmxcfs
+/// to ensure compatibility with C-based cluster nodes.
+///
+/// The C version uses a simple format with iovec arrays containing raw C types.
+use anyhow::{Context, Result};
+use bytemuck::{Pod, Zeroable};
+use std::ffi::CStr;
+
+/// C message types (must match dcdb.h)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum CMessageType {
+    Write = 1,
+    Mkdir = 2,
+    Delete = 3,
+    Rename = 4,
+    Create = 5,
+    Mtime = 6,
+    UnlockRequest = 7,
+    Unlock = 8,
+}
+
+/// C-compatible FUSE message header
+/// Layout matches the iovec array from C: [size][offset][pathlen][tolen][flags]
+#[derive(Debug, Clone, Copy, Pod, Zeroable)]
+#[repr(C)]
+struct CFuseMessageHeader {
+    size: u32,
+    offset: u32,
+    pathlen: u32,
+    tolen: u32,
+    flags: u32,
+}
+
+/// Parsed C FUSE message
+#[derive(Debug, Clone)]
+pub struct CFuseMessage {
+    pub size: u32,
+    pub offset: u32,
+    pub flags: u32,
+    pub path: String,
+    pub to: Option<String>,
+    pub data: Vec<u8>,
+}
+
+impl CFuseMessage {
+    /// Maximum message size to prevent DoS attacks (16MB)
+    pub const MAX_MESSAGE_SIZE: u32 = 16 * 1024 * 1024;
+
+    /// Parse a C FUSE message from raw bytes
+    pub fn parse(data: &[u8]) -> Result<Self> {
+        if data.len() < std::mem::size_of::<CFuseMessageHeader>() {
+            return Err(anyhow::anyhow!(
+                "Message too short: {} < {}",
+                data.len(),
+                std::mem::size_of::<CFuseMessageHeader>()
+            ));
+        }
+
+        // Parse header manually to avoid alignment issues
+        let header = CFuseMessageHeader {
+            size: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
+            offset: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
+            pathlen: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
+            tolen: u32::from_le_bytes([data[12], data[13], data[14], data[15]]),
+            flags: u32::from_le_bytes([data[16], data[17], data[18], data[19]]),
+        };
+
+        // Check for integer overflow in total size calculation
+        let total_size = header
+            .pathlen
+            .checked_add(header.tolen)
+            .and_then(|s| s.checked_add(header.size))
+            .ok_or_else(|| anyhow::anyhow!("Integer overflow in message size calculation"))?;
+
+        // Validate total size is reasonable (prevent DoS)
+        if total_size > Self::MAX_MESSAGE_SIZE {
+            return Err(anyhow::anyhow!(
+                "Message size {total_size} exceeds maximum {}",
+                Self::MAX_MESSAGE_SIZE
+            ));
+        }
+
+        // Validate total size matches actual buffer size (prevent reading beyond buffer)
+        let header_size = std::mem::size_of::<CFuseMessageHeader>();
+        let expected_total = header_size
+            .checked_add(total_size as usize)
+            .ok_or_else(|| anyhow::anyhow!("Total message size overflow"))?;
+
+        if expected_total != data.len() {
+            return Err(anyhow::anyhow!(
+                "Message size mismatch: expected {}, got {}",
+                expected_total,
+                data.len()
+            ));
+        }
+
+        let mut offset = header_size;
+
+        // Parse path with overflow-checked arithmetic
+        let path = if header.pathlen > 0 {
+            let end_offset = offset
+                .checked_add(header.pathlen as usize)
+                .ok_or_else(|| anyhow::anyhow!("Integer overflow in path offset"))?;
+
+            if end_offset > data.len() {
+                return Err(anyhow::anyhow!(
+                    "Invalid path length: {} bytes at offset {} exceeds message size {}",
+                    header.pathlen,
+                    offset,
+                    data.len()
+                ));
+            }
+            let path_bytes = &data[offset..end_offset];
+            offset = end_offset;
+
+            // C strings are null-terminated
+            CStr::from_bytes_until_nul(path_bytes)
+                .context("Invalid path string")?
+                .to_str()
+                .context("Path not valid UTF-8")?
+                .to_string()
+        } else {
+            String::new()
+        };
+
+        // Parse 'to' (for rename operations) with overflow-checked arithmetic
+        let to = if header.tolen > 0 {
+            let end_offset = offset
+                .checked_add(header.tolen as usize)
+                .ok_or_else(|| anyhow::anyhow!("Integer overflow in 'to' offset"))?;
+
+            if end_offset > data.len() {
+                return Err(anyhow::anyhow!(
+                    "Invalid 'to' length: {} bytes at offset {} exceeds message size {}",
+                    header.tolen,
+                    offset,
+                    data.len()
+                ));
+            }
+            let to_bytes = &data[offset..end_offset];
+            offset = end_offset;
+
+            Some(
+                CStr::from_bytes_until_nul(to_bytes)
+                    .context("Invalid to string")?
+                    .to_str()
+                    .context("To path not valid UTF-8")?
+                    .to_string(),
+            )
+        } else {
+            None
+        };
+
+        // Parse data buffer with overflow-checked arithmetic
+        let buf_data = if header.size > 0 {
+            let end_offset = offset
+                .checked_add(header.size as usize)
+                .ok_or_else(|| anyhow::anyhow!("Integer overflow in data offset"))?;
+
+            if end_offset > data.len() {
+                return Err(anyhow::anyhow!(
+                    "Invalid data size: {} bytes at offset {} exceeds message size {}",
+                    header.size,
+                    offset,
+                    data.len()
+                ));
+            }
+            data[offset..end_offset].to_vec()
+        } else {
+            Vec::new()
+        };
+
+        Ok(CFuseMessage {
+            size: header.size,
+            offset: header.offset,
+            flags: header.flags,
+            path,
+            to,
+            data: buf_data,
+        })
+    }
+
+    /// Serialize to C wire format
+    pub fn serialize(&self) -> Vec<u8> {
+        let path_bytes = self.path.as_bytes();
+        let pathlen = if path_bytes.is_empty() {
+            0
+        } else {
+            (path_bytes.len() + 1) as u32 // +1 for null terminator
+        };
+
+        let to_bytes = self.to.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
+        let tolen = if to_bytes.is_empty() {
+            0
+        } else {
+            (to_bytes.len() + 1) as u32
+        };
+
+        let header = CFuseMessageHeader {
+            size: self.size,
+            offset: self.offset,
+            pathlen,
+            tolen,
+            flags: self.flags,
+        };
+
+        let total_len = std::mem::size_of::<CFuseMessageHeader>()
+            + pathlen as usize
+            + tolen as usize
+            + self.size as usize;
+        let mut result = Vec::with_capacity(total_len);
+
+        // Serialize header
+        result.extend_from_slice(bytemuck::bytes_of(&header));
+
+        // Serialize path (with null terminator)
+        if pathlen > 0 {
+            result.extend_from_slice(path_bytes);
+            result.push(0); // null terminator
+        }
+
+        // Serialize 'to' (with null terminator)
+        if tolen > 0 {
+            result.extend_from_slice(to_bytes);
+            result.push(0); // null terminator
+        }
+
+        // Serialize data
+        if self.size > 0 {
+            result.extend_from_slice(&self.data);
+        }
+
+        result
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_serialize_deserialize_write() {
+        let msg = CFuseMessage {
+            size: 13,
+            offset: 0,
+            flags: 0,
+            path: "/test.txt".to_string(),
+            to: None,
+            data: b"Hello, World!".to_vec(),
+        };
+
+        let serialized = msg.serialize();
+        let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+        assert_eq!(parsed.size, msg.size);
+        assert_eq!(parsed.offset, msg.offset);
+        assert_eq!(parsed.flags, msg.flags);
+        assert_eq!(parsed.path, msg.path);
+        assert_eq!(parsed.to, msg.to);
+        assert_eq!(parsed.data, msg.data);
+    }
+
+    #[test]
+    fn test_serialize_deserialize_rename() {
+        let msg = CFuseMessage {
+            size: 0,
+            offset: 0,
+            flags: 0,
+            path: "/old.txt".to_string(),
+            to: Some("/new.txt".to_string()),
+            data: Vec::new(),
+        };
+
+        let serialized = msg.serialize();
+        let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+        assert_eq!(parsed.path, msg.path);
+        assert_eq!(parsed.to, msg.to);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
new file mode 100644
index 000000000..719a0a692
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
@@ -0,0 +1,568 @@
+/// Multi-node integration tests for DFSM cluster synchronization
+///
+/// These tests simulate multi-node clusters to verify the complete synchronization
+/// protocol works correctly with multiple Rust nodes exchanging state.
+use anyhow::Result;
+use pmxcfs_dfsm::{Callbacks, FuseMessage, NodeSyncInfo};
+use pmxcfs_memdb::{MemDb, MemDbIndex, ROOT_INODE, TreeEntry};
+use std::sync::{Arc, Mutex};
+use tempfile::TempDir;
+
+/// Mock callbacks for testing DFSM without full pmxcfs integration
+struct MockCallbacks {
+    memdb: MemDb,
+    states_received: Arc<Mutex<Vec<NodeSyncInfo>>>,
+    updates_received: Arc<Mutex<Vec<TreeEntry>>>,
+    synced_count: Arc<Mutex<usize>>,
+}
+
+impl MockCallbacks {
+    fn new(memdb: MemDb) -> Self {
+        Self {
+            memdb,
+            states_received: Arc::new(Mutex::new(Vec::new())),
+            updates_received: Arc::new(Mutex::new(Vec::new())),
+            synced_count: Arc::new(Mutex::new(0)),
+        }
+    }
+
+    #[allow(dead_code)]
+    fn get_states(&self) -> Vec<NodeSyncInfo> {
+        self.states_received.lock().unwrap().clone()
+    }
+
+    #[allow(dead_code)]
+    fn get_updates(&self) -> Vec<TreeEntry> {
+        self.updates_received.lock().unwrap().clone()
+    }
+
+    #[allow(dead_code)]
+    fn get_synced_count(&self) -> usize {
+        *self.synced_count.lock().unwrap()
+    }
+}
+
+impl Callbacks for MockCallbacks {
+    type Message = FuseMessage;
+
+    fn deliver_message(
+        &self,
+        _node_id: u32,
+        _pid: u32,
+        _message: FuseMessage,
+        _timestamp: u64,
+    ) -> std::io::Result<(i32, bool)> {
+        Ok((0, true))
+    }
+
+    fn compute_checksum(&self, output: &mut [u8; 32]) -> Result<()> {
+        let checksum = self.memdb.compute_database_checksum()?;
+        output.copy_from_slice(&checksum);
+        Ok(())
+    }
+
+    fn get_state(&self) -> Result<Vec<u8>> {
+        let index = self.memdb.encode_index()?;
+        Ok(index.serialize())
+    }
+
+    fn process_state_update(&self, states: &[NodeSyncInfo]) -> Result<bool> {
+        // Store received states for verification
+        *self.states_received.lock().unwrap() = states.to_vec();
+
+        // Parse indices from states
+        let mut indices: Vec<(u32, u32, MemDbIndex)> = Vec::new();
+        for node in states {
+            if let Some(state_data) = &node.state {
+                match MemDbIndex::deserialize(state_data) {
+                    Ok(index) => indices.push((node.node_id, node.pid, index)),
+                    Err(_) => continue,
+                }
+            }
+        }
+
+        if indices.is_empty() {
+            return Ok(true);
+        }
+
+        // Find leader (highest version, or if tie, highest mtime)
+        let mut leader_idx = 0;
+        for i in 1..indices.len() {
+            let (_, _, current_index) = &indices[i];
+            let (_, _, leader_index) = &indices[leader_idx];
+            if current_index > leader_index {
+                leader_idx = i;
+            }
+        }
+
+        let (_leader_nodeid, _leader_pid, leader_index) = &indices[leader_idx];
+
+        // Check if WE are synced with leader
+        let our_index = self.memdb.encode_index()?;
+        let we_are_synced = our_index.version == leader_index.version
+            && our_index.mtime == leader_index.mtime
+            && our_index.size == leader_index.size
+            && our_index.entries.len() == leader_index.entries.len()
+            && our_index
+                .entries
+                .iter()
+                .zip(leader_index.entries.iter())
+                .all(|(a, b)| a.inode == b.inode && a.digest == b.digest);
+
+        Ok(we_are_synced)
+    }
+
+    fn process_update(&self, _node_id: u32, _pid: u32, data: &[u8]) -> Result<()> {
+        // Deserialize and store update
+        let tree_entry = TreeEntry::deserialize_from_update(data)?;
+        self.updates_received
+            .lock()
+            .unwrap()
+            .push(tree_entry.clone());
+
+        // Apply to database
+        self.memdb.apply_tree_entry(tree_entry)?;
+        Ok(())
+    }
+
+    fn commit_state(&self) -> Result<()> {
+        Ok(())
+    }
+
+    fn on_synced(&self) {
+        *self.synced_count.lock().unwrap() += 1;
+    }
+}
+
+fn create_test_node(node_id: u32) -> Result<(MemDb, TempDir, Arc<MockCallbacks>)> {
+    let temp_dir = TempDir::new()?;
+    let db_path = temp_dir.path().join(format!("node{node_id}.db"));
+    let memdb = MemDb::open(&db_path, true)?;
+    // Note: Local operations always use writer=0 (matching C implementation)
+    // Remote DFSM updates use the writer field from the incoming TreeEntry
+
+    let callbacks = Arc::new(MockCallbacks::new(memdb.clone()));
+    Ok((memdb, temp_dir, callbacks))
+}
+
+#[test]
+fn test_two_node_empty_sync() -> Result<()> {
+    // Create two nodes with empty databases
+    let (_memdb1, _temp1, callbacks1) = create_test_node(1)?;
+    let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+    // Generate states from both nodes
+    let state1 = callbacks1.get_state()?;
+    let state2 = callbacks2.get_state()?;
+
+    // Simulate state exchange
+    let states = vec![
+        NodeSyncInfo {
+            node_id: 1,
+            pid: 1000,
+            state: Some(state1),
+            synced: false,
+        },
+        NodeSyncInfo {
+            node_id: 2,
+            pid: 2000,
+            state: Some(state2),
+            synced: false,
+        },
+    ];
+
+    // Both nodes process states
+    let synced1 = callbacks1.process_state_update(&states)?;
+    let synced2 = callbacks2.process_state_update(&states)?;
+
+    // Both should be synced (empty databases are identical)
+    assert!(synced1, "Node 1 should be synced");
+    assert!(synced2, "Node 2 should be synced");
+
+    Ok(())
+}
+
+#[test]
+fn test_two_node_leader_election() -> Result<()> {
+    // Create two nodes
+    let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+    let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+    // Node 1 has more data (higher version)
+    memdb1.create("/file1.txt", 0, 0, 1000)?;
+    memdb1.write("/file1.txt", 0, 0, 1001, b"data from node 1", false)?;
+
+    // Generate states
+    let state1 = callbacks1.get_state()?;
+    let state2 = callbacks2.get_state()?;
+
+    // Parse to check versions
+    let index1 = MemDbIndex::deserialize(&state1)?;
+    let index2 = MemDbIndex::deserialize(&state2)?;
+
+    // Node 1 should have higher version
+    assert!(
+        index1.version > index2.version,
+        "Node 1 version {} should be > Node 2 version {}",
+        index1.version,
+        index2.version
+    );
+
+    // Simulate state exchange
+    let states = vec![
+        NodeSyncInfo {
+            node_id: 1,
+            pid: 1000,
+            state: Some(state1),
+            synced: false,
+        },
+        NodeSyncInfo {
+            node_id: 2,
+            pid: 2000,
+            state: Some(state2),
+            synced: false,
+        },
+    ];
+
+    // Process states
+    let synced1 = callbacks1.process_state_update(&states)?;
+    let synced2 = callbacks2.process_state_update(&states)?;
+
+    // Node 1 (leader) should be synced, Node 2 (follower) should not
+    assert!(synced1, "Node 1 (leader) should be synced");
+    assert!(!synced2, "Node 2 (follower) should not be synced");
+
+    Ok(())
+}
+
+#[test]
+fn test_incremental_update_transfer() -> Result<()> {
+    // Create leader and follower
+    let (leader_db, _temp_leader, _) = create_test_node(1)?;
+    let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+    // Leader has data
+    leader_db.create("/config", libc::S_IFDIR, 0, 1000)?;
+    leader_db.create("/config/node.conf", 0, 0, 1001)?;
+    leader_db.write("/config/node.conf", 0, 0, 1002, b"hostname=pve1", false)?;
+
+    // Get entries from leader
+    let leader_entries = leader_db.get_all_entries()?;
+
+    // Simulate sending updates to follower
+    for entry in leader_entries {
+        if entry.inode == ROOT_INODE {
+            continue; // Skip root (both have it)
+        }
+
+        // Serialize as update message
+        let update_msg = entry.serialize_for_update();
+
+        // Follower receives and processes update
+        follower_callbacks.process_update(1, 1000, &update_msg)?;
+    }
+
+    // Verify follower has the data
+    let config_dir = follower_db.lookup_path("/config");
+    assert!(
+        config_dir.is_some(),
+        "Follower should have /config directory"
+    );
+    assert!(config_dir.unwrap().is_dir());
+
+    let config_file = follower_db.lookup_path("/config/node.conf");
+    assert!(
+        config_file.is_some(),
+        "Follower should have /config/node.conf"
+    );
+
+    let config_data = follower_db.read("/config/node.conf", 0, 1024)?;
+    assert_eq!(
+        config_data, b"hostname=pve1",
+        "Follower should have correct data"
+    );
+
+    Ok(())
+}
+
+#[test]
+fn test_three_node_sync() -> Result<()> {
+    // Create three nodes
+    let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+    let (memdb2, _temp2, callbacks2) = create_test_node(2)?;
+    let (_memdb3, _temp3, callbacks3) = create_test_node(3)?;
+
+    // Node 1 has the most recent data
+    memdb1.create("/cluster.conf", 0, 0, 5000)?;
+    memdb1.write("/cluster.conf", 0, 0, 5001, b"version=3", false)?;
+
+    // Node 2 has older data
+    memdb2.create("/cluster.conf", 0, 0, 4000)?;
+    memdb2.write("/cluster.conf", 0, 0, 4001, b"version=2", false)?;
+
+    // Node 3 is empty (new node joining)
+
+    // Generate states
+    let state1 = callbacks1.get_state()?;
+    let state2 = callbacks2.get_state()?;
+    let state3 = callbacks3.get_state()?;
+
+    let states = vec![
+        NodeSyncInfo {
+            node_id: 1,
+            pid: 1000,
+            state: Some(state1.clone()),
+            synced: false,
+        },
+        NodeSyncInfo {
+            node_id: 2,
+            pid: 2000,
+            state: Some(state2.clone()),
+            synced: false,
+        },
+        NodeSyncInfo {
+            node_id: 3,
+            pid: 3000,
+            state: Some(state3.clone()),
+            synced: false,
+        },
+    ];
+
+    // All nodes process states
+    let synced1 = callbacks1.process_state_update(&states)?;
+    let synced2 = callbacks2.process_state_update(&states)?;
+    let synced3 = callbacks3.process_state_update(&states)?;
+
+    // Node 1 (leader) should be synced
+    assert!(synced1, "Node 1 (leader) should be synced");
+
+    // Nodes 2 and 3 need updates
+    assert!(!synced2, "Node 2 should need updates");
+    assert!(!synced3, "Node 3 should need updates");
+
+    // Verify leader has highest version
+    let index1 = MemDbIndex::deserialize(&state1)?;
+    let index2 = MemDbIndex::deserialize(&state2)?;
+    let index3 = MemDbIndex::deserialize(&state3)?;
+
+    assert!(index1.version >= index2.version);
+    assert!(index1.version >= index3.version);
+
+    Ok(())
+}
+
+#[test]
+fn test_update_message_wire_format_compatibility() -> Result<()> {
+    // Verify our wire format matches C implementation exactly
+    let entry = TreeEntry {
+        inode: 42,
+        parent: 1,
+        version: 100,
+        writer: 2,
+        mtime: 12345,
+        size: 11,
+        entry_type: 8, // DT_REG
+        name: "test.conf".to_string(),
+        data: b"hello world".to_vec(),
+    };
+
+    let serialized = entry.serialize_for_update();
+
+    // Verify header size (41 bytes)
+    // parent(8) + inode(8) + version(8) + writer(4) + mtime(4) + size(4) + namelen(4) + type(1)
+    let expected_header_size = 8 + 8 + 8 + 4 + 4 + 4 + 4 + 1;
+    assert_eq!(expected_header_size, 41);
+
+    // Verify total size
+    let namelen = "test.conf".len() + 1; // Include null terminator
+    let expected_total = expected_header_size + namelen + 11;
+    assert_eq!(serialized.len(), expected_total);
+
+    // Verify we can deserialize it back
+    let deserialized = TreeEntry::deserialize_from_update(&serialized)?;
+    assert_eq!(deserialized.inode, entry.inode);
+    assert_eq!(deserialized.parent, entry.parent);
+    assert_eq!(deserialized.version, entry.version);
+    assert_eq!(deserialized.writer, entry.writer);
+    assert_eq!(deserialized.mtime, entry.mtime);
+    assert_eq!(deserialized.size, entry.size);
+    assert_eq!(deserialized.entry_type, entry.entry_type);
+    assert_eq!(deserialized.name, entry.name);
+    assert_eq!(deserialized.data, entry.data);
+
+    Ok(())
+}
+
+#[test]
+fn test_index_wire_format_compatibility() -> Result<()> {
+    // Verify memdb_index_t wire format matches C implementation
+    use pmxcfs_memdb::IndexEntry;
+
+    let entries = vec![
+        IndexEntry {
+            inode: 1,
+            digest: [0u8; 32],
+        },
+        IndexEntry {
+            inode: 2,
+            digest: [1u8; 32],
+        },
+    ];
+
+    let index = MemDbIndex::new(
+        100,   // version
+        2,     // last_inode
+        1,     // writer
+        12345, // mtime
+        entries,
+    );
+
+    let serialized = index.serialize();
+
+    // Verify header size (32 bytes)
+    // version(8) + last_inode(8) + writer(4) + mtime(4) + size(4) + bytes(4)
+    let expected_header_size = 8 + 8 + 4 + 4 + 4 + 4;
+    assert_eq!(expected_header_size, 32);
+
+    // Verify entry size (40 bytes each)
+    // inode(8) + digest(32)
+    let expected_entry_size = 8 + 32;
+    assert_eq!(expected_entry_size, 40);
+
+    // Verify total size
+    let expected_total = expected_header_size + 2 * expected_entry_size;
+    assert_eq!(serialized.len(), expected_total);
+    assert_eq!(serialized.len(), index.bytes as usize);
+
+    // Verify deserialization
+    let deserialized = MemDbIndex::deserialize(&serialized)?;
+    assert_eq!(deserialized.version, index.version);
+    assert_eq!(deserialized.last_inode, index.last_inode);
+    assert_eq!(deserialized.writer, index.writer);
+    assert_eq!(deserialized.mtime, index.mtime);
+    assert_eq!(deserialized.size, index.size);
+    assert_eq!(deserialized.bytes, index.bytes);
+    assert_eq!(deserialized.entries.len(), 2);
+
+    Ok(())
+}
+
+#[test]
+fn test_sync_with_conflicts() -> Result<()> {
+    // Test scenario: two nodes modified different files
+    let (memdb1, _temp1, _callbacks1) = create_test_node(1)?;
+    let (memdb2, _temp2, _callbacks2) = create_test_node(2)?;
+
+    // Both start with same base
+    memdb1.create("/base.conf", 0, 0, 1000)?;
+    memdb1.write("/base.conf", 0, 0, 1001, b"shared", false)?;
+
+    memdb2.create("/base.conf", 0, 0, 1000)?;
+    memdb2.write("/base.conf", 0, 0, 1001, b"shared", false)?;
+
+    // Node 1 adds file1
+    memdb1.create("/file1.txt", 0, 0, 2000)?;
+    memdb1.write("/file1.txt", 0, 0, 2001, b"from node 1", false)?;
+
+    // Node 2 adds file2
+    memdb2.create("/file2.txt", 0, 0, 2000)?;
+    memdb2.write("/file2.txt", 0, 0, 2001, b"from node 2", false)?;
+
+    // Generate indices
+    let index1 = memdb1.encode_index()?;
+    let index2 = memdb2.encode_index()?;
+
+    // Find differences
+    let diffs_1_vs_2 = index1.find_differences(&index2);
+    let diffs_2_vs_1 = index2.find_differences(&index1);
+
+    // Node 1 has file1 that node 2 doesn't have
+    assert!(
+        !diffs_1_vs_2.is_empty(),
+        "Node 1 should have entries node 2 doesn't have"
+    );
+
+    // Node 2 has file2 that node 1 doesn't have
+    assert!(
+        !diffs_2_vs_1.is_empty(),
+        "Node 2 should have entries node 1 doesn't have"
+    );
+
+    // Higher version wins - in this case they're both v3 (base + create + write)
+    // so mtime would be tiebreaker
+
+    Ok(())
+}
+
+#[test]
+fn test_large_file_update() -> Result<()> {
+    // Test updating a file with significant data
+    let (leader_db, _temp_leader, _) = create_test_node(1)?;
+    let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+    // Create a file with 10KB of data
+    let large_data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect();
+
+    leader_db.create("/large.bin", 0, 0, 1000)?;
+    leader_db.write("/large.bin", 0, 0, 1001, &large_data, false)?;
+
+    // Get the entry
+    let entry = leader_db.lookup_path("/large.bin").unwrap();
+
+    // Serialize and send
+    let update_msg = entry.serialize_for_update();
+
+    // Follower receives
+    follower_callbacks.process_update(1, 1000, &update_msg)?;
+
+    // Verify
+    let follower_entry = follower_db.lookup_path("/large.bin").unwrap();
+    assert_eq!(follower_entry.size, large_data.len());
+    assert_eq!(follower_entry.data, large_data);
+
+    Ok(())
+}
+
+#[test]
+fn test_directory_hierarchy_sync() -> Result<()> {
+    // Test syncing nested directory structure
+    let (leader_db, _temp_leader, _) = create_test_node(1)?;
+    let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+    // Create directory hierarchy on leader
+    leader_db.create("/etc", libc::S_IFDIR, 0, 1000)?;
+    leader_db.create("/etc/pve", libc::S_IFDIR, 0, 1001)?;
+    leader_db.create("/etc/pve/nodes", libc::S_IFDIR, 0, 1002)?;
+    leader_db.create("/etc/pve/nodes/pve1", libc::S_IFDIR, 0, 1003)?;
+    leader_db.create("/etc/pve/nodes/pve1/config", 0, 0, 1004)?;
+    leader_db.write(
+        "/etc/pve/nodes/pve1/config",
+        0,
+        0,
+        1005,
+        b"cpu: 2\nmem: 4096",
+        false,
+    )?;
+
+    // Send all entries to follower
+    let entries = leader_db.get_all_entries()?;
+    for entry in entries {
+        if entry.inode == ROOT_INODE {
+            continue; // Skip root
+        }
+        let update_msg = entry.serialize_for_update();
+        follower_callbacks.process_update(1, 1000, &update_msg)?;
+    }
+
+    // Verify entire hierarchy
+    assert!(follower_db.lookup_path("/etc").is_some());
+    assert!(follower_db.lookup_path("/etc/pve").is_some());
+    assert!(follower_db.lookup_path("/etc/pve/nodes").is_some());
+    assert!(follower_db.lookup_path("/etc/pve/nodes/pve1").is_some());
+
+    let config = follower_db.lookup_path("/etc/pve/nodes/pve1/config");
+    assert!(config.is_some());
+    assert_eq!(config.unwrap().data, b"cpu: 2\nmem: 4096");
+
+    Ok(())
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
index ac0a3cb32..1ddb9b971 100644
--- a/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
@@ -463,7 +463,7 @@ impl MemDb {
             return Err(std::io::Error::other(DB_ERRORS_MSG));
         }
 
-        // CRITICAL FIX: Acquire write guard BEFORE any checks to prevent TOCTOU race
+        // Acquire write guard before any checks to prevent TOCTOU race
         // This ensures all validation and mutation happen atomically
         let _guard = self.inner.write_guard.lock();
 
@@ -617,7 +617,7 @@ impl MemDb {
             return Err(std::io::Error::other(DB_ERRORS_MSG));
         }
 
-        // CRITICAL FIX: Acquire write guard BEFORE any checks to prevent TOCTOU race
+        // Acquire write guard before any checks to prevent TOCTOU race
         // This ensures lookup and mutation happen atomically
         let _guard = self.inner.write_guard.lock();
 
-- 
2.47.3





  parent reply	other threads:[~2026-03-23 13:01 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-03-23 11:32 [PATCH pve-cluster v3 00/13] Rewrite pmxcfs with Rust Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 01/13] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 02/13] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 03/13] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 04/13] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 05/13] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-03-23 11:32 ` SPAM: [PATCH pve-cluster v3 06/13] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 07/13] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 08/13] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-03-23 11:32 ` Kefu Chai [this message]
2026-03-23 11:32 ` [PATCH pve-cluster v3 10/13] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 11/13] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 13/13] pmxcfs-rs: add project documentation Kefu Chai

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260323113239.942866-10-k.chai@proxmox.com \
    --to=k.chai@proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    --cc=tchaikov@gmail.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal