From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [PATCH pve-cluster 10/14 v2] pmxcfs-rs: add pmxcfs-dfsm crate
Date: Fri, 13 Feb 2026 17:33:47 +0800 [thread overview]
Message-ID: <20260213094119.2379288-11-k.chai@proxmox.com> (raw)
In-Reply-To: <20260213094119.2379288-1-k.chai@proxmox.com>
Add Distributed Finite State Machine for cluster synchronization:
- Dfsm: Core state machine implementation
- ClusterDatabaseService: MemDb sync (pmxcfs_v1 CPG group)
- StatusSyncService: Status sync (pve_kvstore_v1 CPG group)
- Protocol: SyncStart, State, Update, UpdateComplete, Verify
- Leader election based on version and mtime
- Incremental updates for efficiency
This integrates pmxcfs-memdb, pmxcfs-services, and rust-corosync
to provide cluster-wide database synchronization. It implements
the wire-compatible protocol used by the C version.
Includes unit tests for:
- Index serialization and comparison
- Leader election logic
- Tree entry serialization
- Diff computation between indices
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/pmxcfs-rs/Cargo.toml | 9 +
src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml | 46 +
src/pmxcfs-rs/pmxcfs-dfsm/README.md | 340 +++++
src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs | 80 ++
.../src/cluster_database_service.rs | 116 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs | 235 ++++
src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs | 722 ++++++++++
src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs | 194 +++
.../pmxcfs-dfsm/src/kv_store_message.rs | 387 +++++
src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs | 32 +
src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs | 21 +
.../pmxcfs-dfsm/src/state_machine.rs | 1251 +++++++++++++++++
.../pmxcfs-dfsm/src/status_sync_service.rs | 118 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs | 107 ++
src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs | 279 ++++
.../tests/multi_node_sync_tests.rs | 563 ++++++++
src/pmxcfs-rs/pmxcfs-memdb/src/database.rs | 4 +-
src/pmxcfs-rs/pmxcfs-status/src/status.rs | 4 +-
18 files changed, 4504 insertions(+), 4 deletions(-)
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 07c450fb4..4dfb1c1a8 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -10,6 +10,7 @@ members = [
"pmxcfs-test-utils", # Test utilities and helpers (dev-only)
"pmxcfs-services", # Service framework for automatic retry and lifecycle management
"pmxcfs-ipc", # libqb-compatible IPC server
+ "pmxcfs-dfsm", # Distributed Finite State Machine
]
resolver = "2"
@@ -32,6 +33,10 @@ pmxcfs-status = { path = "pmxcfs-status" }
pmxcfs-test-utils = { path = "pmxcfs-test-utils" }
pmxcfs-services = { path = "pmxcfs-services" }
pmxcfs-ipc = { path = "pmxcfs-ipc" }
+pmxcfs-dfsm = { path = "pmxcfs-dfsm" }
+
+# Corosync integration
+rust-corosync = "0.1"
# Core async runtime
tokio = { version = "1.35", features = ["full"] }
@@ -51,6 +56,7 @@ async-trait = "0.1"
# Serialization
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3"
+bytemuck = { version = "1.14", features = ["derive"] }
# Network and cluster
bytes = "1.5"
@@ -63,6 +69,9 @@ parking_lot = "0.12"
libc = "0.2"
nix = { version = "0.29", features = ["socket", "poll"] }
+# Utilities
+num_enum = "0.7"
+
# Development dependencies
tempfile = "3.8"
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
new file mode 100644
index 000000000..b495f6343
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
@@ -0,0 +1,46 @@
+[package]
+name = "pmxcfs-dfsm"
+description = "Distributed Finite State Machine for cluster state synchronization"
+
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+# Internal dependencies
+pmxcfs-api-types.workspace = true
+pmxcfs-memdb.workspace = true
+pmxcfs-services.workspace = true
+
+# Corosync integration
+rust-corosync.workspace = true
+
+# Error handling
+anyhow.workspace = true
+thiserror.workspace = true
+
+# Async and concurrency
+parking_lot.workspace = true
+async-trait.workspace = true
+tokio.workspace = true
+
+# Serialization
+serde.workspace = true
+bincode.workspace = true
+bytemuck.workspace = true
+
+# Logging
+tracing.workspace = true
+
+# Utilities
+num_enum.workspace = true
+libc.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
+libc.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/README.md b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
new file mode 100644
index 000000000..a8412f1b0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
@@ -0,0 +1,340 @@
+# pmxcfs-dfsm
+
+**Distributed Finite State Machine** for cluster-wide state synchronization in pmxcfs.
+
+This crate implements the DFSM protocol used to replicate configuration changes and status updates across all nodes in a Proxmox cluster via Corosync CPG (Closed Process Group).
+
+## Overview
+
+The DFSM is the core mechanism for maintaining consistency across cluster nodes. It ensures that:
+
+- All nodes see filesystem operations (writes, creates, deletes) in the same order
+- Database state remains synchronized even after network partitions
+- Status information (VM states, RRD data) is broadcast to all nodes
+- State verification catches inconsistencies
+
+## Architecture
+
+### Key Components
+
+### Module Structure
+
+| Module | Purpose | C Equivalent |
+|--------|---------|--------------|
+| `state_machine.rs` | Core DFSM logic, state transitions | `dfsm.c` |
+| `cluster_database_service.rs` | MemDb sync service | `dcdb.c`, `loop.c:service_dcdb` |
+| `status_sync_service.rs` | Status/kvstore sync service | `loop.c:service_status` |
+| `cpg_service.rs` | Corosync CPG integration | `dfsm.c:cpg_callbacks` |
+| `dfsm_message.rs` | Protocol message types | `dfsm.c:dfsm_message_*_header_t` |
+| `message.rs` | Message trait and serialization | (inline in C) |
+| `wire_format.rs` | C-compatible wire format | `dcdb.c:c_fuse_message_header_t` |
+| `broadcast.rs` | Cluster-wide message broadcast | `dcdb.c:dcdb_send_fuse_message` |
+| `types.rs` | Type definitions (modes, epochs) | `dfsm.c:dfsm_mode_t` |
+
+## C to Rust Mapping
+
+### Data Structures
+
+| C Type | Rust Type | Notes |
+|--------|-----------|-------|
+| `dfsm_t` | `Dfsm` | Main state machine |
+| `dfsm_mode_t` | `DfsmMode` | Enum with type safety |
+| `dfsm_node_info_t` | (internal) | Node state tracking |
+| `dfsm_sync_info_t` | (internal) | Sync session info |
+| `dfsm_callbacks_t` | Trait-based callbacks | Type-safe callbacks via traits |
+| `dfsm_message_*_header_t` | `DfsmMessage` | Type-safe enum variants |
+
+### Functions
+
+#### Core DFSM Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dfsm_new()` | `Dfsm::new()` | state_machine.rs |
+| `dfsm_initialize()` | `Dfsm::init_cpg()` | state_machine.rs |
+| `dfsm_join()` | (part of init_cpg) | state_machine.rs |
+| `dfsm_dispatch()` | `Dfsm::dispatch_events()` | state_machine.rs |
+| `dfsm_send_message()` | `Dfsm::send_message()` | state_machine.rs |
+| `dfsm_send_update()` | `Dfsm::send_update()` | state_machine.rs |
+| `dfsm_verify_request()` | `Dfsm::verify_request()` | state_machine.rs |
+| `dfsm_finalize()` | `Dfsm::stop_services()` | state_machine.rs |
+
+#### DCDB (Cluster Database) Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dcdb_new()` | `ClusterDatabaseService::new()` | cluster_database_service.rs |
+| `dcdb_send_fuse_message()` | `broadcast()` | broadcast.rs |
+| `dcdb_send_unlock()` | `FuseMessage::Unlock` + broadcast | broadcast.rs |
+| `service_dcdb()` | `ClusterDatabaseService` | cluster_database_service.rs |
+
+#### Status Sync Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `service_status()` | `StatusSyncService` | status_sync_service.rs |
+| (kvstore CPG group) | `StatusSyncService` | Uses separate CPG group |
+
+### Callback System
+
+**C Implementation:**
+
+**Rust Implementation:**
+- Uses trait-based callbacks instead of function pointers
+- Callbacks are implemented by `MemDbCallbacks` (memdb integration)
+- Defined in external crates (pmxcfs-memdb)
+
+## Synchronization Protocol
+
+The DFSM ensures all nodes maintain consistent database state through a multi-phase synchronization protocol:
+
+### Protocol Phases
+
+#### Phase 1: Membership Change
+
+When nodes join or leave the cluster:
+
+1. **Corosync CPG** delivers membership change notification
+2. **DFSM invalidates** cached checksums
+3. **Message queues** are cleared
+4. **Epoch counter** is incremented
+
+**CPG Leader** (lowest node ID):
+- Initiates sync by sending `SyncStart` message
+- Sends its own `State` (CPG doesn't loop back messages)
+
+**All Followers**:
+- Respond to `SyncStart` by sending their `State`
+- Wait for other nodes' states
+
+#### Phase 2: State Exchange
+
+Each node collects `State` messages containing serialized **MemDbIndex** (compact state summary using C-compatible wire format).
+
+State digests are computed using SHA-256 hashing to detect differences between nodes.
+
+#### Phase 3: Leader Election
+
+When all states are collected, `process_state_update()` is called:
+
+1. **Parse indices** from all node states
+2. **Elect data leader** (may differ from CPG leader):
+ - Highest `version` wins
+ - If tied, highest `mtime` wins
+3. **Identify synced nodes**: Nodes whose index matches leader exactly
+4. **Determine own status**:
+ - If we're the data leader → send updates to followers
+ - If we're synced with leader → mark as Synced
+ - Otherwise → enter Update mode and wait
+
+**Leader Election Algorithm**:
+
+#### Phase 4: Incremental Updates
+
+**Data Leader** (node with highest version):
+
+1. **Compare indices** using `find_differences()` for each follower
+2. **Serialize differing entries** to C-compatible TreeEntry format
+3. **Send Update messages** via CPG
+4. **Send UpdateComplete** when all updates sent
+
+**Followers** (out-of-sync nodes):
+
+1. **Receive Update messages**
+2. **Deserialize TreeEntry** via `TreeEntry::deserialize_from_update()`
+3. **Apply to database** via `MemDb::apply_tree_entry()`:
+ - INSERT OR REPLACE in SQLite
+ - Update in-memory structures
+ - Handle entry moves (parent/name changes)
+4. **On UpdateComplete**: Transition to Synced mode
+
+#### Phase 5: Normal Operations
+
+When in **Synced** mode:
+
+- FUSE operations are broadcast via `send_fuse_message()`
+- Messages are delivered immediately via `deliver_message()`
+- Leader periodically sends `VerifyRequest` for checksum comparison
+- Nodes respond with `Verify` containing SHA-256 of entire database
+- Mismatches trigger cluster resync
+
+---
+
+## Protocol Details
+
+### State Machine Transitions
+
+Based on analysis of C implementation (`dfsm.c` lines 795-1209):
+
+#### Critical Protocol Rules
+
+1. **Epoch Management**:
+ - Each node creates local epoch during confchg: `(counter++, time, own_nodeid, own_pid)`
+ - **Leader sends SYNC_START with its epoch**
+ - **Followers MUST adopt leader's epoch from SYNC_START** (`dfsm->sync_epoch = header->epoch`)
+ - All STATE messages in sync round use adopted epoch
+ - Epoch mismatch → message discarded (may lead to LEAVE)
+
+2. **Member List Validation**:
+ - Built from `member_list` in confchg callback
+ - Stored in `dfsm->sync_info->nodes[]`
+ - STATE sender MUST be in this list
+ - Non-member STATE → immediate LEAVE
+
+3. **Duplicate Detection**:
+ - Each node sends STATE exactly once per sync round
+ - Tracked via `ni->state` pointer (NULL = not received, non-NULL = received)
+ - Duplicate STATE from same nodeid/pid → immediate LEAVE
+ - ✅ **FIXED**: Rust implementation now matches C (see commit c321869cc)
+
+4. **Message Ordering** (one sync round):
+
+5. **Leader Selection**:
+ - Determined by `lowest_nodeid` from member list
+ - Set in confchg callback before any messages sent
+ - Used to validate SYNC_START sender (logged but not enforced)
+ - Re-elected during state processing based on DB versions
+
+### DFSM States (DfsmMode)
+
+| State | Value | Description | C Equivalent |
+|-------|-------|-------------|--------------|
+| `Start` | 0 | Initial connection | `DFSM_MODE_START` |
+| `StartSync` | 1 | Beginning sync | `DFSM_MODE_START_SYNC` |
+| `Synced` | 2 | Fully synchronized | `DFSM_MODE_SYNCED` |
+| `Update` | 3 | Receiving updates | `DFSM_MODE_UPDATE` |
+| `Leave` | 253 | Leaving group | `DFSM_MODE_LEAVE` |
+| `VersionError` | 254 | Protocol mismatch | `DFSM_MODE_VERSION_ERROR` |
+| `Error` | 255 | Error state | `DFSM_MODE_ERROR` |
+
+### Message Types (DfsmMessageType)
+
+| Type | Value | Purpose |
+|------|-------|---------|
+| `Normal` | 0 | Application messages (with header + payload) |
+| `SyncStart` | 1 | Start sync (from leader) |
+| `State` | 2 | Full state data |
+| `Update` | 3 | Incremental update |
+| `UpdateComplete` | 4 | End of updates |
+| `VerifyRequest` | 5 | Request state verification |
+| `Verify` | 6 | State checksum response |
+
+All messages use C-compatible wire format with headers and payloads.
+
+### Application Message Types
+
+The DFSM can carry two types of application messages:
+
+1. **Fuse Messages** (Filesystem operations)
+ - CPG Group: `pmxcfs_v1` (DCDB)
+ - Message types: `Write`, `Create`, `Delete`, `Mkdir`, `Rename`, `SetMtime`, `Unlock`
+ - Defined in: `pmxcfs-api-types::FuseMessage`
+
+2. **KvStore Messages** (Status/RRD sync)
+ - CPG Group: `pve_kvstore_v1`
+ - Message types: `Data` (key-value pairs for status sync)
+ - Defined in: `pmxcfs-api-types::KvStoreMessage`
+
+### Wire Format Compatibility
+
+All wire formats are **byte-compatible** with the C implementation. Messages include appropriate headers and payloads as defined in the C protocol.
+
+## Synchronization Flow
+
+### 1. Node Join
+
+### 2. Normal Operation
+
+### 3. State Verification (Periodic)
+
+## Key Differences from C Implementation
+
+### Event Loop Architecture
+
+**C Version:**
+- Uses libqb's `qb_loop` for event loop
+- CPG fd registered with `qb_loop_poll_add()`
+- Dispatch called from qb_loop when fd is readable
+
+**Rust Version:**
+- Uses tokio async runtime
+- Service trait provides `dispatch()` method
+- ServiceManager polls fd using tokio's async I/O
+- No qb_loop dependency
+
+### CPG Instance Management
+
+**C Version:**
+- Single DFSM struct with callbacks
+- Two different CPG groups created separately
+
+**Rust Version:**
+- Each CPG group gets its own `Dfsm` instance
+- `ClusterDatabaseService` - manages `pmxcfs_v1` CPG group (MemDb)
+- `StatusSyncService` - manages `pve_kvstore_v1` CPG group (Status/RRD)
+- Both use same DFSM protocol but different callbacks
+
+## Error Handling
+
+### Split-Brain Prevention
+
+- Checksum verification detects divergence
+- Automatic resync on mismatch
+- Version monotonicity ensures forward progress
+
+### Network Partition Recovery
+
+- Membership changes trigger sync
+- Highest version always wins
+- Stale data is safely replaced
+
+### Consistency Guarantees
+
+- SQLite transactions ensure atomic updates
+- In-memory structures updated atomically
+- Version increments are monotonic
+- All nodes converge to same state
+
+## Compatibility Matrix
+
+| Feature | C Version | Rust Version | Compatible |
+|---------|-----------|--------------|------------|
+| Wire format | `dfsm_message_*_header_t` | `DfsmMessage::serialize()` | Yes |
+| CPG protocol | libcorosync | rust-corosync | Yes |
+| Message types | 0-6 | `DfsmMessageType` | Yes |
+| State machine | `dfsm_mode_t` | `DfsmMode` | Yes |
+| Protocol version | 1 | 1 | Yes |
+| Group names | `pmxcfs_v1`, `pve_kvstore_v1` | Same | Yes |
+
+## Known Issues / TODOs
+
+### Missing Features
+- [ ] **Sync message batching**: C version can batch updates, Rust sends individually
+- [ ] **Message queue limits**: C has MAX_QUEUE_LEN, Rust unbounded (potential memory issue)
+- [ ] **Detailed error codes**: C returns specific CS_ERR_* codes, Rust uses anyhow errors
+
+### Behavioral Differences (Benign)
+- **Logging**: Rust uses `tracing` instead of `qb_log` (compatible with journald)
+- **Threading**: Rust uses tokio tasks, C uses qb_loop single-threaded model
+- **Timers**: Rust uses tokio timers, C uses qb_loop timers (same timeout values)
+
+### Incompatibilities (None Known)
+No incompatibilities have been identified. The Rust implementation is fully wire-compatible and can operate in a mixed C/Rust cluster.
+
+## References
+
+### C Implementation
+- `src/pmxcfs/dfsm.c` / `dfsm.h` - Core DFSM implementation
+- `src/pmxcfs/dcdb.c` / `dcdb.h` - Distributed database coordination
+- `src/pmxcfs/loop.c` / `loop.h` - Service loop and management
+
+### Related Crates
+- **pmxcfs-memdb**: Database callbacks for DFSM
+- **pmxcfs-status**: Status tracking and kvstore
+- **pmxcfs-api-types**: Message type definitions
+- **pmxcfs-services**: Service framework for lifecycle management
+- **rust-corosync**: CPG bindings (external dependency)
+
+### Corosync Documentation
+- CPG (Closed Process Group) API: https://github.com/corosync/corosync
+- Group communication semantics: Total order, virtual synchrony
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
new file mode 100644
index 000000000..f2f48619b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
@@ -0,0 +1,80 @@
+/// DFSM application callbacks
+///
+/// This module defines the callback trait that application layers implement
+/// to integrate with the DFSM state machine.
+use crate::NodeSyncInfo;
+
+/// Callback trait for DFSM operations
+///
+/// The application layer implements this to receive DFSM events.
+/// The associated type `Message` specifies the message type this callback handles:
+/// - `FuseMessage` for main database operations
+/// - `KvStoreMessage` for status synchronization
+///
+/// This provides type safety by ensuring each DFSM instance only delivers
+/// the correct message type to its callbacks.
+pub trait Callbacks: Send + Sync {
+ /// The message type this callback handles
+ type Message: crate::message::Message;
+
+ /// Deliver an application message
+ ///
+ /// The message type is determined by the associated type:
+ /// - FuseMessage for main database operations
+ /// - KvStoreMessage for status synchronization
+ fn deliver_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ message: Self::Message,
+ timestamp: u64,
+ ) -> anyhow::Result<(i32, bool)>;
+
+ /// Compute state checksum for verification
+ fn compute_checksum(&self, output: &mut [u8; 32]) -> anyhow::Result<()>;
+
+ /// Get current state for synchronization
+ ///
+ /// Called when we need to send our state to other nodes during sync.
+ fn get_state(&self) -> anyhow::Result<Vec<u8>>;
+
+ /// Process state update during synchronization
+ fn process_state_update(&self, states: &[NodeSyncInfo]) -> anyhow::Result<bool>;
+
+ /// Process incremental update from leader
+ ///
+ /// The leader sends individual TreeEntry updates during synchronization.
+ /// The data is serialized TreeEntry in C-compatible wire format.
+ fn process_update(&self, nodeid: u32, pid: u32, data: &[u8]) -> anyhow::Result<()>;
+
+ /// Commit synchronized state
+ fn commit_state(&self) -> anyhow::Result<()>;
+
+ /// Called when cluster becomes synced
+ fn on_synced(&self);
+
+ /// Clean up sync resources (matches C's dfsm_cleanup_fn)
+ ///
+ /// Called to release resources allocated during state synchronization.
+ /// This is called when sync resources are being released, typically during
+ /// membership changes or when transitioning out of sync mode.
+ ///
+ /// Default implementation does nothing (Rust's RAII handles most cleanup).
+ fn cleanup_sync_resources(&self, _states: &[NodeSyncInfo]) {
+ // Default: no-op, Rust's Drop trait handles cleanup
+ }
+
+ /// Called on membership changes (matches C's dfsm_confchg_fn)
+ ///
+ /// Notifies the application layer when cluster membership changes.
+ /// This can be used for logging, monitoring, or application-specific
+ /// membership tracking.
+ ///
+ /// # Arguments
+ /// * `member_list` - Current list of cluster members after the change
+ ///
+ /// Default implementation does nothing (membership handled internally).
+ fn on_membership_change(&self, _member_list: &[pmxcfs_api_types::MemberInfo]) {
+ // Default: no-op, membership changes handled internally
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
new file mode 100644
index 000000000..f2847062d
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
@@ -0,0 +1,116 @@
+//! Cluster Database Service
+//!
+//! This service synchronizes the distributed cluster database (pmxcfs-memdb) across
+//! all cluster nodes using DFSM (Distributed Finite State Machine).
+//!
+//! Equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+//! Provides automatic retry, event-driven CPG dispatching, and periodic state verification.
+
+use async_trait::async_trait;
+use pmxcfs_services::{Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{debug, error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message;
+
+/// Cluster Database Service
+///
+/// Synchronizes the distributed cluster database (pmxcfs-memdb) across all nodes.
+/// Implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for database replication
+/// - Periodic state verification via timer callback
+///
+/// This is equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct ClusterDatabaseService<M> {
+ dfsm: Arc<Dfsm<M>>,
+ fd: Option<i32>,
+}
+
+impl<M: Message> ClusterDatabaseService<M> {
+ /// Create a new cluster database service
+ pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+ Self { dfsm, fd: None }
+ }
+}
+
+#[async_trait]
+impl<M: Message> Service for ClusterDatabaseService<M> {
+ fn name(&self) -> &str {
+ "cluster-database"
+ }
+
+ async fn initialize(&mut self) -> pmxcfs_services::Result<std::os::unix::io::RawFd> {
+ info!("Initializing cluster database service (dcdb)");
+
+ // Initialize CPG connection (this also joins the group)
+ self.dfsm.init_cpg().map_err(|e| {
+ ServiceError::InitializationFailed(format!("DFSM CPG initialization failed: {e}"))
+ })?;
+
+ // Get file descriptor for event monitoring
+ let fd = self.dfsm.fd_get().map_err(|e| {
+ self.dfsm.stop_services().ok();
+ ServiceError::InitializationFailed(format!("Failed to get DFSM fd: {e}"))
+ })?;
+
+ self.fd = Some(fd);
+
+ info!(
+ "Cluster database service initialized successfully with fd {}",
+ fd
+ );
+ Ok(fd)
+ }
+
+ async fn dispatch(&mut self) -> pmxcfs_services::Result<bool> {
+ match self.dfsm.dispatch_events() {
+ Ok(_) => Ok(true),
+ Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+ warn!("DFSM connection lost, requesting reinitialization");
+ Ok(false)
+ }
+ Err(e) => {
+ error!("DFSM dispatch failed: {}", e);
+ Err(ServiceError::DispatchFailed(format!(
+ "DFSM dispatch failed: {e}"
+ )))
+ }
+ }
+ }
+
+ async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+ info!("Finalizing cluster database service");
+
+ self.fd = None;
+
+ if let Err(e) = self.dfsm.stop_services() {
+ warn!("Error stopping cluster database services: {}", e);
+ }
+
+ info!("Cluster database service finalized");
+ Ok(())
+ }
+
+ async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+ debug!("Cluster database timer callback: initiating state verification");
+
+ // Request state verification
+ if let Err(e) = self.dfsm.verify_request() {
+ warn!("DFSM state verification request failed: {}", e);
+ }
+
+ Ok(())
+ }
+
+ fn timer_period(&self) -> Option<Duration> {
+ // Match C implementation's DCDB_VERIFY_TIME (60 * 60 seconds)
+ // Periodic state verification happens once per hour
+ Some(Duration::from_secs(3600))
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
new file mode 100644
index 000000000..6e74238e9
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
@@ -0,0 +1,235 @@
+//! Safe, idiomatic wrapper for Corosync CPG (Closed Process Group)
+//!
+//! This module provides a trait-based abstraction over the Corosync CPG C API,
+//! handling the unsafe FFI boundary and callback lifecycle management internally.
+
+use anyhow::Result;
+use rust_corosync::{NodeId, cpg};
+use std::sync::Arc;
+
+/// Helper to extract CpgHandler from CPG context
+///
+/// # Safety
+/// - Context must point to a valid Arc<dyn CpgHandler> leaked via Box::into_raw()
+/// - Handler must still be alive (CpgService not dropped)
+/// - Pointer must be properly aligned for Arc<dyn CpgHandler>
+///
+/// # Errors
+/// Returns error if context is invalid, null, or misaligned
+unsafe fn handler_from_context<'a>(handle: cpg::Handle) -> Result<&'a dyn CpgHandler> {
+ let context = cpg::context_get(handle)
+ .map_err(|e| anyhow::anyhow!("Failed to get CPG context: {e:?}"))?;
+
+ if context == 0 {
+ return Err(anyhow::anyhow!("CPG context is null - not initialized"));
+ }
+
+ // Validate pointer alignment
+ if context % std::mem::align_of::<Arc<dyn CpgHandler>>() as u64 != 0 {
+ return Err(anyhow::anyhow!("CPG context pointer misaligned"));
+ }
+
+ // Context points to a leaked Arc<dyn CpgHandler>
+ // We borrow the Arc to get a reference to the handler
+ let arc_ptr = context as *const Arc<dyn CpgHandler>;
+ let arc_ref: &Arc<dyn CpgHandler> = unsafe { &*arc_ptr };
+ Ok(arc_ref.as_ref())
+}
+
+/// Trait for handling CPG events in a safe, idiomatic way
+///
+/// Implementors receive callbacks when CPG events occur. The trait handles
+/// all unsafe pointer conversion and context management internally.
+pub trait CpgHandler: Send + Sync + 'static {
+ fn on_deliver(&self, group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]);
+
+ fn on_confchg(
+ &self,
+ group_name: &str,
+ member_list: &[cpg::Address],
+ left_list: &[cpg::Address],
+ joined_list: &[cpg::Address],
+ );
+}
+
+/// Safe wrapper for CPG handle that manages callback lifecycle
+///
+/// This service registers callbacks with the CPG handle and ensures proper
+/// cleanup when dropped. It uses Arc reference counting to safely manage
+/// the handler lifetime across the FFI boundary.
+pub struct CpgService {
+ handle: cpg::Handle,
+ handler: Arc<dyn CpgHandler>,
+}
+
+impl CpgService {
+ pub fn new<T: CpgHandler>(handler: Arc<T>) -> Result<Self> {
+ fn cpg_deliver_callback(
+ handle: &cpg::Handle,
+ group_name: String,
+ nodeid: NodeId,
+ pid: u32,
+ msg: &[u8],
+ _msg_len: usize,
+ ) {
+ // Catch panics to prevent unwinding through C code (UB)
+ let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+ match unsafe { handler_from_context(*handle) } {
+ Ok(handler) => handler.on_deliver(&group_name, nodeid, pid, msg),
+ Err(e) => {
+ tracing::error!("CPG deliver callback error: {}", e);
+ }
+ }
+ }));
+
+ if let Err(panic) = result {
+ tracing::error!("PANIC in CPG deliver callback: {:?}", panic);
+ }
+ }
+
+ fn cpg_confchg_callback(
+ handle: &cpg::Handle,
+ group_name: &str,
+ member_list: Vec<cpg::Address>,
+ left_list: Vec<cpg::Address>,
+ joined_list: Vec<cpg::Address>,
+ ) {
+ // Catch panics to prevent unwinding through C code (UB)
+ let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+ match unsafe { handler_from_context(*handle) } {
+ Ok(handler) => handler.on_confchg(group_name, &member_list, &left_list, &joined_list),
+ Err(e) => {
+ tracing::error!("CPG confchg callback error: {}", e);
+ }
+ }
+ }));
+
+ if let Err(panic) = result {
+ tracing::error!("PANIC in CPG confchg callback: {:?}", panic);
+ }
+ }
+
+ let model_data = cpg::ModelData::ModelV1(cpg::Model1Data {
+ flags: cpg::Model1Flags::None,
+ deliver_fn: Some(cpg_deliver_callback),
+ confchg_fn: Some(cpg_confchg_callback),
+ totem_confchg_fn: None,
+ });
+
+ let handle = cpg::initialize(&model_data, 0)?;
+
+ let handler_dyn: Arc<dyn CpgHandler> = handler;
+ let leaked_arc = Box::new(Arc::clone(&handler_dyn));
+ let arc_ptr = Box::into_raw(leaked_arc) as u64;
+
+ // Set context with error handling to prevent Arc leak
+ if let Err(e) = cpg::context_set(handle, arc_ptr) {
+ // Recover the leaked Arc on error
+ unsafe {
+ let _ = Box::from_raw(arc_ptr as *mut Arc<dyn CpgHandler>);
+ }
+ // Finalize CPG handle
+ let _ = cpg::finalize(handle);
+ return Err(e.into());
+ }
+
+ Ok(Self {
+ handle,
+ handler: handler_dyn,
+ })
+ }
+
+ pub fn join(&self, group_name: &str) -> Result<()> {
+ // Group names are hardcoded in the application, so assert they don't contain nulls
+ debug_assert!(!group_name.contains('\0'), "Group name cannot contain null bytes");
+
+ // IMPORTANT: C implementation uses strlen(name) + 1 for CPG name length,
+ // which includes the trailing nul. To ensure compatibility with C nodes,
+ // we must add \0 to the group name.
+ // See src/pmxcfs/dfsm.c: dfsm->cpg_group_name.length = strlen(group_name) + 1;
+ let group_string = format!("{group_name}\0");
+ tracing::warn!(
+ "CPG JOIN: Joining group '{}' (verify matches C's DCDB_CPG_GROUP_NAME='pve_dcdb_v1')",
+ group_name
+ );
+ cpg::join(self.handle, &group_string)?;
+ tracing::info!("CPG JOIN: Successfully joined group '{}'", group_name);
+ Ok(())
+ }
+
+ pub fn leave(&self, group_name: &str) -> Result<()> {
+ // Group names are hardcoded in the application, so assert they don't contain nulls
+ debug_assert!(!group_name.contains('\0'), "Group name cannot contain null bytes");
+
+ // Include trailing nul to match C's behavior (see join() comment)
+ let group_string = format!("{group_name}\0");
+ cpg::leave(self.handle, &group_string)?;
+ Ok(())
+ }
+
+ pub fn mcast(&self, guarantee: cpg::Guarantee, msg: &[u8]) -> Result<()> {
+ cpg::mcast_joined(self.handle, guarantee, msg)?;
+ Ok(())
+ }
+
+ pub fn dispatch(&self) -> Result<(), rust_corosync::CsError> {
+ cpg::dispatch(self.handle, rust_corosync::DispatchFlags::All)
+ }
+
+ pub fn fd(&self) -> Result<i32> {
+ Ok(cpg::fd_get(self.handle)?)
+ }
+
+ pub fn handler(&self) -> &Arc<dyn CpgHandler> {
+ &self.handler
+ }
+
+ pub fn handle(&self) -> cpg::Handle {
+ self.handle
+ }
+}
+
+impl Drop for CpgService {
+ fn drop(&mut self) {
+ // CRITICAL: Finalize BEFORE recovering Arc to prevent race condition
+ // where callbacks could fire while we're deallocating the Arc
+ if let Err(e) = cpg::finalize(self.handle) {
+ tracing::error!("Failed to finalize CPG handle: {:?}", e);
+ }
+
+ // Now safe to recover Arc - no more callbacks can fire
+ match cpg::context_get(self.handle) {
+ Ok(context) if context != 0 => {
+ unsafe {
+ let _boxed = Box::from_raw(context as *mut Arc<dyn CpgHandler>);
+ }
+ }
+ Ok(_) => {
+ tracing::warn!("CPG context was null during drop");
+ }
+ Err(e) => {
+ // Context_get might fail after finalize - this is acceptable
+ tracing::debug!("Context get failed during drop: {:?}", e);
+ }
+ }
+ }
+}
+
+/// SAFETY: CpgService is thread-safe with the following guarantees:
+///
+/// 1. cpg::Handle is thread-safe per Corosync 2.x/3.x library design
+/// - CPG library uses internal mutex for concurrent access protection
+///
+/// 2. Handler is protected by Arc reference counting
+/// - Multiple threads can safely hold references to the handler
+///
+/// 3. CpgHandler trait requires Send + Sync
+/// - Implementations must handle concurrent callbacks safely
+///
+/// 4. Methods join/leave/mcast are safe to call concurrently from multiple threads
+///
+/// LIMITATIONS:
+/// - Do NOT call dispatch() concurrently from multiple threads on the same handle
+/// (CPG library dispatch is not reentrant)
+unsafe impl Send for CpgService {}
+unsafe impl Sync for CpgService {}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
new file mode 100644
index 000000000..5d03eae45
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
@@ -0,0 +1,722 @@
+/// DFSM Protocol Message Types
+///
+/// This module defines the DfsmMessage enum which encapsulates all DFSM protocol messages
+/// with their associated data, providing type-safe serialization and deserialization.
+///
+/// Wire format matches C implementation's dfsm_message_*_header_t structures for compatibility.
+use anyhow::Result;
+use pmxcfs_memdb::TreeEntry;
+
+use super::message::Message;
+use super::types::{DfsmMessageType, SyncEpoch};
+
+/// DFSM protocol message with typed variants
+///
+/// Each variant corresponds to a message type in the DFSM protocol and carries
+/// the appropriate payload data. The wire format matches the C implementation:
+///
+/// For Normal messages: dfsm_message_normal_header_t (24 bytes) + fuse_data
+/// ```text
+/// [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][fuse_data...]
+/// ```
+///
+/// The generic parameter `M` specifies the application message type and must implement
+/// the `Message` trait for serialization/deserialization:
+/// - `DfsmMessage<FuseMessage>` for database operations
+/// - `DfsmMessage<KvStoreMessage>` for status synchronization
+#[derive(Debug, Clone)]
+pub enum DfsmMessage<M: Message> {
+ /// Regular application message
+ ///
+ /// Contains a typed application message (FuseMessage or KvStoreMessage).
+ /// C wire format: dfsm_message_normal_header_t + application_message data
+ Normal {
+ msg_count: u64,
+ timestamp: u32, // Unix timestamp (matches C's u32)
+ protocol_version: u32, // Protocol version
+ message: M, // Typed message (FuseMessage or KvStoreMessage)
+ },
+
+ /// Start synchronization signal from leader (no payload)
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+ SyncStart { sync_epoch: SyncEpoch },
+
+ /// State data from another node during sync
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [state_data: raw bytes]
+ State {
+ sync_epoch: SyncEpoch,
+ data: Vec<u8>,
+ },
+
+ /// State update from leader
+ ///
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + TreeEntry fields
+ /// This is sent by the leader during synchronization to update followers
+ /// with individual database entries that differ from their state.
+ Update {
+ sync_epoch: SyncEpoch,
+ tree_entry: TreeEntry,
+ },
+
+ /// Update complete signal from leader (no payload)
+ /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+ UpdateComplete { sync_epoch: SyncEpoch },
+
+ /// Verification request from leader
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64]
+ VerifyRequest { sync_epoch: SyncEpoch, csum_id: u64 },
+
+ /// Verification response with checksum
+ ///
+ /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64][checksum: [u8; 32]]
+ Verify {
+ sync_epoch: SyncEpoch,
+ csum_id: u64,
+ checksum: [u8; 32],
+ },
+}
+
+impl<M: Message> DfsmMessage<M> {
+ /// Protocol version (should match cluster-wide)
+ pub const DEFAULT_PROTOCOL_VERSION: u32 = 1;
+
+ /// Get the message type discriminant
+ pub fn message_type(&self) -> DfsmMessageType {
+ match self {
+ DfsmMessage::Normal { .. } => DfsmMessageType::Normal,
+ DfsmMessage::SyncStart { .. } => DfsmMessageType::SyncStart,
+ DfsmMessage::State { .. } => DfsmMessageType::State,
+ DfsmMessage::Update { .. } => DfsmMessageType::Update,
+ DfsmMessage::UpdateComplete { .. } => DfsmMessageType::UpdateComplete,
+ DfsmMessage::VerifyRequest { .. } => DfsmMessageType::VerifyRequest,
+ DfsmMessage::Verify { .. } => DfsmMessageType::Verify,
+ }
+ }
+
+ /// Serialize message to C-compatible wire format
+ ///
+ /// For Normal/Update: dfsm_message_normal_header_t (24 bytes) + application_data
+ /// Format: [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][data...]
+ pub fn serialize(&self) -> Vec<u8> {
+ match self {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ } => self.serialize_normal_message(*msg_count, *timestamp, *protocol_version, message),
+ _ => self.serialize_state_message(),
+ }
+ }
+
+ /// Serialize a Normal message with C-compatible header
+ fn serialize_normal_message(
+ &self,
+ msg_count: u64,
+ timestamp: u32,
+ protocol_version: u32,
+ message: &M,
+ ) -> Vec<u8> {
+ let msg_type = self.message_type() as u16;
+ let subtype = message.message_type();
+ let app_data = message.serialize();
+
+ // C header: type (u16) + subtype (u16) + protocol (u32) + time (u32) + reserved (u32) + count (u64) = 24 bytes
+ let mut message = Vec::with_capacity(24 + app_data.len());
+
+ // dfsm_message_header_t fields
+ message.extend_from_slice(&msg_type.to_le_bytes());
+ message.extend_from_slice(&subtype.to_le_bytes());
+ message.extend_from_slice(&protocol_version.to_le_bytes());
+ message.extend_from_slice(×tamp.to_le_bytes());
+ message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // count field
+ message.extend_from_slice(&msg_count.to_le_bytes());
+
+ // application message data
+ message.extend_from_slice(&app_data);
+
+ message
+ }
+
+ /// Serialize state messages (non-Normal) with C-compatible header
+ /// C wire format: dfsm_message_state_header_t (32 bytes) + payload
+ /// Header breakdown: base (16 bytes) + epoch (16 bytes)
+ fn serialize_state_message(&self) -> Vec<u8> {
+ let msg_type = self.message_type() as u16;
+ let (sync_epoch, payload) = self.extract_epoch_and_payload();
+
+ // For state messages: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + payload
+ let mut message = Vec::with_capacity(32 + payload.len());
+
+ // Base header (16 bytes): type, subtype, protocol, time, reserved
+ message.extend_from_slice(&msg_type.to_le_bytes());
+ message.extend_from_slice(&0u16.to_le_bytes()); // subtype (unused)
+ message.extend_from_slice(&Self::DEFAULT_PROTOCOL_VERSION.to_le_bytes());
+
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+ message.extend_from_slice(×tamp.to_le_bytes());
+ message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // Epoch header (16 bytes): epoch, time, nodeid, pid
+ message.extend_from_slice(&sync_epoch.serialize());
+
+ // Payload
+ message.extend_from_slice(&payload);
+
+ message
+ }
+
+ /// Extract sync_epoch and payload from state messages
+ fn extract_epoch_and_payload(&self) -> (SyncEpoch, Vec<u8>) {
+ match self {
+ DfsmMessage::Normal { .. } => {
+ unreachable!("Normal messages use serialize_normal_message")
+ }
+ DfsmMessage::SyncStart { sync_epoch } => (*sync_epoch, Vec::new()),
+ DfsmMessage::State { sync_epoch, data } => (*sync_epoch, data.clone()),
+ DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ } => (*sync_epoch, tree_entry.serialize_for_update()),
+ DfsmMessage::UpdateComplete { sync_epoch } => (*sync_epoch, Vec::new()),
+ DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ } => (*sync_epoch, csum_id.to_le_bytes().to_vec()),
+ DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ } => {
+ let mut data = Vec::with_capacity(8 + 32);
+ data.extend_from_slice(&csum_id.to_le_bytes());
+ data.extend_from_slice(checksum);
+ (*sync_epoch, data)
+ }
+ }
+ }
+
+ /// Deserialize message from C-compatible wire format
+ ///
+ /// Normal messages: [base header: 16 bytes][count: u64][app data]
+ /// State messages: [base header: 16 bytes][epoch: 16 bytes][payload]
+ ///
+ /// # Arguments
+ /// * `data` - Raw message bytes from CPG
+ pub fn deserialize(data: &[u8]) -> Result<Self> {
+ if data.len() < 16 {
+ anyhow::bail!(
+ "Message too short: {} bytes (need at least 16 for header)",
+ data.len()
+ );
+ }
+
+ // Parse dfsm_message_header_t (16 bytes)
+ let msg_type = u16::from_le_bytes([data[0], data[1]]);
+ let subtype = u16::from_le_bytes([data[2], data[3]]);
+ let protocol_version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
+ let timestamp = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
+ let _reserved = u32::from_le_bytes([data[12], data[13], data[14], data[15]]);
+
+ let dfsm_type = DfsmMessageType::try_from(msg_type)?;
+
+ // Normal messages have different structure than state messages
+ if dfsm_type == DfsmMessageType::Normal {
+ // Normal: [base: 16][count: 8][app_data: ...]
+ let payload = &data[16..];
+ Self::deserialize_normal_message(subtype, protocol_version, timestamp, payload)
+ } else {
+ // State messages: [base: 16][epoch: 16][payload: ...]
+ if data.len() < 32 {
+ anyhow::bail!(
+ "State message too short: {} bytes (need at least 32 for state header)",
+ data.len()
+ );
+ }
+ let sync_epoch = SyncEpoch::deserialize(&data[16..32])
+ .map_err(|e| anyhow::anyhow!("Failed to deserialize sync epoch: {e}"))?;
+ let payload = &data[32..];
+ Self::deserialize_state_message(dfsm_type, sync_epoch, payload)
+ }
+ }
+
+ /// Deserialize a Normal message
+ fn deserialize_normal_message(
+ subtype: u16,
+ protocol_version: u32,
+ timestamp: u32,
+ payload: &[u8],
+ ) -> Result<Self> {
+ // Normal messages have count field (u64) after base header
+ if payload.len() < 8 {
+ anyhow::bail!("Normal message too short: need count field");
+ }
+ let msg_count = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ let app_data = &payload[8..];
+
+ // Deserialize using the Message trait
+ let message = M::deserialize(subtype, app_data)?;
+
+ Ok(DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ })
+ }
+
+ /// Deserialize a state message (with epoch)
+ fn deserialize_state_message(
+ dfsm_type: DfsmMessageType,
+ sync_epoch: SyncEpoch,
+ payload: &[u8],
+ ) -> Result<Self> {
+ match dfsm_type {
+ DfsmMessageType::Normal => {
+ unreachable!("Normal messages use deserialize_normal_message")
+ }
+ DfsmMessageType::Update => {
+ let tree_entry = TreeEntry::deserialize_from_update(payload)?;
+ Ok(DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ })
+ }
+ DfsmMessageType::SyncStart => Ok(DfsmMessage::SyncStart { sync_epoch }),
+ DfsmMessageType::State => Ok(DfsmMessage::State {
+ sync_epoch,
+ data: payload.to_vec(),
+ }),
+ DfsmMessageType::UpdateComplete => Ok(DfsmMessage::UpdateComplete { sync_epoch }),
+ DfsmMessageType::VerifyRequest => {
+ if payload.len() < 8 {
+ anyhow::bail!("VerifyRequest message too short");
+ }
+ let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ Ok(DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ })
+ }
+ DfsmMessageType::Verify => {
+ if payload.len() < 40 {
+ anyhow::bail!("Verify message too short");
+ }
+ let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+ let mut checksum = [0u8; 32];
+ checksum.copy_from_slice(&payload[8..40]);
+ Ok(DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ })
+ }
+ }
+ }
+
+ /// Helper to create a Normal message from an application message
+ pub fn from_message(msg_count: u64, message: M, protocol_version: u32) -> Self {
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ }
+ }
+
+ /// Helper to create an Update message from a TreeEntry
+ ///
+ /// Used by the leader during synchronization to send individual database entries
+ /// to nodes that need to catch up. Matches C's dcdb_send_update_inode().
+ pub fn from_tree_entry(tree_entry: TreeEntry, sync_epoch: SyncEpoch) -> Self {
+ DfsmMessage::Update {
+ sync_epoch,
+ tree_entry,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::FuseMessage;
+
+ #[test]
+ fn test_sync_start_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 1234567890,
+ nodeid: 1,
+ pid: 1000,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ assert!(
+ matches!(deserialized, DfsmMessage::SyncStart { sync_epoch: e } if e == sync_epoch)
+ );
+ }
+
+ #[test]
+ fn test_normal_roundtrip() {
+ let fuse_msg = FuseMessage::Create {
+ path: "/test/file".to_string(),
+ };
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Normal {
+ msg_count: 42,
+ timestamp: 1234567890,
+ protocol_version: DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION,
+ message: fuse_msg.clone(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version,
+ message,
+ } => {
+ assert_eq!(msg_count, 42);
+ assert_eq!(timestamp, 1234567890);
+ assert_eq!(
+ protocol_version,
+ DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION
+ );
+ assert_eq!(message, fuse_msg);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_verify_request_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 2,
+ time: 1234567891,
+ nodeid: 2,
+ pid: 2000,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: 0x123456789ABCDEF0,
+ };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::VerifyRequest {
+ sync_epoch: e,
+ csum_id,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, 0x123456789ABCDEF0);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_verify_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 3,
+ time: 1234567892,
+ nodeid: 3,
+ pid: 3000,
+ };
+ let checksum = [42u8; 32];
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Verify {
+ sync_epoch,
+ csum_id: 0x1122334455667788,
+ checksum,
+ };
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Verify {
+ sync_epoch: e,
+ csum_id,
+ checksum: recv_checksum,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, 0x1122334455667788);
+ assert_eq!(recv_checksum, checksum);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_invalid_magic() {
+ let data = vec![0xAA, 0x00, 0x01, 0x02];
+ assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+ }
+
+ #[test]
+ fn test_too_short() {
+ let data = vec![0xFF];
+ assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+ }
+
+ // ===== Edge Case Tests =====
+
+ #[test]
+ fn test_state_message_too_short() {
+ // State messages need at least 32 bytes (16 base + 16 epoch)
+ let mut data = vec![0u8; 31]; // One byte short
+ // Set message type to State (2)
+ data[0..2].copy_from_slice(&2u16.to_le_bytes());
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(result.is_err(), "State message with 31 bytes should fail");
+ assert!(result.unwrap_err().to_string().contains("too short"));
+ }
+
+ #[test]
+ fn test_normal_message_missing_count() {
+ // Normal messages need count field (u64) after 16-byte header
+ let mut data = vec![0u8; 20]; // Header + 4 bytes (not enough for u64 count)
+ // Set message type to Normal (0)
+ data[0..2].copy_from_slice(&0u16.to_le_bytes());
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(
+ result.is_err(),
+ "Normal message without full count field should fail"
+ );
+ }
+
+ #[test]
+ fn test_verify_message_truncated_checksum() {
+ // Verify messages need csum_id (8 bytes) + checksum (32 bytes) = 40 bytes payload
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 123,
+ nodeid: 1,
+ pid: 100,
+ };
+ let mut data = Vec::new();
+
+ // Base header (16 bytes)
+ data.extend_from_slice(&6u16.to_le_bytes()); // Verify message type
+ data.extend_from_slice(&0u16.to_le_bytes()); // subtype
+ data.extend_from_slice(&1u32.to_le_bytes()); // protocol
+ data.extend_from_slice(&123u32.to_le_bytes()); // time
+ data.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+ // Epoch (16 bytes)
+ data.extend_from_slice(&sync_epoch.serialize());
+
+ // Truncated payload (only 39 bytes instead of 40)
+ data.extend_from_slice(&0x12345678u64.to_le_bytes());
+ data.extend_from_slice(&[0u8; 31]); // Only 31 bytes of checksum
+
+ let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+ assert!(
+ result.is_err(),
+ "Verify message with truncated checksum should fail"
+ );
+ }
+
+ #[test]
+ fn test_update_message_with_tree_entry() {
+ use pmxcfs_memdb::TreeEntry;
+
+ // Create a valid tree entry with matching size
+ let data = vec![1, 2, 3, 4, 5];
+ let tree_entry = TreeEntry {
+ inode: 42,
+ parent: 0,
+ version: 1,
+ writer: 0,
+ name: "testfile".to_string(),
+ mtime: 1234567890,
+ size: data.len(), // size must match data.len()
+ entry_type: 8, // DT_REG (regular file)
+ data,
+ };
+
+ let sync_epoch = SyncEpoch {
+ epoch: 5,
+ time: 999,
+ nodeid: 2,
+ pid: 200,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::Update {
+ sync_epoch,
+ tree_entry: tree_entry.clone(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::Update {
+ sync_epoch: e,
+ tree_entry: recv_entry,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(recv_entry.inode, tree_entry.inode);
+ assert_eq!(recv_entry.name, tree_entry.name);
+ assert_eq!(recv_entry.size, tree_entry.size);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_update_complete_roundtrip() {
+ let sync_epoch = SyncEpoch {
+ epoch: 10,
+ time: 5555,
+ nodeid: 3,
+ pid: 300,
+ };
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+
+ let serialized = msg.serialize();
+ assert_eq!(
+ serialized.len(),
+ 32,
+ "UpdateComplete should be exactly 32 bytes (header + epoch)"
+ );
+
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ assert!(
+ matches!(deserialized, DfsmMessage::UpdateComplete { sync_epoch: e } if e == sync_epoch)
+ );
+ }
+
+ #[test]
+ fn test_state_message_with_large_payload() {
+ let sync_epoch = SyncEpoch {
+ epoch: 7,
+ time: 7777,
+ nodeid: 4,
+ pid: 400,
+ };
+ // Create a large payload (1MB)
+ let large_data = vec![0xAB; 1024 * 1024];
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::State {
+ sync_epoch,
+ data: large_data.clone(),
+ };
+
+ let serialized = msg.serialize();
+ // Should be 32 bytes header + 1MB data
+ assert_eq!(serialized.len(), 32 + 1024 * 1024);
+
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::State {
+ sync_epoch: e,
+ data,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(data.len(), large_data.len());
+ assert_eq!(data, large_data);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+
+ #[test]
+ fn test_message_type_detection() {
+ let sync_epoch = SyncEpoch {
+ epoch: 1,
+ time: 100,
+ nodeid: 1,
+ pid: 50,
+ };
+
+ let sync_start: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+ assert_eq!(sync_start.message_type(), DfsmMessageType::SyncStart);
+
+ let state: DfsmMessage<FuseMessage> = DfsmMessage::State {
+ sync_epoch,
+ data: vec![1, 2, 3],
+ };
+ assert_eq!(state.message_type(), DfsmMessageType::State);
+
+ let update_complete: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+ assert_eq!(
+ update_complete.message_type(),
+ DfsmMessageType::UpdateComplete
+ );
+ }
+
+ #[test]
+ fn test_from_message_helper() {
+ let fuse_msg = FuseMessage::Mkdir {
+ path: "/new/dir".to_string(),
+ };
+ let msg_count = 123;
+ let protocol_version = DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION;
+
+ let dfsm_msg = DfsmMessage::from_message(msg_count, fuse_msg.clone(), protocol_version);
+
+ match dfsm_msg {
+ DfsmMessage::Normal {
+ msg_count: count,
+ timestamp: _,
+ protocol_version: pv,
+ message,
+ } => {
+ assert_eq!(count, msg_count);
+ assert_eq!(pv, protocol_version);
+ assert_eq!(message, fuse_msg);
+ }
+ _ => panic!("from_message should create Normal variant"),
+ }
+ }
+
+ #[test]
+ fn test_verify_request_with_max_csum_id() {
+ let sync_epoch = SyncEpoch {
+ epoch: 99,
+ time: 9999,
+ nodeid: 5,
+ pid: 500,
+ };
+ let max_csum_id = u64::MAX; // Test with maximum value
+
+ let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: max_csum_id,
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+ match deserialized {
+ DfsmMessage::VerifyRequest {
+ sync_epoch: e,
+ csum_id,
+ } => {
+ assert_eq!(e, sync_epoch);
+ assert_eq!(csum_id, max_csum_id);
+ }
+ _ => panic!("Wrong message type"),
+ }
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
new file mode 100644
index 000000000..4d639ea1f
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
@@ -0,0 +1,194 @@
+/// FUSE message types for cluster synchronization
+///
+/// These are the high-level operations that get broadcast through the cluster
+/// via the main database DFSM (pmxcfs_v1 CPG group).
+use anyhow::{Context, Result};
+
+use crate::message::Message;
+use crate::wire_format::{CFuseMessage, CMessageType};
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum FuseMessage {
+ /// Create a regular file
+ Create { path: String },
+ /// Create a directory
+ Mkdir { path: String },
+ /// Write data to a file
+ Write {
+ path: String,
+ offset: u64,
+ data: Vec<u8>,
+ },
+ /// Delete a file or directory
+ Delete { path: String },
+ /// Rename/move a file or directory
+ Rename { from: String, to: String },
+ /// Update modification time
+ ///
+ /// Note: mtime is sent via offset field in CFuseMessage (C: dcdb.c:900)
+ ///
+ /// WARNING: mtime is limited to u32 (matching C implementation).
+ /// This will overflow in 2038 (Year 2038 problem).
+ /// Consider migrating to u64 timestamps in a future protocol version.
+ Mtime { path: String, mtime: u32 },
+ /// Request unlock (not yet implemented)
+ UnlockRequest { path: String },
+ /// Unlock (not yet implemented)
+ Unlock { path: String },
+}
+
+impl Message for FuseMessage {
+ fn message_type(&self) -> u16 {
+ match self {
+ FuseMessage::Create { .. } => CMessageType::Create as u16,
+ FuseMessage::Mkdir { .. } => CMessageType::Mkdir as u16,
+ FuseMessage::Write { .. } => CMessageType::Write as u16,
+ FuseMessage::Delete { .. } => CMessageType::Delete as u16,
+ FuseMessage::Rename { .. } => CMessageType::Rename as u16,
+ FuseMessage::Mtime { .. } => CMessageType::Mtime as u16,
+ FuseMessage::UnlockRequest { .. } => CMessageType::UnlockRequest as u16,
+ FuseMessage::Unlock { .. } => CMessageType::Unlock as u16,
+ }
+ }
+
+ fn serialize(&self) -> Vec<u8> {
+ let c_msg = match self {
+ FuseMessage::Create { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Mkdir { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Write { path, offset, data } => CFuseMessage {
+ size: data.len() as u32,
+ offset: *offset as u32,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: data.clone(),
+ },
+ FuseMessage::Delete { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Rename { from, to } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: from.clone(),
+ to: Some(to.clone()),
+ data: Vec::new(),
+ },
+ FuseMessage::Mtime { path, mtime } => CFuseMessage {
+ size: 0,
+ offset: *mtime, // mtime is sent via offset field (C: dcdb.c:900)
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::UnlockRequest { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ FuseMessage::Unlock { path } => CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: path.clone(),
+ to: None,
+ data: Vec::new(),
+ },
+ };
+
+ c_msg.serialize()
+ }
+
+ fn deserialize(message_type: u16, data: &[u8]) -> Result<Self> {
+ let c_msg = CFuseMessage::parse(data).context("Failed to parse C FUSE message")?;
+ let msg_type = CMessageType::try_from(message_type).context("Invalid C message type")?;
+
+ Ok(match msg_type {
+ CMessageType::Create => FuseMessage::Create { path: c_msg.path },
+ CMessageType::Mkdir => FuseMessage::Mkdir { path: c_msg.path },
+ CMessageType::Write => FuseMessage::Write {
+ path: c_msg.path,
+ offset: c_msg.offset as u64,
+ data: c_msg.data,
+ },
+ CMessageType::Delete => FuseMessage::Delete { path: c_msg.path },
+ CMessageType::Rename => FuseMessage::Rename {
+ from: c_msg.path,
+ to: c_msg.to.unwrap_or_default(),
+ },
+ CMessageType::Mtime => FuseMessage::Mtime {
+ path: c_msg.path,
+ mtime: c_msg.offset, // mtime is sent via offset field (C: dcdb.c:900)
+ },
+ CMessageType::UnlockRequest => FuseMessage::UnlockRequest { path: c_msg.path },
+ CMessageType::Unlock => FuseMessage::Unlock { path: c_msg.path },
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_fuse_message_create() {
+ let msg = FuseMessage::Create {
+ path: "/test/file".to_string(),
+ };
+ assert_eq!(msg.message_type(), CMessageType::Create as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_fuse_message_write() {
+ let msg = FuseMessage::Write {
+ path: "/test/file".to_string(),
+ offset: 100,
+ data: vec![1, 2, 3, 4, 5],
+ };
+ assert_eq!(msg.message_type(), CMessageType::Write as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_fuse_message_rename() {
+ let msg = FuseMessage::Rename {
+ from: "/old/path".to_string(),
+ to: "/new/path".to_string(),
+ };
+ assert_eq!(msg.message_type(), CMessageType::Rename as u16);
+
+ let serialized = msg.serialize();
+ let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
new file mode 100644
index 000000000..79196eca7
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
@@ -0,0 +1,387 @@
+/// KvStore message types for DFSM status synchronization
+///
+/// This module defines the KvStore message types that are delivered through
+/// the status DFSM state machine (pve_kvstore_v1 CPG group).
+use anyhow::Context;
+
+use crate::message::Message;
+
+/// Key size in kvstore update messages (matches C's hardcoded 256 byte buffer)
+const KVSTORE_KEY_SIZE: usize = 256;
+
+/// Wire format header for clog_entry_t messages
+///
+/// This represents only the portion that's actually transmitted over the network.
+/// The full C struct includes prev/next/uid/digests/pid fields, but those are only
+/// used in the in-memory representation and not sent on the wire.
+#[repr(C)]
+#[derive(Copy, Clone)]
+struct ClogEntryHeader {
+ time: u32,
+ priority: u8,
+ padding: [u8; 3],
+ node_len: u32,
+ ident_len: u32,
+ tag_len: u32,
+ msg_len: u32,
+}
+
+impl ClogEntryHeader {
+ /// Convert the header to bytes for serialization
+ ///
+ /// SAFETY: This is safe because:
+ /// 1. ClogEntryHeader is #[repr(C)] with explicit layout
+ /// 2. All fields are plain data types (u32, u8, [u8; 3])
+ /// 3. No padding issues - padding is explicit in the struct
+ /// 4. Target platform (x86_64) is little-endian, matching wire format
+ fn to_bytes(&self) -> [u8; std::mem::size_of::<Self>()] {
+ unsafe { std::mem::transmute(*self) }
+ }
+
+ /// Parse header from bytes
+ ///
+ /// SAFETY: This is safe because:
+ /// 1. We validate the input size before transmuting
+ /// 2. ClogEntryHeader is #[repr(C)] with well-defined layout
+ /// 3. All bit patterns are valid for the struct's field types
+ /// 4. Source platform (x86_64) is little-endian, matching wire format
+ fn from_bytes(data: &[u8]) -> anyhow::Result<Self> {
+ if data.len() < std::mem::size_of::<ClogEntryHeader>() {
+ anyhow::bail!("LOG message too short: {} < {}", data.len(), std::mem::size_of::<ClogEntryHeader>());
+ }
+
+ let header_bytes: [u8; std::mem::size_of::<ClogEntryHeader>()] = data[..std::mem::size_of::<ClogEntryHeader>()]
+ .try_into()
+ .expect("slice length verified above");
+
+ Ok(unsafe { std::mem::transmute(header_bytes) })
+ }
+}
+
+
+/// KvStore message type IDs (matches C's kvstore_message_t enum)
+#[derive(
+ Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive,
+)]
+#[repr(u16)]
+enum KvStoreMessageType {
+ Update = 1, // KVSTORE_MESSAGE_UPDATE
+ UpdateComplete = 2, // KVSTORE_MESSAGE_UPDATE_COMPLETE
+ Log = 3, // KVSTORE_MESSAGE_LOG
+}
+
+/// KvStore message types for ephemeral status synchronization
+///
+/// These messages are used by the kvstore DFSM (pve_kvstore_v1 CPG group)
+/// to synchronize ephemeral data like RRD metrics, node IPs, and cluster logs.
+///
+/// Matches C implementation's KVSTORE_MESSAGE_* types in status.c
+#[derive(Debug, Clone, PartialEq)]
+pub enum KvStoreMessage {
+ /// Update key-value data from a node
+ ///
+ /// Wire format: key (256 bytes, null-terminated) + value (variable length)
+ /// Matches C's KVSTORE_MESSAGE_UPDATE
+ Update { key: String, value: Vec<u8> },
+
+ /// Cluster log entry
+ ///
+ /// Wire format: clog_entry_t struct
+ /// Matches C's KVSTORE_MESSAGE_LOG
+ Log {
+ time: u32,
+ priority: u8,
+ node: String,
+ ident: String,
+ tag: String,
+ message: String,
+ },
+
+ /// Update complete signal (not currently used)
+ ///
+ /// Matches C's KVSTORE_MESSAGE_UPDATE_COMPLETE
+ UpdateComplete,
+}
+
+impl KvStoreMessage {
+ /// Get message type ID (matches C's kvstore_message_t enum)
+ pub fn message_type(&self) -> u16 {
+ let msg_type = match self {
+ KvStoreMessage::Update { .. } => KvStoreMessageType::Update,
+ KvStoreMessage::UpdateComplete => KvStoreMessageType::UpdateComplete,
+ KvStoreMessage::Log { .. } => KvStoreMessageType::Log,
+ };
+ msg_type.into()
+ }
+
+ /// Serialize to C-compatible wire format
+ ///
+ /// Update format: key (256 bytes, null-terminated) + value (variable)
+ /// Log format: clog_entry_t struct
+ pub fn serialize(&self) -> Vec<u8> {
+ match self {
+ KvStoreMessage::Update { key, value } => {
+ // C format: char key[KVSTORE_KEY_SIZE] + data
+ let mut buf = vec![0u8; KVSTORE_KEY_SIZE];
+ let key_bytes = key.as_bytes();
+ let copy_len = key_bytes.len().min(KVSTORE_KEY_SIZE - 1); // Leave room for null terminator
+ buf[..copy_len].copy_from_slice(&key_bytes[..copy_len]);
+ // buf is already zero-filled, so null terminator is automatic
+
+ buf.extend_from_slice(value);
+ buf
+ }
+ KvStoreMessage::Log {
+ time,
+ priority,
+ node,
+ ident,
+ tag,
+ message,
+ } => {
+ let node_bytes = node.as_bytes();
+ let ident_bytes = ident.as_bytes();
+ let tag_bytes = tag.as_bytes();
+ let msg_bytes = message.as_bytes();
+
+ let node_len = (node_bytes.len() + 1) as u32; // +1 for null
+ let ident_len = (ident_bytes.len() + 1) as u32;
+ let tag_len = (tag_bytes.len() + 1) as u32;
+ let msg_len = (msg_bytes.len() + 1) as u32;
+
+ let total_len = std::mem::size_of::<ClogEntryHeader>() as u32 + node_len + ident_len + tag_len + msg_len;
+ let mut buf = Vec::with_capacity(total_len as usize);
+
+ // Serialize header using the struct
+ let header = ClogEntryHeader {
+ time: *time,
+ priority: *priority,
+ padding: [0u8; 3],
+ node_len,
+ ident_len,
+ tag_len,
+ msg_len,
+ };
+ buf.extend_from_slice(&header.to_bytes());
+
+ // Append string data (all null-terminated)
+ buf.extend_from_slice(node_bytes);
+ buf.push(0); // null terminator
+ buf.extend_from_slice(ident_bytes);
+ buf.push(0);
+ buf.extend_from_slice(tag_bytes);
+ buf.push(0);
+ buf.extend_from_slice(msg_bytes);
+ buf.push(0);
+
+ buf
+ }
+ KvStoreMessage::UpdateComplete => {
+ // No payload
+ Vec::new()
+ }
+ }
+ }
+
+ /// Deserialize from C-compatible wire format
+ pub fn deserialize(msg_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+ use KvStoreMessageType::*;
+
+ let msg_type = KvStoreMessageType::try_from(msg_type)
+ .map_err(|_| anyhow::anyhow!("Unknown kvstore message type: {msg_type}"))?;
+
+ match msg_type {
+ Update => {
+ if data.len() < KVSTORE_KEY_SIZE {
+ anyhow::bail!("UPDATE message too short: {} < {}", data.len(), KVSTORE_KEY_SIZE);
+ }
+
+ // Find null terminator in first KVSTORE_KEY_SIZE bytes
+ let key_end = data[..KVSTORE_KEY_SIZE]
+ .iter()
+ .position(|&b| b == 0)
+ .ok_or_else(|| anyhow::anyhow!("UPDATE key not null-terminated"))?;
+
+ let key = std::str::from_utf8(&data[..key_end])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in UPDATE key: {e}"))?
+ .to_string();
+
+ let value = data[KVSTORE_KEY_SIZE..].to_vec();
+
+ Ok(KvStoreMessage::Update { key, value })
+ }
+ UpdateComplete => Ok(KvStoreMessage::UpdateComplete),
+ Log => {
+ // Parse header using the struct
+ let header = ClogEntryHeader::from_bytes(data)?;
+
+ let node_len = header.node_len as usize;
+ let ident_len = header.ident_len as usize;
+ let tag_len = header.tag_len as usize;
+ let msg_len = header.msg_len as usize;
+
+ // Validate individual field lengths are non-zero (strings must have at least null terminator)
+ if node_len == 0 || ident_len == 0 || tag_len == 0 || msg_len == 0 {
+ anyhow::bail!("LOG message contains zero-length field");
+ }
+
+ // Check for integer overflow in total size calculation
+ let expected_len = std::mem::size_of::<ClogEntryHeader>()
+ .checked_add(node_len)
+ .and_then(|s| s.checked_add(ident_len))
+ .and_then(|s| s.checked_add(tag_len))
+ .and_then(|s| s.checked_add(msg_len))
+ .ok_or_else(|| anyhow::anyhow!("LOG message size overflow"))?;
+
+ if data.len() != expected_len {
+ anyhow::bail!(
+ "LOG message size mismatch: {} != {}",
+ data.len(),
+ expected_len
+ );
+ }
+
+ let mut offset = std::mem::size_of::<ClogEntryHeader>();
+
+ // Safe string extraction with bounds checking
+ let extract_string = |offset: &mut usize, len: usize| -> Result<String, anyhow::Error> {
+ let end = offset.checked_add(len)
+ .ok_or_else(|| anyhow::anyhow!("String offset overflow"))?;
+
+ if end > data.len() {
+ return Err(anyhow::anyhow!("String exceeds buffer"));
+ }
+
+ // len includes null terminator, so read len-1 bytes
+ let s = std::str::from_utf8(&data[*offset..end - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8: {e}"))?
+ .to_string();
+
+ *offset = end;
+ Ok(s)
+ };
+
+ let node = extract_string(&mut offset, node_len)?;
+ let ident = extract_string(&mut offset, ident_len)?;
+ let tag = extract_string(&mut offset, tag_len)?;
+ let message = extract_string(&mut offset, msg_len)?;
+
+ Ok(KvStoreMessage::Log {
+ time: header.time,
+ priority: header.priority,
+ node,
+ ident,
+ tag,
+ message,
+ })
+ }
+ }
+ }
+}
+
+impl Message for KvStoreMessage {
+ fn message_type(&self) -> u16 {
+ // Delegate to the existing method
+ KvStoreMessage::message_type(self)
+ }
+
+ fn serialize(&self) -> Vec<u8> {
+ // Delegate to the existing method
+ KvStoreMessage::serialize(self)
+ }
+
+ fn deserialize(message_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+ // Delegate to the existing method
+ KvStoreMessage::deserialize(message_type, data)
+ .context("Failed to deserialize KvStoreMessage")
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_kvstore_message_update_serialization() {
+ let msg = KvStoreMessage::Update {
+ key: "test_key".to_string(),
+ value: vec![1, 2, 3, 4, 5],
+ };
+
+ let serialized = msg.serialize();
+ assert_eq!(serialized.len(), KVSTORE_KEY_SIZE + 5);
+ assert_eq!(&serialized[..8], b"test_key");
+ assert_eq!(serialized[8], 0); // null terminator
+ assert_eq!(&serialized[KVSTORE_KEY_SIZE..], &[1, 2, 3, 4, 5]);
+
+ let deserialized = KvStoreMessage::deserialize(1, &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_kvstore_message_log_serialization() {
+ let msg = KvStoreMessage::Log {
+ time: 1234567890,
+ priority: 5,
+ node: "node1".to_string(),
+ ident: "pmxcfs".to_string(),
+ tag: "info".to_string(),
+ message: "test message".to_string(),
+ };
+
+ let serialized = msg.serialize();
+ let deserialized = KvStoreMessage::deserialize(3, &serialized).unwrap();
+ assert_eq!(msg, deserialized);
+ }
+
+ #[test]
+ fn test_kvstore_message_type() {
+ assert_eq!(
+ KvStoreMessage::Update {
+ key: "".into(),
+ value: vec![]
+ }
+ .message_type(),
+ 1
+ );
+ assert_eq!(KvStoreMessage::UpdateComplete.message_type(), 2);
+ assert_eq!(
+ KvStoreMessage::Log {
+ time: 0,
+ priority: 0,
+ node: "".into(),
+ ident: "".into(),
+ tag: "".into(),
+ message: "".into()
+ }
+ .message_type(),
+ 3
+ );
+ }
+
+ #[test]
+ fn test_kvstore_message_type_roundtrip() {
+ // Test that message_type() and deserialize() are consistent
+ use super::KvStoreMessageType;
+
+ assert_eq!(u16::from(KvStoreMessageType::Update), 1);
+ assert_eq!(u16::from(KvStoreMessageType::UpdateComplete), 2);
+ assert_eq!(u16::from(KvStoreMessageType::Log), 3);
+
+ assert_eq!(
+ KvStoreMessageType::try_from(1).unwrap(),
+ KvStoreMessageType::Update
+ );
+ assert_eq!(
+ KvStoreMessageType::try_from(2).unwrap(),
+ KvStoreMessageType::UpdateComplete
+ );
+ assert_eq!(
+ KvStoreMessageType::try_from(3).unwrap(),
+ KvStoreMessageType::Log
+ );
+
+ assert!(KvStoreMessageType::try_from(0).is_err());
+ assert!(KvStoreMessageType::try_from(4).is_err());
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
new file mode 100644
index 000000000..892404833
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
@@ -0,0 +1,32 @@
+/// Distributed Finite State Machine (DFSM) for cluster state synchronization
+///
+/// This crate implements the state machine for synchronizing configuration
+/// changes across the cluster nodes using Corosync CPG.
+///
+/// The DFSM handles:
+/// - State synchronization between nodes
+/// - Message ordering and queuing
+/// - Leader-based state updates
+/// - Split-brain prevention
+/// - Membership change handling
+mod callbacks;
+pub mod cluster_database_service;
+mod cpg_service;
+mod dfsm_message;
+mod fuse_message;
+mod kv_store_message;
+mod message;
+mod state_machine;
+pub mod status_sync_service;
+mod types;
+mod wire_format;
+
+// Re-export public API
+pub use callbacks::Callbacks;
+pub use cluster_database_service::ClusterDatabaseService;
+pub use cpg_service::{CpgHandler, CpgService};
+pub use fuse_message::FuseMessage;
+pub use kv_store_message::KvStoreMessage;
+pub use state_machine::{Dfsm, DfsmBroadcast};
+pub use status_sync_service::StatusSyncService;
+pub use types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
new file mode 100644
index 000000000..a2401d030
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
@@ -0,0 +1,21 @@
+/// High-level message abstraction for DFSM
+///
+/// This module provides a Message trait for working with cluster messages
+/// at a higher abstraction level than raw bytes.
+use anyhow::Result;
+
+/// Trait for messages that can be sent through DFSM
+pub trait Message: Clone + std::fmt::Debug + Send + Sync + Sized + 'static {
+ /// Get the message type identifier
+ fn message_type(&self) -> u16;
+
+ /// Serialize the message to bytes (application message payload only)
+ ///
+ /// This serializes only the application-level payload. The DFSM protocol
+ /// headers (msg_count, timestamp, protocol_version, etc.) are added by
+ /// DfsmMessage::serialize() when wrapping in DfsmMessage::Normal.
+ fn serialize(&self) -> Vec<u8>;
+
+ /// Deserialize from bytes given a message type
+ fn deserialize(message_type: u16, data: &[u8]) -> Result<Self>;
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
new file mode 100644
index 000000000..d4eecc690
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
@@ -0,0 +1,1251 @@
+/// DFSM state machine implementation
+///
+/// This module contains the main Dfsm struct and its implementation
+/// for managing distributed state synchronization.
+use anyhow::{Context, Result};
+use parking_lot::{Mutex as ParkingMutex, RwLock};
+use pmxcfs_api_types::MemberInfo;
+use rust_corosync::{NodeId, cpg};
+use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tokio::sync::oneshot;
+
+use super::cpg_service::{CpgHandler, CpgService};
+use super::dfsm_message::DfsmMessage;
+use super::message::Message;
+use super::types::{DfsmMode, QueuedMessage, SyncEpoch};
+use crate::{Callbacks, NodeSyncInfo};
+
+/// Maximum queue length to prevent memory exhaustion
+/// C implementation uses unbounded GSequence/GList, but we add a limit for safety
+/// This value should be tuned based on production workload
+const MAX_QUEUE_LEN: usize = 500;
+
+/// Result of a synchronous message send
+/// Matches C's dfsm_result_t structure
+#[derive(Debug, Clone)]
+pub struct MessageResult {
+ /// Message count for tracking
+ pub msgcount: u64,
+ /// Result code from deliver callback (0 = success, negative = errno)
+ pub result: i32,
+ /// Whether the message was processed successfully
+ pub processed: bool,
+}
+
+/// Extension trait to add broadcast() method to Option<Arc<Dfsm<M>>>
+///
+/// This allows calling `.broadcast()` directly on Option<Arc<Dfsm<M>>> fields
+/// without explicit None checking at call sites.
+pub trait DfsmBroadcast<M: Message> {
+ fn broadcast(&self, msg: M);
+}
+
+impl<M: Message> DfsmBroadcast<M> for Option<Arc<Dfsm<M>>> {
+ fn broadcast(&self, msg: M) {
+ if let Some(dfsm) = self {
+ let _ = dfsm.broadcast(msg);
+ }
+ }
+}
+
+/// DFSM state machine
+///
+/// The generic parameter `M` specifies the message type this DFSM handles:
+/// - `Dfsm<FuseMessage>` for main database operations
+/// - `Dfsm<KvStoreMessage>` for status synchronization
+pub struct Dfsm<M> {
+ /// CPG service for cluster communication (matching C's dfsm_t->cpg_handle)
+ cpg_service: RwLock<Option<Arc<CpgService>>>,
+
+ /// Cluster group name for CPG
+ cluster_name: String,
+
+ /// Callbacks for application integration
+ callbacks: Arc<dyn Callbacks<Message = M>>,
+
+ /// Current operating mode
+ mode: RwLock<DfsmMode>,
+
+ /// Current sync epoch
+ sync_epoch: RwLock<SyncEpoch>,
+
+ /// Local epoch counter
+ local_epoch_counter: ParkingMutex<u32>,
+
+ /// Node synchronization info
+ sync_nodes: RwLock<Vec<NodeSyncInfo>>,
+
+ /// Message queue (ordered by count)
+ msg_queue: ParkingMutex<BTreeMap<u64, QueuedMessage<M>>>,
+
+ /// Sync queue for messages during update mode
+ sync_queue: ParkingMutex<VecDeque<QueuedMessage<M>>>,
+
+ /// Message counter for ordering (atomic for lock-free increment)
+ msg_counter: AtomicU64,
+
+ /// Lowest node ID in cluster (leader)
+ lowest_nodeid: RwLock<u32>,
+
+ /// Our node ID (set during init_cpg via cpg_local_get)
+ nodeid: AtomicU32,
+
+ /// Our process ID
+ pid: u32,
+
+ /// Protocol version for cluster compatibility
+ protocol_version: u32,
+
+ /// State verification - SHA-256 checksum
+ checksum: ParkingMutex<[u8; 32]>,
+
+ /// Checksum epoch (when it was computed)
+ checksum_epoch: ParkingMutex<SyncEpoch>,
+
+ /// Checksum ID for verification
+ checksum_id: ParkingMutex<u64>,
+
+ /// Checksum counter for verify requests
+ checksum_counter: ParkingMutex<u64>,
+
+ /// Message count received (for synchronous send tracking)
+ /// Matches C's dfsm->msgcount_rcvd
+ msgcount_rcvd: AtomicU64,
+
+ /// Pending message results for synchronous sends
+ /// Matches C's dfsm->results (GHashTable)
+ /// Maps msgcount -> oneshot sender for result delivery
+ /// Uses tokio oneshot channels - the idiomatic pattern for one-time async notifications
+ message_results: ParkingMutex<HashMap<u64, oneshot::Sender<MessageResult>>>,
+}
+
+impl<M: Message> Dfsm<M> {
+ /// Create a new DFSM instance
+ ///
+ /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+ pub fn new(cluster_name: String, callbacks: Arc<dyn Callbacks<Message = M>>) -> Result<Self> {
+ Self::new_with_protocol_version(cluster_name, callbacks, DfsmMessage::<M>::DEFAULT_PROTOCOL_VERSION)
+ }
+
+ /// Create a new DFSM instance with a specific protocol version
+ ///
+ /// This is used when the DFSM needs to use a non-default protocol version,
+ /// such as the status/kvstore DFSM which uses protocol version 0 for
+ /// compatibility with the C implementation.
+ ///
+ /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+ pub fn new_with_protocol_version(
+ cluster_name: String,
+ callbacks: Arc<dyn Callbacks<Message = M>>,
+ protocol_version: u32,
+ ) -> Result<Self> {
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+ let pid = std::process::id();
+
+ Ok(Self {
+ cpg_service: RwLock::new(None),
+ cluster_name,
+ callbacks,
+ mode: RwLock::new(DfsmMode::Start),
+ sync_epoch: RwLock::new(SyncEpoch {
+ epoch: 0,
+ time: now,
+ nodeid: 0,
+ pid,
+ }),
+ local_epoch_counter: ParkingMutex::new(0),
+ sync_nodes: RwLock::new(Vec::new()),
+ msg_queue: ParkingMutex::new(BTreeMap::new()),
+ sync_queue: ParkingMutex::new(VecDeque::new()),
+ msg_counter: AtomicU64::new(0),
+ lowest_nodeid: RwLock::new(0),
+ nodeid: AtomicU32::new(0), // Will be set by init_cpg() using cpg_local_get()
+ pid,
+ protocol_version,
+ checksum: ParkingMutex::new([0u8; 32]),
+ checksum_epoch: ParkingMutex::new(SyncEpoch {
+ epoch: 0,
+ time: 0,
+ nodeid: 0,
+ pid: 0,
+ }),
+ checksum_id: ParkingMutex::new(0),
+ checksum_counter: ParkingMutex::new(0),
+ msgcount_rcvd: AtomicU64::new(0),
+ message_results: ParkingMutex::new(HashMap::new()),
+ })
+ }
+
+ pub fn get_mode(&self) -> DfsmMode {
+ *self.mode.read()
+ }
+
+ pub fn set_mode(&self, new_mode: DfsmMode) {
+ let mut mode = self.mode.write();
+ let old_mode = *mode;
+
+ // Match C's dfsm_set_mode logic (dfsm.c:450-456):
+ // Allow transition if:
+ // 1. new_mode < DFSM_ERROR_MODE_START (normal modes), OR
+ // 2. (old_mode < DFSM_ERROR_MODE_START OR new_mode >= old_mode)
+ // - If already in error mode, only allow transitions to higher error codes
+ if old_mode != new_mode {
+ let allow_transition = !new_mode.is_error() ||
+ (!old_mode.is_error() || new_mode >= old_mode);
+
+ if !allow_transition {
+ tracing::debug!(
+ "DFSM: blocking transition from {:?} to {:?} (error mode can only go to higher codes)",
+ old_mode, new_mode
+ );
+ return;
+ }
+ } else {
+ // No-op transition
+ return;
+ }
+
+ *mode = new_mode;
+ drop(mode);
+
+ if new_mode.is_error() {
+ tracing::error!("DFSM: {}", new_mode);
+ } else {
+ tracing::info!("DFSM: {}", new_mode);
+ }
+ }
+
+ pub fn is_leader(&self) -> bool {
+ let lowest = *self.lowest_nodeid.read();
+ lowest > 0 && lowest == self.nodeid.load(Ordering::Relaxed)
+ }
+
+ pub fn get_nodeid(&self) -> u32 {
+ self.nodeid.load(Ordering::Relaxed)
+ }
+
+ pub fn get_pid(&self) -> u32 {
+ self.pid
+ }
+
+ /// Check if DFSM is synced and ready
+ pub fn is_synced(&self) -> bool {
+ self.get_mode() == DfsmMode::Synced
+ }
+
+ /// Check if DFSM encountered an error
+ pub fn is_error(&self) -> bool {
+ self.get_mode().is_error()
+ }
+}
+
+impl<M: Message> Dfsm<M> {
+ fn send_sync_start(&self) -> Result<()> {
+ tracing::debug!("DFSM: sending SYNC_START message");
+ let sync_epoch = *self.sync_epoch.read();
+ self.send_dfsm_message(&DfsmMessage::<M>::SyncStart { sync_epoch })
+ }
+
+ fn send_state(&self) -> Result<()> {
+ tracing::debug!("DFSM: generating and sending state");
+
+ let state_data = self
+ .callbacks
+ .get_state()
+ .context("Failed to get state from callbacks")?;
+
+ tracing::info!("DFSM: sending state ({} bytes)", state_data.len());
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::State {
+ sync_epoch,
+ data: state_data,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ pub(super) fn send_dfsm_message(&self, message: &DfsmMessage<M>) -> Result<()> {
+ let serialized = message.serialize();
+
+ if let Some(ref service) = *self.cpg_service.read() {
+ service
+ .mcast(cpg::Guarantee::TypeAgreed, &serialized)
+ .context("Failed to broadcast DFSM message")?;
+ Ok(())
+ } else {
+ anyhow::bail!("CPG not initialized")
+ }
+ }
+
+ pub fn process_state(&self, nodeid: u32, pid: u32, state: &[u8]) -> Result<()> {
+ tracing::debug!(
+ "DFSM: processing state from node {}/{} ({} bytes)",
+ nodeid,
+ pid,
+ state.len()
+ );
+
+ let mut sync_nodes = self.sync_nodes.write();
+
+ // Find node in sync_nodes
+ let node_info = sync_nodes
+ .iter_mut()
+ .find(|n| n.node_id == nodeid && n.pid == pid);
+
+ let node_info = match node_info {
+ Some(ni) => ni,
+ None => {
+ // Non-member sent state - immediate LEAVE (matches C: dfsm.c:823-828)
+ tracing::error!(
+ "DFSM: received state from non-member {}/{} - entering LEAVE mode",
+ nodeid, pid
+ );
+ drop(sync_nodes);
+ self.set_mode(DfsmMode::Leave);
+ return Err(anyhow::anyhow!("State from non-member"));
+ }
+ };
+
+ // Check for duplicate state (matches C: dfsm.c:830-835)
+ if node_info.state.is_some() {
+ tracing::error!(
+ "DFSM: received duplicate state from member {}/{} - entering LEAVE mode",
+ nodeid, pid
+ );
+ drop(sync_nodes);
+ self.set_mode(DfsmMode::Leave);
+ return Err(anyhow::anyhow!("Duplicate state from member"));
+ }
+
+ // Store state
+ node_info.state = Some(state.to_vec());
+
+ let all_received = sync_nodes.iter().all(|n| n.state.is_some());
+ drop(sync_nodes);
+
+ if all_received {
+ tracing::info!("DFSM: received all states, processing synchronization");
+ self.process_state_sync()?;
+ }
+
+ Ok(())
+ }
+
+ fn process_state_sync(&self) -> Result<()> {
+ tracing::info!("DFSM: processing state synchronization");
+
+ let sync_nodes = self.sync_nodes.read().clone();
+
+ match self.callbacks.process_state_update(&sync_nodes) {
+ Ok(synced) => {
+ if synced {
+ tracing::info!("DFSM: state synchronization successful");
+
+ let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+ let mut sync_nodes_write = self.sync_nodes.write();
+ if let Some(node) = sync_nodes_write
+ .iter_mut()
+ .find(|n| n.node_id == my_nodeid && n.pid == self.pid)
+ {
+ node.synced = true;
+ }
+ drop(sync_nodes_write);
+
+ self.set_mode(DfsmMode::Synced);
+ self.callbacks.on_synced();
+ self.deliver_message_queue()?;
+ } else {
+ tracing::info!("DFSM: entering UPDATE mode, waiting for leader");
+ self.set_mode(DfsmMode::Update);
+ self.deliver_message_queue()?;
+ }
+ }
+ Err(e) => {
+ tracing::error!("DFSM: state synchronization failed: {}", e);
+ self.set_mode(DfsmMode::Error);
+ return Err(e);
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn queue_message(&self, nodeid: u32, pid: u32, msg_count: u64, message: M, timestamp: u64)
+ where
+ M: Clone,
+ {
+ tracing::debug!(
+ "DFSM: queueing message {} from {}/{}",
+ msg_count,
+ nodeid,
+ pid
+ );
+
+ let qm = QueuedMessage {
+ nodeid,
+ pid,
+ _msg_count: msg_count,
+ message,
+ timestamp,
+ };
+
+ // Hold mode read lock during queueing decision to prevent TOCTOU race
+ // This ensures mode cannot change between check and queue selection
+ let mode_guard = self.mode.read();
+ let mode = *mode_guard;
+
+ let node_synced = self
+ .sync_nodes
+ .read()
+ .iter()
+ .find(|n| n.node_id == nodeid && n.pid == pid)
+ .map(|n| n.synced)
+ .unwrap_or(false);
+
+ if mode == DfsmMode::Update && node_synced {
+ let mut sync_queue = self.sync_queue.lock();
+
+ // Check sync queue size limit
+ // Queues use a bounded size (MAX_QUEUE_LEN=500) to prevent memory exhaustion
+ // from slow or stuck nodes. When full, oldest messages are dropped.
+ // This matches distributed system semantics where old updates can be superseded.
+ //
+ // Monitoring: Track queue depth via metrics/logs to detect congestion:
+ // - Sustained high queue depth indicates slow message processing
+ // - Frequent drops indicate network partitions or overload
+ if sync_queue.len() >= MAX_QUEUE_LEN {
+ tracing::warn!(
+ "DFSM: sync queue full ({} messages), dropping oldest - possible network congestion or slow node",
+ sync_queue.len()
+ );
+ sync_queue.pop_front();
+ }
+
+ sync_queue.push_back(qm);
+ } else {
+ let mut msg_queue = self.msg_queue.lock();
+
+ // Check message queue size limit (same rationale as sync queue)
+ if msg_queue.len() >= MAX_QUEUE_LEN {
+ tracing::warn!(
+ "DFSM: message queue full ({} messages), dropping oldest - possible network congestion or slow node",
+ msg_queue.len()
+ );
+ // Drop oldest message (lowest count)
+ if let Some((&oldest_count, _)) = msg_queue.iter().next() {
+ msg_queue.remove(&oldest_count);
+ }
+ }
+
+ msg_queue.insert(msg_count, qm);
+ }
+
+ // Release mode lock after queueing decision completes
+ drop(mode_guard);
+ }
+
+ pub(super) fn deliver_message_queue(&self) -> Result<()>
+ where
+ M: Clone,
+ {
+ let mut queue = self.msg_queue.lock();
+ if queue.is_empty() {
+ return Ok(());
+ }
+
+ tracing::info!("DFSM: delivering {} queued messages", queue.len());
+
+ // Hold mode lock during iteration to prevent mode changes mid-delivery
+ let mode_guard = self.mode.read();
+ let mode = *mode_guard;
+ let sync_nodes = self.sync_nodes.read().clone();
+
+ let mut to_remove = Vec::new();
+ let mut to_sync_queue = Vec::new();
+
+ for (count, qm) in queue.iter() {
+ let node_info = sync_nodes
+ .iter()
+ .find(|n| n.node_id == qm.nodeid && n.pid == qm.pid);
+
+ let Some(info) = node_info else {
+ tracing::debug!(
+ "DFSM: removing message from non-member {}/{}",
+ qm.nodeid,
+ qm.pid
+ );
+ to_remove.push(*count);
+ continue;
+ };
+
+ if mode == DfsmMode::Synced && info.synced {
+ tracing::debug!("DFSM: delivering message {}", count);
+
+ match self.callbacks.deliver_message(
+ qm.nodeid,
+ qm.pid,
+ qm.message.clone(),
+ qm.timestamp,
+ ) {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: message delivered, result={}, processed={}",
+ result,
+ processed
+ );
+ // Record result for synchronous sends
+ self.record_message_result(*count, result, processed);
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver message: {}", e);
+ // Record error result
+ self.record_message_result(*count, -libc::EIO, false);
+ }
+ }
+
+ to_remove.push(*count);
+ } else if mode == DfsmMode::Update && info.synced {
+ // Collect messages to move instead of acquiring sync_queue lock
+ // while holding msg_queue lock to prevent deadlock
+ to_sync_queue.push(qm.clone());
+ to_remove.push(*count);
+ }
+ }
+
+ // Remove processed messages from queue
+ for count in to_remove {
+ queue.remove(&count);
+ }
+
+ // Release locks before acquiring sync_queue to prevent deadlock
+ drop(mode_guard);
+ drop(queue);
+
+ // Now move messages to sync_queue without holding msg_queue
+ if !to_sync_queue.is_empty() {
+ let mut sync_queue = self.sync_queue.lock();
+ for qm in to_sync_queue {
+ sync_queue.push_back(qm);
+ }
+ }
+
+ Ok(())
+ }
+
+ pub(super) fn deliver_sync_queue(&self) -> Result<()> {
+ let mut sync_queue = self.sync_queue.lock();
+ let queue_len = sync_queue.len();
+
+ if queue_len == 0 {
+ return Ok(());
+ }
+
+ tracing::info!("DFSM: delivering {} sync queue messages", queue_len);
+
+ while let Some(qm) = sync_queue.pop_front() {
+ tracing::debug!(
+ "DFSM: delivering sync message from {}/{}",
+ qm.nodeid,
+ qm.pid
+ );
+
+ match self
+ .callbacks
+ .deliver_message(qm.nodeid, qm.pid, qm.message, qm.timestamp)
+ {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: sync message delivered, result={}, processed={}",
+ result,
+ processed
+ );
+ // Record result for synchronous sends
+ self.record_message_result(qm._msg_count, result, processed);
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver sync message: {}", e);
+ // Record error result
+ self.record_message_result(qm._msg_count, -libc::EIO, false);
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Send a message to the cluster
+ ///
+ /// Creates a properly formatted Normal message with C-compatible headers.
+ pub fn send_message(&self, message: M) -> Result<u64> {
+ let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1;
+
+ tracing::debug!("DFSM: sending message {}", msg_count);
+
+ let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version);
+
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(msg_count)
+ }
+
+ /// Send a message to the cluster and wait for delivery result
+ ///
+ /// This is the async equivalent of send_message(), matching C's dfsm_send_message_sync().
+ /// It broadcasts the message via CPG and waits for it to be delivered to the local node,
+ /// returning the result from the deliver callback.
+ ///
+ /// Uses tokio oneshot channels - the idiomatic pattern for one-time async result delivery.
+ /// This avoids any locking or notification complexity.
+ ///
+ /// # Cancellation Safety
+ /// If this future is dropped before completion, the cleanup guard ensures the HashMap
+ /// entry is removed, preventing memory leaks.
+ ///
+ /// # Arguments
+ /// * `message` - The message to send
+ /// * `timeout` - Maximum time to wait for delivery (typically 10 seconds)
+ ///
+ /// # Returns
+ /// * `Ok(MessageResult)` - The result from the local deliver callback
+ /// - Caller should check `result.result < 0` for errno-based errors
+ /// * `Err(_)` - If send failed, timeout occurred, or channel closed unexpectedly
+ pub async fn send_message_sync(&self, message: M, timeout: Duration) -> Result<MessageResult> {
+ let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1;
+
+ tracing::debug!("DFSM: sending synchronous message {}", msg_count);
+
+ // Create oneshot channel for result delivery (tokio best practice)
+ let (tx, rx) = oneshot::channel();
+
+ // Register the sender before broadcasting
+ self.message_results.lock().insert(msg_count, tx);
+
+ // RAII guard ensures cleanup on timeout, send error, or cancellation
+ // (record_message_result also removes, so double-remove is harmless)
+ struct CleanupGuard<'a> {
+ msg_count: u64,
+ results: &'a ParkingMutex<HashMap<u64, oneshot::Sender<MessageResult>>>,
+ }
+ impl Drop for CleanupGuard<'_> {
+ fn drop(&mut self) {
+ self.results.lock().remove(&self.msg_count);
+ }
+ }
+ let _guard = CleanupGuard {
+ msg_count,
+ results: &self.message_results,
+ };
+
+ // Send the message
+ let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version);
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ // Wait for delivery with timeout (clean tokio pattern)
+ match tokio::time::timeout(timeout, rx).await {
+ Ok(Ok(result)) => {
+ // Got result successfully - return it to caller
+ // Caller should check result.result < 0 for errno-based errors
+ Ok(result)
+ }
+ Ok(Err(_)) => {
+ // Channel closed without sending - shouldn't happen
+ anyhow::bail!("DFSM: message {} sender dropped", msg_count);
+ }
+ Err(_) => {
+ // Timeout - guard will clean up
+ anyhow::bail!("DFSM: message {} timed out after {:?}", msg_count, timeout);
+ }
+ }
+ // On cancellation (future dropped), guard cleans up automatically
+ }
+
+ /// Record the result of a delivered message (for synchronous sends)
+ ///
+ /// Called from deliver_message_queue() when a message is delivered.
+ /// Matches C's dfsm_record_local_result().
+ ///
+ /// Uses tokio oneshot channel to send result - clean, non-blocking, and can't fail.
+ fn record_message_result(&self, msg_count: u64, result: i32, processed: bool) {
+ tracing::debug!(
+ "DFSM: recording result for message {}: result={}, processed={}",
+ msg_count,
+ result,
+ processed
+ );
+
+ // Update msgcount_rcvd
+ self.msgcount_rcvd.store(msg_count, Ordering::SeqCst);
+
+ // Send result via oneshot channel if someone is waiting
+ let mut results = self.message_results.lock();
+ if let Some(tx) = results.remove(&msg_count) {
+ let msg_result = MessageResult {
+ msgcount: msg_count,
+ result,
+ processed,
+ };
+
+ // Send result through oneshot channel (non-blocking, infallible)
+ // If receiver was dropped (timeout), this silently fails - which is fine
+ let _ = tx.send(msg_result);
+ }
+ }
+
+ /// Send a TreeEntry update to the cluster (leader only, during synchronization)
+ ///
+ /// This is used by the leader to send individual database entries to followers
+ /// that need to catch up. Matches C's dfsm_send_update().
+ pub fn send_update(&self, tree_entry: pmxcfs_memdb::TreeEntry) -> Result<()> {
+ tracing::debug!("DFSM: sending Update for inode {}", tree_entry.inode);
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::from_tree_entry(tree_entry, sync_epoch);
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Send UpdateComplete signal to cluster (leader only, after sending all updates)
+ ///
+ /// Signals to followers that all Update messages have been sent and they can
+ /// now transition to Synced mode. Matches C's dfsm_send_update_complete().
+ pub fn send_update_complete(&self) -> Result<()> {
+ tracing::info!("DFSM: sending UpdateComplete");
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::UpdateComplete { sync_epoch };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Request checksum verification (leader only)
+ /// This should be called periodically by the leader to verify cluster state consistency
+ pub fn verify_request(&self) -> Result<()> {
+ // Only leader should send verify requests
+ if !self.is_leader() {
+ return Ok(());
+ }
+
+ // Only verify when synced
+ if self.get_mode() != DfsmMode::Synced {
+ return Ok(());
+ }
+
+ // Check if we need to wait for previous verification to complete
+ let checksum_counter = *self.checksum_counter.lock();
+ let checksum_id = *self.checksum_id.lock();
+
+ if checksum_counter != checksum_id {
+ tracing::debug!(
+ "DFSM: delaying verify request {:016x}",
+ checksum_counter + 1
+ );
+ return Ok(());
+ }
+
+ // Increment counter and send verify request
+ *self.checksum_counter.lock() = checksum_counter + 1;
+ let new_counter = checksum_counter + 1;
+
+ tracing::debug!("DFSM: sending verify request {:016x}", new_counter);
+
+ // Send VERIFY_REQUEST message with counter
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg: DfsmMessage<M> = DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id: new_counter,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Handle verify request from leader
+ pub fn handle_verify_request(&self, message_epoch: SyncEpoch, csum_id: u64) -> Result<()> {
+ tracing::debug!("DFSM: received verify request {:016x}", csum_id);
+
+ // Compute current state checksum
+ let mut checksum = [0u8; 32];
+ self.callbacks.compute_checksum(&mut checksum)?;
+
+ // Save checksum info
+ // Store the epoch FROM THE MESSAGE (matching C: dfsm.c:736)
+ *self.checksum.lock() = checksum;
+ *self.checksum_epoch.lock() = message_epoch;
+ *self.checksum_id.lock() = csum_id;
+
+ // Send the checksum verification response
+ tracing::debug!("DFSM: sending verify response");
+
+ let sync_epoch = *self.sync_epoch.read();
+ let dfsm_msg = DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ };
+ self.send_dfsm_message(&dfsm_msg)?;
+
+ Ok(())
+ }
+
+ /// Handle verify response from a node
+ pub fn handle_verify(
+ &self,
+ message_epoch: SyncEpoch,
+ csum_id: u64,
+ received_checksum: &[u8; 32],
+ ) -> Result<()> {
+ tracing::debug!("DFSM: received verify response");
+
+ let our_checksum_id = *self.checksum_id.lock();
+ let our_checksum_epoch = *self.checksum_epoch.lock();
+
+ // Check if this verification matches our saved checksum
+ // Compare with MESSAGE epoch, not current epoch (matching C: dfsm.c:766-767)
+ if our_checksum_id == csum_id && our_checksum_epoch == message_epoch {
+ let our_checksum = *self.checksum.lock();
+
+ // Compare checksums
+ if our_checksum != *received_checksum {
+ tracing::error!(
+ "DFSM: checksum mismatch! Expected {:016x?}, got {:016x?}",
+ &our_checksum[..8],
+ &received_checksum[..8]
+ );
+ tracing::error!("DFSM: data divergence detected - restarting cluster sync");
+ self.set_mode(DfsmMode::Leave);
+ return Err(anyhow::anyhow!("Checksum verification failed"));
+ } else {
+ tracing::info!("DFSM: data verification successful");
+ }
+ } else {
+ tracing::debug!("DFSM: skipping verification - no checksum saved or epoch mismatch");
+ }
+
+ Ok(())
+ }
+
+ /// Invalidate saved checksum (called on membership changes)
+ pub fn invalidate_checksum(&self) {
+ let counter = *self.checksum_counter.lock();
+ *self.checksum_id.lock() = counter;
+
+ // Reset checksum epoch
+ *self.checksum_epoch.lock() = SyncEpoch {
+ epoch: 0,
+ time: 0,
+ nodeid: 0,
+ pid: 0,
+ };
+
+ tracing::debug!("DFSM: checksum invalidated");
+ }
+
+ /// Broadcast a message to the cluster
+ ///
+ /// Checks if the cluster is synced before broadcasting.
+ /// If not synced, the message is silently dropped.
+ pub fn broadcast(&self, msg: M) -> Result<()> {
+ if !self.is_synced() {
+ return Ok(());
+ }
+
+ tracing::debug!("Broadcasting {:?}", msg);
+ self.send_message(msg)?;
+ tracing::debug!("Broadcast successful");
+
+ Ok(())
+ }
+
+ /// Handle incoming DFSM message from cluster (called by CpgHandler)
+ fn handle_dfsm_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ message: DfsmMessage<M>,
+ ) -> anyhow::Result<()> {
+ // Validate epoch for state messages (all except Normal and SyncStart)
+ // This matches C implementation's epoch checking in dfsm.c:665-673
+ let should_validate_epoch = !matches!(
+ message,
+ DfsmMessage::Normal { .. } | DfsmMessage::SyncStart { .. }
+ );
+
+ if should_validate_epoch {
+ let current_epoch = *self.sync_epoch.read();
+ let message_epoch = match &message {
+ DfsmMessage::State { sync_epoch, .. }
+ | DfsmMessage::Update { sync_epoch, .. }
+ | DfsmMessage::UpdateComplete { sync_epoch }
+ | DfsmMessage::VerifyRequest { sync_epoch, .. }
+ | DfsmMessage::Verify { sync_epoch, .. } => *sync_epoch,
+ _ => unreachable!(),
+ };
+
+ if message_epoch != current_epoch {
+ tracing::debug!(
+ "DFSM: ignoring message with wrong epoch (expected {:?}, got {:?})",
+ current_epoch,
+ message_epoch
+ );
+ return Ok(());
+ }
+ }
+
+ // Match on typed message variants
+ match message {
+ DfsmMessage::Normal {
+ msg_count,
+ timestamp,
+ protocol_version: _,
+ message: app_msg,
+ } => self.handle_normal_message(nodeid, pid, msg_count, timestamp, app_msg),
+ DfsmMessage::SyncStart { sync_epoch } => self.handle_sync_start(nodeid, sync_epoch),
+ DfsmMessage::State {
+ sync_epoch: _,
+ data,
+ } => self.process_state(nodeid, pid, &data),
+ DfsmMessage::Update {
+ sync_epoch: _,
+ tree_entry,
+ } => self.handle_update(nodeid, pid, tree_entry),
+ DfsmMessage::UpdateComplete { sync_epoch: _ } => self.handle_update_complete(),
+ DfsmMessage::VerifyRequest {
+ sync_epoch,
+ csum_id,
+ } => self.handle_verify_request(sync_epoch, csum_id),
+ DfsmMessage::Verify {
+ sync_epoch,
+ csum_id,
+ checksum,
+ } => self.handle_verify(sync_epoch, csum_id, &checksum),
+ }
+ }
+
+ /// Handle membership change notification (called by CpgHandler)
+ fn handle_membership_change(&self, members: &[MemberInfo]) -> anyhow::Result<()> {
+ tracing::info!(
+ "DFSM: handling membership change ({} members)",
+ members.len()
+ );
+
+ // Invalidate saved checksum
+ self.invalidate_checksum();
+
+ // Update epoch
+ let mut counter = self.local_epoch_counter.lock();
+ *counter += 1;
+
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as u32;
+
+ let new_epoch = SyncEpoch {
+ epoch: *counter,
+ time: now,
+ nodeid: self.nodeid.load(Ordering::Relaxed),
+ pid: self.pid,
+ };
+
+ *self.sync_epoch.write() = new_epoch;
+ drop(counter);
+
+ // Find lowest node ID (leader)
+ let lowest = members.iter().map(|m| m.node_id).min().unwrap_or(0);
+ *self.lowest_nodeid.write() = lowest;
+
+ // Call cleanup callback before releasing sync resources (matches C: dfsm.c:512-514)
+ let old_sync_nodes = self.sync_nodes.read().clone();
+ if !old_sync_nodes.is_empty() {
+ self.callbacks.cleanup_sync_resources(&old_sync_nodes);
+ }
+
+ // Initialize sync nodes
+ let mut sync_nodes = self.sync_nodes.write();
+ sync_nodes.clear();
+
+ for member in members {
+ sync_nodes.push(NodeSyncInfo {
+ node_id: member.node_id,
+ pid: member.pid,
+ state: None,
+ synced: false,
+ });
+ }
+ drop(sync_nodes);
+
+ // Clear queues
+ self.sync_queue.lock().clear();
+
+ // Call membership change callback (matches C: dfsm.c:1180-1182)
+ self.callbacks.on_membership_change(members);
+
+ // Determine next mode
+ if members.len() == 1 {
+ // Single node - already synced
+ tracing::info!("DFSM: single node cluster, marking as synced");
+ self.set_mode(DfsmMode::Synced);
+
+ // Mark ourselves as synced
+ let mut sync_nodes = self.sync_nodes.write();
+ if let Some(node) = sync_nodes.first_mut() {
+ node.synced = true;
+ }
+
+ // Deliver queued messages
+ self.deliver_message_queue()?;
+ } else {
+ // Multi-node - start synchronization
+ tracing::info!("DFSM: multi-node cluster, starting sync");
+ self.set_mode(DfsmMode::StartSync);
+
+ // If we're the leader, initiate sync
+ if self.is_leader() {
+ tracing::info!("DFSM: we are leader, sending sync start");
+ self.send_sync_start()?;
+
+ // Leader also needs to send its own state
+ // (CPG doesn't loop back messages to sender)
+ self.send_state().context("Failed to send leader state")?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Handle normal application message
+ fn handle_normal_message(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ msg_count: u64,
+ timestamp: u32,
+ message: M,
+ ) -> Result<()> {
+ // C version: deliver immediately if in Synced mode, otherwise queue
+ if self.get_mode() == DfsmMode::Synced {
+ // Deliver immediately - message is already deserialized
+ match self.callbacks.deliver_message(
+ nodeid,
+ pid,
+ message,
+ timestamp as u64, // Convert back to u64 for callback compatibility
+ ) {
+ Ok((result, processed)) => {
+ tracing::debug!(
+ "DFSM: message delivered immediately, result={}, processed={}",
+ result,
+ processed
+ );
+ // Record result for synchronous sends
+ self.record_message_result(msg_count, result, processed);
+ }
+ Err(e) => {
+ tracing::error!("DFSM: failed to deliver message: {}", e);
+ // Record error result
+ self.record_message_result(msg_count, -libc::EIO, false);
+ }
+ }
+ } else {
+ // Queue for later delivery - store typed message directly
+ self.queue_message(nodeid, pid, msg_count, message, timestamp as u64);
+ }
+ Ok(())
+ }
+
+ /// Handle SyncStart message from leader
+ fn handle_sync_start(&self, nodeid: u32, new_epoch: SyncEpoch) -> Result<()> {
+ tracing::info!(
+ "DFSM: received SyncStart from node {} with epoch {:?}",
+ nodeid,
+ new_epoch
+ );
+
+ // Adopt the new epoch from the leader (critical for sync protocol!)
+ // This matches C implementation which updates dfsm->sync_epoch
+ *self.sync_epoch.write() = new_epoch;
+ tracing::debug!("DFSM: adopted new sync epoch from leader");
+
+ // Send our state back to the cluster
+ // BUT: don't send if we're the leader (we already sent our state in handle_membership_change)
+ let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+ if nodeid != my_nodeid {
+ self.send_state()
+ .context("Failed to send state in response to SyncStart")?;
+ tracing::debug!("DFSM: sent state in response to SyncStart");
+ } else {
+ tracing::debug!("DFSM: skipping state send (we're the leader who already sent state)");
+ }
+
+ Ok(())
+ }
+
+ /// Handle Update message from leader
+ fn handle_update(
+ &self,
+ nodeid: u32,
+ pid: u32,
+ tree_entry: pmxcfs_memdb::TreeEntry,
+ ) -> Result<()> {
+ // Serialize TreeEntry for callback (process_update expects raw bytes for now)
+ let serialized = tree_entry.serialize_for_update();
+ if let Err(e) = self.callbacks.process_update(nodeid, pid, &serialized) {
+ tracing::error!("DFSM: failed to process update: {}", e);
+ }
+ Ok(())
+ }
+
+ /// Handle UpdateComplete message
+ fn handle_update_complete(&self) -> Result<()> {
+ tracing::info!("DFSM: received UpdateComplete from leader");
+ self.deliver_sync_queue()?;
+ self.set_mode(DfsmMode::Synced);
+ self.callbacks.on_synced();
+ Ok(())
+ }
+}
+
+/// Implementation of CpgHandler trait for DFSM
+///
+/// This allows Dfsm to receive CPG callbacks in an idiomatic Rust way,
+/// with all unsafe pointer handling managed by the CpgService.
+impl<M: Message> CpgHandler for Dfsm<M> {
+ fn on_deliver(&self, _group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]) {
+ tracing::debug!(
+ "DFSM CPG message from node {} (pid {}): {} bytes",
+ u32::from(nodeid),
+ pid,
+ msg.len()
+ );
+
+ // Deserialize DFSM protocol message
+ match DfsmMessage::<M>::deserialize(msg) {
+ Ok(dfsm_msg) => {
+ if let Err(e) = self.handle_dfsm_message(u32::from(nodeid), pid, dfsm_msg) {
+ tracing::error!("Error handling DFSM message: {}", e);
+ }
+ }
+ Err(e) => {
+ tracing::error!("Failed to deserialize DFSM message: {}", e);
+ }
+ }
+ }
+
+ fn on_confchg(
+ &self,
+ _group_name: &str,
+ member_list: &[cpg::Address],
+ _left_list: &[cpg::Address],
+ _joined_list: &[cpg::Address],
+ ) {
+ tracing::info!("DFSM CPG membership change: {} members", member_list.len());
+
+ // Build MemberInfo list from CPG addresses
+ let members: Vec<MemberInfo> = member_list
+ .iter()
+ .map(|addr| MemberInfo {
+ node_id: u32::from(addr.nodeid),
+ pid: addr.pid,
+ joined_at: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs(),
+ })
+ .collect();
+
+ // Notify DFSM of membership change
+ if let Err(e) = self.handle_membership_change(&members) {
+ tracing::error!("Failed to handle membership change: {}", e);
+ }
+ }
+}
+
+impl<M: Message> Dfsm<M> {
+ /// Initialize CPG (Closed Process Group) for cluster communication
+ ///
+ /// Uses the idiomatic CpgService wrapper which handles all unsafe FFI
+ /// and callback management internally.
+ pub fn init_cpg(self: &Arc<Self>) -> Result<()> {
+ tracing::info!("DFSM: Initializing CPG");
+
+ // Create CPG service with this Dfsm as the handler
+ // CpgService handles all callback registration and context management
+ let cpg_service = Arc::new(CpgService::new(Arc::clone(self))?);
+
+ // Get our node ID from CPG (matches C's cpg_local_get)
+ // This MUST be done after cpg_initialize but before joining the group
+ let nodeid = cpg::local_get(cpg_service.handle())?;
+ let nodeid_u32 = u32::from(nodeid);
+ self.nodeid.store(nodeid_u32, Ordering::Relaxed);
+ tracing::info!("DFSM: Got node ID {} from CPG", nodeid_u32);
+
+ // Join the CPG group
+ let group_name = &self.cluster_name;
+ cpg_service
+ .join(group_name)
+ .context("Failed to join CPG group")?;
+
+ tracing::info!("DFSM joined CPG group '{}'", group_name);
+
+ // Store the service
+ *self.cpg_service.write() = Some(cpg_service);
+
+ // Dispatch once to get initial membership
+ if let Some(ref service) = *self.cpg_service.read()
+ && let Err(e) = service.dispatch()
+ {
+ tracing::warn!("Failed to dispatch CPG events: {:?}", e);
+ }
+
+ tracing::info!("DFSM CPG initialized successfully");
+ Ok(())
+ }
+
+ /// Dispatch CPG events (should be called periodically from event loop)
+ /// Matching C's service_dfsm_dispatch
+ pub fn dispatch_events(&self) -> Result<(), rust_corosync::CsError> {
+ if let Some(ref service) = *self.cpg_service.read() {
+ service.dispatch()
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Get CPG file descriptor for event monitoring
+ pub fn fd_get(&self) -> Result<i32> {
+ if let Some(ref service) = *self.cpg_service.read() {
+ service.fd()
+ } else {
+ Err(anyhow::anyhow!("CPG service not initialized"))
+ }
+ }
+
+ /// Stop DFSM services (leave CPG group and finalize)
+ pub fn stop_services(&self) -> Result<()> {
+ tracing::info!("DFSM: Stopping services");
+
+ // Leave the CPG group before dropping the service
+ let group_name = self.cluster_name.clone();
+ if let Some(ref service) = *self.cpg_service.read()
+ && let Err(e) = service.leave(&group_name)
+ {
+ tracing::warn!("Error leaving CPG group: {:?}", e);
+ }
+
+ // Drop the service (CpgService::drop handles finalization)
+ *self.cpg_service.write() = None;
+
+ tracing::info!("DFSM services stopped");
+ Ok(())
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
new file mode 100644
index 000000000..203fe208b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
@@ -0,0 +1,118 @@
+//! Status Sync Service
+//!
+//! This service synchronizes ephemeral status data across the cluster using a separate
+//! DFSM instance with the "pve_kvstore_v1" CPG group.
+//!
+//! Equivalent to C implementation's service_status (the kvstore DFSM).
+//! Handles synchronization of:
+//! - RRD data (performance metrics from each node)
+//! - Node IP addresses
+//! - Cluster log entries
+//! - Other ephemeral status key-value data
+
+use async_trait::async_trait;
+use pmxcfs_services::{Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message;
+
+/// Status Sync Service
+///
+/// Synchronizes ephemeral status data across all nodes using a separate DFSM instance.
+/// Uses CPG group "pve_kvstore_v1" (separate from main config database "pmxcfs_v1").
+///
+/// This implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for status replication
+/// - Separation of status data from config data for better performance
+///
+/// This is equivalent to C implementation's service_status (the kvstore DFSM).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct StatusSyncService<M> {
+ dfsm: Arc<Dfsm<M>>,
+ fd: Option<i32>,
+}
+
+impl<M: Message> StatusSyncService<M> {
+ /// Create a new status sync service
+ pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+ Self { dfsm, fd: None }
+ }
+}
+
+#[async_trait]
+impl<M: Message> Service for StatusSyncService<M> {
+ fn name(&self) -> &str {
+ "status-sync"
+ }
+
+ async fn initialize(&mut self) -> pmxcfs_services::Result<std::os::unix::io::RawFd> {
+ info!("Initializing status sync service (kvstore)");
+
+ // Initialize CPG connection for kvstore group
+ self.dfsm.init_cpg().map_err(|e| {
+ ServiceError::InitializationFailed(format!(
+ "Status sync CPG initialization failed: {e}"
+ ))
+ })?;
+
+ // Get file descriptor for event monitoring
+ let fd = self.dfsm.fd_get().map_err(|e| {
+ self.dfsm.stop_services().ok();
+ ServiceError::InitializationFailed(format!("Failed to get status sync fd: {e}"))
+ })?;
+
+ self.fd = Some(fd);
+
+ info!(
+ "Status sync service initialized successfully with fd {}",
+ fd
+ );
+ Ok(fd)
+ }
+
+ async fn dispatch(&mut self) -> pmxcfs_services::Result<bool> {
+ match self.dfsm.dispatch_events() {
+ Ok(_) => Ok(true),
+ Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+ warn!("Status sync connection lost, requesting reinitialization");
+ Ok(false)
+ }
+ Err(e) => {
+ error!("Status sync dispatch failed: {}", e);
+ Err(ServiceError::DispatchFailed(format!(
+ "Status sync dispatch failed: {e}"
+ )))
+ }
+ }
+ }
+
+ async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+ info!("Finalizing status sync service");
+
+ self.fd = None;
+
+ if let Err(e) = self.dfsm.stop_services() {
+ warn!("Error stopping status sync services: {}", e);
+ }
+
+ info!("Status sync service finalized");
+ Ok(())
+ }
+
+ async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+ // Status sync doesn't need periodic verification like the main database
+ // Status data is ephemeral and doesn't require the same consistency guarantees
+ Ok(())
+ }
+
+ fn timer_period(&self) -> Option<Duration> {
+ // No periodic timer needed for status sync
+ None
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
new file mode 100644
index 000000000..5a2eb9645
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
@@ -0,0 +1,107 @@
+/// DFSM type definitions
+///
+/// This module contains all type definitions used by the DFSM state machine.
+/// DFSM operating modes
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub enum DfsmMode {
+ /// Initial state - starting cluster connection
+ Start = 0,
+
+ /// Starting data synchronization
+ StartSync = 1,
+
+ /// All data is up to date
+ Synced = 2,
+
+ /// Waiting for updates from leader
+ Update = 3,
+
+ /// Error states (>= 128)
+ Leave = 253,
+ VersionError = 254,
+ Error = 255,
+}
+
+impl DfsmMode {
+ /// Check if this is an error mode
+ pub fn is_error(&self) -> bool {
+ (*self as u8) >= 128
+ }
+}
+
+impl std::fmt::Display for DfsmMode {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ DfsmMode::Start => write!(f, "start cluster connection"),
+ DfsmMode::StartSync => write!(f, "starting data synchronization"),
+ DfsmMode::Synced => write!(f, "all data is up to date"),
+ DfsmMode::Update => write!(f, "waiting for updates from leader"),
+ DfsmMode::Leave => write!(f, "leaving cluster"),
+ DfsmMode::VersionError => write!(f, "protocol version mismatch"),
+ DfsmMode::Error => write!(f, "serious internal error"),
+ }
+ }
+}
+
+/// DFSM message types (internal protocol messages)
+/// Matches C's dfsm_message_t enum values
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum DfsmMessageType {
+ Normal = 0,
+ SyncStart = 1,
+ State = 2,
+ Update = 3,
+ UpdateComplete = 4,
+ VerifyRequest = 5,
+ Verify = 6,
+}
+
+/// Sync epoch - identifies a synchronization session
+/// Matches C's dfsm_sync_epoch_t structure (16 bytes total)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct SyncEpoch {
+ pub epoch: u32,
+ pub time: u32,
+ pub nodeid: u32,
+ pub pid: u32,
+}
+
+impl SyncEpoch {
+ /// Serialize to C-compatible wire format (16 bytes)
+ /// Format: [epoch: u32][time: u32][nodeid: u32][pid: u32]
+ pub fn serialize(&self) -> [u8; 16] {
+ let mut bytes = [0u8; 16];
+ bytes[0..4].copy_from_slice(&self.epoch.to_le_bytes());
+ bytes[4..8].copy_from_slice(&self.time.to_le_bytes());
+ bytes[8..12].copy_from_slice(&self.nodeid.to_le_bytes());
+ bytes[12..16].copy_from_slice(&self.pid.to_le_bytes());
+ bytes
+ }
+
+ /// Deserialize from C-compatible wire format (16 bytes)
+ pub fn deserialize(bytes: &[u8]) -> Result<Self, &'static str> {
+ if bytes.len() < 16 {
+ return Err("SyncEpoch requires 16 bytes");
+ }
+ Ok(SyncEpoch {
+ epoch: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
+ time: u32::from_le_bytes(bytes[4..8].try_into().unwrap()),
+ nodeid: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
+ pid: u32::from_le_bytes(bytes[12..16].try_into().unwrap()),
+ })
+ }
+}
+
+/// Queued message awaiting delivery
+#[derive(Debug, Clone)]
+pub(super) struct QueuedMessage<M> {
+ pub nodeid: u32,
+ pub pid: u32,
+ pub _msg_count: u64,
+ pub message: M,
+ pub timestamp: u64,
+}
+
+// Re-export NodeSyncInfo from pmxcfs-api-types for use in Callbacks trait
+pub use pmxcfs_api_types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
new file mode 100644
index 000000000..3022d9704
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
@@ -0,0 +1,279 @@
+/// C-compatible wire format for cluster communication
+///
+/// This module implements the exact wire protocol used by the C version of pmxcfs
+/// to ensure compatibility with C-based cluster nodes.
+///
+/// The C version uses a simple format with iovec arrays containing raw C types.
+use anyhow::{Context, Result};
+use bytemuck::{Pod, Zeroable};
+use std::ffi::CStr;
+
+/// C message types (must match dcdb.h)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum CMessageType {
+ Write = 1,
+ Mkdir = 2,
+ Delete = 3,
+ Rename = 4,
+ Create = 5,
+ Mtime = 6,
+ UnlockRequest = 7,
+ Unlock = 8,
+}
+
+/// C-compatible FUSE message header
+/// Layout matches the iovec array from C: [size][offset][pathlen][tolen][flags]
+#[derive(Debug, Clone, Copy, Pod, Zeroable)]
+#[repr(C)]
+struct CFuseMessageHeader {
+ size: u32,
+ offset: u32,
+ pathlen: u32,
+ tolen: u32,
+ flags: u32,
+}
+
+/// Parsed C FUSE message
+#[derive(Debug, Clone)]
+pub struct CFuseMessage {
+ pub size: u32,
+ pub offset: u32,
+ pub flags: u32,
+ pub path: String,
+ pub to: Option<String>,
+ pub data: Vec<u8>,
+}
+
+impl CFuseMessage {
+ /// Maximum message size to prevent DoS attacks (16MB)
+ pub const MAX_MESSAGE_SIZE: u32 = 16 * 1024 * 1024;
+
+ /// Parse a C FUSE message from raw bytes
+ pub fn parse(data: &[u8]) -> Result<Self> {
+ if data.len() < std::mem::size_of::<CFuseMessageHeader>() {
+ return Err(anyhow::anyhow!(
+ "Message too short: {} < {}",
+ data.len(),
+ std::mem::size_of::<CFuseMessageHeader>()
+ ));
+ }
+
+ // Parse header manually to avoid alignment issues
+ let header = CFuseMessageHeader {
+ size: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
+ offset: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
+ pathlen: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
+ tolen: u32::from_le_bytes([data[12], data[13], data[14], data[15]]),
+ flags: u32::from_le_bytes([data[16], data[17], data[18], data[19]]),
+ };
+
+ // Check for integer overflow in total size calculation
+ let total_size = header
+ .pathlen
+ .checked_add(header.tolen)
+ .and_then(|s| s.checked_add(header.size))
+ .ok_or_else(|| anyhow::anyhow!("Integer overflow in message size calculation"))?;
+
+ // Validate total size is reasonable (prevent DoS)
+ if total_size > Self::MAX_MESSAGE_SIZE {
+ return Err(anyhow::anyhow!(
+ "Message size {total_size} exceeds maximum {}",
+ Self::MAX_MESSAGE_SIZE
+ ));
+ }
+
+ // Validate total size matches actual buffer size (prevent reading beyond buffer)
+ let header_size = std::mem::size_of::<CFuseMessageHeader>();
+ let expected_total = header_size
+ .checked_add(total_size as usize)
+ .ok_or_else(|| anyhow::anyhow!("Total message size overflow"))?;
+
+ if expected_total != data.len() {
+ return Err(anyhow::anyhow!(
+ "Message size mismatch: expected {}, got {}",
+ expected_total,
+ data.len()
+ ));
+ }
+
+ let mut offset = header_size;
+
+ // Parse path with overflow-checked arithmetic
+ let path = if header.pathlen > 0 {
+ let end_offset = offset
+ .checked_add(header.pathlen as usize)
+ .ok_or_else(|| anyhow::anyhow!("Integer overflow in path offset"))?;
+
+ if end_offset > data.len() {
+ return Err(anyhow::anyhow!(
+ "Invalid path length: {} bytes at offset {} exceeds message size {}",
+ header.pathlen,
+ offset,
+ data.len()
+ ));
+ }
+ let path_bytes = &data[offset..end_offset];
+ offset = end_offset;
+
+ // C strings are null-terminated
+ CStr::from_bytes_until_nul(path_bytes)
+ .context("Invalid path string")?
+ .to_str()
+ .context("Path not valid UTF-8")?
+ .to_string()
+ } else {
+ String::new()
+ };
+
+ // Parse 'to' (for rename operations) with overflow-checked arithmetic
+ let to = if header.tolen > 0 {
+ let end_offset = offset
+ .checked_add(header.tolen as usize)
+ .ok_or_else(|| anyhow::anyhow!("Integer overflow in 'to' offset"))?;
+
+ if end_offset > data.len() {
+ return Err(anyhow::anyhow!(
+ "Invalid 'to' length: {} bytes at offset {} exceeds message size {}",
+ header.tolen,
+ offset,
+ data.len()
+ ));
+ }
+ let to_bytes = &data[offset..end_offset];
+ offset = end_offset;
+
+ Some(
+ CStr::from_bytes_until_nul(to_bytes)
+ .context("Invalid to string")?
+ .to_str()
+ .context("To path not valid UTF-8")?
+ .to_string(),
+ )
+ } else {
+ None
+ };
+
+ // Parse data buffer with overflow-checked arithmetic
+ let buf_data = if header.size > 0 {
+ let end_offset = offset
+ .checked_add(header.size as usize)
+ .ok_or_else(|| anyhow::anyhow!("Integer overflow in data offset"))?;
+
+ if end_offset > data.len() {
+ return Err(anyhow::anyhow!(
+ "Invalid data size: {} bytes at offset {} exceeds message size {}",
+ header.size,
+ offset,
+ data.len()
+ ));
+ }
+ data[offset..end_offset].to_vec()
+ } else {
+ Vec::new()
+ };
+
+ Ok(CFuseMessage {
+ size: header.size,
+ offset: header.offset,
+ flags: header.flags,
+ path,
+ to,
+ data: buf_data,
+ })
+ }
+
+ /// Serialize to C wire format
+ pub fn serialize(&self) -> Vec<u8> {
+ let path_bytes = self.path.as_bytes();
+ let pathlen = if path_bytes.is_empty() {
+ 0
+ } else {
+ (path_bytes.len() + 1) as u32 // +1 for null terminator
+ };
+
+ let to_bytes = self.to.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
+ let tolen = if to_bytes.is_empty() {
+ 0
+ } else {
+ (to_bytes.len() + 1) as u32
+ };
+
+ let header = CFuseMessageHeader {
+ size: self.size,
+ offset: self.offset,
+ pathlen,
+ tolen,
+ flags: self.flags,
+ };
+
+ let mut result = Vec::new();
+
+ // Serialize header
+ result.extend_from_slice(bytemuck::bytes_of(&header));
+
+ // Serialize path (with null terminator)
+ if pathlen > 0 {
+ result.extend_from_slice(path_bytes);
+ result.push(0); // null terminator
+ }
+
+ // Serialize 'to' (with null terminator)
+ if tolen > 0 {
+ result.extend_from_slice(to_bytes);
+ result.push(0); // null terminator
+ }
+
+ // Serialize data
+ if self.size > 0 {
+ result.extend_from_slice(&self.data);
+ }
+
+ result
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_serialize_deserialize_write() {
+ let msg = CFuseMessage {
+ size: 13,
+ offset: 0,
+ flags: 0,
+ path: "/test.txt".to_string(),
+ to: None,
+ data: b"Hello, World!".to_vec(),
+ };
+
+ let serialized = msg.serialize();
+ let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+ assert_eq!(parsed.size, msg.size);
+ assert_eq!(parsed.offset, msg.offset);
+ assert_eq!(parsed.flags, msg.flags);
+ assert_eq!(parsed.path, msg.path);
+ assert_eq!(parsed.to, msg.to);
+ assert_eq!(parsed.data, msg.data);
+ }
+
+ #[test]
+ fn test_serialize_deserialize_rename() {
+ let msg = CFuseMessage {
+ size: 0,
+ offset: 0,
+ flags: 0,
+ path: "/old.txt".to_string(),
+ to: Some("/new.txt".to_string()),
+ data: Vec::new(),
+ };
+
+ let serialized = msg.serialize();
+ let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+ assert_eq!(parsed.path, msg.path);
+ assert_eq!(parsed.to, msg.to);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
new file mode 100644
index 000000000..3a371ad86
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
@@ -0,0 +1,563 @@
+/// Multi-node integration tests for DFSM cluster synchronization
+///
+/// These tests simulate multi-node clusters to verify the complete synchronization
+/// protocol works correctly with multiple Rust nodes exchanging state.
+use anyhow::Result;
+use pmxcfs_dfsm::{Callbacks, FuseMessage, NodeSyncInfo};
+use pmxcfs_memdb::{MemDb, MemDbIndex, ROOT_INODE, TreeEntry};
+use std::sync::{Arc, Mutex};
+use tempfile::TempDir;
+
+/// Mock callbacks for testing DFSM without full pmxcfs integration
+struct MockCallbacks {
+ memdb: MemDb,
+ states_received: Arc<Mutex<Vec<NodeSyncInfo>>>,
+ updates_received: Arc<Mutex<Vec<TreeEntry>>>,
+ synced_count: Arc<Mutex<usize>>,
+}
+
+impl MockCallbacks {
+ fn new(memdb: MemDb) -> Self {
+ Self {
+ memdb,
+ states_received: Arc::new(Mutex::new(Vec::new())),
+ updates_received: Arc::new(Mutex::new(Vec::new())),
+ synced_count: Arc::new(Mutex::new(0)),
+ }
+ }
+
+ #[allow(dead_code)]
+ fn get_states(&self) -> Vec<NodeSyncInfo> {
+ self.states_received.lock().unwrap().clone()
+ }
+
+ #[allow(dead_code)]
+ fn get_updates(&self) -> Vec<TreeEntry> {
+ self.updates_received.lock().unwrap().clone()
+ }
+
+ #[allow(dead_code)]
+ fn get_synced_count(&self) -> usize {
+ *self.synced_count.lock().unwrap()
+ }
+}
+
+impl Callbacks for MockCallbacks {
+ type Message = FuseMessage;
+
+ fn deliver_message(
+ &self,
+ _node_id: u32,
+ _pid: u32,
+ _message: FuseMessage,
+ _timestamp: u64,
+ ) -> Result<(i32, bool)> {
+ Ok((0, true))
+ }
+
+ fn compute_checksum(&self, output: &mut [u8; 32]) -> Result<()> {
+ let checksum = self.memdb.compute_database_checksum()?;
+ output.copy_from_slice(&checksum);
+ Ok(())
+ }
+
+ fn get_state(&self) -> Result<Vec<u8>> {
+ let index = self.memdb.encode_index()?;
+ Ok(index.serialize())
+ }
+
+ fn process_state_update(&self, states: &[NodeSyncInfo]) -> Result<bool> {
+ // Store received states for verification
+ *self.states_received.lock().unwrap() = states.to_vec();
+
+ // Parse indices from states
+ let mut indices: Vec<(u32, u32, MemDbIndex)> = Vec::new();
+ for node in states {
+ if let Some(state_data) = &node.state {
+ match MemDbIndex::deserialize(state_data) {
+ Ok(index) => indices.push((node.node_id, node.pid, index)),
+ Err(_) => continue,
+ }
+ }
+ }
+
+ if indices.is_empty() {
+ return Ok(true);
+ }
+
+ // Find leader (highest version, or if tie, highest mtime)
+ let mut leader_idx = 0;
+ for i in 1..indices.len() {
+ let (_, _, current_index) = &indices[i];
+ let (_, _, leader_index) = &indices[leader_idx];
+ if current_index > leader_index {
+ leader_idx = i;
+ }
+ }
+
+ let (_leader_nodeid, _leader_pid, leader_index) = &indices[leader_idx];
+
+ // Check if WE are synced with leader
+ let our_index = self.memdb.encode_index()?;
+ let we_are_synced = our_index.version == leader_index.version
+ && our_index.mtime == leader_index.mtime
+ && our_index.size == leader_index.size
+ && our_index.entries.len() == leader_index.entries.len()
+ && our_index
+ .entries
+ .iter()
+ .zip(leader_index.entries.iter())
+ .all(|(a, b)| a.inode == b.inode && a.digest == b.digest);
+
+ Ok(we_are_synced)
+ }
+
+ fn process_update(&self, _node_id: u32, _pid: u32, data: &[u8]) -> Result<()> {
+ // Deserialize and store update
+ let tree_entry = TreeEntry::deserialize_from_update(data)?;
+ self.updates_received
+ .lock()
+ .unwrap()
+ .push(tree_entry.clone());
+
+ // Apply to database
+ self.memdb.apply_tree_entry(tree_entry)?;
+ Ok(())
+ }
+
+ fn commit_state(&self) -> Result<()> {
+ Ok(())
+ }
+
+ fn on_synced(&self) {
+ *self.synced_count.lock().unwrap() += 1;
+ }
+}
+
+fn create_test_node(node_id: u32) -> Result<(MemDb, TempDir, Arc<MockCallbacks>)> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join(format!("node{node_id}.db"));
+ let memdb = MemDb::open(&db_path, true)?;
+ // Note: Local operations always use writer=0 (matching C implementation)
+ // Remote DFSM updates use the writer field from the incoming TreeEntry
+
+ let callbacks = Arc::new(MockCallbacks::new(memdb.clone()));
+ Ok((memdb, temp_dir, callbacks))
+}
+
+#[test]
+fn test_two_node_empty_sync() -> Result<()> {
+ // Create two nodes with empty databases
+ let (_memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+ // Generate states from both nodes
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+
+ // Simulate state exchange
+ let states = vec![
+ NodeSyncInfo {
+ node_id: 1,
+ pid: 1000,
+ state: Some(state1),
+ synced: false,
+ },
+ NodeSyncInfo {
+ node_id: 2,
+ pid: 2000,
+ state: Some(state2),
+ synced: false,
+ },
+ ];
+
+ // Both nodes process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+
+ // Both should be synced (empty databases are identical)
+ assert!(synced1, "Node 1 should be synced");
+ assert!(synced2, "Node 2 should be synced");
+
+ Ok(())
+}
+
+#[test]
+fn test_two_node_leader_election() -> Result<()> {
+ // Create two nodes
+ let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+ // Node 1 has more data (higher version)
+ memdb1.create("/file1.txt", 0, 0, 1000)?;
+ memdb1.write("/file1.txt", 0, 0, 1001, b"data from node 1", false)?;
+
+ // Generate states
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+
+ // Parse to check versions
+ let index1 = MemDbIndex::deserialize(&state1)?;
+ let index2 = MemDbIndex::deserialize(&state2)?;
+
+ // Node 1 should have higher version
+ assert!(
+ index1.version > index2.version,
+ "Node 1 version {} should be > Node 2 version {}",
+ index1.version,
+ index2.version
+ );
+
+ // Simulate state exchange
+ let states = vec![
+ NodeSyncInfo {
+ node_id: 1,
+ pid: 1000,
+ state: Some(state1),
+ synced: false,
+ },
+ NodeSyncInfo {
+ node_id: 2,
+ pid: 2000,
+ state: Some(state2),
+ synced: false,
+ },
+ ];
+
+ // Process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+
+ // Node 1 (leader) should be synced, Node 2 (follower) should not
+ assert!(synced1, "Node 1 (leader) should be synced");
+ assert!(!synced2, "Node 2 (follower) should not be synced");
+
+ Ok(())
+}
+
+#[test]
+fn test_incremental_update_transfer() -> Result<()> {
+ // Create leader and follower
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Leader has data
+ leader_db.create("/config", libc::S_IFDIR, 0, 1000)?;
+ leader_db.create("/config/node.conf", 0, 0, 1001)?;
+ leader_db.write("/config/node.conf", 0, 0, 1002, b"hostname=pve1", false)?;
+
+ // Get entries from leader
+ let leader_entries = leader_db.get_all_entries()?;
+
+ // Simulate sending updates to follower
+ for entry in leader_entries {
+ if entry.inode == ROOT_INODE {
+ continue; // Skip root (both have it)
+ }
+
+ // Serialize as update message
+ let update_msg = entry.serialize_for_update();
+
+ // Follower receives and processes update
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+ }
+
+ // Verify follower has the data
+ let config_dir = follower_db.lookup_path("/config");
+ assert!(
+ config_dir.is_some(),
+ "Follower should have /config directory"
+ );
+ assert!(config_dir.unwrap().is_dir());
+
+ let config_file = follower_db.lookup_path("/config/node.conf");
+ assert!(
+ config_file.is_some(),
+ "Follower should have /config/node.conf"
+ );
+
+ let config_data = follower_db.read("/config/node.conf", 0, 1024)?;
+ assert_eq!(
+ config_data, b"hostname=pve1",
+ "Follower should have correct data"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_three_node_sync() -> Result<()> {
+ // Create three nodes
+ let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+ let (memdb2, _temp2, callbacks2) = create_test_node(2)?;
+ let (_memdb3, _temp3, callbacks3) = create_test_node(3)?;
+
+ // Node 1 has the most recent data
+ memdb1.create("/cluster.conf", 0, 0, 5000)?;
+ memdb1.write("/cluster.conf", 0, 0, 5001, b"version=3", false)?;
+
+ // Node 2 has older data
+ memdb2.create("/cluster.conf", 0, 0, 4000)?;
+ memdb2.write("/cluster.conf", 0, 0, 4001, b"version=2", false)?;
+
+ // Node 3 is empty (new node joining)
+
+ // Generate states
+ let state1 = callbacks1.get_state()?;
+ let state2 = callbacks2.get_state()?;
+ let state3 = callbacks3.get_state()?;
+
+ let states = vec![
+ NodeSyncInfo {
+ node_id: 1,
+ pid: 1000,
+ state: Some(state1.clone()),
+ synced: false,
+ },
+ NodeSyncInfo {
+ node_id: 2,
+ pid: 2000,
+ state: Some(state2.clone()),
+ synced: false,
+ },
+ NodeSyncInfo {
+ node_id: 3,
+ pid: 3000,
+ state: Some(state3.clone()),
+ synced: false,
+ },
+ ];
+
+ // All nodes process states
+ let synced1 = callbacks1.process_state_update(&states)?;
+ let synced2 = callbacks2.process_state_update(&states)?;
+ let synced3 = callbacks3.process_state_update(&states)?;
+
+ // Node 1 (leader) should be synced
+ assert!(synced1, "Node 1 (leader) should be synced");
+
+ // Nodes 2 and 3 need updates
+ assert!(!synced2, "Node 2 should need updates");
+ assert!(!synced3, "Node 3 should need updates");
+
+ // Verify leader has highest version
+ let index1 = MemDbIndex::deserialize(&state1)?;
+ let index2 = MemDbIndex::deserialize(&state2)?;
+ let index3 = MemDbIndex::deserialize(&state3)?;
+
+ assert!(index1.version >= index2.version);
+ assert!(index1.version >= index3.version);
+
+ Ok(())
+}
+
+#[test]
+fn test_update_message_wire_format_compatibility() -> Result<()> {
+ // Verify our wire format matches C implementation exactly
+ let entry = TreeEntry {
+ inode: 42,
+ parent: 1,
+ version: 100,
+ writer: 2,
+ mtime: 12345,
+ size: 11,
+ entry_type: 8, // DT_REG
+ name: "test.conf".to_string(),
+ data: b"hello world".to_vec(),
+ };
+
+ let serialized = entry.serialize_for_update();
+
+ // Verify header size (41 bytes)
+ // parent(8) + inode(8) + version(8) + writer(4) + mtime(4) + size(4) + namelen(4) + type(1)
+ let expected_header_size = 8 + 8 + 8 + 4 + 4 + 4 + 4 + 1;
+ assert_eq!(expected_header_size, 41);
+
+ // Verify total size
+ let namelen = "test.conf".len() + 1; // Include null terminator
+ let expected_total = expected_header_size + namelen + 11;
+ assert_eq!(serialized.len(), expected_total);
+
+ // Verify we can deserialize it back
+ let deserialized = TreeEntry::deserialize_from_update(&serialized)?;
+ assert_eq!(deserialized.inode, entry.inode);
+ assert_eq!(deserialized.parent, entry.parent);
+ assert_eq!(deserialized.version, entry.version);
+ assert_eq!(deserialized.writer, entry.writer);
+ assert_eq!(deserialized.mtime, entry.mtime);
+ assert_eq!(deserialized.size, entry.size);
+ assert_eq!(deserialized.entry_type, entry.entry_type);
+ assert_eq!(deserialized.name, entry.name);
+ assert_eq!(deserialized.data, entry.data);
+
+ Ok(())
+}
+
+#[test]
+fn test_index_wire_format_compatibility() -> Result<()> {
+ // Verify memdb_index_t wire format matches C implementation
+ use pmxcfs_memdb::IndexEntry;
+
+ let entries = vec![
+ IndexEntry {
+ inode: 1,
+ digest: [0u8; 32],
+ },
+ IndexEntry {
+ inode: 2,
+ digest: [1u8; 32],
+ },
+ ];
+
+ let index = MemDbIndex::new(
+ 100, // version
+ 2, // last_inode
+ 1, // writer
+ 12345, // mtime
+ entries,
+ );
+
+ let serialized = index.serialize();
+
+ // Verify header size (32 bytes)
+ // version(8) + last_inode(8) + writer(4) + mtime(4) + size(4) + bytes(4)
+ let expected_header_size = 8 + 8 + 4 + 4 + 4 + 4;
+ assert_eq!(expected_header_size, 32);
+
+ // Verify entry size (40 bytes each)
+ // inode(8) + digest(32)
+ let expected_entry_size = 8 + 32;
+ assert_eq!(expected_entry_size, 40);
+
+ // Verify total size
+ let expected_total = expected_header_size + 2 * expected_entry_size;
+ assert_eq!(serialized.len(), expected_total);
+ assert_eq!(serialized.len(), index.bytes as usize);
+
+ // Verify deserialization
+ let deserialized = MemDbIndex::deserialize(&serialized)?;
+ assert_eq!(deserialized.version, index.version);
+ assert_eq!(deserialized.last_inode, index.last_inode);
+ assert_eq!(deserialized.writer, index.writer);
+ assert_eq!(deserialized.mtime, index.mtime);
+ assert_eq!(deserialized.size, index.size);
+ assert_eq!(deserialized.bytes, index.bytes);
+ assert_eq!(deserialized.entries.len(), 2);
+
+ Ok(())
+}
+
+#[test]
+fn test_sync_with_conflicts() -> Result<()> {
+ // Test scenario: two nodes modified different files
+ let (memdb1, _temp1, _callbacks1) = create_test_node(1)?;
+ let (memdb2, _temp2, _callbacks2) = create_test_node(2)?;
+
+ // Both start with same base
+ memdb1.create("/base.conf", 0, 0, 1000)?;
+ memdb1.write("/base.conf", 0, 0, 1001, b"shared", false)?;
+
+ memdb2.create("/base.conf", 0, 0, 1000)?;
+ memdb2.write("/base.conf", 0, 0, 1001, b"shared", false)?;
+
+ // Node 1 adds file1
+ memdb1.create("/file1.txt", 0, 0, 2000)?;
+ memdb1.write("/file1.txt", 0, 0, 2001, b"from node 1", false)?;
+
+ // Node 2 adds file2
+ memdb2.create("/file2.txt", 0, 0, 2000)?;
+ memdb2.write("/file2.txt", 0, 0, 2001, b"from node 2", false)?;
+
+ // Generate indices
+ let index1 = memdb1.encode_index()?;
+ let index2 = memdb2.encode_index()?;
+
+ // Find differences
+ let diffs_1_vs_2 = index1.find_differences(&index2);
+ let diffs_2_vs_1 = index2.find_differences(&index1);
+
+ // Node 1 has file1 that node 2 doesn't have
+ assert!(
+ !diffs_1_vs_2.is_empty(),
+ "Node 1 should have entries node 2 doesn't have"
+ );
+
+ // Node 2 has file2 that node 1 doesn't have
+ assert!(
+ !diffs_2_vs_1.is_empty(),
+ "Node 2 should have entries node 1 doesn't have"
+ );
+
+ // Higher version wins - in this case they're both v3 (base + create + write)
+ // so mtime would be tiebreaker
+
+ Ok(())
+}
+
+#[test]
+fn test_large_file_update() -> Result<()> {
+ // Test updating a file with significant data
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Create a file with 10KB of data
+ let large_data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect();
+
+ leader_db.create("/large.bin", 0, 0, 1000)?;
+ leader_db.write("/large.bin", 0, 0, 1001, &large_data, false)?;
+
+ // Get the entry
+ let entry = leader_db.lookup_path("/large.bin").unwrap();
+
+ // Serialize and send
+ let update_msg = entry.serialize_for_update();
+
+ // Follower receives
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+
+ // Verify
+ let follower_entry = follower_db.lookup_path("/large.bin").unwrap();
+ assert_eq!(follower_entry.size, large_data.len());
+ assert_eq!(follower_entry.data, large_data);
+
+ Ok(())
+}
+
+#[test]
+fn test_directory_hierarchy_sync() -> Result<()> {
+ // Test syncing nested directory structure
+ let (leader_db, _temp_leader, _) = create_test_node(1)?;
+ let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+ // Create directory hierarchy on leader
+ leader_db.create("/etc", libc::S_IFDIR, 0, 1000)?;
+ leader_db.create("/etc/pve", libc::S_IFDIR, 0, 1001)?;
+ leader_db.create("/etc/pve/nodes", libc::S_IFDIR, 0, 1002)?;
+ leader_db.create("/etc/pve/nodes/pve1", libc::S_IFDIR, 0, 1003)?;
+ leader_db.create("/etc/pve/nodes/pve1/config", 0, 0, 1004)?;
+ leader_db.write(
+ "/etc/pve/nodes/pve1/config", 0, 0, 1005, b"cpu: 2\nmem: 4096", false,
+ )?;
+
+ // Send all entries to follower
+ let entries = leader_db.get_all_entries()?;
+ for entry in entries {
+ if entry.inode == ROOT_INODE {
+ continue; // Skip root
+ }
+ let update_msg = entry.serialize_for_update();
+ follower_callbacks.process_update(1, 1000, &update_msg)?;
+ }
+
+ // Verify entire hierarchy
+ assert!(follower_db.lookup_path("/etc").is_some());
+ assert!(follower_db.lookup_path("/etc/pve").is_some());
+ assert!(follower_db.lookup_path("/etc/pve/nodes").is_some());
+ assert!(follower_db.lookup_path("/etc/pve/nodes/pve1").is_some());
+
+ let config = follower_db.lookup_path("/etc/pve/nodes/pve1/config");
+ assert!(config.is_some());
+ assert_eq!(config.unwrap().data, b"cpu: 2\nmem: 4096");
+
+ Ok(())
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
index 106f5016e..1c9b6cad8 100644
--- a/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
@@ -506,7 +506,7 @@ impl MemDb {
anyhow::bail!("Database has errors, refusing operation");
}
- // CRITICAL FIX: Acquire write guard BEFORE any checks to prevent TOCTOU race
+ // Acquire write guard before any checks to prevent TOCTOU race
// This ensures all validation and mutation happen atomically
let _guard = self.inner.write_guard.lock();
@@ -666,7 +666,7 @@ impl MemDb {
anyhow::bail!("Database has errors, refusing operation");
}
- // CRITICAL FIX: Acquire write guard BEFORE any checks to prevent TOCTOU race
+ // Acquire write guard before any checks to prevent TOCTOU race
// This ensures lookup and mutation happen atomically
let _guard = self.inner.write_guard.lock();
diff --git a/src/pmxcfs-rs/pmxcfs-status/src/status.rs b/src/pmxcfs-rs/pmxcfs-status/src/status.rs
index 58d81b8ed..3a0243a62 100644
--- a/src/pmxcfs-rs/pmxcfs-status/src/status.rs
+++ b/src/pmxcfs-rs/pmxcfs-status/src/status.rs
@@ -695,8 +695,8 @@ impl Status {
/// This updates the CPG member list and synchronizes the online status
/// in cluster_info to match current membership.
///
- /// IMPORTANT: Both members and cluster_info are updated atomically under locks
- /// to prevent TOCTOU where readers could see inconsistent state.
+ /// Both members and cluster_info are updated atomically under locks
+ /// to prevent readers from seeing inconsistent state.
pub fn update_members(&self, members: Vec<pmxcfs_api_types::MemberInfo>) {
// Acquire both locks before any updates to ensure atomicity
// (matches C's single mutex protection in status.c)
--
2.47.3
next prev parent reply other threads:[~2026-02-13 9:47 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-02-13 9:33 [PATCH pve-cluster 00/14 v2] Rewrite pmxcfs with Rust Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 01/14 v2] pmxcfs-rs: add Rust workspace configuration Kefu Chai
2026-02-18 10:41 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 02/14 v2] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-02-18 15:06 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 03/14 v2] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-02-18 16:41 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 04/14 v2] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 06/14 v2] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 07/14 v2] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 08/14 v2] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 09/14 v2] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-02-13 9:33 ` Kefu Chai [this message]
2026-02-13 9:33 ` [PATCH pve-cluster 11/14 v2] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 12/14 v2] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 14/14 v2] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260213094119.2379288-11-k.chai@proxmox.com \
--to=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.