From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Cc: Kefu Chai <tchaikov@gmail.com>
Subject: [PATCH pve-cluster v3 05/13] pmxcfs-rs: add pmxcfs-memdb crate
Date: Mon, 23 Mar 2026 19:32:20 +0800 [thread overview]
Message-ID: <20260323113239.942866-6-k.chai@proxmox.com> (raw)
In-Reply-To: <20260323113239.942866-1-k.chai@proxmox.com>
Add in-memory database with SQLite persistence:
- MemDb: Main database handle (thread-safe via Arc)
- TreeEntry: File/directory entries with metadata
- SQLite schema version 5 (C-compatible)
- Resource locking with timeout-based expiration
- Version tracking and checksumming
- Index encoding/decoding for cluster synchronization
This crate depends only on pmxcfs-api-types and external
libraries (rusqlite, sha2, bincode). It provides the core
storage layer used by the distributed file system.
Includes comprehensive unit tests for:
- CRUD operations on files and directories
- Lock acquisition and expiration
- SQLite persistence and recovery
- Index encoding/decoding for sync
- Tree entry application
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/pmxcfs-rs/Cargo.toml | 10 +
src/pmxcfs-rs/pmxcfs-memdb/Cargo.toml | 42 +
src/pmxcfs-rs/pmxcfs-memdb/README.md | 263 +++
src/pmxcfs-rs/pmxcfs-memdb/src/database.rs | 2008 +++++++++++++++++
src/pmxcfs-rs/pmxcfs-memdb/src/index.rs | 579 +++++
src/pmxcfs-rs/pmxcfs-memdb/src/lib.rs | 26 +
src/pmxcfs-rs/pmxcfs-memdb/src/locks.rs | 315 +++
src/pmxcfs-rs/pmxcfs-memdb/src/sync.rs | 720 ++++++
src/pmxcfs-rs/pmxcfs-memdb/src/traits.rs | 102 +
src/pmxcfs-rs/pmxcfs-memdb/src/types.rs | 344 +++
src/pmxcfs-rs/pmxcfs-memdb/src/vmlist.rs | 239 ++
.../pmxcfs-memdb/tests/checksum_test.rs | 208 ++
.../pmxcfs-memdb/tests/database_tests.rs | 461 ++++
.../tests/sync_integration_tests.rs | 390 ++++
14 files changed, 5707 insertions(+)
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/src/index.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/src/locks.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/src/sync.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/src/traits.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/src/types.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/src/vmlist.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/tests/checksum_test.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/tests/database_tests.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-memdb/tests/sync_integration_tests.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 7b4498267..db9db41b0 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -5,6 +5,7 @@ members = [
"pmxcfs-config", # Configuration management
"pmxcfs-logger", # Cluster log with ring buffer and deduplication
"pmxcfs-rrd", # RRD (Round-Robin Database) persistence
+ "pmxcfs-memdb", # In-memory database with SQLite persistence
]
resolver = "2"
@@ -22,6 +23,7 @@ pmxcfs-api-types = { path = "pmxcfs-api-types" }
pmxcfs-config = { path = "pmxcfs-config" }
pmxcfs-logger = { path = "pmxcfs-logger" }
pmxcfs-rrd = { path = "pmxcfs-rrd" }
+pmxcfs-memdb = { path = "pmxcfs-memdb" }
# Core async runtime
tokio = { version = "1.35", features = ["full"] }
@@ -33,6 +35,14 @@ thiserror = "2.0"
# Logging and tracing
tracing = "0.1"
+# Serialization
+serde = { version = "1.0", features = ["derive"] }
+bincode = "1.3"
+
+# Network and cluster
+bytes = "1.5"
+sha2 = "0.10"
+
# Concurrency primitives
parking_lot = "0.12"
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/Cargo.toml b/src/pmxcfs-rs/pmxcfs-memdb/Cargo.toml
new file mode 100644
index 000000000..4d340f4b6
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/Cargo.toml
@@ -0,0 +1,42 @@
+[package]
+name = "pmxcfs-memdb"
+description = "In-memory database with SQLite persistence for pmxcfs"
+
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+# Error handling
+anyhow.workspace = true
+
+# Database
+rusqlite = "0.29"
+
+# Concurrency primitives
+parking_lot.workspace = true
+
+# System integration
+libc.workspace = true
+
+# Cryptography (for checksums)
+sha2.workspace = true
+
+# Serialization
+serde.workspace = true
+bincode.workspace = true
+
+# Logging
+tracing.workspace = true
+tempfile.workspace = true
+
+# pmxcfs types
+pmxcfs-api-types = { path = "../pmxcfs-api-types" }
+
+[dev-dependencies]
+pmxcfs-test-utils = { path = "../pmxcfs-test-utils" }
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/README.md b/src/pmxcfs-rs/pmxcfs-memdb/README.md
new file mode 100644
index 000000000..ff7737dcb
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/README.md
@@ -0,0 +1,263 @@
+# pmxcfs-memdb
+
+**In-Memory Database** with SQLite persistence for pmxcfs cluster filesystem.
+
+This crate provides a thread-safe, cluster-synchronized in-memory database that serves as the backend storage for the Proxmox cluster filesystem. All filesystem operations (read, write, create, delete) are performed on in-memory structures with SQLite providing durable persistence.
+
+## Overview
+
+The MemDb is the core data structure that stores all cluster configuration files in memory for fast access while maintaining durability through SQLite. Changes are synchronized across the cluster using the DFSM protocol.
+
+### Key Features
+
+- **In-memory tree structure**: All filesystem entries cached in memory
+- **SQLite persistence**: Durable storage with ACID guarantees
+- **Cluster synchronization**: State replication via DFSM (pmxcfs-dfsm crate)
+- **Version tracking**: Monotonically increasing version numbers for conflict detection
+- **Resource locking**: File-level locks with timeout-based expiration
+- **Thread-safe**: All operations protected by mutex
+- **Size limits**: Enforces max file size (1 MiB) and total filesystem size (128 MiB)
+
+## Architecture
+
+### Module Structure
+
+| Module | Purpose | C Equivalent |
+|--------|---------|--------------|
+| `database.rs` | Core MemDb struct and CRUD operations | `memdb.c` (main functions) |
+| `types.rs` | TreeEntry, LockInfo, constants | `memdb.h:38-51, 71-74` |
+| `locks.rs` | Resource locking functionality | `memdb.c:memdb_lock_*` |
+| `sync.rs` | State serialization for cluster sync | `memdb.c:memdb_encode_index` |
+| `index.rs` | Index comparison for DFSM updates | `memdb.c:memdb_index_*` |
+
+## C to Rust Mapping
+
+### Data Structures
+
+| C Type | Rust Type | Notes |
+|--------|-----------|-------|
+| `memdb_t` | `MemDb` | Main database handle (Clone-able via Arc) |
+| `memdb_tree_entry_t` | `TreeEntry` | File/directory entry |
+| `memdb_index_t` | `MemDbIndex` | Serialized state for sync |
+| `memdb_index_extry_t` | `IndexEntry` | Single index entry |
+| `memdb_lock_info_t` | `LockInfo` | Lock metadata |
+| `db_backend_t` | `Connection` | SQLite backend (rusqlite) |
+| `GHashTable *index` | `HashMap<u64, TreeEntry>` | Inode index |
+| `GHashTable *locks` | `HashMap<String, LockInfo>` | Lock table |
+| `GMutex mutex` | `Mutex` | Thread synchronization |
+
+### Core Functions
+
+#### Database Lifecycle
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `memdb_open()` | `MemDb::open()` | database.rs |
+| `memdb_close()` | (Drop trait) | Automatic |
+| `memdb_checkpoint()` | (implicit in writes) | Auto-commit |
+
+#### File Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `memdb_read()` | `MemDb::read()` | database.rs |
+| `memdb_write()` | `MemDb::write()` | database.rs |
+| `memdb_create()` | `MemDb::create()` | database.rs |
+| `memdb_delete()` | `MemDb::delete()` | database.rs |
+| `memdb_mkdir()` | `MemDb::create()` (with DT_DIR) | database.rs |
+| `memdb_rename()` | `MemDb::rename()` | database.rs |
+| `memdb_mtime()` | (included in write) | database.rs |
+
+#### Directory Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `memdb_readdir()` | `MemDb::readdir()` | database.rs |
+| `memdb_dirlist_free()` | (automatic) | Rust's Vec drops automatically |
+
+#### Metadata Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `memdb_getattr()` | `MemDb::lookup_path()` | database.rs |
+| `memdb_statfs()` | `MemDb::statfs()` | database.rs |
+
+#### Tree Entry Functions
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `memdb_tree_entry_new()` | `TreeEntry { ... }` | Struct initialization |
+| `memdb_tree_entry_copy()` | `.clone()` | Automatic (derive Clone) |
+| `memdb_tree_entry_free()` | (Drop trait) | Automatic |
+| `tree_entry_debug()` | `{:?}` format | Automatic (derive Debug) |
+| `memdb_tree_entry_csum()` | `TreeEntry::compute_checksum()` | types.rs |
+
+#### Lock Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `memdb_lock_expired()` | `MemDb::is_lock_expired()` | locks.rs |
+| `memdb_update_locks()` | `MemDb::update_locks()` | locks.rs |
+
+#### Index/Sync Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `memdb_encode_index()` | `MemDb::get_index()` | sync.rs |
+| `memdb_index_copy()` | `.clone()` | Automatic (derive Clone) |
+| `memdb_compute_checksum()` | `MemDb::compute_checksum()` | sync.rs |
+| `bdb_backend_commit_update()` | `MemDb::apply_tree_entry()` | database.rs |
+
+#### State Synchronization
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `memdb_recreate_vmlist()` | (handled by status crate) | External |
+| (implicit) | `MemDb::replace_all_entries()` | database.rs |
+
+### SQLite Backend
+
+**C Version (database.c):**
+- Direct SQLite3 C API
+- Manual statement preparation
+- Explicit transaction management
+- Manual memory management
+
+**Rust Version (database.rs):**
+- `rusqlite` crate for type-safe SQLite access
+
+## Database Schema
+
+The SQLite schema stores all filesystem entries with metadata:
+- `inode = 1` is always the root directory
+- `parent = 0` for root, otherwise parent directory's inode
+- `version` increments on each modification (monotonic)
+- `writer` is the node ID that made the change
+- `mtime` is seconds since UNIX epoch
+- `data` is NULL for directories, BLOB for files
+
+## TreeEntry Wire Format
+
+For cluster synchronization (DFSM Update messages), TreeEntry uses C-compatible serialization that is byte-compatible with C's implementation.
+
+## Key Differences from C Implementation
+
+### Thread Safety
+
+**C Version:**
+- Single `GMutex` protects entire memdb_t
+- Callback-based access from qb_loop (single-threaded)
+
+**Rust Version:**
+- Mutex for each data structure (index, tree, locks, conn)
+- More granular locking
+- Can be shared across tokio tasks
+
+### Data Structures
+
+**C Version:**
+- `GHashTable` (GLib) for index and tree
+- Recursive tree structure with pointers
+
+**Rust Version:**
+- `HashMap` from std
+- Flat structure: `HashMap<u64, HashMap<String, u64>>` for tree
+- Separate `HashMap<u64, TreeEntry>` for index
+- No recursive pointers (eliminates cycles)
+
+### SQLite Integration
+
+**C Version (database.c):**
+- Direct SQLite3 C API
+
+**Rust Version (database.rs):**
+- `rusqlite` crate for type-safe SQLite access
+
+## Constants
+
+| Constant | Value | Purpose |
+|----------|-------|---------|
+| `MEMDB_MAX_FILE_SIZE` | 1 MiB | Maximum file size (matches C) |
+| `MEMDB_MAX_FSSIZE` | 128 MiB | Maximum total filesystem size |
+| `MEMDB_MAX_INODES` | 256k | Maximum number of files/dirs |
+| `MEMDB_BLOCKSIZE` | 4096 | Block size for statfs |
+| `LOCK_TIMEOUT` | 120 sec | Lock expiration timeout |
+| `DT_DIR` | 4 | Directory type (matches POSIX) |
+| `DT_REG` | 8 | Regular file type (matches POSIX) |
+
+## Known Issues / TODOs
+
+### Missing Features
+
+- [ ] **vmlist regeneration**: `memdb_recreate_vmlist()` not implemented (handled by status crate's `scan_vmlist()`)
+- [ ] **C integration tests**: No tests with real C-generated databases or Update messages
+- [ ] **Concurrent access tests**: No multi-threaded stress tests for lock contention
+
+### Behavioral Differences (Benign)
+
+- **Lock storage**: C reads from filesystem at startup, Rust does the same but implementation differs
+- **Index encoding**: Rust uses `Vec<IndexEntry>` instead of flexible array member
+- **Checksum algorithm**: Same (SHA-256) but implementation differs (ring vs OpenSSL)
+
+### Error Handling & Recovery
+
+**Error Flag Behavior:**
+
+When a database operation fails (e.g., SQLite error, transaction failure), the `errors` flag is set to `true` (matching C behavior in `memdb->errors`). Once set:
+- **All subsequent operations will fail** with "Database has errors, refusing operation"
+- **No automatic recovery mechanism** is provided
+- **Manual intervention required:** Restart the pmxcfs daemon to clear the error state
+
+This is a **fail-safe design** to prevent data corruption. If the database enters an inconsistent state due to an error, the system refuses all further operations rather than risk corrupting the cluster state.
+
+**Production Impact:**
+- A single database error will make the node unable to process further memdb operations
+- The node must be restarted to recover
+- This matches C implementation behavior
+
+**Future Improvements:**
+- [ ] Add error context to help diagnose which operation caused the error
+- [ ] Consider adding a recovery mechanism (e.g., re-open database, validate consistency)
+- [ ] Add monitoring/alerting for error flag state
+
+### Path Normalization Strategy
+
+**Internal Path Format:**
+
+All paths are internally stored and processed as **absolute paths** with:
+- Leading `/` (e.g., "/nodes/node1/qemu-server/100.conf")
+- No trailing `/` except for root ("/")
+- No `..` or `.` components
+
+**C Compatibility:**
+
+The C implementation sometimes sends paths without leading `/` (see `find_plug` in pmxcfs.c). The Rust implementation automatically normalizes these to absolute paths using `normalize_path()`.
+
+**Security:**
+
+Path traversal is prevented by:
+1. Normalization removes leading/trailing slashes
+2. Lock paths explicitly reject `..` components
+3. All lookups go through `lookup_path()` which only follows valid tree structure
+
+### Compatibility
+
+- **Database format**: 100% compatible with C version (same SQLite schema)
+- **Wire format**: TreeEntry serialization matches C byte-for-byte
+- **Constants**: All limits match C version exactly
+
+## References
+
+### C Implementation
+- `src/pmxcfs/memdb.c` / `memdb.h` - In-memory database
+- `src/pmxcfs/database.c` - SQLite backend
+
+### Related Crates
+- **pmxcfs-dfsm**: Uses MemDb for cluster synchronization
+- **pmxcfs-api-types**: Message types for FUSE operations
+- **pmxcfs**: Main daemon and FUSE integration
+
+### External Dependencies
+- **rusqlite**: SQLite bindings
+- **parking_lot**: Fast mutex implementation
+- **sha2**: SHA-256 checksums
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
new file mode 100644
index 000000000..ac0a3cb32
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/database.rs
@@ -0,0 +1,2008 @@
+/// Core MemDb implementation - in-memory database with SQLite persistence
+use parking_lot::Mutex;
+use pmxcfs_api_types::{VmType, errno};
+use rusqlite::{Connection, params};
+use std::collections::HashMap;
+use std::path::Path;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::time::SystemTime;
+
+use super::locks::lock_cache_key;
+use super::types::LockInfo;
+use super::types::{
+ DT_DIR, DT_REG, LoadDbResult, MEMDB_MAX_FILE_SIZE, MEMDB_MAX_FSSIZE, MEMDB_MAX_INODES,
+ NODES_DIR, ROOT_INODE, TreeEntry, VERSION_FILENAME,
+};
+
+const DB_ERRORS_MSG: &str = "Database has errors, refusing operation";
+
+/// In-memory database with SQLite persistence
+#[derive(Clone)]
+pub struct MemDb {
+ pub(super) inner: Arc<MemDbInner>,
+}
+
+pub(super) struct MemDbInner {
+ /// SQLite connection for persistence (wrapped in Mutex for thread-safety)
+ pub(super) conn: Mutex<Connection>,
+
+ /// In-memory index of all entries (inode -> TreeEntry)
+ /// This is a cache of the database for fast lookups
+ pub(super) index: Mutex<HashMap<u64, TreeEntry>>,
+
+ /// In-memory tree structure (parent inode -> children)
+ pub(super) tree: Mutex<HashMap<u64, HashMap<String, u64>>>,
+
+ /// Current version (incremented on each write)
+ pub(super) version: AtomicU64,
+
+ /// Resource locks (path -> LockInfo)
+ pub(super) locks: Mutex<HashMap<String, LockInfo>>,
+
+ /// Error flag - set to true on database errors (matches C's memdb->errors)
+ /// When true, all operations should fail to prevent data corruption
+ pub(super) errors: AtomicBool,
+
+ /// Single write guard mutex to serialize all mutating operations
+ /// Matches C's single GMutex approach, eliminates lock ordering issues
+ pub(super) write_guard: Mutex<()>,
+}
+
+impl MemDb {
+ pub fn open(path: &Path, create: bool) -> anyhow::Result<Self> {
+ let conn = Connection::open(path)?;
+
+ // Set SQLite pragmas to match C implementation (database.c:112-127)
+ // - WAL mode: Write-Ahead Logging for better concurrent read access
+ // - NORMAL sync: Faster writes (fsync only at critical moments)
+ // - 10s busy timeout: Retry on SQLITE_BUSY instead of instant failure
+ conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
+ conn.busy_timeout(std::time::Duration::from_secs(10))?;
+
+ if create {
+ Self::init_schema(&conn)?;
+ }
+
+ let (index, tree, _root_inode, version) = Self::load_from_db(&conn)?;
+
+ let memdb = Self {
+ inner: Arc::new(MemDbInner {
+ conn: Mutex::new(conn),
+ index: Mutex::new(index),
+ tree: Mutex::new(tree),
+ version: AtomicU64::new(version),
+ locks: Mutex::new(HashMap::new()),
+ errors: AtomicBool::new(false),
+ write_guard: Mutex::new(()),
+ }),
+ };
+
+ memdb.update_locks();
+
+ Ok(memdb)
+ }
+
+ fn init_schema(conn: &Connection) -> anyhow::Result<()> {
+ conn.execute_batch(
+ r#"
+ CREATE TABLE tree (
+ inode INTEGER PRIMARY KEY,
+ parent INTEGER NOT NULL,
+ version INTEGER NOT NULL,
+ writer INTEGER NOT NULL,
+ mtime INTEGER NOT NULL,
+ type INTEGER NOT NULL,
+ name TEXT NOT NULL,
+ data BLOB
+ );
+
+ CREATE INDEX tree_parent_idx ON tree(parent, name);
+
+ CREATE TABLE config (
+ name TEXT PRIMARY KEY,
+ value TEXT
+ );
+ "#,
+ )?;
+
+ // Create root metadata entry as inode ROOT_INODE with name "__version__"
+ // Matching C implementation: root inode is NEVER in database as a regular entry
+ // Root metadata is stored as inode ROOT_INODE with special name "__version__"
+ let now = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)?
+ .as_secs() as u32;
+
+ conn.execute(
+ "INSERT INTO tree (inode, parent, version, writer, mtime, type, name, data) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
+ params![ROOT_INODE, ROOT_INODE, 1, 0, now, DT_REG, VERSION_FILENAME, None::<Vec<u8>>],
+ )?;
+
+ Ok(())
+ }
+
+ fn load_from_db(conn: &Connection) -> anyhow::Result<LoadDbResult> {
+ let mut index = HashMap::new();
+ let mut tree: HashMap<u64, HashMap<String, u64>> = HashMap::new();
+ let mut max_version = 0u64;
+
+ let mut stmt = conn
+ .prepare("SELECT inode, parent, version, writer, mtime, type, name, data FROM tree")?;
+ let rows = stmt.query_map([], |row| {
+ let inode: u64 = row.get(0)?;
+ let parent: u64 = row.get(1)?;
+ let version: u64 = row.get(2)?;
+ let writer: u32 = row.get(3)?;
+ let mtime: u32 = row.get(4)?;
+ let entry_type: u8 = row.get(5)?;
+ let name: String = row.get(6)?;
+ let data: Option<Vec<u8>> = row.get(7)?;
+
+ // Derive size from data length (matching C behavior: sqlite3_column_bytes)
+ let data_vec = data.unwrap_or_default();
+ let size = data_vec.len();
+
+ Ok(TreeEntry {
+ inode,
+ parent,
+ version,
+ writer,
+ mtime,
+ size,
+ entry_type,
+ name,
+ data: data_vec,
+ })
+ })?;
+
+ // Create root entry in memory first (matching C implementation in database.c:559-567)
+ // Root is NEVER stored in database, only its metadata via inode ROOT_INODE
+ let now = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)?
+ .as_secs() as u32;
+ let mut root = TreeEntry {
+ inode: ROOT_INODE,
+ parent: ROOT_INODE, // Root's parent is itself
+ version: 0, // Will be populated from __version__ entry
+ writer: 0,
+ mtime: now,
+ size: 0,
+ entry_type: DT_DIR,
+ name: String::new(),
+ data: Vec::new(),
+ };
+
+ for row in rows {
+ let entry = row?;
+
+ // Handle __version__ entry (inode ROOT_INODE) - populate root metadata (C: database.c:372-382)
+ if entry.inode == ROOT_INODE {
+ if entry.name == VERSION_FILENAME {
+ tracing::debug!(
+ "Loading root metadata from __version__: version={}, writer={}, mtime={}",
+ entry.version,
+ entry.writer,
+ entry.mtime
+ );
+ root.version = entry.version;
+ root.writer = entry.writer;
+ root.mtime = entry.mtime;
+ if entry.version > max_version {
+ max_version = entry.version;
+ }
+ } else {
+ tracing::warn!("Ignoring inode 0 with unexpected name: {}", entry.name);
+ }
+ continue; // Don't add __version__ to index
+ }
+
+ // Track max version from all entries
+ if entry.version > max_version {
+ max_version = entry.version;
+ }
+
+ // Add to tree structure
+ tree.entry(entry.parent)
+ .or_default()
+ .insert(entry.name.clone(), entry.inode);
+
+ // If this is a directory, ensure it has an entry in the tree map
+ if entry.is_dir() {
+ tree.entry(entry.inode).or_default();
+ }
+
+ // Add to index
+ index.insert(entry.inode, entry);
+ }
+
+ // If root version is still 0, set it to 1 (new database)
+ if root.version == 0 {
+ root.version = 1;
+ max_version = 1;
+ tracing::debug!("No __version__ entry found, initializing root with version 1");
+ }
+
+ // Add root to index and ensure it has a tree entry (use entry() to not overwrite children!)
+ index.insert(ROOT_INODE, root);
+ tree.entry(ROOT_INODE).or_default();
+
+ Ok((index, tree, ROOT_INODE, max_version))
+ }
+
+ pub(crate) fn reload_runtime_state_from_db(&self) -> anyhow::Result<()> {
+ let conn = self.inner.conn.lock();
+ let (index, tree, _root_inode, version) = Self::load_from_db(&conn)?;
+ drop(conn);
+
+ *self.inner.index.lock() = index;
+ *self.inner.tree.lock() = tree;
+ self.inner.version.store(version, Ordering::SeqCst);
+ self.update_locks();
+
+ Ok(())
+ }
+
+ pub fn get_entry_by_inode(&self, inode: u64) -> Option<TreeEntry> {
+ let index = self.inner.index.lock();
+ index.get(&inode).cloned()
+ }
+
+ /// Get the __version__ entry for sending updates to C nodes
+ ///
+ /// The __version__ entry (inode ROOT_INODE) stores root metadata in the database
+ /// but is not kept in the in-memory index. This method queries it directly
+ /// from the database to send as an UPDATE message to C nodes.
+ pub fn get_version_entry(&self) -> anyhow::Result<TreeEntry> {
+ let index = self.inner.index.lock();
+ let root_entry = index
+ .get(&ROOT_INODE)
+ .ok_or_else(|| anyhow::anyhow!("Root entry not found"))?;
+
+ // Create a __version__ entry matching C's format
+ // This is what C expects to receive as inode ROOT_INODE
+ Ok(TreeEntry {
+ inode: ROOT_INODE, // __version__ is always inode ROOT_INODE in database/wire format
+ parent: ROOT_INODE, // Root's parent is itself
+ version: root_entry.version,
+ writer: root_entry.writer,
+ mtime: root_entry.mtime,
+ size: 0,
+ entry_type: DT_REG,
+ name: VERSION_FILENAME.to_string(),
+ data: Vec::new(),
+ })
+ }
+
+ pub fn lookup_path(&self, path: &str) -> Option<TreeEntry> {
+ let index = self.inner.index.lock();
+ let tree = self.inner.tree.lock();
+
+ if path.is_empty() || path == "/" || path == "." {
+ return index.get(&ROOT_INODE).cloned();
+ }
+
+ let mut current_inode = ROOT_INODE;
+
+ for part in path.split('/').filter(|s| !s.is_empty()) {
+ let children = tree.get(¤t_inode)?;
+ current_inode = *children.get(part)?;
+ }
+
+ index.get(¤t_inode).cloned()
+ }
+
+ /// Normalize a path to internal format
+ ///
+ /// # Path Normalization Strategy
+ ///
+ /// Internal paths are always stored as absolute paths with:
+ /// - Leading `/` (e.g., "/nodes/node1/qemu-server/100.conf")
+ /// - No trailing `/` except for root ("/")
+ /// - No `..` or `.` components
+ ///
+ /// C compatibility: The C implementation sometimes sends paths without leading `/`
+ /// (see find_plug in pmxcfs.c). This function normalizes all inputs to absolute paths.
+ ///
+ /// # Arguments
+ ///
+ /// * `path` - Input path (may or may not have leading `/`)
+ ///
+ /// # Returns
+ ///
+ /// Normalized absolute path with leading `/` and no trailing `/`
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// normalize_path("nodes/node1/qemu-server") -> "/nodes/node1/qemu-server"
+ /// normalize_path("/nodes/node1/qemu-server") -> "/nodes/node1/qemu-server"
+ /// normalize_path("/nodes/node1/qemu-server/") -> "/nodes/node1/qemu-server"
+ /// normalize_path("") -> "/"
+ /// normalize_path("/") -> "/"
+ /// ```
+ fn normalize_path(path: &str) -> String {
+ // Handle empty path as root
+ if path.is_empty() || path == "/" || path == "." {
+ return "/".to_string();
+ }
+
+ // Remove leading and trailing slashes, then add single leading slash
+ let trimmed = path.trim_matches('/');
+ if trimmed.is_empty() {
+ "/".to_string()
+ } else {
+ format!("/{trimmed}")
+ }
+ }
+
+ /// Split a path into parent directory and basename
+ ///
+ /// Uses the internal path normalization strategy to ensure consistent behavior.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if the path is invalid (e.g., empty).
+ fn split_path(path: &str) -> std::io::Result<(String, String)> {
+ if path.is_empty() {
+ return Err(errno::einval());
+ }
+
+ // Normalize to absolute path
+ let normalized_path = Self::normalize_path(path);
+
+ if let Some(pos) = normalized_path.rfind('/') {
+ let dirname = if pos == 0 {
+ "/"
+ } else {
+ &normalized_path[..pos]
+ };
+ let basename = &normalized_path[pos + 1..];
+ Ok((dirname.to_string(), basename.to_string()))
+ } else {
+ // This shouldn't happen after normalization, but handle it anyway
+ Ok(("/".to_string(), normalized_path.to_string()))
+ }
+ }
+
+ /// Check if path is a lock directory (cfs-utils.c:306-312)
+ fn is_lock_dir(path: &str) -> bool {
+ super::locks::is_lock_path(path)
+ }
+
+ pub fn exists(&self, path: &str) -> std::io::Result<bool> {
+ Ok(self.lookup_path(path).is_some())
+ }
+
+ pub fn read(&self, path: &str, offset: u64, size: usize) -> std::io::Result<Vec<u8>> {
+ let entry = self.lookup_path(path).ok_or_else(errno::enoent)?;
+
+ if entry.is_dir() {
+ return Err(errno::os_err(libc::EISDIR));
+ }
+
+ let offset = offset as usize;
+ if offset >= entry.data.len() {
+ return Ok(Vec::new());
+ }
+
+ let end = std::cmp::min(offset + size, entry.data.len());
+ Ok(entry.data[offset..end].to_vec())
+ }
+
+ /// Helper to update __version__ entry in database
+ ///
+ /// This is called for EVERY write operation to keep root metadata synchronized
+ /// (matching C behavior in database.c:275-278)
+ pub(crate) fn update_version_entry(
+ conn: &rusqlite::Connection,
+ version: u64,
+ writer: u32,
+ mtime: u32,
+ ) -> std::io::Result<()> {
+ conn.execute(
+ "UPDATE tree SET version = ?1, writer = ?2, mtime = ?3 WHERE inode = ?4",
+ params![version, writer, mtime, ROOT_INODE],
+ )
+ .map_err(std::io::Error::other)?;
+ Ok(())
+ }
+
+ /// Helper to update root entry in index
+ ///
+ /// Keeps the in-memory root entry synchronized with database __version__
+ pub(crate) fn update_root_metadata(
+ index: &mut HashMap<u64, TreeEntry>,
+ root_inode: u64,
+ version: u64,
+ writer: u32,
+ mtime: u32,
+ ) {
+ if let Some(root_entry) = index.get_mut(&root_inode) {
+ root_entry.version = version;
+ root_entry.writer = writer;
+ root_entry.mtime = mtime;
+ }
+ }
+
+ /// Create a new file or directory (FUSE layer).
+ ///
+ /// Enforces VMID uniqueness: refuses to create a VM config if the same VMID
+ /// already exists on another node. Use `create_replicated()` in DFSM callbacks.
+ pub fn create(&self, path: &str, mode: u32, writer: u32, mtime: u32) -> std::io::Result<()> {
+ self.create_impl(path, mode, writer, mtime, true)
+ }
+
+ /// Create a new file or directory from a DFSM replication message.
+ ///
+ /// Skips the VMID uniqueness check because:
+ /// 1. DFSM messages are trusted cluster broadcasts accepted by the quorum leader.
+ /// 2. During PVE VM migration the destination config is created while the source
+ /// (same VMID, different node) still exists — this is intentional.
+ /// 3. The C implementation uses a lazy vmlist hash table for this check, which
+ /// may not yet reflect remote state, effectively bypassing it for remote creates.
+ pub fn create_replicated(
+ &self,
+ path: &str,
+ mode: u32,
+ writer: u32,
+ mtime: u32,
+ ) -> std::io::Result<()> {
+ self.create_impl(path, mode, writer, mtime, false)
+ }
+
+ fn create_impl(
+ &self,
+ path: &str,
+ mode: u32,
+ writer: u32,
+ mtime: u32,
+ check_vmid: bool,
+ ) -> std::io::Result<()> {
+ // Check error flag first (matches C's memdb->errors check)
+ if self.inner.errors.load(Ordering::SeqCst) {
+ return Err(std::io::Error::other(DB_ERRORS_MSG));
+ }
+
+ // CRITICAL FIX: Acquire write guard BEFORE any checks to prevent TOCTOU race
+ // This ensures all validation and mutation happen atomically
+ let _guard = self.inner.write_guard.lock();
+
+ // Now perform checks under write guard protection
+ if self.exists(path)? {
+ return Err(errno::eexist());
+ }
+
+ let (parent_path, basename) = Self::split_path(path)?;
+
+ // Reject '.' and '..' basenames (memdb.c:577-582)
+ if basename.is_empty() || basename == "." || basename == ".." {
+ return Err(errno::eacces());
+ }
+
+ let parent_entry = self.lookup_path(&parent_path).ok_or_else(errno::enoent)?;
+
+ if !parent_entry.is_dir() {
+ return Err(errno::os_err(libc::ENOTDIR));
+ }
+
+ // VMID uniqueness check on create (C parity: memdb_create() lines 720-725).
+ //
+ // Skipped for DFSM-replicated creates (check_vmid=false): during PVE VM
+ // migration the destination config exists simultaneously with the source.
+ if check_vmid {
+ if let Some((to_node, _vmtype, to_vmid)) = crate::vmlist::parse_vm_config_path(path)
+ && self.vmid_on_other_node(to_vmid, &to_node)
+ {
+ return Err(errno::eexist());
+ }
+ }
+
+ // Check inode limit (matches C implementation in memdb.c)
+ let index = self.inner.index.lock();
+ let current_inodes = index.len();
+ drop(index);
+
+ if current_inodes >= MEMDB_MAX_INODES {
+ return Err(errno::os_err(libc::ENOSPC));
+ }
+
+ let entry_type = if mode & libc::S_IFDIR != 0 {
+ DT_DIR
+ } else {
+ DT_REG
+ };
+
+ // Increment version
+ let new_version = self.inner.version.fetch_add(1, Ordering::SeqCst) + 1;
+
+ // Begin transaction
+ let conn = self.inner.conn.lock();
+ let tx = conn
+ .unchecked_transaction()
+ .map_err(|e| std::io::Error::other(format!("Failed to begin transaction: {e}")))?;
+
+ // Update __version__ entry in database (matches C's database.c:275-278)
+ tx.execute(
+ "UPDATE tree SET version = ?1, writer = ?2, mtime = ?3 WHERE inode = ?4",
+ params![new_version, writer, mtime, ROOT_INODE],
+ )
+ .map_err(|e| std::io::Error::other(format!("Failed to update __version__ entry: {e}")))?;
+
+ // Inode equals version number (C compatibility)
+ let new_inode = new_version;
+
+ let entry = TreeEntry {
+ inode: new_inode,
+ parent: parent_entry.inode,
+ version: new_version,
+ writer,
+ mtime,
+ size: 0,
+ entry_type,
+ name: basename.clone(),
+ data: Vec::new(),
+ };
+
+ if let Err(e) = tx.execute(
+ "INSERT INTO tree (inode, parent, version, writer, mtime, type, name, data) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
+ params![
+ entry.inode,
+ entry.parent,
+ entry.version,
+ entry.writer,
+ entry.mtime,
+ entry.entry_type,
+ entry.name,
+ if entry.is_dir() { None::<Vec<u8>> } else { Some(entry.data.clone()) }
+ ],
+ ) {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Database mutation failed: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ // Commit transaction
+ if let Err(e) = tx.commit() {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Failed to commit transaction: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ drop(conn);
+
+ // Update in-memory structures (root metadata + new entry in single lock acquisition)
+ {
+ let mut index = self.inner.index.lock();
+ let mut tree = self.inner.tree.lock();
+
+ // Update root entry metadata
+ if let Some(root_entry) = index.get_mut(&ROOT_INODE) {
+ root_entry.version = new_version;
+ root_entry.writer = writer;
+ root_entry.mtime = mtime;
+ }
+
+ index.insert(new_inode, entry.clone());
+
+ tree.entry(parent_entry.inode)
+ .or_default()
+ .insert(basename, new_inode);
+
+ if entry.is_dir() {
+ tree.insert(new_inode, HashMap::new());
+ }
+ }
+
+ // If this is a directory in priv/lock/, register it in the lock table
+ if entry.is_dir() && super::locks::is_lock_path(path) {
+ let csum = entry.compute_checksum();
+ let _ = self.lock_expired(path, &csum);
+ tracing::debug!("Registered lock directory: {}", path);
+ }
+
+ Ok(())
+ }
+
+ pub fn write(
+ &self,
+ path: &str,
+ offset: u64,
+ writer: u32,
+ mtime: u32,
+ data: &[u8],
+ truncate: bool,
+ ) -> std::io::Result<usize> {
+ // Check error flag first (matches C's memdb->errors check)
+ if self.inner.errors.load(Ordering::SeqCst) {
+ return Err(std::io::Error::other(DB_ERRORS_MSG));
+ }
+
+ // CRITICAL FIX: Acquire write guard BEFORE any checks to prevent TOCTOU race
+ // This ensures lookup and mutation happen atomically
+ let _guard = self.inner.write_guard.lock();
+
+ let mut entry = self.lookup_path(path).ok_or_else(errno::enoent)?;
+
+ if entry.is_dir() {
+ return Err(errno::os_err(libc::EISDIR));
+ }
+
+ // Overflow protection: validate offset fits in usize and check arithmetic
+ let offset_usize = usize::try_from(offset).map_err(|_| errno::efbig())?;
+ let end_offset = offset_usize
+ .checked_add(data.len())
+ .ok_or_else(errno::efbig)?;
+
+ if end_offset > MEMDB_MAX_FILE_SIZE {
+ return Err(errno::efbig());
+ }
+
+ // Check total filesystem size limit (matches C implementation)
+ // Calculate size delta for this write operation
+ let size_delta = end_offset.saturating_sub(entry.data.len());
+
+ if size_delta > 0 {
+ // Calculate current filesystem usage
+ let index = self.inner.index.lock();
+ let total_size: usize = index
+ .values()
+ .filter_map(|e| e.is_file().then_some(e.size))
+ .sum();
+ drop(index);
+
+ // Check if adding this data would exceed filesystem limit
+ let new_total_size = total_size
+ .checked_add(size_delta)
+ .ok_or_else(|| errno::os_err(libc::ENOSPC))?;
+
+ if new_total_size > MEMDB_MAX_FSSIZE {
+ return Err(errno::os_err(libc::ENOSPC));
+ }
+ }
+
+ // Truncate behavior: preserve prefix bytes (matching C)
+ // C implementation (memdb.c:724-726): if truncate, resize to offset, then write
+ if truncate {
+ // Preserve bytes before offset, clear from offset onwards
+ entry.data.truncate(offset_usize);
+ }
+
+ // Extend if necessary
+ if end_offset > entry.data.len() {
+ entry.data.resize(end_offset, 0);
+ }
+
+ // Write data
+ entry.data[offset_usize..end_offset].copy_from_slice(data);
+ entry.size = entry.data.len();
+ entry.mtime = mtime;
+ entry.writer = writer;
+
+ // Inline mutation logic to maintain write guard throughout
+ // Increment version
+ let new_version = self.inner.version.fetch_add(1, Ordering::SeqCst) + 1;
+ entry.version = new_version;
+
+ // Begin transaction
+ let conn = self.inner.conn.lock();
+ let tx = conn
+ .unchecked_transaction()
+ .map_err(|e| std::io::Error::other(format!("Failed to begin transaction: {e}")))?;
+
+ // Update __version__ entry in database (matches C's database.c:275-278)
+ tx.execute(
+ "UPDATE tree SET version = ?1, writer = ?2, mtime = ?3 WHERE inode = ?4",
+ params![new_version, writer, mtime, ROOT_INODE],
+ )
+ .map_err(|e| std::io::Error::other(format!("Failed to update __version__ entry: {e}")))?;
+
+ // Execute the update
+ if let Err(e) = tx.execute(
+ "UPDATE tree SET version = ?1, writer = ?2, mtime = ?3, data = ?4 WHERE inode = ?5",
+ params![
+ entry.version,
+ entry.writer,
+ entry.mtime,
+ &entry.data,
+ entry.inode
+ ],
+ ) {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Database mutation failed: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ // Commit transaction
+ if let Err(e) = tx.commit() {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Failed to commit transaction: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ drop(conn);
+
+ // Update in-memory state (root metadata + written entry in single lock acquisition)
+ {
+ let mut index = self.inner.index.lock();
+ if let Some(root_entry) = index.get_mut(&ROOT_INODE) {
+ root_entry.version = new_version;
+ root_entry.writer = writer;
+ root_entry.mtime = mtime;
+ }
+ index.insert(entry.inode, entry);
+ }
+
+ Ok(data.len())
+ }
+
+ /// Update modification time of a file or directory
+ ///
+ /// This implements the C version's `memdb_mtime` function (memdb.c:860-932)
+ /// with full lock protection semantics for directories in `priv/lock/`.
+ ///
+ /// # Lock Protection
+ ///
+ /// For lock directories (`priv/lock/*`), this function enforces:
+ /// 1. Only the same writer (node ID) can update the lock
+ /// 2. Only newer mtime values are accepted (to prevent replay attacks)
+ /// 3. Lock cache is refreshed after successful update
+ ///
+ /// # Arguments
+ ///
+ /// * `path` - Path to the file/directory
+ /// * `writer` - Writer ID (node ID in cluster)
+ /// * `mtime` - New modification time (seconds since UNIX epoch)
+ pub fn set_mtime(&self, path: &str, writer: u32, mtime: u32) -> std::io::Result<()> {
+ // Check error flag first (matches C's memdb->errors check)
+ if self.inner.errors.load(Ordering::SeqCst) {
+ return Err(std::io::Error::other(DB_ERRORS_MSG));
+ }
+
+ // Acquire write guard before lookup to prevent TOCTOU race
+ // (another writer could delete the entry between lookup and mutation)
+ let _guard = self.inner.write_guard.lock();
+
+ let mut entry = self.lookup_path(path).ok_or_else(errno::enoent)?;
+
+ // Don't allow updating root
+ if entry.inode == ROOT_INODE {
+ return Err(errno::eperm());
+ }
+
+ // Check if this is a lock directory (matching C logic in memdb.c:882).
+ //
+ // Use is_lock_dir() (which delegates to is_lock_path()) rather than
+ // comparing only the parent path. The old parent-path check only
+ // matched direct children of priv/lock/ (e.g. /priv/lock/mylock) but
+ // missed deeper entries such as /priv/lock/qemu-server/100.conf whose
+ // parent_path would be /priv/lock/qemu-server, not /priv/lock.
+ let is_lock = Self::is_lock_dir(path) && entry.is_dir();
+
+ if is_lock {
+ // Lock protection: Only allow newer mtime (C: memdb.c:886-889)
+ // This prevents replay attacks and ensures lock renewal works correctly
+ if mtime < entry.mtime {
+ tracing::warn!(
+ "Rejecting mtime update for lock '{}': {} < {} (locked)",
+ path,
+ mtime,
+ entry.mtime
+ );
+ return Err(errno::eacces());
+ }
+
+ // Lock protection: Only same writer can update (C: memdb.c:890-894)
+ // This prevents lock hijacking from other nodes
+ if entry.writer != writer {
+ tracing::warn!(
+ "Rejecting mtime update for lock '{}': writer {} != {} (wrong owner)",
+ path,
+ writer,
+ entry.writer
+ );
+ return Err(errno::eacces());
+ }
+
+ tracing::debug!(
+ "Updating lock directory: {} (mtime: {} -> {})",
+ path,
+ entry.mtime,
+ mtime
+ );
+ }
+
+ // Inline mutation logic: write_guard already held, cannot call with_mutation
+ // (with_mutation would try to re-acquire write_guard, which would deadlock)
+ let new_version = self.inner.version.fetch_add(1, Ordering::SeqCst) + 1;
+ entry.version = new_version;
+ entry.writer = writer;
+ entry.mtime = mtime;
+
+ let conn = self.inner.conn.lock();
+ let tx = conn
+ .unchecked_transaction()
+ .map_err(|e| std::io::Error::other(format!("Failed to begin transaction: {e}")))?;
+
+ // Update __version__ entry in database (matches C's database.c:275-278)
+ tx.execute(
+ "UPDATE tree SET version = ?1, writer = ?2, mtime = ?3 WHERE inode = ?4",
+ params![new_version, writer, mtime, ROOT_INODE],
+ )
+ .map_err(|e| std::io::Error::other(format!("Failed to update __version__ entry: {e}")))?;
+
+ if let Err(e) = tx.execute(
+ "UPDATE tree SET version = ?1, writer = ?2, mtime = ?3 WHERE inode = ?4",
+ params![entry.version, entry.writer, entry.mtime, entry.inode],
+ ) {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Database mutation failed: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ if let Err(e) = tx.commit() {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Failed to commit transaction: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ drop(conn);
+
+ // Update in-memory index (root metadata + updated entry in single lock acquisition)
+ {
+ let mut index = self.inner.index.lock();
+ if let Some(root_entry) = index.get_mut(&ROOT_INODE) {
+ root_entry.version = new_version;
+ root_entry.writer = writer;
+ root_entry.mtime = mtime;
+ }
+ index.insert(entry.inode, entry.clone());
+ }
+
+ // Refresh lock cache if this is a lock directory (C: memdb.c:924-929)
+ // Remove old entry and insert new one with updated checksum
+ if is_lock {
+ let lock_key = lock_cache_key(path);
+ let mut locks = self.inner.locks.lock();
+ locks.remove(&lock_key);
+
+ let csum = entry.compute_checksum();
+ locks.insert(
+ lock_key,
+ LockInfo {
+ ltime: super::locks::now_secs(),
+ csum,
+ },
+ );
+
+ tracing::debug!("Refreshed lock cache for: {}", path);
+ }
+
+ Ok(())
+ }
+
+ pub fn readdir(&self, path: &str) -> std::io::Result<Vec<TreeEntry>> {
+ let entry = self.lookup_path(path).ok_or_else(errno::enoent)?;
+
+ if !entry.is_dir() {
+ return Err(errno::os_err(libc::ENOTDIR));
+ }
+
+ let tree = self.inner.tree.lock();
+ let index = self.inner.index.lock();
+
+ let children = tree.get(&entry.inode).ok_or_else(errno::eio)?;
+
+ let mut entries = Vec::new();
+ for child_inode in children.values() {
+ if let Some(child) = index.get(child_inode) {
+ entries.push(child.clone());
+ }
+ }
+
+ Ok(entries)
+ }
+
+ pub fn delete(&self, path: &str, writer: u32, mtime: u32) -> std::io::Result<()> {
+ // Check error flag first (matches C's memdb->errors check)
+ if self.inner.errors.load(Ordering::SeqCst) {
+ return Err(std::io::Error::other(DB_ERRORS_MSG));
+ }
+
+ // Acquire write guard before lookup to prevent TOCTOU race
+ // (entry could be deleted or directory could gain children between lookup and mutation)
+ let _guard = self.inner.write_guard.lock();
+
+ let entry = self.lookup_path(path).ok_or_else(errno::enoent)?;
+
+ // Don't allow deleting root
+ if entry.inode == ROOT_INODE {
+ return Err(errno::eperm());
+ }
+
+ // If directory, check if empty
+ if entry.is_dir() {
+ let tree = self.inner.tree.lock();
+ if let Some(children) = tree.get(&entry.inode)
+ && !children.is_empty()
+ {
+ return Err(errno::enotempty());
+ }
+ }
+
+ // Inline mutation logic: write_guard already held, cannot call with_mutation
+ let new_version = self.inner.version.fetch_add(1, Ordering::SeqCst) + 1;
+
+ let conn = self.inner.conn.lock();
+ let tx = conn
+ .unchecked_transaction()
+ .map_err(|e| std::io::Error::other(format!("Failed to begin transaction: {e}")))?;
+
+ // Update __version__ entry in database (matches C's database.c:275-278)
+ tx.execute(
+ "UPDATE tree SET version = ?1, writer = ?2, mtime = ?3 WHERE inode = ?4",
+ params![new_version, writer, mtime, ROOT_INODE],
+ )
+ .map_err(|e| std::io::Error::other(format!("Failed to update __version__ entry: {e}")))?;
+
+ if let Err(e) = tx.execute("DELETE FROM tree WHERE inode = ?1", params![entry.inode]) {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Database mutation failed: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ if let Err(e) = tx.commit() {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Failed to commit transaction: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ drop(conn);
+
+ // Update in-memory structures
+ {
+ let mut index = self.inner.index.lock();
+ let mut tree = self.inner.tree.lock();
+
+ // Update root entry metadata
+ if let Some(root_entry) = index.get_mut(&ROOT_INODE) {
+ root_entry.version = new_version;
+ root_entry.writer = writer;
+ root_entry.mtime = mtime;
+ }
+
+ // Remove from index
+ index.remove(&entry.inode);
+
+ // Remove from parent's children
+ if let Some(parent_children) = tree.get_mut(&entry.parent) {
+ parent_children.remove(&entry.name);
+ }
+
+ // Remove from tree if directory
+ if entry.is_dir() {
+ tree.remove(&entry.inode);
+ }
+ }
+
+ // Clean up lock cache for directories (matching C behavior in memdb.c:1235)
+ // This prevents stale lock cache entries and memory leaks
+ if entry.is_dir() {
+ let lock_key = lock_cache_key(path);
+ let mut locks = self.inner.locks.lock();
+ locks.remove(&lock_key);
+ tracing::debug!("Removed lock cache entry for deleted directory: {}", path);
+ }
+
+ Ok(())
+ }
+
+ pub fn rename(
+ &self,
+ old_path: &str,
+ new_path: &str,
+ writer: u32,
+ mtime: u32,
+ ) -> std::io::Result<()> {
+ // Check error flag first (matches C's memdb->errors check)
+ if self.inner.errors.load(Ordering::SeqCst) {
+ return Err(std::io::Error::other(DB_ERRORS_MSG));
+ }
+
+ // Acquire write guard before any lookups to prevent TOCTOU races
+ // (entries could be created, deleted, or modified between the lookups and mutation)
+ let _guard = self.inner.write_guard.lock();
+
+ let mut entry = self.lookup_path(old_path).ok_or_else(errno::enoent)?;
+
+ if entry.inode == ROOT_INODE {
+ return Err(errno::eperm());
+ }
+
+ // Protect lock directories from being renamed (memdb.c:1107-1111)
+ if entry.is_dir() && Self::is_lock_dir(old_path) {
+ return Err(errno::eacces());
+ }
+
+ // Cannot rename non-empty directories (pmxcfs POSIX restriction).
+ // The C implementation enforces this to guarantee VMID uniqueness —
+ // renaming a nodes/<name>/qemu-server dir with live .conf files would
+ // silently move all VMIDs without updating the cluster vmlist.
+ if entry.is_dir() && self.dir_has_children(entry.inode) {
+ return Err(errno::enotempty());
+ }
+
+ // VMID uniqueness check: if renaming a regular file to a VM config path,
+ // block the rename when the target VMID already exists on a different node.
+ //
+ // C parity: memdb_rename() lines 1098-1105 (memdb.c).
+ // C checks vmlist_different_vm_exists(); we scan the index directly since
+ // the Rust implementation rebuilds the vmlist on demand rather than
+ // maintaining a live registry.
+ //
+ // Exception: renaming a VM config to a path with the same VMID is allowed
+ // (e.g., moving a VM between nodes — C: "if (!(from_node && vmid == from_vmid))").
+ if !entry.is_dir()
+ && let Some((to_node, _to_vmtype, to_vmid)) =
+ crate::vmlist::parse_vm_config_path(new_path)
+ {
+ let from_vmid = crate::vmlist::parse_vm_config_path(old_path).map(|(_, _, id)| id);
+ if from_vmid != Some(to_vmid) && self.vmid_on_other_node(to_vmid, &to_node) {
+ return Err(errno::eexist());
+ }
+ }
+
+ // If target exists, delete it first (POSIX rename semantics).
+ // Capture the full entry so we can enforce POSIX type constraints and
+ // clean up the tree map for directory targets (memdb.c:1113-1125).
+ let target_entry = self.lookup_path(new_path);
+
+ // POSIX rename(2): renaming a non-directory over a directory must fail.
+ if let Some(ref tgt) = target_entry {
+ if !entry.is_dir() && tgt.is_dir() {
+ return Err(errno::os_err(libc::EISDIR));
+ }
+ }
+
+ let target_inode = target_entry.as_ref().map(|e| e.inode);
+
+ let (new_parent_path, new_basename) = Self::split_path(new_path)?;
+
+ let new_parent_entry = self
+ .lookup_path(&new_parent_path)
+ .ok_or_else(errno::enoent)?;
+
+ if !new_parent_entry.is_dir() {
+ return Err(errno::os_err(libc::ENOTDIR));
+ }
+
+ let old_parent = entry.parent;
+ let old_name = entry.name.clone();
+
+ entry.parent = new_parent_entry.inode;
+ entry.name = new_basename.clone();
+
+ // Update writer and mtime on the renamed entry (memdb.c:1156-1164)
+ entry.writer = writer;
+ entry.mtime = mtime;
+
+ // Inline mutation logic: write_guard already held, cannot call with_mutation
+ let new_version = self.inner.version.fetch_add(1, Ordering::SeqCst) + 1;
+ entry.version = new_version;
+
+ let conn = self.inner.conn.lock();
+ let tx = conn
+ .unchecked_transaction()
+ .map_err(|e| std::io::Error::other(format!("Failed to begin transaction: {e}")))?;
+
+ // Update __version__ entry in database (matches C's database.c:275-278)
+ tx.execute(
+ "UPDATE tree SET version = ?1, writer = ?2, mtime = ?3 WHERE inode = ?4",
+ params![new_version, writer, mtime, ROOT_INODE],
+ )
+ .map_err(|e| std::io::Error::other(format!("Failed to update __version__ entry: {e}")))?;
+
+ // Delete target if it exists (atomic replacement)
+ if let Some(t_inode) = target_inode {
+ if let Err(e) = tx.execute("DELETE FROM tree WHERE inode = ?1", params![t_inode]) {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Database mutation failed: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+ }
+
+ // Update writer and mtime in database (memdb.c:1171-1173)
+ if let Err(e) = tx.execute(
+ "UPDATE tree SET parent = ?1, name = ?2, version = ?3, writer = ?4, mtime = ?5 WHERE inode = ?6",
+ params![entry.parent, entry.name, entry.version, entry.writer, entry.mtime, entry.inode],
+ ) {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Database mutation failed: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ if let Err(e) = tx.commit() {
+ self.inner.errors.store(true, Ordering::SeqCst);
+ tracing::error!("Failed to commit transaction: {}", e);
+ return Err(std::io::Error::other(e));
+ }
+
+ drop(conn);
+
+ // Update in-memory structures
+ {
+ let mut index = self.inner.index.lock();
+ let mut tree = self.inner.tree.lock();
+
+ // Update root entry metadata
+ if let Some(root_entry) = index.get_mut(&ROOT_INODE) {
+ root_entry.version = new_version;
+ root_entry.writer = writer;
+ root_entry.mtime = mtime;
+ }
+
+ // Remove target from in-memory structures if it existed.
+ // For directory targets we must also remove their tree map entry to
+ // avoid an orphaned HashMap leaking memory indefinitely.
+ if let Some(t_inode) = target_inode {
+ index.remove(&t_inode);
+ // If the target was a directory, remove its children map from tree.
+ // (The non-empty-directory guard above ensures this map is empty.)
+ if target_entry.as_ref().is_some_and(|e| e.is_dir()) {
+ tree.remove(&t_inode);
+ }
+ // The target name in new_parent_entry's children will be replaced below.
+ }
+
+ index.insert(entry.inode, entry.clone());
+
+ if let Some(old_parent_children) = tree.get_mut(&old_parent) {
+ old_parent_children.remove(&old_name);
+ }
+
+ tree.entry(new_parent_entry.inode)
+ .or_default()
+ .insert(new_basename, entry.inode);
+ }
+
+ Ok(())
+ }
+
+ /// Returns true if the directory at `inode` has any children in the tree.
+ fn dir_has_children(&self, inode: u64) -> bool {
+ self.inner
+ .tree
+ .lock()
+ .get(&inode)
+ .is_some_and(|children| !children.is_empty())
+ }
+
+ /// Check if `vmid` appears as a VM config on any node OTHER than `exclude_node`.
+ ///
+ /// Used by rename() to enforce the VMID uniqueness invariant (C parity:
+ /// vmlist_different_vm_exists in memdb.c:1099).
+ ///
+ /// Traverses the index looking for entries whose basename is `<vmid>.conf`,
+ /// whose parent is a `qemu-server` or `lxc` directory, and whose grandparent
+ /// (the node directory) is not `exclude_node`.
+ fn vmid_on_other_node(&self, vmid: u32, exclude_node: &str) -> bool {
+ let filename = format!("{vmid}.conf");
+ let index = self.inner.index.lock();
+ for entry in index.values() {
+ if entry.name != filename {
+ continue;
+ }
+ // Walk up: parent = qemu-server/lxc dir, grandparent = node dir
+ let Some(parent) = index.get(&entry.parent) else {
+ continue;
+ };
+ if parent.name != VmType::Qemu.config_dir() && parent.name != VmType::Lxc.config_dir() {
+ continue;
+ }
+ let Some(grandparent) = index.get(&parent.parent) else {
+ continue;
+ };
+ // grandparent is the node dir; its parent should be the "nodes" dir
+ let Some(nodes_dir) = index.get(&grandparent.parent) else {
+ continue;
+ };
+ if nodes_dir.name != NODES_DIR {
+ continue;
+ }
+ if grandparent.name != exclude_node {
+ return true;
+ }
+ }
+ false
+ }
+
+ pub fn get_version(&self) -> u64 {
+ self.inner.version.load(Ordering::SeqCst)
+ }
+
+ /// **TEST ONLY**: Manually set lock timestamp for testing expiration behavior
+ ///
+ /// This method is exposed for testing purposes only to simulate lock expiration
+ /// without waiting the full 120 seconds. Do not use in production code.
+ #[cfg(test)]
+ pub fn test_set_lock_timestamp(&self, path: &str, timestamp_secs: u64) {
+ // Normalize path to remove leading slash for consistency
+ let normalized_path = path.strip_prefix('/').unwrap_or(path);
+
+ let mut locks = self.inner.locks.lock();
+ if let Some(lock_info) = locks.get_mut(normalized_path) {
+ lock_info.ltime = timestamp_secs;
+ }
+ }
+
+ /// Get filesystem statistics
+ ///
+ /// Returns information about the filesystem usage, matching C's memdb_statfs
+ /// implementation. This is used by FUSE to report filesystem statistics.
+ ///
+ /// # Returns
+ ///
+ /// A tuple of (blocks, bfree, bavail, files, ffree) where:
+ /// - blocks: Total data blocks in filesystem
+ /// - bfree: Free blocks available
+ /// - bavail: Free blocks available to non-privileged user
+ /// - files: Total file nodes (inodes)
+ /// - ffree: Free file nodes
+ pub fn statfs(&self) -> (u64, u64, u64, u64, u64) {
+ const MEMDB_BLOCKSIZE: u64 = 4096;
+
+ let index = self.inner.index.lock();
+
+ // Calculate total size used by all files
+ let total_size: u64 = index
+ .values()
+ .filter_map(|e| e.is_file().then_some(e.size as u64))
+ .sum();
+
+ // Calculate blocks
+ let blocks = MEMDB_MAX_FSSIZE as u64 / MEMDB_BLOCKSIZE;
+ let blocks_used = total_size.div_ceil(MEMDB_BLOCKSIZE);
+ let bfree = blocks.saturating_sub(blocks_used);
+ let bavail = bfree; // Same as bfree for non-privileged users
+
+ // Calculate inodes
+ let files = MEMDB_MAX_INODES as u64;
+ let files_used = index.len() as u64;
+ let ffree = files.saturating_sub(files_used);
+
+ (blocks, bfree, bavail, files, ffree)
+ }
+}
+
+// ============================================================================
+// Trait Implementation for Dependency Injection
+// ============================================================================
+
+impl crate::traits::MemDbOps for MemDb {
+ fn create(&self, path: &str, mode: u32, writer: u32, mtime: u32) -> std::io::Result<()> {
+ self.create(path, mode, writer, mtime)
+ }
+
+ fn read(&self, path: &str, offset: u64, size: usize) -> std::io::Result<Vec<u8>> {
+ self.read(path, offset, size)
+ }
+
+ fn write(
+ &self,
+ path: &str,
+ offset: u64,
+ writer: u32,
+ mtime: u32,
+ data: &[u8],
+ truncate: bool,
+ ) -> std::io::Result<usize> {
+ self.write(path, offset, writer, mtime, data, truncate)
+ }
+
+ fn delete(&self, path: &str, writer: u32, mtime: u32) -> std::io::Result<()> {
+ self.delete(path, writer, mtime)
+ }
+
+ fn rename(
+ &self,
+ old_path: &str,
+ new_path: &str,
+ writer: u32,
+ mtime: u32,
+ ) -> std::io::Result<()> {
+ self.rename(old_path, new_path, writer, mtime)
+ }
+
+ fn exists(&self, path: &str) -> std::io::Result<bool> {
+ self.exists(path)
+ }
+
+ fn readdir(&self, path: &str) -> std::io::Result<Vec<crate::types::TreeEntry>> {
+ self.readdir(path)
+ }
+
+ fn set_mtime(&self, path: &str, writer: u32, mtime: u32) -> std::io::Result<()> {
+ self.set_mtime(path, writer, mtime)
+ }
+
+ fn lookup_path(&self, path: &str) -> Option<crate::types::TreeEntry> {
+ self.lookup_path(path)
+ }
+
+ fn get_entry_by_inode(&self, inode: u64) -> Option<crate::types::TreeEntry> {
+ self.get_entry_by_inode(inode)
+ }
+
+ fn acquire_lock(&self, path: &str, csum: &[u8; 32]) -> std::io::Result<()> {
+ self.acquire_lock(path, csum)
+ }
+
+ fn release_lock(&self, path: &str, csum: &[u8; 32]) -> std::io::Result<()> {
+ self.release_lock(path, csum)
+ }
+
+ fn is_locked(&self, path: &str) -> bool {
+ self.is_locked(path)
+ }
+
+ fn lock_expired(&self, path: &str, csum: &[u8; 32]) -> bool {
+ self.lock_expired(path, csum)
+ }
+
+ fn get_version(&self) -> u64 {
+ self.get_version()
+ }
+
+ fn get_all_entries(&self) -> std::io::Result<Vec<crate::types::TreeEntry>> {
+ self.get_all_entries()
+ }
+
+ fn replace_all_entries(&self, entries: Vec<crate::types::TreeEntry>) -> std::io::Result<()> {
+ self.replace_all_entries(entries)
+ }
+
+ fn apply_tree_entry(&self, entry: crate::types::TreeEntry) -> std::io::Result<()> {
+ self.apply_tree_entry(entry)
+ }
+
+ fn encode_database(&self) -> std::io::Result<Vec<u8>> {
+ self.encode_database()
+ }
+
+ fn compute_database_checksum(&self) -> std::io::Result<[u8; 32]> {
+ self.compute_database_checksum()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ //! Unit tests for MemDb database operations
+ //!
+ //! This test module provides comprehensive coverage for:
+ //! - Basic CRUD operations (create, read, write, delete, rename)
+ //! - Lock management (acquisition, release, expiration, contention)
+ //! - Checksum operations
+ //! - Persistence verification
+ //! - Error handling and edge cases
+ //! - Security (path traversal, type mismatches)
+ //!
+ //! ## Test Organization
+ //!
+ //! Tests are organized into several categories:
+ //! - **Basic Operations**: File and directory CRUD
+ //! - **Lock Management**: Lock lifecycle, expiration, renewal
+ //! - **Error Handling**: Path validation, type checking, duplicates
+ //! - **Edge Cases**: Empty paths, sparse files, boundary conditions
+ //!
+ //! ## Lock Expiration Testing
+ //!
+ //! Lock timeout is 120 seconds. Tests use `test_set_lock_timestamp()` helper
+ //! to simulate time passage without waiting 120 actual seconds.
+
+ use super::*;
+ use anyhow::Result;
+ use std::thread::sleep;
+ use std::time::{Duration, SystemTime, UNIX_EPOCH};
+ use tempfile::TempDir;
+
+ #[test]
+ fn test_lock_expiration() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+ let path = "/priv/lock/test-resource";
+ let csum = [42u8; 32];
+
+ // Create lock directory structure
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Acquire lock
+ db.acquire_lock(path, &csum)?;
+ assert!(db.is_locked(path), "Lock should be active");
+ assert!(
+ !db.lock_expired(path, &csum),
+ "Lock should not be expired initially"
+ );
+
+ // Wait a short time (should still not be expired)
+ sleep(Duration::from_secs(2));
+ assert!(
+ db.is_locked(path),
+ "Lock should still be active after 2 seconds"
+ );
+ assert!(
+ !db.lock_expired(path, &csum),
+ "Lock should not be expired after 2 seconds"
+ );
+
+ // Manually set lock timestamp to simulate expiration (testing internal behavior)
+ // Note: In C implementation, LOCK_TIMEOUT is 120 seconds (memdb.h:27)
+ // Set ltime to 121 seconds ago (past LOCK_TIMEOUT of 120 seconds)
+ let now_secs = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ db.test_set_lock_timestamp(path, now_secs - 121);
+
+ // Now the lock should be expired
+ assert!(
+ db.lock_expired(path, &csum),
+ "Lock should be expired after 121 seconds"
+ );
+
+ // is_locked() should also return false for expired locks
+ assert!(
+ !db.is_locked(path),
+ "is_locked() should return false for expired locks"
+ );
+
+ // Test checksum mismatch resets timeout
+ let different_csum = [99u8; 32];
+ assert!(
+ !db.lock_expired(path, &different_csum),
+ "lock_expired() with different checksum should reset timeout and return false"
+ );
+
+ // After checksum mismatch, lock should be active again (with new checksum)
+ assert!(
+ db.is_locked(path),
+ "Lock should be active after checksum reset"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_management() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+
+ // Create parent directory and resource
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock/qemu-server", libc::S_IFDIR, 0, now)?;
+
+ let path = "/priv/lock/resource";
+ let csum1 = [1u8; 32];
+ let csum2 = [2u8; 32];
+
+ // Create the lock file
+ db.create(path, libc::S_IFREG, 0, now)?;
+
+ // Test lock acquisition
+ assert!(!db.is_locked(path), "Path should not be locked initially");
+
+ db.acquire_lock(path, &csum1)?;
+ assert!(
+ db.is_locked(path),
+ "Path should be locked after acquisition"
+ );
+
+ // Test lock contention
+ let result = db.acquire_lock(path, &csum2);
+ assert!(result.is_err(), "Lock with different checksum should fail");
+
+ // Test lock refresh (same checksum)
+ let result = db.acquire_lock(path, &csum1);
+ assert!(
+ result.is_ok(),
+ "Lock refresh with same checksum should succeed"
+ );
+
+ // Test lock release
+ db.release_lock(path, &csum1)?;
+ assert!(
+ !db.is_locked(path),
+ "Path should not be locked after release"
+ );
+
+ // Test release non-existent lock
+ let result = db.release_lock(path, &csum1);
+ assert!(result.is_err(), "Releasing non-existent lock should fail");
+
+ // Test lock access using config path (maps to priv/lock)
+ let config_path = "/qemu-server/100.conf";
+ let csum3 = [3u8; 32];
+ db.acquire_lock(config_path, &csum3)?;
+ assert!(db.is_locked(config_path), "Config path should be locked");
+ db.release_lock(config_path, &csum3)?;
+ assert!(
+ !db.is_locked(config_path),
+ "Config path should be unlocked after release"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_cache_cleanup_on_delete() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+
+ // Create priv/lock directory
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Create a lock directory
+ db.create("/priv/lock/testlock", libc::S_IFDIR, 0, now)?;
+
+ // Verify lock directory exists
+ assert!(db.exists("/priv/lock/testlock")?);
+
+ // Delete the lock directory
+ db.delete("/priv/lock/testlock", 0, now)?;
+
+ // Verify lock directory is deleted
+ assert!(!db.exists("/priv/lock/testlock")?);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_protection_same_writer() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+
+ // Create priv/lock directory
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Create a lock directory
+ db.create("/priv/lock/mylock", libc::S_IFDIR, 0, now)?;
+
+ // Get the actual writer ID from the created lock
+ let entry = db.lookup_path("/priv/lock/mylock").unwrap();
+ let writer_id = entry.writer;
+
+ // Same writer (node 1) should be able to update mtime
+ let new_mtime = now + 10;
+ let result = db.set_mtime("/priv/lock/mylock", writer_id, new_mtime);
+ assert!(
+ result.is_ok(),
+ "Same writer should be able to update lock mtime"
+ );
+
+ // Verify mtime was updated
+ let updated_entry = db.lookup_path("/priv/lock/mylock").unwrap();
+ assert_eq!(updated_entry.mtime, new_mtime);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_protection_different_writer() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+
+ // Create priv/lock directory
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Create a lock directory
+ db.create("/priv/lock/mylock", libc::S_IFDIR, 0, now)?;
+
+ // Get the current writer ID
+ let entry = db.lookup_path("/priv/lock/mylock").unwrap();
+ let original_writer = entry.writer;
+
+ // Try to update from different writer (simulating another node trying to steal lock)
+ let different_writer = original_writer + 1;
+ let new_mtime = now + 10;
+ let result = db.set_mtime("/priv/lock/mylock", different_writer, new_mtime);
+
+ // Should fail - cannot hijack lock from different writer
+ assert!(
+ result.is_err(),
+ "Different writer should NOT be able to hijack lock"
+ );
+ assert_eq!(
+ result.unwrap_err().raw_os_error(),
+ Some(libc::EACCES),
+ "Error should be EACCES for lock ownership conflict"
+ );
+
+ // Verify mtime was NOT updated
+ let unchanged_entry = db.lookup_path("/priv/lock/mylock").unwrap();
+ assert_eq!(unchanged_entry.mtime, now, "Mtime should not have changed");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_protection_older_mtime() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+
+ // Create priv/lock directory
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Create a lock directory
+ db.create("/priv/lock/mylock", libc::S_IFDIR, 0, now)?;
+
+ let entry = db.lookup_path("/priv/lock/mylock").unwrap();
+ let writer_id = entry.writer;
+
+ // Try to set an older mtime (replay attack simulation)
+ let older_mtime = now - 10;
+ let result = db.set_mtime("/priv/lock/mylock", writer_id, older_mtime);
+
+ // Should fail - cannot set older mtime
+ assert!(result.is_err(), "Cannot set older mtime on lock");
+ assert_eq!(
+ result.unwrap_err().raw_os_error(),
+ Some(libc::EACCES),
+ "Error should be EACCES for mtime protection"
+ );
+
+ // Verify mtime was NOT changed
+ let unchanged_entry = db.lookup_path("/priv/lock/mylock").unwrap();
+ assert_eq!(unchanged_entry.mtime, now, "Mtime should not have changed");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_protection_newer_mtime() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+
+ // Create priv/lock directory
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Create a lock directory
+ db.create("/priv/lock/mylock", libc::S_IFDIR, 0, now)?;
+
+ let entry = db.lookup_path("/priv/lock/mylock").unwrap();
+ let writer_id = entry.writer;
+
+ // Set a newer mtime (normal lock refresh)
+ let newer_mtime = now + 60;
+ let result = db.set_mtime("/priv/lock/mylock", writer_id, newer_mtime);
+
+ // Should succeed
+ assert!(result.is_ok(), "Should be able to set newer mtime on lock");
+
+ // Verify mtime was updated
+ let updated_entry = db.lookup_path("/priv/lock/mylock").unwrap();
+ assert_eq!(updated_entry.mtime, newer_mtime, "Mtime should be updated");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_lifecycle_with_cache() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+
+ // Setup: Create priv/lock directory
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Step 1: Create lock
+ db.create("/priv/lock/lifecycle_lock", libc::S_IFDIR, 0, now)?;
+ assert!(db.exists("/priv/lock/lifecycle_lock")?);
+
+ let entry = db.lookup_path("/priv/lock/lifecycle_lock").unwrap();
+ let writer_id = entry.writer;
+
+ // Step 2: Refresh lock multiple times (simulate lock renewals)
+ for i in 1..=5 {
+ let refresh_mtime = now + (i * 30); // Refresh every 30 seconds
+ let result = db.set_mtime("/priv/lock/lifecycle_lock", writer_id, refresh_mtime);
+ assert!(result.is_ok(), "Lock refresh #{i} should succeed");
+
+ // Verify mtime was updated
+ let refreshed_entry = db.lookup_path("/priv/lock/lifecycle_lock").unwrap();
+ assert_eq!(refreshed_entry.mtime, refresh_mtime);
+ }
+
+ // Step 3: Delete lock (release)
+ db.delete("/priv/lock/lifecycle_lock", 0, now)?;
+ assert!(!db.exists("/priv/lock/lifecycle_lock")?);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_renewal_before_expiration() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+ let path = "/priv/lock/renewal-test";
+ let csum = [55u8; 32];
+
+ // Create lock directory structure
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Acquire initial lock
+ db.acquire_lock(path, &csum)?;
+ assert!(db.is_locked(path), "Lock should be active");
+
+ // Simulate time passing (119 seconds - just before expiration)
+ let now_secs = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ db.test_set_lock_timestamp(path, now_secs - 119);
+
+ // Lock should still be valid (not yet expired)
+ assert!(
+ !db.lock_expired(path, &csum),
+ "Lock should not be expired at 119 seconds"
+ );
+ assert!(
+ db.is_locked(path),
+ "is_locked() should return true before expiration"
+ );
+
+ // Renew the lock by acquiring again with same checksum
+ db.acquire_lock(path, &csum)?;
+
+ // After renewal, lock should definitely not be expired
+ assert!(
+ !db.lock_expired(path, &csum),
+ "Lock should not be expired after renewal"
+ );
+ assert!(
+ db.is_locked(path),
+ "Lock should still be active after renewal"
+ );
+
+ // Now simulate expiration time (121 seconds from renewal)
+ let now_secs = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ db.test_set_lock_timestamp(path, now_secs - 121);
+
+ // Lock should now be expired
+ assert!(
+ db.lock_expired(path, &csum),
+ "Lock should be expired after 121 seconds without renewal"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_acquire_lock_after_expiration() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+ let path = "/priv/lock/reacquire-test";
+ let csum1 = [11u8; 32];
+ let csum2 = [22u8; 32];
+
+ // Create lock directory structure
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Acquire initial lock with csum1
+ db.acquire_lock(path, &csum1)?;
+ assert!(db.is_locked(path), "Lock should be active");
+
+ // Simulate lock expiration (121 seconds)
+ let now_secs = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ db.test_set_lock_timestamp(path, now_secs - 121);
+
+ // Verify lock is expired
+ assert!(db.lock_expired(path, &csum1), "Lock should be expired");
+ assert!(
+ !db.is_locked(path),
+ "is_locked() should return false for expired lock"
+ );
+
+ // A different process should be able to acquire the expired lock
+ let result = db.acquire_lock(path, &csum2);
+ assert!(
+ result.is_ok(),
+ "Should be able to acquire expired lock with different checksum"
+ );
+
+ // Lock should now be active with new checksum
+ assert!(
+ db.is_locked(path),
+ "Lock should be active with new checksum"
+ );
+ assert!(
+ !db.lock_expired(path, &csum2),
+ "New lock should not be expired"
+ );
+
+ // Old checksum should fail to check expiration (checksum mismatch)
+ assert!(
+ !db.lock_expired(path, &csum1),
+ "lock_expired() with old checksum should reset timeout and return false"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_multiple_locks_expiring() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+
+ // Create lock directory structure
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Create three locks
+ let locks = [
+ ("/priv/lock/lock1", [1u8; 32]),
+ ("/priv/lock/lock2", [2u8; 32]),
+ ("/priv/lock/lock3", [3u8; 32]),
+ ];
+
+ // Acquire all locks
+ for (path, csum) in &locks {
+ db.acquire_lock(path, csum)?;
+ assert!(db.is_locked(path), "Lock {path} should be active");
+ }
+
+ let now_secs = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+
+ // Set different expiration times
+ // lock1: 121 seconds ago (expired)
+ // lock2: 119 seconds ago (not expired)
+ // lock3: 121 seconds ago (expired)
+ db.test_set_lock_timestamp(locks[0].0, now_secs - 121);
+ db.test_set_lock_timestamp(locks[1].0, now_secs - 119);
+ db.test_set_lock_timestamp(locks[2].0, now_secs - 121);
+
+ // Check expiration states
+ assert!(
+ db.lock_expired(locks[0].0, &locks[0].1),
+ "lock1 should be expired"
+ );
+ assert!(
+ !db.lock_expired(locks[1].0, &locks[1].1),
+ "lock2 should not be expired"
+ );
+ assert!(
+ db.lock_expired(locks[2].0, &locks[2].1),
+ "lock3 should be expired"
+ );
+
+ // Check is_locked states
+ assert!(
+ !db.is_locked(locks[0].0),
+ "lock1 is_locked should return false"
+ );
+ assert!(
+ db.is_locked(locks[1].0),
+ "lock2 is_locked should return true"
+ );
+ assert!(
+ !db.is_locked(locks[2].0),
+ "lock3 is_locked should return false"
+ );
+
+ // Re-acquire expired locks with different checksums
+ let new_csum1 = [11u8; 32];
+ let new_csum3 = [33u8; 32];
+
+ assert!(
+ db.acquire_lock(locks[0].0, &new_csum1).is_ok(),
+ "Should be able to re-acquire expired lock1"
+ );
+ assert!(
+ db.acquire_lock(locks[2].0, &new_csum3).is_ok(),
+ "Should be able to re-acquire expired lock3"
+ );
+
+ // Verify all locks are now active
+ assert!(db.is_locked(locks[0].0), "lock1 should be active again");
+ assert!(db.is_locked(locks[1].0), "lock2 should still be active");
+ assert!(db.is_locked(locks[2].0), "lock3 should be active again");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_lock_expiration_boundary() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let db = MemDb::open(&db_path, true)?;
+
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as u32;
+ let path = "/priv/lock/boundary-test";
+ let csum = [77u8; 32];
+
+ // Create lock directory structure
+ db.create("/priv", libc::S_IFDIR, 0, now)?;
+ db.create("/priv/lock", libc::S_IFDIR, 0, now)?;
+
+ // Acquire lock
+ db.acquire_lock(path, &csum)?;
+
+ let now_secs = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+
+ // Test exact boundary: 120 seconds (LOCK_TIMEOUT)
+ db.test_set_lock_timestamp(path, now_secs - 120);
+ assert!(
+ !db.lock_expired(path, &csum),
+ "Lock should NOT be expired at exactly 120 seconds (boundary)"
+ );
+ assert!(
+ db.is_locked(path),
+ "Lock should still be considered active at 120 seconds"
+ );
+
+ // Test 121 seconds (just past timeout)
+ db.test_set_lock_timestamp(path, now_secs - 121);
+ assert!(
+ db.lock_expired(path, &csum),
+ "Lock SHOULD be expired at 121 seconds"
+ );
+ assert!(
+ !db.is_locked(path),
+ "Lock should not be considered active at 121 seconds"
+ );
+
+ Ok(())
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/index.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/index.rs
new file mode 100644
index 000000000..96ddc9237
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/index.rs
@@ -0,0 +1,579 @@
+/// MemDB Index structures for C-compatible state synchronization
+///
+/// This module implements the memdb_index_t format used by the C implementation
+/// for efficient state comparison during cluster synchronization.
+use anyhow::Result;
+use sha2::{Digest, Sha256};
+
+/// Size of the memdb_index_t header in bytes (version + last_inode + writer + mtime + size + bytes)
+/// Wire format: 8 + 8 + 4 + 4 + 4 + 4 = 32 bytes
+const MEMDB_INDEX_HEADER_SIZE: u32 = 32;
+
+/// Size of each memdb_index_extry_t in bytes (inode + digest)
+/// Wire format: 8 + 32 = 40 bytes
+const MEMDB_INDEX_ENTRY_SIZE: u32 = 40;
+
+/// Index entry matching C's memdb_index_extry_t
+///
+/// Wire format (40 bytes):
+/// ```c
+/// typedef struct {
+/// guint64 inode; // 8 bytes
+/// char digest[32]; // 32 bytes (SHA256)
+/// } memdb_index_extry_t;
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct IndexEntry {
+ pub inode: u64,
+ pub digest: [u8; 32],
+}
+
+impl IndexEntry {
+ pub fn serialize(&self) -> Vec<u8> {
+ let mut data = Vec::with_capacity(40);
+ data.extend_from_slice(&self.inode.to_le_bytes());
+ data.extend_from_slice(&self.digest);
+ data
+ }
+
+ pub fn deserialize(data: &[u8]) -> Result<Self> {
+ if data.len() < 40 {
+ anyhow::bail!("IndexEntry too short: {} bytes (need 40)", data.len());
+ }
+
+ let inode = u64::from_le_bytes(data[0..8].try_into().unwrap());
+ let mut digest = [0u8; 32];
+ digest.copy_from_slice(&data[8..40]);
+
+ Ok(Self { inode, digest })
+ }
+}
+
+/// MemDB index matching C's memdb_index_t
+///
+/// Wire format header (32 bytes) + entries:
+/// ```c
+/// typedef struct {
+/// guint64 version; // 8 bytes
+/// guint64 last_inode; // 8 bytes
+/// guint32 writer; // 4 bytes
+/// guint32 mtime; // 4 bytes
+/// guint32 size; // 4 bytes (number of entries)
+/// guint32 bytes; // 4 bytes (total bytes allocated)
+/// memdb_index_extry_t entries[]; // variable length
+/// } memdb_index_t;
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct MemDbIndex {
+ pub version: u64,
+ pub last_inode: u64,
+ pub writer: u32,
+ pub mtime: u32,
+ pub size: u32, // number of entries
+ pub bytes: u32, // total bytes (32 + size * 40)
+ pub entries: Vec<IndexEntry>,
+}
+
+impl MemDbIndex {
+ /// Create a new index from entries
+ ///
+ /// Entries are automatically sorted by inode for efficient comparison
+ /// and to match C implementation behavior.
+ pub fn new(
+ version: u64,
+ last_inode: u64,
+ writer: u32,
+ mtime: u32,
+ mut entries: Vec<IndexEntry>,
+ ) -> Self {
+ // Sort entries by inode (matching C implementation)
+ entries.sort_by_key(|e| e.inode);
+
+ let size = entries.len() as u32;
+ let bytes = MEMDB_INDEX_HEADER_SIZE + size * MEMDB_INDEX_ENTRY_SIZE;
+
+ Self {
+ version,
+ last_inode,
+ writer,
+ mtime,
+ size,
+ bytes,
+ entries,
+ }
+ }
+
+ /// Serialize to C-compatible wire format
+ pub fn serialize(&self) -> Vec<u8> {
+ let mut data = Vec::with_capacity(self.bytes as usize);
+
+ // Header (32 bytes)
+ data.extend_from_slice(&self.version.to_le_bytes());
+ data.extend_from_slice(&self.last_inode.to_le_bytes());
+ data.extend_from_slice(&self.writer.to_le_bytes());
+ data.extend_from_slice(&self.mtime.to_le_bytes());
+ data.extend_from_slice(&self.size.to_le_bytes());
+ data.extend_from_slice(&self.bytes.to_le_bytes());
+
+ // Entries (40 bytes each)
+ for entry in &self.entries {
+ data.extend_from_slice(&entry.serialize());
+ }
+
+ data
+ }
+
+ /// Deserialize from C-compatible wire format
+ pub fn deserialize(data: &[u8]) -> Result<Self> {
+ if data.len() < 32 {
+ anyhow::bail!(
+ "MemDbIndex too short: {} bytes (need at least 32)",
+ data.len()
+ );
+ }
+
+ // Parse header
+ let version = u64::from_le_bytes(data[0..8].try_into().unwrap());
+ let last_inode = u64::from_le_bytes(data[8..16].try_into().unwrap());
+ let writer = u32::from_le_bytes(data[16..20].try_into().unwrap());
+ let mtime = u32::from_le_bytes(data[20..24].try_into().unwrap());
+ let size = u32::from_le_bytes(data[24..28].try_into().unwrap());
+ let bytes = u32::from_le_bytes(data[28..32].try_into().unwrap());
+
+ // Validate size
+ let expected_bytes = 32 + size * 40;
+ if bytes != expected_bytes {
+ anyhow::bail!("MemDbIndex bytes mismatch: got {bytes}, expected {expected_bytes}");
+ }
+
+ if data.len() < bytes as usize {
+ anyhow::bail!(
+ "MemDbIndex data too short: {} bytes (need {})",
+ data.len(),
+ bytes
+ );
+ }
+
+ // Parse entries
+ let mut entries = Vec::with_capacity(size as usize);
+ let mut offset = 32;
+ for _ in 0..size {
+ let entry = IndexEntry::deserialize(&data[offset..offset + 40])?;
+ entries.push(entry);
+ offset += 40;
+ }
+
+ Ok(Self {
+ version,
+ last_inode,
+ writer,
+ mtime,
+ size,
+ bytes,
+ entries,
+ })
+ }
+
+ /// Compute SHA256 digest of a tree entry for the index
+ ///
+ /// Matches C's memdb_encode_index() digest computation (memdb.c:1497-1507)
+ /// CRITICAL: Order and fields must match exactly:
+ /// 1. version, 2. writer, 3. mtime, 4. size, 5. type, 6. parent, 7. name, 8. data
+ ///
+ /// NOTE: inode is NOT included in the digest (it is only the index key)
+ #[allow(clippy::too_many_arguments)]
+ pub fn compute_entry_digest(
+ parent: u64,
+ version: u64,
+ writer: u32,
+ mtime: u32,
+ size: usize,
+ entry_type: u8,
+ name: &str,
+ data: &[u8],
+ ) -> [u8; 32] {
+ let mut hasher = Sha256::new();
+
+ // Hash entry metadata in C's exact order (memdb.c:1497-1503)
+ // C uses native endian (in-memory representation), so we use to_ne_bytes()
+ hasher.update(version.to_ne_bytes());
+ hasher.update(writer.to_ne_bytes());
+ hasher.update(mtime.to_ne_bytes());
+ hasher.update((size as u32).to_ne_bytes()); // C uses u32 for te->size
+ hasher.update([entry_type]);
+ hasher.update(parent.to_ne_bytes());
+ hasher.update(name.as_bytes());
+
+ // Hash data only for regular files with non-zero size (memdb.c:1505-1507)
+ if entry_type == crate::types::DT_REG && size > 0 {
+ hasher.update(data);
+ }
+
+ hasher.finalize().into()
+ }
+}
+
+/// Implement comparison for MemDbIndex
+///
+/// Matches C's dcdb_choose_leader_with_highest_index() logic:
+/// - If same version, higher mtime wins
+/// - If different version, higher version wins
+impl PartialOrd for MemDbIndex {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for MemDbIndex {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ // First compare by version (higher version wins)
+ // Then by mtime (higher mtime wins) if versions are equal
+ self.version
+ .cmp(&other.version)
+ .then_with(|| self.mtime.cmp(&other.mtime))
+ }
+}
+
+impl MemDbIndex {
+ /// Find entries that differ from another index
+ ///
+ /// Returns the set of inodes that need to be sent as updates.
+ /// Matches C's dcdb_create_and_send_updates() comparison logic.
+ pub fn find_differences(&self, other: &MemDbIndex) -> Vec<u64> {
+ let mut differences = Vec::new();
+
+ // Walk through master index, comparing with slave
+ let mut j = 0; // slave position
+
+ for i in 0..self.entries.len() {
+ let master_entry = &self.entries[i];
+ let inode = master_entry.inode;
+
+ // Advance slave pointer to matching or higher inode
+ while j < other.entries.len() && other.entries[j].inode < inode {
+ j += 1;
+ }
+
+ // Check if entries match
+ if j < other.entries.len() {
+ let slave_entry = &other.entries[j];
+ if slave_entry.inode == inode && slave_entry.digest == master_entry.digest {
+ // Entries match - skip
+ continue;
+ }
+ }
+
+ // Entry differs or missing - needs update
+ differences.push(inode);
+ }
+
+ differences
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ //! Unit tests for index serialization and synchronization
+ //!
+ //! This test module covers:
+ //! - Index serialization/deserialization (round-trip verification)
+ //! - Leader election logic (version-based, mtime tiebreaker)
+ //! - Difference detection (finding sync deltas between indices)
+ //! - TreeEntry serialization (files, directories, empty files)
+ //! - Digest computation (determinism, sorted entries)
+ //! - Large index handling (100+ entry stress tests)
+ //!
+ //! ## Serialization Format
+ //!
+ //! - IndexEntry: 40 bytes (8-byte inode + 32-byte digest)
+ //! - MemDbIndex: Header (version) + entries
+ //! - TreeEntry: Type-specific format (regular file, directory, symlink)
+ //!
+ //! ## Leader Election
+ //!
+ //! Leader election follows these rules:
+ //! 1. Higher version wins
+ //! 2. If versions equal, higher mtime wins
+ //! 3. If both equal, indices are considered equal
+
+ use super::*;
+
+ #[test]
+ fn test_index_entry_roundtrip() {
+ let entry = IndexEntry {
+ inode: 0x123456789ABCDEF0,
+ digest: [42u8; 32],
+ };
+
+ let serialized = entry.serialize();
+ assert_eq!(serialized.len(), 40);
+
+ let deserialized = IndexEntry::deserialize(&serialized).unwrap();
+ assert_eq!(deserialized, entry);
+ }
+
+ #[test]
+ fn test_memdb_index_roundtrip() {
+ let entries = vec![
+ IndexEntry {
+ inode: 1,
+ digest: [1u8; 32],
+ },
+ IndexEntry {
+ inode: 2,
+ digest: [2u8; 32],
+ },
+ ];
+
+ let index = MemDbIndex::new(100, 1000, 1, 123456, entries);
+
+ let serialized = index.serialize();
+ assert_eq!(serialized.len(), 32 + 2 * 40);
+
+ let deserialized = MemDbIndex::deserialize(&serialized).unwrap();
+ assert_eq!(deserialized.version, 100);
+ assert_eq!(deserialized.last_inode, 1000);
+ assert_eq!(deserialized.size, 2);
+ assert_eq!(deserialized.entries.len(), 2);
+ }
+
+ #[test]
+ fn test_index_comparison() {
+ let idx1 = MemDbIndex::new(100, 0, 1, 1000, vec![]);
+ let idx2 = MemDbIndex::new(100, 0, 1, 2000, vec![]);
+ let idx3 = MemDbIndex::new(101, 0, 1, 500, vec![]);
+
+ // Same version, lower mtime
+ assert!(idx1 < idx2);
+ assert_eq!(idx1.cmp(&idx2), std::cmp::Ordering::Less);
+
+ // Same version, higher mtime
+ assert!(idx2 > idx1);
+ assert_eq!(idx2.cmp(&idx1), std::cmp::Ordering::Greater);
+
+ // Higher version wins even with lower mtime
+ assert!(idx3 > idx2);
+ assert_eq!(idx3.cmp(&idx2), std::cmp::Ordering::Greater);
+
+ // Test equality
+ let idx4 = MemDbIndex::new(100, 0, 1, 1000, vec![]);
+ assert_eq!(idx1, idx4);
+ assert_eq!(idx1.cmp(&idx4), std::cmp::Ordering::Equal);
+ }
+
+ #[test]
+ fn test_find_differences() {
+ let master_entries = vec![
+ IndexEntry {
+ inode: 1,
+ digest: [1u8; 32],
+ },
+ IndexEntry {
+ inode: 2,
+ digest: [2u8; 32],
+ },
+ IndexEntry {
+ inode: 3,
+ digest: [3u8; 32],
+ },
+ ];
+
+ let slave_entries = vec![
+ IndexEntry {
+ inode: 1,
+ digest: [1u8; 32], // same
+ },
+ IndexEntry {
+ inode: 2,
+ digest: [99u8; 32], // different digest
+ },
+ // missing inode 3
+ ];
+
+ let master = MemDbIndex::new(100, 3, 1, 1000, master_entries);
+ let slave = MemDbIndex::new(100, 2, 1, 900, slave_entries);
+
+ let diffs = master.find_differences(&slave);
+ assert_eq!(diffs, vec![2, 3]); // inode 2 changed, inode 3 missing
+ }
+
+ #[test]
+ fn test_tree_entry_update_serialization() {
+ use crate::types::TreeEntry;
+
+ // Create a TreeEntry
+ let entry = TreeEntry {
+ inode: 42,
+ parent: 1,
+ version: 100,
+ writer: 2,
+ mtime: 12345,
+ size: 11,
+ entry_type: 8, // DT_REG
+ name: "test.conf".to_string(),
+ data: b"hello world".to_vec(),
+ };
+
+ // Serialize for update
+ let serialized = entry.serialize_for_update();
+
+ // Expected size: 41-byte header + 10 bytes (name + null) + 11 bytes (data)
+ // = 62 bytes
+ assert_eq!(serialized.len(), 41 + 10 + 11);
+
+ // Deserialize
+ let deserialized = TreeEntry::deserialize_from_update(&serialized).unwrap();
+
+ // Verify all fields
+ assert_eq!(deserialized.inode, entry.inode);
+ assert_eq!(deserialized.parent, entry.parent);
+ assert_eq!(deserialized.version, entry.version);
+ assert_eq!(deserialized.writer, entry.writer);
+ assert_eq!(deserialized.mtime, entry.mtime);
+ assert_eq!(deserialized.size, entry.size);
+ assert_eq!(deserialized.entry_type, entry.entry_type);
+ assert_eq!(deserialized.name, entry.name);
+ assert_eq!(deserialized.data, entry.data);
+ }
+
+ #[test]
+ fn test_tree_entry_directory_serialization() {
+ use crate::types::TreeEntry;
+
+ // Create a directory entry (no data)
+ let entry = TreeEntry {
+ inode: 10,
+ parent: 1,
+ version: 50,
+ writer: 1,
+ mtime: 10000,
+ size: 0,
+ entry_type: 4, // DT_DIR
+ name: "configs".to_string(),
+ data: Vec::new(),
+ };
+
+ // Serialize
+ let serialized = entry.serialize_for_update();
+
+ // Expected size: 41-byte header + 8 bytes (name + null) + 0 bytes (no data)
+ assert_eq!(serialized.len(), 41 + 8);
+
+ // Deserialize
+ let deserialized = TreeEntry::deserialize_from_update(&serialized).unwrap();
+
+ assert_eq!(deserialized.inode, entry.inode);
+ assert_eq!(deserialized.name, entry.name);
+ assert_eq!(deserialized.entry_type, 4); // DT_DIR
+ assert_eq!(deserialized.data.len(), 0);
+ }
+
+ #[test]
+ fn test_tree_entry_empty_file_serialization() {
+ use crate::types::TreeEntry;
+
+ // Create an empty file
+ let entry = TreeEntry {
+ inode: 20,
+ parent: 1,
+ version: 75,
+ writer: 3,
+ mtime: 20000,
+ size: 0,
+ entry_type: 8, // DT_REG
+ name: "empty.txt".to_string(),
+ data: Vec::new(),
+ };
+
+ // Serialize
+ let serialized = entry.serialize_for_update();
+
+ // Expected size: 41-byte header + 10 bytes (name + null) + 0 bytes (no data)
+ assert_eq!(serialized.len(), 41 + 10);
+
+ // Deserialize
+ let deserialized = TreeEntry::deserialize_from_update(&serialized).unwrap();
+
+ assert_eq!(deserialized.inode, entry.inode);
+ assert_eq!(deserialized.name, entry.name);
+ assert_eq!(deserialized.size, 0);
+ assert_eq!(deserialized.data.len(), 0);
+ }
+
+ #[test]
+ fn test_index_digest_computation() {
+ let digest1 = MemDbIndex::compute_entry_digest(0, 100, 1, 1000, 0, 4, "dir1", &[]);
+
+ // Same parameters should produce same digest
+ let digest2 = MemDbIndex::compute_entry_digest(0, 100, 1, 1000, 0, 4, "dir1", &[]);
+ assert_eq!(digest1, digest2);
+
+ // Different metadata should produce different digests
+ let digest3 = MemDbIndex::compute_entry_digest(0, 100, 1, 1000, 0, 4, "dir2", &[]);
+ assert_ne!(digest1, digest3);
+
+ // Different data should produce different digest
+ let digest4 = MemDbIndex::compute_entry_digest(0, 100, 1, 1000, 5, 8, "file", b"hello");
+ let digest5 = MemDbIndex::compute_entry_digest(0, 100, 1, 1000, 5, 8, "file", b"world");
+ assert_ne!(digest4, digest5);
+ }
+
+ #[test]
+ fn test_index_sorted_entries() {
+ // Create entries in unsorted order
+ let entries = vec![
+ IndexEntry {
+ inode: 5,
+ digest: [5u8; 32],
+ },
+ IndexEntry {
+ inode: 2,
+ digest: [2u8; 32],
+ },
+ IndexEntry {
+ inode: 8,
+ digest: [8u8; 32],
+ },
+ IndexEntry {
+ inode: 1,
+ digest: [1u8; 32],
+ },
+ ];
+
+ let index = MemDbIndex::new(100, 8, 1, 1000, entries);
+
+ // Verify entries are stored sorted by inode
+ assert_eq!(index.entries[0].inode, 1);
+ assert_eq!(index.entries[1].inode, 2);
+ assert_eq!(index.entries[2].inode, 5);
+ assert_eq!(index.entries[3].inode, 8);
+ }
+
+ #[test]
+ fn test_large_index_serialization() {
+ // Test with a larger number of entries
+ let mut entries = Vec::new();
+ for i in 1..=100 {
+ entries.push(IndexEntry {
+ inode: i,
+ digest: [(i % 256) as u8; 32],
+ });
+ }
+
+ let index = MemDbIndex::new(1000, 100, 1, 50000, entries);
+
+ // Serialize and deserialize
+ let serialized = index.serialize();
+ let deserialized =
+ MemDbIndex::deserialize(&serialized).expect("Failed to deserialize large index");
+
+ // Verify
+ assert_eq!(deserialized.version, index.version);
+ assert_eq!(deserialized.size, 100);
+ assert_eq!(deserialized.entries.len(), 100);
+
+ for i in 0..100 {
+ assert_eq!(deserialized.entries[i].inode, (i + 1) as u64);
+ }
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/lib.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/lib.rs
new file mode 100644
index 000000000..667665e38
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/lib.rs
@@ -0,0 +1,26 @@
+/// In-memory database with SQLite persistence
+///
+/// This module provides a cluster-synchronized in-memory database with SQLite persistence.
+/// The implementation is organized into focused submodules:
+///
+/// - `types`: Type definitions and constants
+/// - `database`: Core MemDb struct and CRUD operations
+/// - `locks`: Resource locking functionality
+/// - `sync`: State synchronization and serialization
+/// - `index`: C-compatible memdb index structures for efficient state comparison
+/// - `traits`: Trait abstractions for dependency injection and testing
+mod database;
+mod index;
+mod locks;
+mod sync;
+mod traits;
+mod types;
+mod vmlist;
+
+// Re-export public types
+pub use database::MemDb;
+pub use index::{IndexEntry, MemDbIndex};
+pub use locks::{is_lock_path, lock_cache_key};
+pub use traits::MemDbOps;
+pub use types::{LOCK_DIR_PATH, ROOT_INODE, TreeEntry};
+pub use vmlist::{is_valid_nodename, parse_vm_config_name, parse_vm_config_path, recreate_vmlist};
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/locks.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/locks.rs
new file mode 100644
index 000000000..4c268f091
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/locks.rs
@@ -0,0 +1,315 @@
+/// Lock management for memdb
+///
+/// Locks in pmxcfs are implemented as directory entries stored in the database at
+/// `priv/lock/<lockname>`. This ensures locks are:
+/// 1. Persistent across restarts
+/// 2. Synchronized across the cluster via DFSM
+/// 3. Visible to both C and Rust nodes
+///
+/// The in-memory lock table is a cache rebuilt from the database on startup
+/// and updated dynamically during runtime.
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use pmxcfs_api_types::errno;
+
+use super::database::MemDb;
+use super::types::{LOCK_DIR_PATH, LOCK_TIMEOUT, LockInfo, MODE_DIR_DEFAULT};
+
+/// Get current time as seconds since UNIX epoch, returning 0 on clock errors.
+pub(super) fn now_secs() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs()
+}
+
+/// Check if a path is in the lock directory
+///
+/// Matches C's path_is_lockdir() function (cfs-utils.c:306)
+/// Returns true if path is "{LOCK_DIR_PATH}/<something>" (with or without leading /)
+pub fn is_lock_path(path: &str) -> bool {
+ let path = path.trim_start_matches('/');
+ const LOCK_PREFIX: &str = "priv/lock/";
+ path.starts_with(LOCK_PREFIX) && path.len() > LOCK_PREFIX.len()
+}
+
+/// Normalize a lock identifier into the cache key used by the lock map.
+///
+/// This ensures the key is a relative path starting with the `priv/lock` prefix.
+pub fn lock_cache_key(path: &str) -> String {
+ let trimmed = path.trim_start_matches('/');
+ if trimmed.starts_with(LOCK_DIR_PATH) {
+ trimmed.to_string()
+ } else {
+ format!("{LOCK_DIR_PATH}/{trimmed}")
+ }
+}
+
+/// Return the lock cache key and absolute filesystem path for a lock entry.
+///
+/// The cache key is used for the in-memory lock map, while the filesystem path
+/// is used for database operations.
+fn lock_key_and_path(path: &str) -> (String, String) {
+ let lock_key = lock_cache_key(path);
+ let lock_path = format!("/{lock_key}");
+ (lock_key, lock_path)
+}
+
+impl MemDb {
+ fn collect_lock_entries(
+ &self,
+ path: &str,
+ old_locks: &std::collections::HashMap<String, LockInfo>,
+ now: u64,
+ new_locks: &mut std::collections::HashMap<String, LockInfo>,
+ ) {
+ let entries = match self.readdir(path) {
+ Ok(entries) => entries,
+ Err(e) => {
+ tracing::warn!("Failed to read {} directory: {}", path, e);
+ return;
+ }
+ };
+
+ for entry in entries {
+ if !entry.is_dir() {
+ continue;
+ }
+
+ let child_path = format!("{}/{}", path.trim_start_matches('/'), entry.name);
+ let lock_key = lock_cache_key(&child_path);
+ let csum = entry.compute_checksum();
+
+ let ltime = match old_locks.get(&lock_key) {
+ Some(old_lock) if old_lock.csum == csum => old_lock.ltime,
+ _ => now,
+ };
+
+ new_locks.insert(lock_key.clone(), LockInfo { ltime, csum });
+ self.collect_lock_entries(&child_path, old_locks, now, new_locks);
+ }
+ }
+
+ /// Check if a lock has expired (with side effects matching C semantics)
+ ///
+ /// This function implements the same behavior as the C version (memdb.c:330-358):
+ /// - If no lock exists in cache: Reads from database, creates cache entry, returns `false`
+ /// - If lock exists but csum mismatches: Updates csum, resets timeout, logs critical error, returns `false`
+ /// - If lock exists, csum matches, and time > LOCK_TIMEOUT: Returns `true` (expired)
+ /// - Otherwise: Returns `false` (not expired)
+ ///
+ /// This function is used for both checking AND managing locks, matching C semantics.
+ ///
+ /// # Current Usage
+ /// - Called from `database::create()` when creating lock directories (matching C memdb.c:928)
+ /// - Called from FUSE utimens operation (pmxcfs/src/fuse/filesystem.rs:717) for mtime=0 unlock requests
+ /// - Called from DFSM unlock message handlers (pmxcfs/src/memdb_callbacks.rs:142,161)
+ ///
+ /// Note: DFSM broadcasting of unlock messages to cluster nodes is not yet fully implemented.
+ /// See TODOs in filesystem.rs:723 and memdb_callbacks.rs:154 for remaining work.
+ pub fn lock_expired(&self, path: &str, csum: &[u8; 32]) -> bool {
+ let lock_key = lock_cache_key(path);
+
+ let mut locks = self.inner.locks.lock();
+ let now = now_secs();
+
+ match locks.get_mut(&lock_key) {
+ Some(lock_info) => {
+ // Lock exists in cache - check csum
+ if lock_info.csum != *csum {
+ // Wrong csum - update and reset timeout
+ lock_info.ltime = now;
+ lock_info.csum = *csum;
+ tracing::error!(
+ "Lock checksum mismatch for '{}' - resetting timeout",
+ lock_key
+ );
+ return false;
+ }
+
+ // Csum matches - check if expired
+ // Use saturating_sub to handle backward clock jumps
+ let elapsed = now.saturating_sub(lock_info.ltime);
+ if elapsed > LOCK_TIMEOUT {
+ tracing::debug!(path = lock_key, elapsed, "Lock expired");
+ return true; // Expired
+ }
+
+ false // Not expired
+ }
+ None => {
+ // No lock in cache - create new cache entry
+ locks.insert(
+ lock_key.clone(),
+ LockInfo {
+ ltime: now,
+ csum: *csum,
+ },
+ );
+ tracing::debug!(path = lock_key, "Created new lock cache entry");
+ false // Not expired (just created)
+ }
+ }
+ }
+
+ /// Acquire a lock on a path
+ ///
+ /// This creates a directory entry in the database at `priv/lock/<lockname>`
+ /// and broadcasts the operation to the cluster via DFSM.
+ pub fn acquire_lock(&self, path: &str, csum: &[u8; 32]) -> std::io::Result<()> {
+ let now = now_secs();
+
+ let (lock_key, lock_path) = lock_key_and_path(path);
+
+ let locks = self.inner.locks.lock();
+
+ // Check if there's an existing valid lock in cache
+ if let Some(existing_lock) = locks.get(&lock_key) {
+ // Use saturating_sub to handle backward clock jumps
+ let lock_age = now.saturating_sub(existing_lock.ltime);
+ if lock_age <= LOCK_TIMEOUT && existing_lock.csum != *csum {
+ return Err(errno::eacces());
+ }
+ }
+
+ // Extract lock name from path like "priv/lock/foo.lock" or "priv/lock/qemu-server/103.conf"
+ let lock_name = lock_key
+ .strip_prefix(LOCK_DIR_PATH)
+ .and_then(|s| s.strip_prefix('/'))
+ .unwrap_or("");
+
+ if lock_name.is_empty() {
+ return Err(errno::einval());
+ }
+
+ // Validate lock name to prevent path traversal
+ if lock_name.contains("..") {
+ return Err(errno::einval());
+ }
+
+ // Release locks mutex before database operations to avoid deadlock
+ drop(locks);
+
+ // Ensure lock directory hierarchy exists.
+ // Ignore EEXIST — another thread may have created entries concurrently;
+ // create() holds write_guard internally so actual creation is atomic.
+ let mtime = now as u32;
+ let lock_dir_full = format!("/{LOCK_DIR_PATH}");
+ if let Err(e) = self.create(&lock_dir_full, MODE_DIR_DEFAULT, 0, mtime) {
+ if e.kind() != std::io::ErrorKind::AlreadyExists {
+ return Err(e);
+ }
+ }
+ match self.create(&lock_path, MODE_DIR_DEFAULT, 0, mtime) {
+ Ok(()) => {
+ tracing::debug!("Created lock directory in database: {}", lock_path);
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
+ Err(e) => return Err(e),
+ }
+
+ // Update in-memory cache (use normalized path without leading slash)
+ let mut locks = self.inner.locks.lock();
+ locks.insert(
+ lock_key,
+ LockInfo {
+ ltime: now,
+ csum: *csum,
+ },
+ );
+
+ tracing::debug!("Lock acquired on path: {}", lock_path);
+ Ok(())
+ }
+
+ /// Release a lock on a path
+ ///
+ /// This deletes the directory entry from the database and broadcasts
+ /// the delete operation to the cluster via DFSM.
+ pub fn release_lock(&self, path: &str, csum: &[u8; 32]) -> std::io::Result<()> {
+ let (lock_key, lock_path) = lock_key_and_path(path);
+
+ let locks = self.inner.locks.lock();
+
+ if let Some(lock_info) = locks.get(&lock_key) {
+ // Only release if checksum matches
+ if lock_info.csum != *csum {
+ return Err(errno::eacces());
+ }
+ } else {
+ return Err(errno::enoent());
+ }
+
+ // Release locks mutex before database operations
+ drop(locks);
+
+ // Delete lock directory from database.
+ // Ignore ENOENT — entry may have already been removed by cluster sync.
+ match self.delete(&lock_path, 0, now_secs() as u32) {
+ Ok(()) => tracing::debug!("Deleted lock directory from database: {}", lock_path),
+ Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {}
+ Err(e) => return Err(e),
+ }
+
+ // Remove from in-memory cache
+ let mut locks = self.inner.locks.lock();
+ locks.remove(&lock_key);
+
+ tracing::debug!("Lock released on path: {}", lock_path);
+ Ok(())
+ }
+
+ /// Update lock cache by scanning the priv/lock directory in database
+ ///
+ /// This implements the C version's behavior (memdb.c:360-89):
+ /// - Scans the `priv/lock` directory in the database
+ /// - Rebuilds the entire lock hash table from database state
+ /// - Preserves `ltime` from old entries if csum matches
+ /// - Is called on database open and after synchronization
+ ///
+ /// This ensures locks are visible across C/Rust nodes and survive restarts.
+ pub(crate) fn update_locks(&self) {
+ // Check if lock directory exists
+ let _lock_dir = match self.lookup_path(LOCK_DIR_PATH) {
+ Some(entry) if entry.is_dir() => entry,
+ _ => {
+ tracing::debug!(
+ "{} directory does not exist, initializing empty lock table",
+ LOCK_DIR_PATH
+ );
+ self.inner.locks.lock().clear();
+ return;
+ }
+ };
+
+ let now = now_secs();
+
+ // Get old locks table for preserving ltimes
+ let old_locks = {
+ let locks = self.inner.locks.lock();
+ locks.clone()
+ };
+
+ // Build new locks table from database
+ let mut new_locks = std::collections::HashMap::new();
+
+ self.collect_lock_entries(LOCK_DIR_PATH, &old_locks, now, &mut new_locks);
+
+ // Replace lock table
+ let mut locks = self.inner.locks.lock();
+ *locks = new_locks;
+
+ tracing::debug!("Updated lock table from database: {} locks", locks.len());
+ }
+
+ /// Check if a path is locked
+ pub fn is_locked(&self, path: &str) -> bool {
+ let lock_key = lock_cache_key(path);
+
+ let locks = self.inner.locks.lock();
+ locks
+ .get(&lock_key)
+ // Use saturating_sub to handle backward clock jumps
+ .is_some_and(|info| now_secs().saturating_sub(info.ltime) <= LOCK_TIMEOUT)
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/sync.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/sync.rs
new file mode 100644
index 000000000..27c1297fa
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/sync.rs
@@ -0,0 +1,720 @@
+/// State synchronization and serialization for memdb
+use anyhow::{Context, Result};
+use rusqlite::params;
+use sha2::{Digest, Sha256};
+use std::collections::HashSet;
+use std::fs;
+use std::io::Write;
+use std::os::fd::AsRawFd;
+use std::os::unix::fs::PermissionsExt;
+use std::path::Path;
+use std::process::{Command, Stdio};
+use std::sync::atomic::Ordering;
+use std::thread;
+use std::time::Duration;
+
+use super::database::MemDb;
+use super::index::{IndexEntry, MemDbIndex};
+use super::types::{ROOT_INODE, TreeEntry, VERSION_FILENAME};
+
+fn cluster_config_version(config_data: &[u8]) -> u64 {
+ const KEY: &[u8] = b"config_version";
+
+ let mut offset = 0;
+ while offset < config_data.len() {
+ let Some(pos) = config_data[offset..]
+ .windows(KEY.len())
+ .position(|window| window == KEY)
+ else {
+ break;
+ };
+
+ let mut idx = offset + pos + KEY.len();
+
+ while matches!(config_data.get(idx), Some(b' ' | b'\t' | b'\r' | b'\n')) {
+ idx += 1;
+ }
+
+ if config_data.get(idx) != Some(&b':') {
+ offset += pos + 1;
+ continue;
+ }
+
+ idx += 1;
+ while matches!(config_data.get(idx), Some(b' ' | b'\t' | b'\r' | b'\n')) {
+ idx += 1;
+ }
+
+ let start = idx;
+ while matches!(config_data.get(idx), Some(b'0'..=b'9')) {
+ idx += 1;
+ }
+
+ if idx > start {
+ return std::str::from_utf8(&config_data[start..idx])
+ .ok()
+ .and_then(|version| version.parse::<u64>().ok())
+ .unwrap_or(0);
+ }
+
+ offset += pos + 1;
+ }
+
+ 0
+}
+
+fn atomic_write_file(path: &Path, data: &[u8]) -> std::io::Result<()> {
+ let parent = path.parent().unwrap_or_else(|| Path::new("."));
+ let file_name = path
+ .file_name()
+ .map(|name| name.to_string_lossy().into_owned())
+ .unwrap_or_else(|| "corosync.conf".to_string());
+ let mut temp_file = tempfile::Builder::new()
+ .prefix(&format!("{file_name}."))
+ .tempfile_in(parent)?;
+
+ let fd = temp_file.as_file().as_raw_fd();
+ let fchown_result = unsafe { libc::fchown(fd, 0, 0) };
+ if fchown_result != 0 {
+ let error = std::io::Error::last_os_error();
+ if error.kind() != std::io::ErrorKind::PermissionDenied {
+ return Err(error);
+ }
+ }
+
+ temp_file
+ .as_file()
+ .set_permissions(fs::Permissions::from_mode(0o644))?;
+ temp_file.as_file_mut().write_all(data)?;
+ temp_file.as_file_mut().flush()?;
+ temp_file.persist(path).map_err(|error| error.error)?;
+
+ Ok(())
+}
+
+fn notify_corosync_reload() {
+ thread::sleep(Duration::from_secs(1));
+
+ tracing::debug!("run corosync-cfgtool -R");
+ match Command::new("corosync-cfgtool")
+ .arg("-R")
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .status()
+ {
+ Ok(status) if status.success() => {}
+ Ok(status) => {
+ tracing::error!(
+ "corosync-cfgtool -R failed with exit code {:?}",
+ status.code()
+ );
+ }
+ Err(error) => {
+ tracing::error!("failed to run corosync-cfgtool -R: {error}");
+ }
+ }
+ tracing::debug!("end corosync-cfgtool -R");
+}
+
+impl MemDb {
+ /// Encode database index for C-compatible state synchronization
+ ///
+ /// This creates a memdb_index_t structure matching the C implementation,
+ /// containing metadata and a sorted list of (inode, digest) pairs.
+ /// This is sent as the "state" during DFSM synchronization.
+ pub fn encode_index(&self) -> Result<MemDbIndex> {
+ // Acquire locks in consistent order: conn, then index
+ // This prevents races where version changes between read and root update
+ let conn = self.inner.conn.lock();
+ let mut index = self.inner.index.lock();
+
+ // Read global version once under both locks to ensure consistency
+ // No other operation can modify version counter while we hold both locks
+ let global_version = self.inner.version.load(Ordering::SeqCst);
+
+ let root_entry = index
+ .get_mut(&ROOT_INODE)
+ .ok_or_else(|| anyhow::anyhow!("Root entry not found in index"))?;
+
+ // If root version is stale, update both in-memory and on-disk atomically
+ if root_entry.version != global_version {
+ root_entry.version = global_version;
+
+ let tx = conn
+ .unchecked_transaction()
+ .context("Failed to begin transaction for root version update")?;
+
+ tx.execute(
+ "UPDATE tree SET version = ? WHERE inode = ?",
+ rusqlite::params![global_version as i64, ROOT_INODE as i64],
+ )
+ .context("Failed to update root version in database")?;
+
+ tx.commit()
+ .context("Failed to commit root version update")?;
+ }
+
+ drop(conn);
+
+ // Collect ALL entries including root (inode 0), sorted by inode.
+ //
+ // C's memdb->index hash table ALWAYS includes root (inode 0): bdb_backend_load_index
+ // adds root to the hash table via g_hash_table_replace(index, &root->inode, root)
+ // before loading, and never removes it. memdb_encode_index iterates ALL hash table
+ // entries including root.
+ //
+ // This is critical for bdb_backend_commit_update correctness: when C is a follower,
+ // it passes its encoded index as `slave`. If master (Rust) excludes root from
+ // entries but slave (C) includes it, the delete loop sees inode 0 in slave but not
+ // in master and deletes root from the DB. After reload, root->version = 0 and the
+ // byte-level comparison fails.
+ //
+ // C also sends Update for root when versions differ (dcdb_create_and_send_updates
+ // iterates master entries including inode 0). We must do the same.
+ //
+ // Build IndexEntry values directly while collecting to avoid a second Vec allocation.
+ let mut index_entries: Vec<IndexEntry> = index
+ .values()
+ .map(|te| {
+ let digest = MemDbIndex::compute_entry_digest(
+ te.parent,
+ te.version,
+ te.writer,
+ te.mtime,
+ te.size,
+ te.entry_type,
+ &te.name,
+ &te.data,
+ );
+ IndexEntry {
+ inode: te.inode,
+ digest,
+ }
+ })
+ .collect();
+ index_entries.sort_by_key(|e| e.inode);
+
+ // Root is guaranteed to exist: we validated it above and hold the lock.
+ // unwrap_or(0) is unreachable but matches C's g_malloc0 zero-initialization.
+ let root_entry = &index[&ROOT_INODE];
+ let last_inode = index.keys().max().copied().unwrap_or(0);
+
+ let result = MemDbIndex::new(
+ global_version,
+ last_inode,
+ root_entry.writer,
+ root_entry.mtime,
+ index_entries,
+ );
+
+ drop(index);
+
+ Ok(result)
+ }
+
+ /// Encode the entire database state into a byte array
+ /// Matches C version's memdb_encode() function
+ pub fn encode_database(&self) -> std::io::Result<Vec<u8>> {
+ let index = self.inner.index.lock();
+
+ // Collect all entries sorted by inode for consistent ordering
+ // This matches the C implementation's memdb_tree_compare function
+ let mut entries: Vec<&TreeEntry> = index.values().collect();
+ entries.sort_by_key(|e| e.inode);
+
+ // Serialize using bincode (compatible with C struct layout)
+ let encoded = bincode::serialize(&entries)
+ .map_err(|e| std::io::Error::other(format!("Failed to encode database: {e}")))?;
+
+ tracing::debug!(
+ "Encoded database: {} entries, {} bytes",
+ entries.len(),
+ encoded.len()
+ );
+
+ Ok(encoded)
+ }
+
+ /// Compute checksum of the entire database state
+ /// Used for DFSM state verification.
+ ///
+ /// This matches C's `memdb_compute_checksum()` implementation and hashes the
+ /// in-memory tree entries directly in inode order.
+ ///
+ /// NOTE: The per-field hashing loop below intentionally duplicates the field
+ /// order of `TreeEntry::compute_checksum()`. Do NOT replace it with calls to
+ /// `entry.compute_checksum()`: that method returns a per-entry `[u8; 32]`
+ /// digest, and feeding those digests into the outer hasher would produce a
+ /// completely different value from C's `memdb_compute_checksum()`. In a mixed
+ /// Rust+C cluster the leader sends a verify request and all nodes must return
+ /// the same checksum, so the computation here must stay byte-for-byte
+ /// compatible with the C implementation.
+ pub fn compute_database_checksum(&self) -> std::io::Result<[u8; 32]> {
+ let mut hasher = Sha256::new();
+
+ let index = self.inner.index.lock();
+ let mut entries: Vec<&TreeEntry> = index.values().collect();
+ entries.sort_by_key(|entry| entry.inode);
+
+ for entry in entries {
+ hasher.update(entry.inode.to_ne_bytes());
+ hasher.update(entry.version.to_ne_bytes());
+ hasher.update(entry.writer.to_ne_bytes());
+ hasher.update(entry.mtime.to_ne_bytes());
+ hasher.update((entry.size as u32).to_ne_bytes());
+ hasher.update([entry.entry_type]);
+ hasher.update(entry.parent.to_ne_bytes());
+ hasher.update(entry.name.as_bytes());
+
+ if entry.is_file() && entry.size > 0 {
+ hasher.update(&entry.data);
+ }
+ }
+
+ Ok(hasher.finalize().into())
+ }
+
+ /// Decode database state from a byte array
+ /// Used during DFSM state synchronization
+ pub fn decode_database(data: &[u8]) -> Result<Vec<TreeEntry>> {
+ let entries: Vec<TreeEntry> = bincode::deserialize(data)
+ .map_err(|e| anyhow::anyhow!("Failed to decode database: {e}"))?;
+
+ tracing::debug!("Decoded database: {} entries", entries.len());
+
+ Ok(entries)
+ }
+
+ /// Synchronize corosync configuration from MemDb to filesystem
+ ///
+ /// Reads corosync.conf from memdb and writes to system file if changed.
+ /// This syncs the cluster configuration from the distributed database
+ /// to the local filesystem.
+ ///
+ /// # Arguments
+ /// * `system_path` - Path to write the corosync.conf file (default: /etc/corosync/corosync.conf)
+ /// * `notify_corosync` - Notify the local corosync daemon after replacing the file
+ pub fn sync_corosync_conf(
+ &self,
+ system_path: Option<&str>,
+ notify_corosync: bool,
+ ) -> Result<()> {
+ let system_path = Path::new(system_path.unwrap_or("/etc/corosync/corosync.conf"));
+ tracing::info!(
+ "Syncing corosync configuration to {} (notify_corosync={})",
+ system_path.display(),
+ notify_corosync
+ );
+
+ // Path in memdb for corosync.conf
+ let memdb_path = "/corosync.conf";
+
+ // Try to read from memdb
+ let memdb_data = match self.lookup_path(memdb_path) {
+ Some(entry) if entry.is_file() => entry.data,
+ Some(_) => {
+ tracing::error!("{memdb_path} exists but is not a file");
+ return Ok(());
+ }
+ None => {
+ tracing::debug!("{} not found in memdb, nothing to sync", memdb_path);
+ return Ok(());
+ }
+ };
+
+ if memdb_data.is_empty() {
+ tracing::debug!("corosync.conf in memdb is empty, skipping host sync");
+ return Ok(());
+ }
+
+ let new_version = cluster_config_version(&memdb_data);
+ if new_version == 0 {
+ tracing::error!("unable to parse cluster config_version");
+ return Ok(());
+ }
+
+ let mut old_version = 0;
+ let system_data = match fs::read(system_path) {
+ Ok(data) => {
+ if !data.is_empty() {
+ old_version = cluster_config_version(&data);
+ }
+ Some(data)
+ }
+ Err(error) if error.kind() == std::io::ErrorKind::NotFound => None,
+ Err(error) => {
+ tracing::error!(
+ "unable to read cluster config file '{}': {}",
+ system_path.display(),
+ error
+ );
+ None
+ }
+ };
+
+ if system_data.as_deref() == Some(memdb_data.as_slice()) {
+ tracing::debug!("Corosync configuration unchanged, skipping write");
+ return Ok(());
+ }
+
+ if new_version < old_version {
+ tracing::error!("local corosync.conf is newer");
+ return Ok(());
+ }
+
+ if let Err(error) = atomic_write_file(system_path, &memdb_data) {
+ if error.kind() == std::io::ErrorKind::PermissionDenied {
+ tracing::warn!(
+ "Permission denied writing {}: {}. Run as root to enable corosync sync.",
+ system_path.display(),
+ error
+ );
+ } else {
+ tracing::error!("Failed to write {}: {}", system_path.display(), error);
+ }
+ return Ok(());
+ }
+
+ tracing::info!(
+ "wrote new corosync config '{}' (version = {})",
+ system_path.display(),
+ new_version
+ );
+
+ if notify_corosync && old_version != 0 {
+ notify_corosync_reload();
+ }
+
+ Ok(())
+ }
+
+ /// Converge the database to match the given index by deleting stale inodes
+ ///
+ /// Returns the list of inodes that were deleted.
+ pub fn converge_to_index(
+ &self,
+ target: &super::index::MemDbIndex,
+ ) -> std::io::Result<Vec<u64>> {
+ let target_inodes: HashSet<u64> = target.entries.iter().map(|entry| entry.inode).collect();
+ let stale_inodes: Vec<u64> = {
+ let index = self.inner.index.lock();
+ index
+ .keys()
+ .copied()
+ // The root guard (*inode != root_inode) is redundant: since master now
+ // includes root in entries[], root is always in target_inodes and won't
+ // be deleted by !target_inodes.contains(inode). Kept as defense-in-depth.
+ .filter(|inode| *inode != ROOT_INODE && !target_inodes.contains(inode))
+ .collect()
+ };
+
+ if stale_inodes.is_empty() {
+ self.update_locks();
+ return Ok(stale_inodes);
+ }
+
+ let conn = self.inner.conn.lock();
+ let tx = conn
+ .unchecked_transaction()
+ .map_err(std::io::Error::other)?;
+ for inode in &stale_inodes {
+ tx.execute("DELETE FROM tree WHERE inode = ?1", params![inode])
+ .map_err(std::io::Error::other)?;
+ }
+ tx.commit().map_err(std::io::Error::other)?;
+ drop(conn);
+
+ self.reload_runtime_state_from_db()
+ .map_err(std::io::Error::other)?;
+
+ Ok(stale_inodes)
+ }
+
+ pub fn get_all_entries(&self) -> std::io::Result<Vec<TreeEntry>> {
+ let index = self.inner.index.lock();
+ let entries: Vec<TreeEntry> = index.values().cloned().collect();
+ Ok(entries)
+ }
+
+ /// Replace all entries (for full state synchronization)
+ pub fn replace_all_entries(&self, entries: Vec<TreeEntry>) -> std::io::Result<()> {
+ tracing::info!(
+ "Replacing all database entries with {} new entries",
+ entries.len()
+ );
+
+ let conn = self.inner.conn.lock();
+ let tx = conn
+ .unchecked_transaction()
+ .map_err(std::io::Error::other)?;
+
+ tx.execute("DELETE FROM tree", [])
+ .map_err(std::io::Error::other)?;
+
+ let mut max_version = 0u64;
+ for entry in &entries {
+ max_version = max_version.max(entry.version);
+ tx.execute(
+ "INSERT INTO tree (inode, parent, version, writer, mtime, type, name, data) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
+ params![
+ entry.inode,
+ entry.parent,
+ entry.version,
+ entry.writer,
+ entry.mtime,
+ entry.entry_type,
+ entry.name,
+ if entry.is_dir() { None::<&[u8]> } else { Some(entry.data.as_slice()) }
+ ],
+ )
+ .map_err(std::io::Error::other)?;
+ }
+
+ tx.commit().map_err(std::io::Error::other)?;
+ drop(conn);
+
+ let mut index = self.inner.index.lock();
+ let mut tree = self.inner.tree.lock();
+
+ index.clear();
+ tree.clear();
+
+ for entry in entries {
+ tree.entry(entry.parent)
+ .or_default()
+ .insert(entry.name.clone(), entry.inode);
+
+ if entry.is_dir() {
+ tree.entry(entry.inode).or_default();
+ }
+
+ index.insert(entry.inode, entry);
+ }
+
+ self.inner.version.store(max_version, Ordering::SeqCst);
+ self.update_locks();
+
+ tracing::info!(
+ "Database state replaced successfully, version now: {}",
+ max_version
+ );
+ Ok(())
+ }
+
+ /// Apply a single TreeEntry during incremental synchronization
+ ///
+ /// This is used when receiving Update messages from the leader.
+ /// It directly inserts or updates the entry in the database without
+ /// going through the path-based API.
+ pub fn apply_tree_entry(&self, entry: TreeEntry) -> std::io::Result<()> {
+ tracing::debug!(
+ "Applying TreeEntry: inode={}, parent={}, name='{}', version={}",
+ entry.inode,
+ entry.parent,
+ entry.name,
+ entry.version
+ );
+
+ // Acquire locks in consistent order: conn, then index, then tree
+ // This prevents DB-memory divergence by updating both atomically
+ let conn = self.inner.conn.lock();
+ let mut index = self.inner.index.lock();
+ let mut tree = self.inner.tree.lock();
+
+ // Begin transaction for atomicity
+ let tx = conn
+ .unchecked_transaction()
+ .map_err(std::io::Error::other)?;
+
+ // Handle root inode specially (inode 0 is __version__)
+ let db_name = if entry.inode == ROOT_INODE {
+ VERSION_FILENAME
+ } else {
+ entry.name.as_str()
+ };
+
+ // Insert or replace the entry in database
+ tx.execute(
+ "INSERT OR REPLACE INTO tree (inode, parent, version, writer, mtime, type, name, data) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
+ params![
+ entry.inode,
+ entry.parent,
+ entry.version,
+ entry.writer,
+ entry.mtime,
+ entry.entry_type,
+ db_name,
+ if entry.is_dir() { None::<&[u8]> } else { Some(entry.data.as_slice()) }
+ ],
+ )
+ .map_err(std::io::Error::other)?;
+
+ // Update __version__ entry with the same metadata (matching C in database.c:275-278)
+ // Only do this if we're not already writing __version__ itself
+ if entry.inode != ROOT_INODE {
+ Self::update_version_entry(&tx, entry.version, entry.writer, entry.mtime)?;
+ }
+
+ // Update in-memory structures BEFORE committing transaction
+ // This ensures DB and memory are atomically updated together
+
+ // Check if this entry already exists
+ let old_entry = index.get(&entry.inode).cloned();
+
+ // If entry exists with different parent or name, update tree structure
+ if let Some(old) = old_entry {
+ if old.parent != entry.parent || old.name != entry.name {
+ // Remove from old parent's children
+ if let Some(old_parent_children) = tree.get_mut(&old.parent) {
+ old_parent_children.remove(&old.name);
+ }
+
+ // Add to new parent's children
+ tree.entry(entry.parent)
+ .or_default()
+ .insert(entry.name.clone(), entry.inode);
+ }
+ } else {
+ // New entry - add to parent's children
+ tree.entry(entry.parent)
+ .or_default()
+ .insert(entry.name.clone(), entry.inode);
+ }
+
+ // If this is a directory, ensure it has an entry in the tree map
+ if entry.is_dir() {
+ tree.entry(entry.inode).or_default();
+ }
+
+ // Extract Copy fields before consuming entry via index.insert
+ let entry_inode = entry.inode;
+ let entry_version = entry.version;
+ let entry_writer = entry.writer;
+ let entry_mtime = entry.mtime;
+
+ // Update index — consumes entry (no clone needed)
+ index.insert(entry_inode, entry);
+
+ // Update root entry's metadata to match __version__ (if we wrote a non-root entry)
+ if entry_inode != ROOT_INODE {
+ Self::update_root_metadata(
+ &mut index,
+ ROOT_INODE,
+ entry_version,
+ entry_writer,
+ entry_mtime,
+ );
+ tracing::debug!(
+ version = entry_version,
+ writer = entry_writer,
+ mtime = entry_mtime,
+ "Updated root entry metadata"
+ );
+ }
+
+ // Update version counter if this entry has a higher version
+ self.inner
+ .version
+ .fetch_max(entry_version, Ordering::SeqCst);
+
+ // Commit transaction after memory is updated
+ // Both DB and memory are now consistent
+ tx.commit().map_err(std::io::Error::other)?;
+
+ tracing::debug!("TreeEntry applied successfully");
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use tempfile::TempDir;
+
+ fn create_test_db() -> Result<(MemDb, TempDir)> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("test.db");
+ let memdb = MemDb::open(&db_path, true)?;
+ Ok((memdb, temp_dir))
+ }
+
+ fn write_corosync_conf(memdb: &MemDb, data: &[u8], mtime: u32) -> Result<()> {
+ if !memdb.exists("/corosync.conf")? {
+ memdb.create("/corosync.conf", 0, 0, mtime)?;
+ }
+ memdb.write("/corosync.conf", 0, 0, mtime, data, true)?;
+ Ok(())
+ }
+
+ #[test]
+ fn test_sync_corosync_conf_skips_empty_memdb_file() -> Result<()> {
+ let (memdb, temp_dir) = create_test_db()?;
+ let system_path = temp_dir.path().join("corosync.conf");
+ fs::write(&system_path, b"totem {\n config_version: 2\n}\n")?;
+
+ memdb.create("/corosync.conf", 0, 0, 100)?;
+ memdb.sync_corosync_conf(system_path.to_str(), false)?;
+
+ assert_eq!(
+ fs::read(&system_path)?,
+ b"totem {\n config_version: 2\n}\n"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_sync_corosync_conf_rejects_missing_config_version() -> Result<()> {
+ let (memdb, temp_dir) = create_test_db()?;
+ let system_path = temp_dir.path().join("corosync.conf");
+ fs::write(&system_path, b"totem {\n config_version: 3\n}\n")?;
+
+ write_corosync_conf(&memdb, b"totem {\n version: 2\n}\n", 101)?;
+ memdb.sync_corosync_conf(system_path.to_str(), false)?;
+
+ assert_eq!(
+ fs::read(&system_path)?,
+ b"totem {\n config_version: 3\n}\n"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_sync_corosync_conf_keeps_newer_host_file() -> Result<()> {
+ let (memdb, temp_dir) = create_test_db()?;
+ let system_path = temp_dir.path().join("corosync.conf");
+ fs::write(&system_path, b"totem {\n config_version: 5\n}\n")?;
+
+ write_corosync_conf(&memdb, b"totem {\n config_version: 4\n}\n", 102)?;
+ memdb.sync_corosync_conf(system_path.to_str(), false)?;
+
+ assert_eq!(
+ fs::read(&system_path)?,
+ b"totem {\n config_version: 5\n}\n"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_sync_corosync_conf_replaces_host_file_without_temp_leaks() -> Result<()> {
+ let (memdb, temp_dir) = create_test_db()?;
+ let system_path = temp_dir.path().join("corosync.conf");
+ fs::write(&system_path, b"totem {\n config_version: 1\n}\n")?;
+
+ write_corosync_conf(&memdb, b"totem {\n config_version: 2\n}\n", 103)?;
+ memdb.sync_corosync_conf(system_path.to_str(), false)?;
+
+ assert_eq!(
+ fs::read(&system_path)?,
+ b"totem {\n config_version: 2\n}\n"
+ );
+
+ let leaked_temp_files = fs::read_dir(temp_dir.path())?
+ .filter_map(Result::ok)
+ .map(|entry| entry.file_name().to_string_lossy().into_owned())
+ .filter(|name| name.starts_with("corosync.conf.") && name != "corosync.conf")
+ .count();
+ assert_eq!(leaked_temp_files, 0);
+
+ Ok(())
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/traits.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/traits.rs
new file mode 100644
index 000000000..528e052d9
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/traits.rs
@@ -0,0 +1,102 @@
+//! Traits for MemDb operations
+//!
+//! This module provides the `MemDbOps` trait which abstracts MemDb operations
+//! for dependency injection and testing. Similar to `StatusOps` in pmxcfs-status.
+
+use crate::types::TreeEntry;
+use std::io::Result;
+
+/// Trait abstracting MemDb operations for dependency injection and mocking
+///
+/// This trait enables:
+/// - Dependency injection of MemDb into components
+/// - Testing with MockMemDb instead of real database
+/// - Trait objects for runtime polymorphism
+///
+/// # Example
+/// ```no_run
+/// use pmxcfs_memdb::{MemDb, MemDbOps};
+/// use std::sync::Arc;
+///
+/// fn use_database(db: Arc<dyn MemDbOps>) {
+/// // Can work with real MemDb or MockMemDb
+/// let exists = db.exists("/test").unwrap();
+/// }
+/// ```
+pub trait MemDbOps: Send + Sync {
+ // ===== Basic File Operations =====
+
+ /// Create a new file or directory
+ fn create(&self, path: &str, mode: u32, writer: u32, mtime: u32) -> Result<()>;
+
+ /// Read data from a file
+ fn read(&self, path: &str, offset: u64, size: usize) -> Result<Vec<u8>>;
+
+ /// Write data to a file
+ fn write(
+ &self,
+ path: &str,
+ offset: u64,
+ writer: u32,
+ mtime: u32,
+ data: &[u8],
+ truncate: bool,
+ ) -> Result<usize>;
+
+ /// Delete a file or directory
+ fn delete(&self, path: &str, writer: u32, mtime: u32) -> Result<()>;
+
+ /// Rename a file or directory
+ fn rename(&self, old_path: &str, new_path: &str, writer: u32, mtime: u32) -> Result<()>;
+
+ /// Check if a path exists
+ fn exists(&self, path: &str) -> Result<bool>;
+
+ /// List directory contents
+ fn readdir(&self, path: &str) -> Result<Vec<TreeEntry>>;
+
+ /// Set modification time
+ fn set_mtime(&self, path: &str, writer: u32, mtime: u32) -> Result<()>;
+
+ // ===== Path Lookup =====
+
+ /// Look up a path and return its entry
+ fn lookup_path(&self, path: &str) -> Option<TreeEntry>;
+
+ /// Get entry by inode number
+ fn get_entry_by_inode(&self, inode: u64) -> Option<TreeEntry>;
+
+ // ===== Lock Operations =====
+
+ /// Acquire a lock on a path
+ fn acquire_lock(&self, path: &str, csum: &[u8; 32]) -> Result<()>;
+
+ /// Release a lock on a path
+ fn release_lock(&self, path: &str, csum: &[u8; 32]) -> Result<()>;
+
+ /// Check if a path is locked
+ fn is_locked(&self, path: &str) -> bool;
+
+ /// Check if a lock has expired
+ fn lock_expired(&self, path: &str, csum: &[u8; 32]) -> bool;
+
+ // ===== Database Operations =====
+
+ /// Get the current database version
+ fn get_version(&self) -> u64;
+
+ /// Get all entries in the database
+ fn get_all_entries(&self) -> Result<Vec<TreeEntry>>;
+
+ /// Replace all entries (for synchronization)
+ fn replace_all_entries(&self, entries: Vec<TreeEntry>) -> Result<()>;
+
+ /// Apply a single tree entry update
+ fn apply_tree_entry(&self, entry: TreeEntry) -> Result<()>;
+
+ /// Encode the entire database for network transmission
+ fn encode_database(&self) -> Result<Vec<u8>>;
+
+ /// Compute database checksum
+ fn compute_database_checksum(&self) -> Result<[u8; 32]>;
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/types.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/types.rs
new file mode 100644
index 000000000..68c53dd5f
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/types.rs
@@ -0,0 +1,344 @@
+/// Type definitions for memdb module
+use sha2::{Digest, Sha256};
+use std::collections::HashMap;
+
+pub(super) const MEMDB_MAX_FILE_SIZE: usize = 1024 * 1024; // 1 MiB (matches C version)
+pub(super) const MEMDB_MAX_FSSIZE: usize = 128 * 1024 * 1024; // 128 MiB (matches C version)
+pub(super) const MEMDB_MAX_INODES: usize = 256 * 1024; // 256k inodes (matches C version)
+pub(super) const LOCK_TIMEOUT: u64 = 120; // Lock timeout in seconds
+pub(super) const DT_DIR: u8 = 4; // Directory type
+pub(super) const DT_REG: u8 = 8; // Regular file type
+
+/// Default file mode for directories (rwxr-xr-x)
+pub(super) const MODE_DIR_DEFAULT: u32 = libc::S_IFDIR | 0o755;
+
+/// Root inode number (matches C implementation's memdb root inode)
+/// IMPORTANT: This is the MEMDB root inode, which is 0 in both C and Rust.
+/// The FUSE layer exposes this as inode 1 to the filesystem (FUSE_ROOT_ID).
+/// See pmxcfs/src/fuse.rs for the inode mapping logic between memdb and FUSE.
+pub const ROOT_INODE: u64 = 0;
+
+/// Version file name (matches C VERSIONFILENAME)
+/// Used to store root metadata as inode ROOT_INODE in the database
+pub const VERSION_FILENAME: &str = "__version__";
+
+/// Lock directory path (where cluster resource locks are stored)
+/// Locks are implemented as directory entries stored at `priv/lock/<lockname>`
+pub const LOCK_DIR_PATH: &str = "priv/lock";
+
+/// Top-level directory that contains one subdirectory per cluster node
+pub(super) const NODES_DIR: &str = "nodes";
+
+/// Lock information for resource locking
+///
+/// In the C version (memdb.h:71-74), the lock info struct includes a `path` field
+/// that serves as the hash table key. In Rust, we use `HashMap<String, LockInfo>`
+/// where the path is stored as the HashMap key, so we don't duplicate it here.
+#[derive(Clone, Debug)]
+pub(crate) struct LockInfo {
+ /// Lock timestamp (seconds since UNIX epoch)
+ pub(crate) ltime: u64,
+
+ /// Checksum of the locked resource (used to detect changes)
+ pub(crate) csum: [u8; 32],
+}
+
+/// Tree entry representing a file or directory
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
+pub struct TreeEntry {
+ pub inode: u64,
+ pub parent: u64,
+ pub version: u64,
+ pub writer: u32,
+ pub mtime: u32,
+ pub size: usize,
+ pub entry_type: u8, // DT_DIR or DT_REG
+ pub name: String,
+ pub data: Vec<u8>, // File data (empty for directories)
+}
+
+impl TreeEntry {
+ pub fn is_dir(&self) -> bool {
+ self.entry_type == DT_DIR
+ }
+
+ pub fn is_file(&self) -> bool {
+ self.entry_type == DT_REG
+ }
+
+ /// Serialize TreeEntry to C-compatible wire format for Update messages
+ ///
+ /// Wire format (matches dcdb_send_update_inode):
+ /// ```c
+ /// [parent: u64][inode: u64][version: u64][writer: u32][mtime: u32]
+ /// [size: u32][namelen: u32][type: u8][name: namelen bytes][data: size bytes]
+ /// ```
+ pub fn serialize_for_update(&self) -> Vec<u8> {
+ let namelen = (self.name.len() + 1) as u32; // Include null terminator
+ let header_size = 8 + 8 + 8 + 4 + 4 + 4 + 4 + 1; // 41 bytes
+ let total_size = header_size + namelen as usize + self.data.len();
+
+ let mut buf = Vec::with_capacity(total_size);
+
+ // Header fields
+ buf.extend_from_slice(&self.parent.to_le_bytes());
+ buf.extend_from_slice(&self.inode.to_le_bytes());
+ buf.extend_from_slice(&self.version.to_le_bytes());
+ buf.extend_from_slice(&self.writer.to_le_bytes());
+ buf.extend_from_slice(&self.mtime.to_le_bytes());
+ buf.extend_from_slice(&(self.size as u32).to_le_bytes());
+ buf.extend_from_slice(&namelen.to_le_bytes());
+ buf.push(self.entry_type);
+
+ // Name (null-terminated)
+ buf.extend_from_slice(self.name.as_bytes());
+ buf.push(0); // null terminator
+
+ // Data (only for files)
+ if self.entry_type == DT_REG && !self.data.is_empty() {
+ buf.extend_from_slice(&self.data);
+ }
+
+ buf
+ }
+
+ /// Deserialize TreeEntry from C-compatible wire format
+ ///
+ /// Matches dcdb_parse_update_inode
+ pub fn deserialize_from_update(data: &[u8]) -> anyhow::Result<Self> {
+ if data.len() < 41 {
+ anyhow::bail!(
+ "Update message too short: {} bytes (need at least 41)",
+ data.len()
+ );
+ }
+
+ let mut offset = 0;
+
+ // Parse header
+ let parent = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
+ offset += 8;
+ let inode = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
+ offset += 8;
+ let version = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
+ offset += 8;
+ let writer = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
+ offset += 4;
+ let mtime = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
+ offset += 4;
+ let size = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize;
+ offset += 4;
+ let namelen = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize;
+ offset += 4;
+ let entry_type = data[offset];
+ offset += 1;
+
+ // Validate type
+ if entry_type != DT_REG && entry_type != DT_DIR {
+ anyhow::bail!("Invalid entry type: {entry_type}");
+ }
+
+ // Validate lengths
+ if data.len() < offset + namelen + size {
+ anyhow::bail!(
+ "Update message too short: {} bytes (need {})",
+ data.len(),
+ offset + namelen + size
+ );
+ }
+
+ // Parse name (null-terminated)
+ let name_bytes = &data[offset..offset + namelen];
+ if name_bytes.is_empty() || name_bytes[namelen - 1] != 0 {
+ anyhow::bail!("Name not null-terminated");
+ }
+ let name = std::str::from_utf8(&name_bytes[..namelen - 1])
+ .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in name: {e}"))?
+ .to_string();
+ offset += namelen;
+
+ // Parse data
+ let data_vec = if entry_type == DT_REG && size > 0 {
+ data[offset..offset + size].to_vec()
+ } else {
+ Vec::new()
+ };
+
+ Ok(TreeEntry {
+ inode,
+ parent,
+ version,
+ writer,
+ mtime,
+ size,
+ entry_type,
+ name,
+ data: data_vec,
+ })
+ }
+
+ /// Compute SHA-256 checksum of this tree entry
+ ///
+ /// This checksum is used by the lock system to detect changes to lock directory entries.
+ /// Matches C version's memdb_tree_entry_csum() function (memdb.c:1389).
+ ///
+ /// The checksum includes all entry metadata (inode, version, writer, mtime, size,
+ /// entry_type, parent, name) and data (for files). This ensures any modification to a lock
+ /// directory entry is detected, triggering lock timeout reset.
+ ///
+ /// CRITICAL: Field order and byte representation must match C exactly:
+ /// 1. inode (u64, native endian)
+ /// 2. version (u64, native endian)
+ /// 3. writer (u32, native endian)
+ /// 4. mtime (u32, native endian)
+ /// 5. size (u32, native endian - C uses guint32)
+ /// 6. entry_type (u8)
+ /// 7. parent (u64, native endian)
+ /// 8. name (bytes)
+ /// 9. data (if present)
+ pub fn compute_checksum(&self) -> [u8; 32] {
+ let mut hasher = Sha256::new();
+
+ // Hash entry metadata in C's exact order (memdb.c:1389-1397)
+ hasher.update(self.inode.to_ne_bytes()); // 1. inode
+ hasher.update(self.version.to_ne_bytes()); // 2. version
+ hasher.update(self.writer.to_ne_bytes()); // 3. writer
+ hasher.update(self.mtime.to_ne_bytes()); // 4. mtime
+ hasher.update((self.size as u32).to_ne_bytes()); // 5. size (C uses guint32)
+ hasher.update([self.entry_type]); // 6. type
+ hasher.update(self.parent.to_ne_bytes()); // 7. parent
+ hasher.update(self.name.as_bytes()); // 8. name
+
+ // Hash data if present (memdb.c:1399-1400)
+ if !self.data.is_empty() {
+ hasher.update(&self.data);
+ }
+
+ hasher.finalize().into()
+ }
+}
+
+/// Return type for load_from_db: (index, tree, root_inode, max_version)
+pub(super) type LoadDbResult = (
+ HashMap<u64, TreeEntry>,
+ HashMap<u64, HashMap<String, u64>>,
+ u64,
+ u64,
+);
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ // ===== TreeEntry Serialization Tests =====
+
+ #[test]
+ fn test_tree_entry_serialize_file_with_data() {
+ let data = b"test file content".to_vec();
+ let entry = TreeEntry {
+ inode: 42,
+ parent: 0,
+ version: 1,
+ writer: 100,
+ name: "testfile.txt".to_string(),
+ mtime: 1234567890,
+ size: data.len(),
+ entry_type: DT_REG,
+ data: data.clone(),
+ };
+
+ let serialized = entry.serialize_for_update();
+
+ // Should have: 41 bytes header + name + null + data
+ let expected_size = 41 + entry.name.len() + 1 + data.len();
+ assert_eq!(serialized.len(), expected_size);
+
+ // Verify roundtrip
+ let deserialized = TreeEntry::deserialize_from_update(&serialized).unwrap();
+ assert_eq!(deserialized.inode, entry.inode);
+ assert_eq!(deserialized.name, entry.name);
+ assert_eq!(deserialized.size, entry.size);
+ assert_eq!(deserialized.data, entry.data);
+ }
+
+ #[test]
+ fn test_tree_entry_serialize_directory() {
+ let entry = TreeEntry {
+ inode: 10,
+ parent: 0,
+ version: 1,
+ writer: 50,
+ name: "mydir".to_string(),
+ mtime: 1234567890,
+ size: 0,
+ entry_type: DT_DIR,
+ data: Vec::new(),
+ };
+
+ let serialized = entry.serialize_for_update();
+
+ // Should have: 41 bytes header + name + null (no data for directories)
+ let expected_size = 41 + entry.name.len() + 1;
+ assert_eq!(serialized.len(), expected_size);
+
+ // Verify roundtrip
+ let deserialized = TreeEntry::deserialize_from_update(&serialized).unwrap();
+ assert_eq!(deserialized.inode, entry.inode);
+ assert_eq!(deserialized.name, entry.name);
+ assert_eq!(deserialized.entry_type, DT_DIR);
+ assert!(
+ deserialized.data.is_empty(),
+ "Directories should have no data"
+ );
+ }
+
+ #[test]
+ fn test_tree_entry_deserialize_truncated_header() {
+ // Only 40 bytes instead of required 41
+ let data = vec![0u8; 40];
+
+ let result = TreeEntry::deserialize_from_update(&data);
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("too short"));
+ }
+
+ #[test]
+ fn test_tree_entry_deserialize_invalid_type() {
+ let mut data = vec![0u8; 100];
+ // Set entry type to invalid value (not DT_REG or DT_DIR)
+ data[40] = 99; // Invalid type
+
+ let result = TreeEntry::deserialize_from_update(&data);
+ assert!(result.is_err());
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("Invalid entry type")
+ );
+ }
+
+ #[test]
+ fn test_tree_entry_deserialize_missing_name_terminator() {
+ let mut data = vec![0u8; 100];
+
+ // Set valid header fields
+ data[40] = DT_REG; // entry_type at offset 40
+
+ // Set namelen = 5 (at offset 32-35)
+ data[32..36].copy_from_slice(&5u32.to_le_bytes());
+
+ // Put name bytes WITHOUT null terminator
+ data[41..46].copy_from_slice(b"test!");
+ // Note: data[45] should be 0 for null terminator but we set it to '!'
+
+ let result = TreeEntry::deserialize_from_update(&data);
+ assert!(result.is_err());
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("not null-terminated")
+ );
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/src/vmlist.rs b/src/pmxcfs-rs/pmxcfs-memdb/src/vmlist.rs
new file mode 100644
index 000000000..d0039cc2f
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/src/vmlist.rs
@@ -0,0 +1,239 @@
+/// VM list recreation from memdb structure
+///
+/// This module implements memdb_recreate_vmlist() from the C version (memdb.c:415),
+/// which scans the nodes/*/qemu-server/ and nodes/*/lxc/ directories to build
+/// a complete VM/CT registry.
+use super::database::MemDb;
+use super::types::NODES_DIR;
+use anyhow::Result;
+use pmxcfs_api_types::{VmEntry, VmType};
+use std::collections::HashMap;
+
+/// Recreate VM list by scanning memdb structure
+///
+/// Equivalent to C's `memdb_recreate_vmlist()` (memdb.c:415)
+///
+/// Scans the memdb tree structure:
+/// - `nodes/*/qemu-server/*.conf` - QEMU VMs
+/// - `nodes/*/lxc/*.conf` - LXC containers
+///
+/// Returns a HashMap of vmid -> VmEntry with node ownership information.
+///
+/// # Errors
+///
+/// Returns an error if duplicate VMIDs are found across different nodes.
+pub fn recreate_vmlist(memdb: &MemDb) -> Result<HashMap<u32, VmEntry>> {
+ let mut vmlist = HashMap::new();
+ let mut duplicates = Vec::new();
+
+ // Check if nodes directory exists
+ let Ok(nodes_entries) = memdb.readdir(NODES_DIR) else {
+ // No nodes directory, return empty vmlist
+ tracing::debug!("No '{}' directory found, returning empty vmlist", NODES_DIR);
+ return Ok(vmlist);
+ };
+
+ // Iterate through each node directory
+ for node_entry in &nodes_entries {
+ if !node_entry.is_dir() {
+ continue;
+ }
+
+ let node_name = node_entry.name.clone();
+
+ // Validate node name (simple check for valid hostname)
+ if !is_valid_nodename(&node_name) {
+ tracing::warn!("Skipping invalid node name: {}", node_name);
+ continue;
+ }
+
+ tracing::debug!("Scanning node: {}", node_name);
+
+ // Scan both qemu-server and lxc directories for VM/CT configs
+ for vmtype in [VmType::Qemu, VmType::Lxc] {
+ let dir_path = format!("nodes/{node_name}/{}", vmtype.config_dir());
+ let Ok(dir_entries) = memdb.readdir(&dir_path) else {
+ continue;
+ };
+
+ for config_entry in dir_entries {
+ let Some(vmid) = parse_vm_config_name(&config_entry.name) else {
+ continue;
+ };
+
+ if let Some(existing) = vmlist.get(&vmid) {
+ tracing::error!(
+ vmid,
+ node = %node_name,
+ vmtype = %vmtype,
+ existing_node = %existing.node,
+ existing_type = %existing.vm_type,
+ "Duplicate VMID found"
+ );
+ duplicates.push(vmid);
+ } else {
+ vmlist.insert(
+ vmid,
+ VmEntry {
+ vm_id: vmid,
+ vm_type: vmtype,
+ node: node_name.clone(),
+ version: config_entry.version as u32,
+ },
+ );
+ tracing::debug!(vmid, node = %node_name, %vmtype, "Found VM/CT");
+ }
+ }
+ }
+ }
+
+ if !duplicates.is_empty() {
+ tracing::warn!(
+ count = duplicates.len(),
+ ?duplicates,
+ "Found duplicate VMIDs"
+ );
+ }
+
+ tracing::info!(
+ vms = vmlist.len(),
+ nodes = nodes_entries.len(),
+ "VM list recreation complete"
+ );
+
+ Ok(vmlist)
+}
+
+/// Parse VM config filename to extract VMID
+///
+/// Expects format: "{vmid}.conf"
+/// Returns Some(vmid) if valid, None otherwise
+pub fn parse_vm_config_name(name: &str) -> Option<u32> {
+ let vmid_str = name.strip_suffix(".conf")?;
+ // Reject vmid=0 (M2: memdb.c:189 requires first digit is '1'..'9')
+ if vmid_str.starts_with('0') {
+ return None;
+ }
+ vmid_str.parse::<u32>().ok()
+}
+
+/// Validate node name (LDH rule - Letters, Digits, Hyphens)
+///
+/// Matches C version's valid_nodename() check (memdb.c:222-228)
+/// - Only ASCII letters, digits, and hyphens
+/// - Cannot start or end with hyphen
+/// - No dots allowed (unlike the previous implementation)
+pub fn is_valid_nodename(name: &str) -> bool {
+ if name.is_empty() || name.len() > 255 {
+ return false;
+ }
+
+ // Cannot start or end with hyphen
+ if name.starts_with('-') || name.ends_with('-') {
+ return false;
+ }
+
+ // All characters must be alphanumeric or hyphen (no dots)
+ name.chars().all(|c| c.is_ascii_alphanumeric() || c == '-')
+}
+
+/// Parse a path to check if it contains a VM config
+///
+/// Returns (nodename, vmtype, vmid) if the path is a VM config, None otherwise
+/// Matches C's path_contain_vm_config() (memdb.c:267)
+pub fn parse_vm_config_path(path: &str) -> Option<(String, VmType, u32)> {
+ // Path format: nodes/{nodename}/qemu-server/{vmid}.conf
+ // or nodes/{nodename}/lxc/{vmid}.conf
+ let path = path.trim_start_matches('/');
+
+ let parts: Vec<&str> = path.split('/').collect();
+ if parts.len() != 4 || parts[0] != NODES_DIR {
+ return None;
+ }
+
+ let nodename = parts[1];
+ let vmtype_dir = parts[2];
+ let filename = parts[3];
+
+ if !is_valid_nodename(nodename) {
+ return None;
+ }
+
+ let vmtype = match vmtype_dir {
+ "qemu-server" => VmType::Qemu,
+ "lxc" => VmType::Lxc,
+ _ => return None,
+ };
+
+ let vmid = parse_vm_config_name(filename)?;
+
+ Some((nodename.to_string(), vmtype, vmid))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_vm_config_name() {
+ assert_eq!(parse_vm_config_name("100.conf"), Some(100));
+ assert_eq!(parse_vm_config_name("999.conf"), Some(999));
+ assert_eq!(parse_vm_config_name("123"), None);
+ assert_eq!(parse_vm_config_name("abc.conf"), None);
+ assert_eq!(parse_vm_config_name(""), None);
+ // Reject vmid=0
+ assert_eq!(parse_vm_config_name("0.conf"), None);
+ assert_eq!(parse_vm_config_name("00.conf"), None);
+ assert_eq!(parse_vm_config_name("001.conf"), None);
+ }
+
+ #[test]
+ fn test_is_valid_nodename() {
+ // Valid names
+ assert!(is_valid_nodename("node1"));
+ assert!(is_valid_nodename("pve-node-01"));
+ assert!(is_valid_nodename("a"));
+ assert!(is_valid_nodename("node123"));
+
+ // Invalid names
+ assert!(!is_valid_nodename("")); // empty
+ assert!(!is_valid_nodename("-invalid")); // starts with hyphen
+ assert!(!is_valid_nodename("invalid-")); // ends with hyphen
+ assert!(!is_valid_nodename("node_1")); // underscore not allowed
+ // Dots not allowed (LDH rule)
+ assert!(!is_valid_nodename("server.example.com"));
+ assert!(!is_valid_nodename(".invalid")); // starts with dot
+ }
+
+ #[test]
+ fn test_parse_vm_config_path() {
+ // Valid paths
+ assert_eq!(
+ parse_vm_config_path("/nodes/node1/qemu-server/100.conf"),
+ Some(("node1".to_string(), VmType::Qemu, 100))
+ );
+ assert_eq!(
+ parse_vm_config_path("nodes/node1/lxc/200.conf"),
+ Some(("node1".to_string(), VmType::Lxc, 200))
+ );
+
+ // Invalid paths
+ assert_eq!(
+ parse_vm_config_path("/nodes/node1/qemu-server/0.conf"),
+ None
+ ); // vmid=0
+ assert_eq!(
+ parse_vm_config_path("/nodes/node1/qemu-server/abc.conf"),
+ None
+ ); // non-numeric
+ assert_eq!(parse_vm_config_path("/nodes/node1/other/100.conf"), None); // wrong dir
+ assert_eq!(
+ parse_vm_config_path("/other/node1/qemu-server/100.conf"),
+ None
+ ); // not under nodes
+ assert_eq!(
+ parse_vm_config_path("/nodes/node1/qemu-server/100.txt"),
+ None
+ ); // wrong extension
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/tests/checksum_test.rs b/src/pmxcfs-rs/pmxcfs-memdb/tests/checksum_test.rs
new file mode 100644
index 000000000..dd6529049
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/tests/checksum_test.rs
@@ -0,0 +1,208 @@
+//! Regression tests for the C-compatible memdb verification checksum.
+
+use anyhow::{Context, Result};
+use pmxcfs_memdb::{MemDb, TreeEntry};
+use pmxcfs_test_utils::create_minimal_test_db;
+use sha2::{Digest, Sha256};
+
+const WRITER: u32 = 7;
+const INITIAL_MTIME: u32 = 1_700_000_100;
+
+fn reference_database_checksum(entries: &[TreeEntry]) -> [u8; 32] {
+ let mut sorted = entries.to_vec();
+ sorted.sort_by_key(|entry| entry.inode);
+
+ let mut hasher = Sha256::new();
+
+ for entry in sorted {
+ hasher.update(entry.inode.to_ne_bytes());
+ hasher.update(entry.version.to_ne_bytes());
+ hasher.update(entry.writer.to_ne_bytes());
+ hasher.update(entry.mtime.to_ne_bytes());
+ hasher.update((entry.size as u32).to_ne_bytes());
+ hasher.update([entry.entry_type]);
+ hasher.update(entry.parent.to_ne_bytes());
+ hasher.update(entry.name.as_bytes());
+
+ if entry.is_file() && entry.size > 0 {
+ hasher.update(&entry.data);
+ }
+ }
+
+ hasher.finalize().into()
+}
+
+fn digest_hex(digest: [u8; 32]) -> String {
+ use std::fmt::Write;
+ let mut s = String::with_capacity(64);
+ for byte in digest {
+ write!(s, "{byte:02x}").unwrap();
+ }
+ s
+}
+
+fn get_entry(db: &MemDb, path: &str) -> Result<TreeEntry> {
+ db.lookup_path(path)
+ .with_context(|| format!("missing entry at {path}"))
+}
+
+#[test]
+fn test_checksum_matches_c_reference_for_empty_database() -> Result<()> {
+ let (_temp_dir, db) = create_minimal_test_db()?;
+
+ let checksum = db.compute_database_checksum()?;
+ let expected = reference_database_checksum(&[get_entry(&db, "/")?]);
+
+ assert_eq!(
+ checksum, expected,
+ "empty database checksum must match C reference"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_checksum_matches_c_reference_for_nested_tree() -> Result<()> {
+ let (_temp_dir, db) = create_minimal_test_db()?;
+
+ db.create("/nodes", libc::S_IFDIR, WRITER, INITIAL_MTIME)?;
+ db.create("/nodes/node1", libc::S_IFDIR, WRITER, INITIAL_MTIME + 1)?;
+ db.create(
+ "/nodes/node1/config.cfg",
+ libc::S_IFREG,
+ WRITER,
+ INITIAL_MTIME + 2,
+ )?;
+ db.write(
+ "/nodes/node1/config.cfg",
+ 0,
+ WRITER,
+ INITIAL_MTIME + 3,
+ b"totem {\n version: 2\n}\n",
+ false,
+ )?;
+
+ let entries = vec![
+ get_entry(&db, "/")?,
+ get_entry(&db, "/nodes")?,
+ get_entry(&db, "/nodes/node1")?,
+ get_entry(&db, "/nodes/node1/config.cfg")?,
+ ];
+
+ let checksum = db.compute_database_checksum()?;
+ let expected = reference_database_checksum(&entries);
+
+ assert_eq!(
+ checksum, expected,
+ "nested tree checksum must match C reference"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_checksum_tracks_content_and_metadata_updates() -> Result<()> {
+ let (_temp_dir, db) = create_minimal_test_db()?;
+
+ db.create("/config.cfg", libc::S_IFREG, WRITER, INITIAL_MTIME)?;
+ db.write(
+ "/config.cfg",
+ 0,
+ WRITER,
+ INITIAL_MTIME + 1,
+ b"keyboard: en\n",
+ false,
+ )?;
+
+ let initial_entries = vec![get_entry(&db, "/")?, get_entry(&db, "/config.cfg")?];
+ let initial_checksum = db.compute_database_checksum()?;
+ let initial_expected = reference_database_checksum(&initial_entries);
+ assert_eq!(initial_checksum, initial_expected);
+
+ db.write(
+ "/config.cfg",
+ 0,
+ WRITER + 1,
+ INITIAL_MTIME + 2,
+ b"keyboard: de\n",
+ false,
+ )?;
+
+ let updated_entries = vec![get_entry(&db, "/")?, get_entry(&db, "/config.cfg")?];
+ let updated_checksum = db.compute_database_checksum()?;
+ let updated_expected = reference_database_checksum(&updated_entries);
+
+ assert_eq!(updated_checksum, updated_expected);
+ assert_ne!(initial_checksum, updated_checksum);
+
+ Ok(())
+}
+
+#[test]
+fn test_checksum_preserves_history_sensitive_versions() -> Result<()> {
+ let (_temp_a, db_a) = create_minimal_test_db()?;
+ let (_temp_b, db_b) = create_minimal_test_db()?;
+
+ for (path, content) in [("/a.txt", b"a"), ("/b.txt", b"b"), ("/c.txt", b"c")] {
+ db_a.create(path, libc::S_IFREG, WRITER, INITIAL_MTIME)?;
+ db_a.write(path, 0, WRITER, INITIAL_MTIME + 1, content, false)?;
+ }
+
+ for (path, content) in [("/c.txt", b"c"), ("/b.txt", b"b"), ("/a.txt", b"a")] {
+ db_b.create(path, libc::S_IFREG, WRITER, INITIAL_MTIME)?;
+ db_b.write(path, 0, WRITER, INITIAL_MTIME + 1, content, false)?;
+ }
+
+ let entries_a = vec![
+ get_entry(&db_a, "/")?,
+ get_entry(&db_a, "/a.txt")?,
+ get_entry(&db_a, "/b.txt")?,
+ get_entry(&db_a, "/c.txt")?,
+ ];
+ let entries_b = vec![
+ get_entry(&db_b, "/")?,
+ get_entry(&db_b, "/a.txt")?,
+ get_entry(&db_b, "/b.txt")?,
+ get_entry(&db_b, "/c.txt")?,
+ ];
+
+ let checksum_a = db_a.compute_database_checksum()?;
+ let checksum_b = db_b.compute_database_checksum()?;
+
+ assert_eq!(checksum_a, reference_database_checksum(&entries_a));
+ assert_eq!(checksum_b, reference_database_checksum(&entries_b));
+ assert_ne!(
+ checksum_a, checksum_b,
+ "different creation history must keep producing different checksums"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_checksum_regression_literal_for_single_file_tree() -> Result<()> {
+ let (_temp_dir, db) = create_minimal_test_db()?;
+
+ db.create("/literal.txt", libc::S_IFREG, WRITER, INITIAL_MTIME)?;
+ db.write(
+ "/literal.txt",
+ 0,
+ WRITER,
+ INITIAL_MTIME + 1,
+ b"literal regression\n",
+ false,
+ )?;
+
+ let checksum = db.compute_database_checksum()?;
+ let expected =
+ reference_database_checksum(&[get_entry(&db, "/")?, get_entry(&db, "/literal.txt")?]);
+
+ assert_eq!(checksum, expected);
+ assert_eq!(
+ digest_hex(checksum),
+ "38c0e4a51b63cc14996a9a5e428d1eef67d3865740449a561a31208477baad37",
+ "literal checksum guards against field/order regressions"
+ );
+
+ Ok(())
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/tests/database_tests.rs b/src/pmxcfs-rs/pmxcfs-memdb/tests/database_tests.rs
new file mode 100644
index 000000000..b21581bc3
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/tests/database_tests.rs
@@ -0,0 +1,461 @@
+//! Integration tests for MemDb that use only the public API.
+//!
+//! Tests that require access to private fields or #[cfg(test)]-only helpers
+//! (e.g. test_set_lock_timestamp) remain in database.rs.
+use anyhow::Result;
+use pmxcfs_memdb::MemDb;
+use pmxcfs_test_utils::{TEST_MTIME, create_minimal_test_db};
+use tempfile::TempDir;
+
+#[test]
+fn test_memdb_file_size_limit() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ // Create a file
+ let now = TEST_MTIME;
+
+ db.create("/test.bin", libc::S_IFREG, 0, now)?;
+
+ // Try to write exactly 1MB (should succeed)
+ let data_1mb = vec![0u8; 1024 * 1024];
+ let result = db.write("/test.bin", 0, 0, now, &data_1mb, false);
+ assert!(result.is_ok(), "1MB file should be accepted");
+
+ // Try to write 1MB + 1 byte (should fail)
+ let data_too_large = vec![0u8; 1024 * 1024 + 1];
+ db.create("/test2.bin", libc::S_IFREG, 0, now)?;
+ let result = db.write("/test2.bin", 0, 0, now, &data_too_large, false);
+ assert!(result.is_err(), "File larger than 1MB should be rejected");
+
+ Ok(())
+}
+
+#[test]
+fn test_memdb_basic_operations() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Test directory creation
+ db.create("/testdir", libc::S_IFDIR, 0, now)?;
+ assert!(db.exists("/testdir")?, "Directory should exist");
+
+ // Test file creation
+ db.create("/testdir/file.txt", libc::S_IFREG, 0, now)?;
+ assert!(db.exists("/testdir/file.txt")?, "File should exist");
+
+ // Test write
+ let data = b"Hello, pmxcfs!";
+ db.write("/testdir/file.txt", 0, 0, now, data, false)?;
+
+ // Test read
+ let read_data = db.read("/testdir/file.txt", 0, 1024)?;
+ assert_eq!(&read_data[..], data, "Read data should match written data");
+
+ // Test readdir
+ let entries = db.readdir("/testdir")?;
+ assert_eq!(entries.len(), 1, "Directory should have 1 entry");
+ assert_eq!(entries[0].name, "file.txt");
+
+ // Test rename
+ db.rename("/testdir/file.txt", "/testdir/renamed.txt", 0, now)?;
+ assert!(
+ !db.exists("/testdir/file.txt")?,
+ "Old path should not exist"
+ );
+ assert!(db.exists("/testdir/renamed.txt")?, "New path should exist");
+
+ // Test delete
+ db.delete("/testdir/renamed.txt", 0, now)?;
+ assert!(
+ !db.exists("/testdir/renamed.txt")?,
+ "Deleted file should not exist"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_checksum_operations() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Create some test data
+ db.create("/file1.txt", libc::S_IFREG, 0, now)?;
+ db.write("/file1.txt", 0, 0, now, b"test data 1", false)?;
+
+ db.create("/file2.txt", libc::S_IFREG, 0, now)?;
+ db.write("/file2.txt", 0, 0, now, b"test data 2", false)?;
+
+ // Test database encoding
+ let encoded = db.encode_database()?;
+ assert!(!encoded.is_empty(), "Encoded database should not be empty");
+
+ // Test database checksum
+ let checksum1 = db.compute_database_checksum()?;
+ assert_ne!(checksum1, [0u8; 32], "Checksum should not be all zeros");
+
+ // Compute checksum again - should be the same
+ let checksum2 = db.compute_database_checksum()?;
+ assert_eq!(checksum1, checksum2, "Checksum should be deterministic");
+
+ // Modify database and verify checksum changes
+ db.write("/file1.txt", 0, 0, now, b"modified data", false)?;
+ let checksum3 = db.compute_database_checksum()?;
+ assert_ne!(
+ checksum1, checksum3,
+ "Checksum should change after modification"
+ );
+
+ // Test entry checksum
+ if let Some(entry) = db.lookup_path("/file1.txt") {
+ let entry_csum = entry.compute_checksum();
+ assert_ne!(
+ entry_csum, [0u8; 32],
+ "Entry checksum should not be all zeros"
+ );
+ } else {
+ panic!("File should exist");
+ }
+
+ Ok(())
+}
+
+#[test]
+fn test_regular_file_mtime_update() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Create a regular file
+ db.create("/testfile.txt", 0, 0, now)?;
+
+ let entry = db.lookup_path("/testfile.txt").unwrap();
+ let writer_id = entry.writer;
+
+ // Should be able to set both older and newer mtime on regular files
+ let older_mtime = now - 10;
+ let result = db.set_mtime("/testfile.txt", writer_id, older_mtime);
+ assert!(result.is_ok(), "Regular files should allow older mtime");
+
+ let newer_mtime = now + 10;
+ let result = db.set_mtime("/testfile.txt", writer_id, newer_mtime);
+ assert!(result.is_ok(), "Regular files should allow newer mtime");
+
+ Ok(())
+}
+
+#[test]
+fn test_invalid_path_traversal() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Test path traversal attempts
+ let invalid_paths = vec![
+ "/../etc/passwd", // Absolute path traversal
+ "/test/../../../etc/passwd", // Multiple parent references
+ "//etc//passwd", // Double slashes
+ "/test/./file", // Current directory reference
+ ];
+
+ for invalid_path in invalid_paths {
+ // The DB splits on '/' and filters empty segments — it does NOT normalize
+ // `.` or `..` as special components. They are treated as literal directory
+ // names, none of which exist. All paths here must fail because intermediate
+ // directories (`..`, `.`, `test`) don't exist in a fresh database.
+ let result = db.create(invalid_path, libc::S_IFREG, 0, now);
+ assert!(
+ result.is_err(),
+ "Path '{invalid_path}' should be rejected (parent directory does not exist)"
+ );
+ }
+
+ Ok(())
+}
+
+#[test]
+fn test_operations_on_nonexistent_paths() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Try to read non-existent file
+ let result = db.read("/nonexistent.txt", 0, 100);
+ assert!(result.is_err(), "Reading non-existent file should fail");
+
+ // Try to write to non-existent file
+ let result = db.write("/nonexistent.txt", 0, 0, now, b"data", false);
+ assert!(result.is_err(), "Writing to non-existent file should fail");
+
+ // Try to delete non-existent file
+ let result = db.delete("/nonexistent.txt", 0, now);
+ assert!(result.is_err(), "Deleting non-existent file should fail");
+
+ // Try to rename non-existent file
+ let result = db.rename("/nonexistent.txt", "/new.txt", 0, now);
+ assert!(result.is_err(), "Renaming non-existent file should fail");
+
+ // Try to check if non-existent file is locked
+ assert!(
+ !db.is_locked("/nonexistent.txt"),
+ "Non-existent file should not be locked"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_file_type_mismatches() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Create a directory
+ db.create("/testdir", libc::S_IFDIR, 0, now)?;
+
+ // Try to write to a directory (should fail)
+ let result = db.write("/testdir", 0, 0, now, b"data", false);
+ assert!(result.is_err(), "Writing to a directory should fail");
+
+ // Try to read from a directory (readdir should work, but read should fail)
+ let result = db.read("/testdir", 0, 100);
+ assert!(result.is_err(), "Reading from a directory should fail");
+
+ // Create a file
+ db.create("/testfile.txt", libc::S_IFREG, 0, now)?;
+
+ // Try to readdir on a file (should fail)
+ let result = db.readdir("/testfile.txt");
+ assert!(result.is_err(), "Readdir on a file should fail");
+
+ Ok(())
+}
+
+#[test]
+fn test_duplicate_creation() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Create a file
+ db.create("/duplicate.txt", libc::S_IFREG, 0, now)?;
+
+ // Try to create the same file again
+ let result = db.create("/duplicate.txt", libc::S_IFREG, 0, now);
+ assert!(result.is_err(), "Creating duplicate file should fail");
+
+ // Create a directory
+ db.create("/dupdir", libc::S_IFDIR, 0, now)?;
+
+ // Try to create the same directory again
+ let result = db.create("/dupdir", libc::S_IFDIR, 0, now);
+ assert!(result.is_err(), "Creating duplicate directory should fail");
+
+ Ok(())
+}
+
+#[test]
+fn test_rename_target_exists() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Create source and target files
+ db.create("/source.txt", libc::S_IFREG, 0, now)?;
+ db.write("/source.txt", 0, 0, now, b"source data", false)?;
+
+ db.create("/target.txt", libc::S_IFREG, 0, now)?;
+ db.write("/target.txt", 0, 0, now, b"target data", false)?;
+
+ // Rename source to existing target (should succeed with atomic replacement - POSIX semantics)
+ let result = db.rename("/source.txt", "/target.txt", 0, now);
+ assert!(
+ result.is_ok(),
+ "Renaming to existing target should succeed (POSIX semantics)"
+ );
+
+ // Source should no longer exist
+ assert!(
+ !db.exists("/source.txt")?,
+ "Source should not exist after rename"
+ );
+
+ // Target should exist with source's data (atomic replacement)
+ assert!(db.exists("/target.txt")?, "Target should exist");
+ let data = db.read("/target.txt", 0, 100)?;
+ assert_eq!(
+ &data[..],
+ b"source data",
+ "Target should have source's data after atomic replacement"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_delete_nonempty_directory() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Create a directory with a file
+ db.create("/parent", libc::S_IFDIR, 0, now)?;
+ db.create("/parent/child.txt", libc::S_IFREG, 0, now)?;
+
+ // Deleting a non-empty directory must always fail with "not empty".
+ let result = db.delete("/parent", 0, now);
+ assert!(result.is_err(), "Deleting non-empty directory must fail");
+ let err_msg = result.unwrap_err().to_string();
+ assert!(
+ err_msg.contains("not empty") || err_msg.contains("ENOTEMPTY"),
+ "Error should indicate directory is not empty, got: {err_msg}"
+ );
+
+ Ok(())
+}
+
+#[test]
+fn test_write_offset_beyond_file_size() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Create a file with some data
+ db.create("/offset-test.txt", libc::S_IFREG, 0, now)?;
+ db.write("/offset-test.txt", 0, 0, now, b"hello", false)?;
+
+ // Write at offset beyond current file size (sparse file)
+ let result = db.write("/offset-test.txt", 100, 0, now, b"world", false);
+
+ // Check if sparse writes are supported
+ if result.is_ok() {
+ let data = db.read("/offset-test.txt", 0, 200)?;
+ // Should have zeros between offset 5 and 100
+ assert_eq!(&data[0..5], b"hello", "Initial data should be preserved");
+ assert_eq!(
+ &data[100..105],
+ b"world",
+ "Data at offset should be written"
+ );
+ }
+
+ Ok(())
+}
+
+#[test]
+fn test_empty_path_handling() -> Result<()> {
+ let (_dir, db) = create_minimal_test_db()?;
+
+ let now = TEST_MTIME;
+
+ // Test empty path for create (should be rejected)
+ let result = db.create("", libc::S_IFREG, 0, now);
+ assert!(result.is_err(), "Empty path should be rejected for create");
+
+ // Note: exists("") behavior is implementation-specific (may return true for root)
+ // so we don't test it here
+
+ Ok(())
+}
+
+#[test]
+fn test_database_persistence() -> Result<()> {
+ let dir = TempDir::new()?;
+ let db_path = dir.path().join("test.db");
+
+ let now = TEST_MTIME;
+
+ // Create database and write data
+ {
+ let db = MemDb::open(&db_path, true)?;
+ db.create("/persistent.txt", libc::S_IFREG, 0, now)?;
+ db.write("/persistent.txt", 0, 0, now, b"persistent data", false)?;
+ }
+
+ // Reopen database and verify data persists
+ {
+ let db = MemDb::open(&db_path, false)?;
+ assert!(
+ db.exists("/persistent.txt")?,
+ "File should persist across reopens"
+ );
+
+ let data = db.read("/persistent.txt", 0, 1024)?;
+ assert_eq!(&data[..], b"persistent data", "Data should persist");
+ }
+
+ Ok(())
+}
+
+#[test]
+fn test_persistence_with_multiple_files() -> Result<()> {
+ let dir = TempDir::new()?;
+ let db_path = dir.path().join("test.db");
+
+ let now = TEST_MTIME;
+
+ // Create database with multiple files
+ {
+ let db = MemDb::open(&db_path, true)?;
+
+ // Create directory
+ db.create("/config", libc::S_IFDIR, 0, now)?;
+
+ // Create files in root
+ db.create("/file1.txt", libc::S_IFREG, 0, now)?;
+ db.write("/file1.txt", 0, 0, now, b"content 1", false)?;
+
+ // Create files in directory
+ db.create("/config/file2.txt", libc::S_IFREG, 0, now)?;
+ db.write("/config/file2.txt", 0, 0, now, b"content 2", false)?;
+ }
+
+ // Reopen and verify all data persists
+ {
+ let db = MemDb::open(&db_path, false)?;
+
+ assert!(db.exists("/config")?, "Directory should persist");
+ assert!(db.exists("/file1.txt")?, "File 1 should persist");
+ assert!(db.exists("/config/file2.txt")?, "File 2 should persist");
+
+ let data1 = db.read("/file1.txt", 0, 1024)?;
+ assert_eq!(&data1[..], b"content 1", "File 1 content should persist");
+
+ let data2 = db.read("/config/file2.txt", 0, 1024)?;
+ assert_eq!(&data2[..], b"content 2", "File 2 content should persist");
+ }
+
+ Ok(())
+}
+
+#[test]
+fn test_persistence_after_updates() -> Result<()> {
+ let dir = TempDir::new()?;
+ let db_path = dir.path().join("test.db");
+
+ let now = TEST_MTIME;
+
+ // Create database and write initial data
+ {
+ let db = MemDb::open(&db_path, true)?;
+ db.create("/mutable.txt", libc::S_IFREG, 0, now)?;
+ db.write("/mutable.txt", 0, 0, now, b"initial", false)?;
+ }
+
+ // Reopen and update data
+ {
+ let db = MemDb::open(&db_path, false)?;
+ db.write("/mutable.txt", 0, 0, now + 1, b"updated", false)?;
+ }
+
+ // Reopen again and verify updated data persists
+ {
+ let db = MemDb::open(&db_path, false)?;
+ let data = db.read("/mutable.txt", 0, 1024)?;
+ assert_eq!(&data[..], b"updated", "Updated data should persist");
+ }
+
+ Ok(())
+}
diff --git a/src/pmxcfs-rs/pmxcfs-memdb/tests/sync_integration_tests.rs b/src/pmxcfs-rs/pmxcfs-memdb/tests/sync_integration_tests.rs
new file mode 100644
index 000000000..a298b2076
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-memdb/tests/sync_integration_tests.rs
@@ -0,0 +1,390 @@
+/// Integration tests for MemDb synchronization operations
+///
+/// Tests the apply_tree_entry and encode_index functionality used during
+/// cluster state synchronization.
+use anyhow::Result;
+use pmxcfs_memdb::{MemDb, ROOT_INODE, TreeEntry};
+use pmxcfs_test_utils::create_minimal_test_db;
+use tempfile::TempDir;
+
+#[test]
+fn test_encode_index_empty_db() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Encode index from empty database (only root entry)
+ let index = memdb.encode_index()?;
+
+ // C's memdb->index hash table always includes root (inode 0). memdb_encode_index
+ // iterates the full hash table, so root appears in entries[].
+ // An empty database has 1 entry: root (inode 0).
+ assert_eq!(index.version, 1); // Root created with version 1
+ assert_eq!(index.size, 1);
+ assert_eq!(index.entries.len(), 1);
+ assert_eq!(index.entries[0].inode, 0); // First (and only) entry is root
+
+ Ok(())
+}
+
+#[test]
+fn test_encode_index_with_entries() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Create some entries
+ memdb.create("/file1.txt", 0, 0, 1000)?;
+ memdb.create("/dir1", libc::S_IFDIR, 0, 1001)?;
+ memdb.create("/dir1/file2.txt", 0, 0, 1002)?;
+
+ // Encode index
+ let index = memdb.encode_index()?;
+
+ // Should have 4 entries: root (inode 0), file1.txt, dir1, dir1/file2.txt
+ // Root is included because C's memdb->index always contains inode 0.
+ assert_eq!(index.size, 4);
+ assert_eq!(index.entries.len(), 4);
+
+ // Entries should be sorted by inode
+ for i in 1..index.entries.len() {
+ assert!(
+ index.entries[i].inode > index.entries[i - 1].inode,
+ "Entries not sorted"
+ );
+ }
+
+ // Version should be incremented
+ assert!(index.version >= 4); // At least 4 operations
+
+ Ok(())
+}
+
+#[test]
+fn test_apply_tree_entry_new() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Create a new TreeEntry
+ let entry = TreeEntry {
+ inode: 10,
+ parent: ROOT_INODE,
+ version: 100,
+ writer: 2,
+ mtime: 5000,
+ size: 13,
+ entry_type: 8, // DT_REG
+ name: "applied.txt".to_string(),
+ data: b"applied data!".to_vec(),
+ };
+
+ // Apply it
+ memdb.apply_tree_entry(entry.clone())?;
+
+ // Verify it was added
+ let retrieved = memdb.lookup_path("/applied.txt");
+ assert!(retrieved.is_some());
+ let retrieved = retrieved.unwrap();
+
+ assert_eq!(retrieved.inode, 10);
+ assert_eq!(retrieved.name, "applied.txt");
+ assert_eq!(retrieved.version, 100);
+ assert_eq!(retrieved.writer, 2);
+ assert_eq!(retrieved.mtime, 5000);
+ assert_eq!(retrieved.data, b"applied data!");
+
+ // Verify database version was updated
+ assert!(memdb.get_version() >= 100);
+
+ Ok(())
+}
+
+#[test]
+fn test_apply_tree_entry_update() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Create an initial entry
+ memdb.create("/update.txt", 0, 0, 1000)?;
+ memdb.write("/update.txt", 0, 0, 1001, b"original", false)?;
+
+ let initial = memdb.lookup_path("/update.txt").unwrap();
+ let initial_inode = initial.inode;
+
+ // Apply an updated version
+ let updated = TreeEntry {
+ inode: initial_inode,
+ parent: ROOT_INODE,
+ version: 200,
+ writer: 3,
+ mtime: 2000,
+ size: 7,
+ entry_type: 8,
+ name: "update.txt".to_string(),
+ data: b"updated".to_vec(),
+ };
+
+ memdb.apply_tree_entry(updated)?;
+
+ // Verify it was updated
+ let retrieved = memdb.lookup_path("/update.txt").unwrap();
+ assert_eq!(retrieved.inode, initial_inode); // Same inode
+ assert_eq!(retrieved.version, 200); // Updated version
+ assert_eq!(retrieved.writer, 3); // Updated writer
+ assert_eq!(retrieved.mtime, 2000); // Updated mtime
+ assert_eq!(retrieved.data, b"updated"); // Updated data
+
+ Ok(())
+}
+
+#[test]
+fn test_apply_tree_entry_directory() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Apply a directory entry
+ let dir_entry = TreeEntry {
+ inode: 20,
+ parent: ROOT_INODE,
+ version: 50,
+ writer: 1,
+ mtime: 3000,
+ size: 0,
+ entry_type: 4, // DT_DIR
+ name: "newdir".to_string(),
+ data: Vec::new(),
+ };
+
+ memdb.apply_tree_entry(dir_entry)?;
+
+ // Verify directory was created
+ let retrieved = memdb.lookup_path("/newdir").unwrap();
+ assert_eq!(retrieved.inode, 20);
+ assert!(retrieved.is_dir());
+ assert_eq!(retrieved.name, "newdir");
+
+ Ok(())
+}
+
+#[test]
+fn test_apply_tree_entry_move() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Create initial structure
+ memdb.create("/olddir", libc::S_IFDIR, 0, 1000)?;
+ memdb.create("/newdir", libc::S_IFDIR, 0, 1001)?;
+ memdb.create("/olddir/file.txt", 0, 0, 1002)?;
+
+ let file = memdb.lookup_path("/olddir/file.txt").unwrap();
+ let file_inode = file.inode;
+ let newdir = memdb.lookup_path("/newdir").unwrap();
+
+ // Apply entry that moves file to newdir
+ let moved = TreeEntry {
+ inode: file_inode,
+ parent: newdir.inode, // New parent
+ version: 100,
+ writer: 2,
+ mtime: 2000,
+ size: 0,
+ entry_type: 8,
+ name: "file.txt".to_string(),
+ data: Vec::new(),
+ };
+
+ memdb.apply_tree_entry(moved)?;
+
+ // Verify file moved
+ assert!(memdb.lookup_path("/olddir/file.txt").is_none());
+ assert!(memdb.lookup_path("/newdir/file.txt").is_some());
+ let retrieved = memdb.lookup_path("/newdir/file.txt").unwrap();
+ assert_eq!(retrieved.inode, file_inode);
+
+ Ok(())
+}
+
+#[test]
+fn test_apply_multiple_entries() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Apply multiple entries simulating a sync
+ let entries = vec![
+ TreeEntry {
+ inode: 10,
+ parent: ROOT_INODE,
+ version: 100,
+ writer: 2,
+ mtime: 5000,
+ size: 0,
+ entry_type: 4, // Dir
+ name: "configs".to_string(),
+ data: Vec::new(),
+ },
+ TreeEntry {
+ inode: 11,
+ parent: 10,
+ version: 101,
+ writer: 2,
+ mtime: 5001,
+ size: 12,
+ entry_type: 8, // File
+ name: "config1.txt".to_string(),
+ data: b"config data1".to_vec(),
+ },
+ TreeEntry {
+ inode: 12,
+ parent: 10,
+ version: 102,
+ writer: 2,
+ mtime: 5002,
+ size: 12,
+ entry_type: 8,
+ name: "config2.txt".to_string(),
+ data: b"config data2".to_vec(),
+ },
+ ];
+
+ // Apply all entries
+ for entry in entries {
+ memdb.apply_tree_entry(entry)?;
+ }
+
+ // Verify all were applied correctly
+ assert!(memdb.lookup_path("/configs").is_some());
+ assert!(memdb.lookup_path("/configs/config1.txt").is_some());
+ assert!(memdb.lookup_path("/configs/config2.txt").is_some());
+
+ let config1 = memdb.lookup_path("/configs/config1.txt").unwrap();
+ assert_eq!(config1.data, b"config data1");
+
+ let config2 = memdb.lookup_path("/configs/config2.txt").unwrap();
+ assert_eq!(config2.data, b"config data2");
+
+ // Verify database version
+ assert_eq!(memdb.get_version(), 102);
+
+ Ok(())
+}
+
+#[test]
+fn test_encode_decode_round_trip() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Create some entries
+ memdb.create("/file1.txt", 0, 0, 1000)?;
+ memdb.write("/file1.txt", 0, 0, 1001, b"data1", false)?;
+ memdb.create("/dir1", libc::S_IFDIR, 0, 1002)?;
+ memdb.create("/dir1/file2.txt", 0, 0, 1003)?;
+ memdb.write("/dir1/file2.txt", 0, 0, 1004, b"data2", false)?;
+
+ // Encode index
+ let index = memdb.encode_index()?;
+ let serialized = index.serialize();
+
+ // Deserialize
+ let deserialized = pmxcfs_memdb::MemDbIndex::deserialize(&serialized)?;
+
+ // Verify roundtrip
+ assert_eq!(deserialized.version, index.version);
+ assert_eq!(deserialized.last_inode, index.last_inode);
+ assert_eq!(deserialized.writer, index.writer);
+ assert_eq!(deserialized.mtime, index.mtime);
+ assert_eq!(deserialized.size, index.size);
+ assert_eq!(deserialized.entries.len(), index.entries.len());
+
+ for (orig, deser) in index.entries.iter().zip(deserialized.entries.iter()) {
+ assert_eq!(deser.inode, orig.inode);
+ assert_eq!(deser.digest, orig.digest);
+ }
+
+ Ok(())
+}
+
+#[test]
+fn test_apply_tree_entry_persistence() -> Result<()> {
+ let temp_dir = TempDir::new()?;
+ let db_path = temp_dir.path().join("persist.db");
+
+ // Create database and apply entry
+ {
+ let memdb = MemDb::open(&db_path, true)?;
+ let entry = TreeEntry {
+ inode: 15,
+ parent: ROOT_INODE,
+ version: 75,
+ writer: 3,
+ mtime: 7000,
+ size: 9,
+ entry_type: 8,
+ name: "persist.txt".to_string(),
+ data: b"persisted".to_vec(),
+ };
+ memdb.apply_tree_entry(entry)?;
+ }
+
+ // Reopen database and verify entry persisted
+ {
+ let memdb = MemDb::open(&db_path, false)?;
+ let retrieved = memdb.lookup_path("/persist.txt");
+ assert!(retrieved.is_some());
+ let retrieved = retrieved.unwrap();
+ assert_eq!(retrieved.inode, 15);
+ assert_eq!(retrieved.version, 75);
+ assert_eq!(retrieved.data, b"persisted");
+ }
+
+ Ok(())
+}
+
+#[test]
+fn test_index_digest_stability() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Create entry
+ memdb.create("/stable.txt", 0, 0, 1000)?;
+ memdb.write("/stable.txt", 0, 0, 1001, b"stable data", false)?;
+
+ // Encode index twice
+ let index1 = memdb.encode_index()?;
+ let index2 = memdb.encode_index()?;
+
+ // Digests should be identical
+ assert_eq!(index1.entries.len(), index2.entries.len());
+ for (e1, e2) in index1.entries.iter().zip(index2.entries.iter()) {
+ assert_eq!(e1.inode, e2.inode);
+ assert_eq!(e1.digest, e2.digest, "Digests should be stable");
+ }
+
+ Ok(())
+}
+
+#[test]
+fn test_index_digest_changes_on_modification() -> Result<()> {
+ let (_temp_dir, memdb) = create_minimal_test_db()?;
+
+ // Create entry
+ memdb.create("/change.txt", 0, 0, 1000)?;
+ memdb.write("/change.txt", 0, 0, 1001, b"original", false)?;
+
+ // Get initial digest
+ let index1 = memdb.encode_index()?;
+ let original_digest = index1
+ .entries
+ .iter()
+ .find(|e| e.inode != 0) // Not root
+ .unwrap()
+ .digest;
+
+ // Modify the file
+ memdb.write("/change.txt", 0, 0, 1002, b"modified", false)?;
+
+ // Get new digest
+ let index2 = memdb.encode_index()?;
+ let modified_digest = index2
+ .entries
+ .iter()
+ .find(|e| e.inode != 0) // Not root
+ .unwrap()
+ .digest;
+
+ // Digest should change
+ assert_ne!(
+ original_digest, modified_digest,
+ "Digest should change after modification"
+ );
+
+ Ok(())
+}
--
2.47.3
next prev parent reply other threads:[~2026-03-23 12:59 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-03-23 11:32 [PATCH pve-cluster v3 00/13] Rewrite pmxcfs with Rust Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 01/13] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 02/13] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 03/13] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 04/13] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-03-23 11:32 ` Kefu Chai [this message]
2026-03-23 11:32 ` SPAM: [PATCH pve-cluster v3 06/13] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 07/13] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 08/13] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 09/13] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 10/13] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 11/13] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-03-23 11:32 ` [PATCH pve-cluster v3 13/13] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260323113239.942866-6-k.chai@proxmox.com \
--to=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
--cc=tchaikov@gmail.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.