From: Samuel Rufinatscha <s.rufinatscha@proxmox.com>
To: Proxmox VE development discussion <pve-devel@lists.proxmox.com>,
Kefu Chai <k.chai@proxmox.com>
Cc: Kefu Chai <tchaikov@gmail.com>
Subject: Re: [pve-devel] [PATCH pve-cluster 07/15] pmxcfs-rs: add pmxcfs-test-utils infrastructure crate
Date: Tue, 3 Feb 2026 18:03:46 +0100 [thread overview]
Message-ID: <8c0d71c9-63b8-4f16-9298-5be7db393f9a@proxmox.com> (raw)
In-Reply-To: <20260106142440.2368585-8-k.chai@proxmox.com>
Thanks for the patch, having shared test utilities in a dedicated crate
makes a lot of sense.
Comments inline.
On 1/6/26 3:25 PM, Kefu Chai wrote:
> From: Kefu Chai <tchaikov@gmail.com>
>
> This commit introduces a dedicated testing infrastructure crate to support
> comprehensive unit and integration testing across the pmxcfs-rs workspace.
>
> Why a dedicated crate?
> - Provides shared test utilities without creating circular dependencies
> - Enables consistent test patterns across all pmxcfs crates
> - Centralizes mock implementations for dependency injection
>
> What this crate provides:
> 1. MockMemDb: Fast, in-memory implementation of MemDbOps trait
> - Eliminates SQLite I/O overhead in unit tests (~100x faster)
> - Enables isolated testing without filesystem dependencies
> - Uses HashMap for storage instead of SQLite persistence
>
> 2. MockStatus: Re-exported mock implementation for StatusOps trait
> - Allows testing without global singleton state
> - Enables parallel test execution
>
> 3. TestEnv builder: Fluent interface for test environment setup
> - Standardizes test configuration across different test types
> - Provides common directory structures and test data
>
> 4. Async helpers: Condition polling utilities (wait_for_condition)
> - Replaces sleep-based synchronization with active polling
>
> This crate is marked as dev-only in the workspace and is used by other
> crates through [dev-dependencies] to avoid circular dependencies.
>
> Signed-off-by: Kefu Chai <k.chai@proxmox.com>
> ---
> src/pmxcfs-rs/Cargo.toml | 2 +
> src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml | 34 +
> src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs | 526 +++++++++++++++
> .../pmxcfs-test-utils/src/mock_memdb.rs | 636 ++++++++++++++++++
> 4 files changed, 1198 insertions(+)
> create mode 100644 src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml
> create mode 100644 src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-test-utils/src/mock_memdb.rs
>
> diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
> index b5191c31..8fe06b88 100644
> --- a/src/pmxcfs-rs/Cargo.toml
> +++ b/src/pmxcfs-rs/Cargo.toml
> @@ -7,6 +7,7 @@ members = [
> "pmxcfs-rrd", # RRD (Round-Robin Database) persistence
> "pmxcfs-memdb", # In-memory database with SQLite persistence
> "pmxcfs-status", # Status monitoring and RRD data management
> + "pmxcfs-test-utils", # Test utilities and helpers (dev-only)
> ]
> resolver = "2"
>
> @@ -29,6 +30,7 @@ pmxcfs-status = { path = "pmxcfs-status" }
> pmxcfs-ipc = { path = "pmxcfs-ipc" }
> pmxcfs-services = { path = "pmxcfs-services" }
> pmxcfs-logger = { path = "pmxcfs-logger" }
> +pmxcfs-test-utils = { path = "pmxcfs-test-utils" }
>
> # Core async runtime
> tokio = { version = "1.35", features = ["full"] }
> diff --git a/src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml b/src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml
> new file mode 100644
> index 00000000..41cdce64
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-test-utils/Cargo.toml
> @@ -0,0 +1,34 @@
> +[package]
> +name = "pmxcfs-test-utils"
> +version.workspace = true
> +edition.workspace = true
> +authors.workspace = true
> +license.workspace = true
> +repository.workspace = true
> +rust-version.workspace = true
> +
> +[lib]
> +name = "pmxcfs_test_utils"
> +path = "src/lib.rs"
> +
> +[dependencies]
> +# Internal workspace dependencies
> +pmxcfs-api-types.workspace = true
> +pmxcfs-config.workspace = true
> +pmxcfs-memdb.workspace = true
> +pmxcfs-status.workspace = true
> +
> +# Error handling
> +anyhow.workspace = true
> +
> +# Concurrency
> +parking_lot.workspace = true
> +
> +# System integration
> +libc.workspace = true
> +
> +# Development utilities
> +tempfile.workspace = true
> +
> +# Async runtime
> +tokio.workspace = true
> diff --git a/src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs b/src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs
> new file mode 100644
> index 00000000..a2b732a5
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-test-utils/src/lib.rs
> @@ -0,0 +1,526 @@
> +//! Test utilities for pmxcfs integration and unit tests
> +//!
> +//! This crate provides:
> +//! - Common test setup and helper functions
> +//! - TestEnv builder for standard test configurations
> +//! - Mock implementations (MockStatus, MockMemDb for isolated testing)
> +//! - Test constants and utilities
> +
> +use anyhow::Result;
> +use pmxcfs_config::Config;
> +use pmxcfs_memdb::MemDb;
> +use std::sync::Arc;
> +use std::time::{Duration, Instant};
> +use tempfile::TempDir;
> +
> +// Re-export MockStatus for easy test access
> +pub use pmxcfs_status::{MockStatus, StatusOps};
> +
> +// Mock implementations
> +mod mock_memdb;
> +pub use mock_memdb::MockMemDb;
> +
> +// Re-export MemDbOps for convenience in tests
> +pub use pmxcfs_memdb::MemDbOps;
> +
> +// Test constants
> +pub const TEST_MTIME: u32 = 1234567890;
> +pub const TEST_NODE_NAME: &str = "testnode";
> +pub const TEST_CLUSTER_NAME: &str = "test-cluster";
> +pub const TEST_WWW_DATA_GID: u32 = 33;
> +
> +/// Test environment builder for standard test setups
> +///
> +/// This builder provides a fluent interface for creating test environments
> +/// with optional components (database, status, config).
> +///
> +/// # Example
> +/// ```
> +/// use pmxcfs_test_utils::TestEnv;
> +///
> +/// # fn example() -> anyhow::Result<()> {
> +/// let env = TestEnv::new()
> +/// .with_database()?
> +/// .with_mock_status()
> +/// .build();
> +///
> +/// // Use env.db, env.status, etc.
> +/// # Ok(())
> +/// # }
> +/// ```
> +pub struct TestEnv {
> + pub config: Arc<Config>,
> + pub db: Option<MemDb>,
> + pub status: Option<Arc<dyn StatusOps>>,
these are pub, but we also have accessor functions
(which can panic)
> + pub temp_dir: Option<TempDir>,
> +}
> +
> +impl TestEnv {
> + /// Create a new test environment builder with default config
> + pub fn new() -> Self {
> + Self::new_with_config(false)
> + }
> +
> + /// Create a new test environment builder with local mode config
> + pub fn new_local() -> Self {
> + Self::new_with_config(true)
> + }
> +
> + /// Create a new test environment builder with custom local_mode setting
> + pub fn new_with_config(local_mode: bool) -> Self {
> + let config = create_test_config(local_mode);
> + Self {
> + config,
> + db: None,
> + status: None,
> + temp_dir: None,
> + }
> + }
> +
> + /// Add a database with standard directory structure
> + pub fn with_database(mut self) -> Result<Self> {
> + let (temp_dir, db) = create_test_db()?;
> + self.temp_dir = Some(temp_dir);
> + self.db = Some(db);
> + Ok(self)
> + }
> +
> + /// Add a minimal database (no standard directories)
> + pub fn with_minimal_database(mut self) -> Result<Self> {
> + let (temp_dir, db) = create_minimal_test_db()?;
> + self.temp_dir = Some(temp_dir);
> + self.db = Some(db);
> + Ok(self)
> + }
> +
> + /// Add a MockStatus instance for isolated testing
> + pub fn with_mock_status(mut self) -> Self {
> + self.status = Some(Arc::new(MockStatus::new()));
> + self
> + }
> +
> + /// Add the real Status instance (uses global singleton)
> + pub fn with_status(mut self) -> Self {
> + self.status = Some(pmxcfs_status::init());
> + self
> + }
> +
> + /// Build and return the test environment
> + pub fn build(self) -> Self {
> + self
> + }
this function seems redundant
> +
> + /// Get a reference to the database (panics if not configured)
> + pub fn db(&self) -> &MemDb {
> + self.db
> + .as_ref()
> + .expect("Database not configured. Call with_database() first")
> + }
> +
> + /// Get a reference to the status (panics if not configured)
> + pub fn status(&self) -> &Arc<dyn StatusOps> {
> + self.status
> + .as_ref()
> + .expect("Status not configured. Call with_status() or with_mock_status() first")
> + }
> +}
> +
> +impl Default for TestEnv {
> + fn default() -> Self {
> + Self::new()
> + }
> +}
> +
> +/// Creates a standard test configuration
> +///
> +/// # Arguments
> +/// * `local_mode` - Whether to run in local mode (no cluster)
> +///
> +/// # Returns
> +/// Arc-wrapped Config suitable for testing
> +pub fn create_test_config(local_mode: bool) -> Arc<Config> {
> + Config::new(
> + TEST_NODE_NAME.to_string(),
> + "127.0.0.1".to_string(),
> + TEST_WWW_DATA_GID,
> + false, // debug mode
> + local_mode,
> + TEST_CLUSTER_NAME.to_string(),
> + )
> +}
> +
> +/// Creates a test database with standard directory structure
> +///
> +/// Creates the following directories:
> +/// - /nodes/{nodename}/qemu-server
> +/// - /nodes/{nodename}/lxc
> +/// - /nodes/{nodename}/priv
> +/// - /priv/lock/qemu-server
> +/// - /priv/lock/lxc
> +/// - /qemu-server
> +/// - /lxc
> +///
> +/// # Returns
> +/// (TempDir, MemDb) - The temp directory must be kept alive for database to persist
> +pub fn create_test_db() -> Result<(TempDir, MemDb)> {
> + let temp_dir = TempDir::new()?;
> + let db_path = temp_dir.path().join("test.db");
> + let db = MemDb::open(&db_path, true)?;
> +
> + // Create standard directory structure
> + let now = TEST_MTIME;
> +
> + // Node-specific directories
> + db.create("/nodes", libc::S_IFDIR, now)?;
> + db.create(&format!("/nodes/{}", TEST_NODE_NAME), libc::S_IFDIR, now)?;
> + db.create(
> + &format!("/nodes/{}/qemu-server", TEST_NODE_NAME),
> + libc::S_IFDIR,
> + now,
> + )?;
> + db.create(
> + &format!("/nodes/{}/lxc", TEST_NODE_NAME),
> + libc::S_IFDIR,
> + now,
> + )?;
> + db.create(
> + &format!("/nodes/{}/priv", TEST_NODE_NAME),
> + libc::S_IFDIR,
> + now,
> + )?;
> +
> + // Global directories
> + db.create("/priv", libc::S_IFDIR, now)?;
> + db.create("/priv/lock", libc::S_IFDIR, now)?;
> + db.create("/priv/lock/qemu-server", libc::S_IFDIR, now)?;
> + db.create("/priv/lock/lxc", libc::S_IFDIR, now)?;
> + db.create("/qemu-server", libc::S_IFDIR, now)?;
> + db.create("/lxc", libc::S_IFDIR, now)?;
> +
> + Ok((temp_dir, db))
> +}
> +
> +/// Creates a minimal test database (no standard directories)
> +///
> +/// Use this when you want full control over database structure
> +///
> +/// # Returns
> +/// (TempDir, MemDb) - The temp directory must be kept alive for database to persist
> +pub fn create_minimal_test_db() -> Result<(TempDir, MemDb)> {
> + let temp_dir = TempDir::new()?;
> + let db_path = temp_dir.path().join("test.db");
> + let db = MemDb::open(&db_path, true)?;
> + Ok((temp_dir, db))
> +}
> +
> +/// Creates test VM configuration content
> +///
> +/// # Arguments
> +/// * `vmid` - VM ID
> +/// * `cores` - Number of CPU cores
> +/// * `memory` - Memory in MB
> +///
> +/// # Returns
> +/// Configuration file content as bytes
> +pub fn create_vm_config(vmid: u32, cores: u32, memory: u32) -> Vec<u8> {
> + format!(
> + "name: test-vm-{}\ncores: {}\nmemory: {}\nbootdisk: scsi0\n",
> + vmid, cores, memory
> + )
> + .into_bytes()
> +}
> +
> +/// Creates test CT (container) configuration content
> +///
> +/// # Arguments
> +/// * `vmid` - Container ID
> +/// * `cores` - Number of CPU cores
> +/// * `memory` - Memory in MB
> +///
> +/// # Returns
> +/// Configuration file content as bytes
> +pub fn create_ct_config(vmid: u32, cores: u32, memory: u32) -> Vec<u8> {
> + format!(
> + "cores: {}\nmemory: {}\nrootfs: local:100/vm-{}-disk-0.raw\n",
> + cores, memory, vmid
> + )
> + .into_bytes()
> +}
> +
> +/// Creates a test lock path for a VM config
> +///
> +/// # Arguments
> +/// * `vmid` - VM ID
> +/// * `vm_type` - "qemu" or "lxc"
> +///
> +/// # Returns
> +/// Lock path in format `/priv/lock/{vm_type}/{vmid}.conf`
> +pub fn create_lock_path(vmid: u32, vm_type: &str) -> String {
> + format!("/priv/lock/{}/{}.conf", vm_type, vmid)
> +}
> +
> +/// Creates a test config path for a VM
> +///
> +/// # Arguments
> +/// * `vmid` - VM ID
> +/// * `vm_type` - "qemu-server" or "lxc"
> +///
> +/// # Returns
> +/// Config path in format `/{vm_type}/{vmid}.conf`
> +pub fn create_config_path(vmid: u32, vm_type: &str) -> String {
> + format!("/{}/{}.conf", vm_type, vmid)
> +}
> +
> +/// Clears all VMs from a status instance
> +///
> +/// Useful for ensuring clean state before tests that register VMs.
> +///
> +/// # Arguments
> +/// * `status` - The status instance to clear
> +pub fn clear_test_vms(status: &dyn StatusOps) {
> + let existing_vms: Vec<u32> = status.get_vmlist().keys().copied().collect();
> + for vmid in existing_vms {
> + status.delete_vm(vmid);
> + }
> +}
> +
> +/// Wait for a condition to become true, polling at regular intervals
> +///
> +/// This is a replacement for sleep-based synchronization in integration tests.
> +/// Instead of sleeping for an arbitrary duration and hoping the condition is met,
> +/// this function polls the condition and returns as soon as it becomes true.
> +///
> +/// # Arguments
> +/// * `predicate` - Function that returns true when the condition is met
> +/// * `timeout` - Maximum time to wait for the condition
> +/// * `check_interval` - How often to check the condition
> +///
> +/// # Returns
> +/// * `true` if condition was met within timeout
> +/// * `false` if timeout was reached without condition being met
> +///
> +/// # Example
> +/// ```no_run
> +/// use pmxcfs_test_utils::wait_for_condition;
> +/// use std::time::Duration;
> +/// use std::sync::atomic::{AtomicBool, Ordering};
> +/// use std::sync::Arc;
> +///
> +/// # async fn example() {
> +/// let ready = Arc::new(AtomicBool::new(false));
> +///
> +/// // Wait for service to be ready (with timeout)
> +/// let result = wait_for_condition(
> +/// || ready.load(Ordering::SeqCst),
> +/// Duration::from_secs(5),
> +/// Duration::from_millis(10),
> +/// ).await;
> +///
> +/// assert!(result, "Service should be ready within 5 seconds");
> +/// # }
> +/// ```
> +pub async fn wait_for_condition<F>(
> + predicate: F,
> + timeout: Duration,
> + check_interval: Duration,
> +) -> bool
> +where
> + F: Fn() -> bool,
> +{
> + let start = Instant::now();
> + loop {
> + if predicate() {
> + return true;
> + }
> + if start.elapsed() >= timeout {
> + return false;
> + }
> + tokio::time::sleep(check_interval).await;
> + }
> +}
> +
> +/// Wait for a condition with a custom error message
> +///
> +/// Similar to `wait_for_condition`, but returns a Result with a custom error message
> +/// if the timeout is reached.
> +///
> +/// # Arguments
> +/// * `predicate` - Function that returns true when the condition is met
> +/// * `timeout` - Maximum time to wait for the condition
> +/// * `check_interval` - How often to check the condition
> +/// * `error_msg` - Error message to return if timeout is reached
> +///
> +/// # Returns
> +/// * `Ok(())` if condition was met within timeout
> +/// * `Err(anyhow::Error)` with custom message if timeout was reached
> +///
> +/// # Example
> +/// ```no_run
> +/// use pmxcfs_test_utils::wait_for_condition_or_fail;
> +/// use std::time::Duration;
> +/// use std::sync::atomic::{AtomicU64, Ordering};
> +/// use std::sync::Arc;
> +///
> +/// # async fn example() -> anyhow::Result<()> {
> +/// let counter = Arc::new(AtomicU64::new(0));
> +///
> +/// wait_for_condition_or_fail(
> +/// || counter.load(Ordering::SeqCst) >= 1,
> +/// Duration::from_secs(5),
> +/// Duration::from_millis(10),
> +/// "Service should initialize within 5 seconds",
> +/// ).await?;
> +///
> +/// # Ok(())
> +/// # }
> +/// ```
> +pub async fn wait_for_condition_or_fail<F>(
> + predicate: F,
> + timeout: Duration,
> + check_interval: Duration,
> + error_msg: &str,
> +) -> Result<()>
> +where
> + F: Fn() -> bool,
> +{
> + if wait_for_condition(predicate, timeout, check_interval).await {
> + Ok(())
> + } else {
> + anyhow::bail!("{}", error_msg)
> + }
> +}
> +
> +/// Blocking version of wait_for_condition for synchronous tests
> +///
> +/// Similar to `wait_for_condition`, but works in synchronous contexts.
> +/// Polls the condition and returns as soon as it becomes true or timeout is reached.
> +///
> +/// # Arguments
> +/// * `predicate` - Function that returns true when the condition is met
> +/// * `timeout` - Maximum time to wait for the condition
> +/// * `check_interval` - How often to check the condition
> +///
> +/// # Returns
> +/// * `true` if condition was met within timeout
> +/// * `false` if timeout was reached without condition being met
> +///
> +/// # Example
> +/// ```no_run
> +/// use pmxcfs_test_utils::wait_for_condition_blocking;
> +/// use std::time::Duration;
> +/// use std::sync::atomic::{AtomicBool, Ordering};
> +/// use std::sync::Arc;
> +///
> +/// let ready = Arc::new(AtomicBool::new(false));
> +///
> +/// // Wait for service to be ready (with timeout)
> +/// let result = wait_for_condition_blocking(
> +/// || ready.load(Ordering::SeqCst),
> +/// Duration::from_secs(5),
> +/// Duration::from_millis(10),
> +/// );
> +///
> +/// assert!(result, "Service should be ready within 5 seconds");
> +/// ```
> +pub fn wait_for_condition_blocking<F>(
> + predicate: F,
> + timeout: Duration,
> + check_interval: Duration,
> +) -> bool
> +where
> + F: Fn() -> bool,
> +{
> + let start = Instant::now();
> + loop {
> + if predicate() {
> + return true;
> + }
> + if start.elapsed() >= timeout {
> + return false;
> + }
> + std::thread::sleep(check_interval);
> + }
> +}
> +
> +#[cfg(test)]
> +mod tests {
> + use super::*;
> +
> + #[test]
> + fn test_create_test_config() {
> + let config = create_test_config(true);
> + assert_eq!(config.nodename, TEST_NODE_NAME);
> + assert_eq!(config.cluster_name, TEST_CLUSTER_NAME);
> + assert!(config.local_mode);
> + }
> +
> + #[test]
> + fn test_create_test_db() -> Result<()> {
> + let (_temp_dir, db) = create_test_db()?;
> +
> + // Verify standard directories exist
> + assert!(db.exists("/nodes")?, "Should have /nodes");
> + assert!(db.exists("/qemu-server")?, "Should have /qemu-server");
> + assert!(db.exists("/priv/lock")?, "Should have /priv/lock");
> +
> + Ok(())
> + }
> +
> + #[test]
> + fn test_path_helpers() {
> + assert_eq!(
> + create_lock_path(100, "qemu-server"),
The docs of create_lock_path say qemu or lxc, but we pass "qemu-server"
> + "/priv/lock/qemu-server/100.conf"
> + );
> + assert_eq!(
> + create_config_path(100, "qemu-server"),
> + "/qemu-server/100.conf"
> + );
> + }
> +
> + #[test]
> + fn test_env_builder_basic() {
> + let env = TestEnv::new().build();
> + assert_eq!(env.config.nodename, TEST_NODE_NAME);
> + assert!(env.db.is_none());
> + assert!(env.status.is_none());
> + }
> +
> + #[test]
> + fn test_env_builder_with_database() -> Result<()> {
> + let env = TestEnv::new().with_database()?.build();
> + assert!(env.db.is_some());
> + assert!(env.db().exists("/nodes")?);
> + Ok(())
> + }
> +
> + #[test]
> + fn test_env_builder_with_mock_status() {
> + let env = TestEnv::new().with_mock_status().build();
> + assert!(env.status.is_some());
> +
> + // Test that MockStatus works
> + let status = env.status();
> + status.set_quorate(true);
> + assert!(status.is_quorate());
> + }
> +
> + #[test]
> + fn test_env_builder_full() -> Result<()> {
> + let env = TestEnv::new().with_database()?.with_mock_status().build();
> +
> + assert!(env.db.is_some());
> + assert!(env.status.is_some());
> + assert!(env.config.nodename == TEST_NODE_NAME);
> +
> + Ok(())
> + }
> +
> + // NOTE: Tokio tests for wait_for_condition functions are REMOVED because they
> + // cause the test runner to hang when running `cargo test --lib --workspace`.
> + // Root cause: tokio multi-threaded runtime doesn't shut down properly when
> + // these async tests complete, blocking the entire test suite.
> + //
> + // These utility functions work correctly and are verified in integration tests
> + // that actually use them (e.g., integration-tests/).
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-test-utils/src/mock_memdb.rs b/src/pmxcfs-rs/pmxcfs-test-utils/src/mock_memdb.rs
> new file mode 100644
> index 00000000..c341f9eb
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-test-utils/src/mock_memdb.rs
> @@ -0,0 +1,636 @@
> +//! Mock in-memory database implementation for testing
> +//!
> +//! This module provides `MockMemDb`, a lightweight in-memory implementation
> +//! of the `MemDbOps` trait for use in unit tests.
> +
> +use anyhow::{Result, bail};
> +use parking_lot::RwLock;
> +use pmxcfs_memdb::{MemDbOps, ROOT_INODE, TreeEntry};
> +use std::collections::HashMap;
> +use std::sync::atomic::{AtomicU64, Ordering};
> +use std::time::{SystemTime, UNIX_EPOCH};
> +
> +// Directory and file type constants from dirent.h
> +const DT_DIR: u8 = 4;
> +const DT_REG: u8 = 8;
> +
> +/// Mock in-memory database for testing
> +///
> +/// Unlike the real `MemDb` which uses SQLite persistence, `MockMemDb` stores
> +/// everything in memory using HashMap. This makes it:
> +/// - Faster for unit tests (no disk I/O)
> +/// - Easier to inject failures for error testing
> +/// - Completely isolated (no shared state between tests)
> +///
> +/// # Example
> +/// ```
> +/// use pmxcfs_test_utils::MockMemDb;
> +/// use pmxcfs_memdb::MemDbOps;
> +/// use std::sync::Arc;
> +///
> +/// let db: Arc<dyn MemDbOps> = Arc::new(MockMemDb::new());
> +/// db.create("/test.txt", 0, 1234).unwrap();
> +/// assert!(db.exists("/test.txt").unwrap());
> +/// ```
> +pub struct MockMemDb {
> + /// Files and directories stored as path -> data
> + files: RwLock<HashMap<String, Vec<u8>>>,
> + /// Directory entries stored as path -> Vec<child_names>
> + directories: RwLock<HashMap<String, Vec<String>>>,
> + /// Metadata stored as path -> TreeEntry
> + entries: RwLock<HashMap<String, TreeEntry>>,
> + /// Lock state stored as path -> (timestamp, checksum)
> + locks: RwLock<HashMap<String, (u64, [u8; 32])>>,
> + /// Version counter
> + version: AtomicU64,
> + /// Inode counter
> + next_inode: AtomicU64,
> +}
> +
> +impl MockMemDb {
> + /// Create a new empty mock database
> + pub fn new() -> Self {
> + let mut directories = HashMap::new();
> + directories.insert("/".to_string(), Vec::new());
> +
> + let mut entries = HashMap::new();
> + let now = SystemTime::now()
> + .duration_since(UNIX_EPOCH)
> + .unwrap()
> + .as_secs() as u32;
> +
> + // Create root entry
> + entries.insert(
> + "/".to_string(),
> + TreeEntry {
> + inode: ROOT_INODE,
> + parent: 0,
> + version: 0,
> + writer: 1,
> + mtime: now,
> + size: 0,
> + entry_type: DT_DIR,
> + data: Vec::new(),
> + name: String::new(),
> + },
> + );
> +
> + Self {
> + files: RwLock::new(HashMap::new()),
> + directories: RwLock::new(directories),
> + entries: RwLock::new(entries),
> + locks: RwLock::new(HashMap::new()),
> + version: AtomicU64::new(1),
> + next_inode: AtomicU64::new(ROOT_INODE + 1),
> + }
> + }
> +
> + /// Helper to check if path is a directory
> + fn is_directory(&self, path: &str) -> bool {
> + self.directories.read().contains_key(path)
> + }
> +
> + /// Helper to get parent path
> + fn parent_path(path: &str) -> Option<String> {
> + if path == "/" {
> + return None;
> + }
> + let parent = path.rsplit_once('/')?.0;
> + if parent.is_empty() {
> + Some("/".to_string())
> + } else {
> + Some(parent.to_string())
> + }
> + }
> +
> + /// Helper to get file name from path
> + fn file_name(path: &str) -> String {
> + if path == "/" {
> + return String::new();
> + }
> + path.rsplit('/').next().unwrap_or("").to_string()
> + }
> +}
> +
> +impl Default for MockMemDb {
> + fn default() -> Self {
> + Self::new()
> + }
> +}
> +
> +impl MemDbOps for MockMemDb {
> + fn create(&self, path: &str, mode: u32, mtime: u32) -> Result<()> {
> + if path.is_empty() {
> + bail!("Empty path");
> + }
> +
> + if self.entries.read().contains_key(path) {
> + bail!("File exists: {}", path);
> + }
> +
> + let is_dir = (mode & libc::S_IFMT) == libc::S_IFDIR;
> + let entry_type = if is_dir { DT_DIR } else { DT_REG };
> + let inode = self.next_inode.fetch_add(1, Ordering::SeqCst);
> +
> + // Add to parent directory
> + if let Some(parent) = Self::parent_path(path) {
> + if !self.is_directory(&parent) {
> + bail!("Parent is not a directory: {}", parent);
> + }
> + let mut dirs = self.directories.write();
> + if let Some(children) = dirs.get_mut(&parent) {
> + children.push(Self::file_name(path));
> + }
> + }
> +
> + // Create entry
> + let entry = TreeEntry {
> + inode,
> + parent: 0, // Simplified
> + version: self.version.load(Ordering::SeqCst),
> + writer: 1,
> + mtime,
> + size: 0,
> + entry_type,
> + data: Vec::new(),
> + name: Self::file_name(path),
> + };
> +
> + self.entries.write().insert(path.to_string(), entry);
> +
> + if is_dir {
> + self.directories
> + .write()
> + .insert(path.to_string(), Vec::new());
> + } else {
> + self.files.write().insert(path.to_string(), Vec::new());
> + }
> +
> + self.version.fetch_add(1, Ordering::SeqCst);
> + Ok(())
> + }
> +
> + fn read(&self, path: &str, offset: u64, size: usize) -> Result<Vec<u8>> {
> + let files = self.files.read();
> + let data = files
> + .get(path)
> + .ok_or_else(|| anyhow::anyhow!("File not found: {}", path))?;
> +
> + let offset = offset as usize;
> + if offset >= data.len() {
> + return Ok(Vec::new());
> + }
> +
> + let end = std::cmp::min(offset + size, data.len());
> + Ok(data[offset..end].to_vec())
> + }
> +
> + fn write(
> + &self,
> + path: &str,
> + offset: u64,
> + mtime: u32,
> + data: &[u8],
> + truncate: bool,
> + ) -> Result<usize> {
> + let mut files = self.files.write();
> + let file_data = files
> + .get_mut(path)
> + .ok_or_else(|| anyhow::anyhow!("File not found: {}", path))?;
> +
> + let offset = offset as usize;
> +
> + if truncate {
> + file_data.clear();
> + }
> +
> + // Expand if needed
> + if offset + data.len() > file_data.len() {
> + file_data.resize(offset + data.len(), 0);
> + }
> +
> + file_data[offset..offset + data.len()].copy_from_slice(data);
> +
> + // Update entry
> + if let Some(entry) = self.entries.write().get_mut(path) {
> + entry.mtime = mtime;
> + entry.size = file_data.len();
> + }
> +
> + self.version.fetch_add(1, Ordering::SeqCst);
> + Ok(data.len())
> + }
> +
> + fn delete(&self, path: &str) -> Result<()> {
> + if !self.entries.read().contains_key(path) {
> + bail!("File not found: {}", path);
> + }
> +
> + // Check if directory is empty
> + if let Some(children) = self.directories.read().get(path) {
> + if !children.is_empty() {
> + bail!("Directory not empty: {}", path);
> + }
> + }
> +
> + self.entries.write().remove(path);
> + self.files.write().remove(path);
> + self.directories.write().remove(path);
> +
> + // Remove from parent
> + if let Some(parent) = Self::parent_path(path) {
> + if let Some(children) = self.directories.write().get_mut(&parent) {
> + children.retain(|name| name != &Self::file_name(path));
> + }
> + }
> +
> + self.version.fetch_add(1, Ordering::SeqCst);
> + Ok(())
> + }
> +
> + fn rename(&self, old_path: &str, new_path: &str) -> Result<()> {
> + // Check existence first with read locks (released immediately)
> + {
> + let entries = self.entries.read();
> + if !entries.contains_key(old_path) {
> + bail!("Source not found: {}", old_path);
> + }
> + if entries.contains_key(new_path) {
> + bail!("Destination already exists: {}", new_path);
> + }
> + }
We currently don't update parent children lists.
Also, if rename() can be used for directories: we likely need to
rewrite/move all descendant keys (/old/... -> /new/...) across
entries/files/directories to keep the tree consistent.
> +
> + // Move entry - hold write lock for entire operation
> + {
> + let mut entries = self.entries.write();
> + if let Some(mut entry) = entries.remove(old_path) {
> + entry.name = Self::file_name(new_path);
> + entries.insert(new_path.to_string(), entry);
> + }
> + }
Between the read and write lock we have a TOCTOU.
Coudlnt we just hold the write lock?
> +
> + // Move file data - hold write lock for entire operation
> + {
> + let mut files = self.files.write();
> + if let Some(data) = files.remove(old_path) {
> + files.insert(new_path.to_string(), data);
> + }
> + }
> +
> + // Move directory - hold write lock for entire operation
> + {
> + let mut directories = self.directories.write();
> + if let Some(children) = directories.remove(old_path) {
> + directories.insert(new_path.to_string(), children);
> + }
> + }
> +
> + self.version.fetch_add(1, Ordering::SeqCst);
> + Ok(())
> + }
> +
> + fn exists(&self, path: &str) -> Result<bool> {
> + Ok(self.entries.read().contains_key(path))
> + }
> +
> + fn readdir(&self, path: &str) -> Result<Vec<TreeEntry>> {
> + let directories = self.directories.read();
> + let children = directories
> + .get(path)
> + .ok_or_else(|| anyhow::anyhow!("Not a directory: {}", path))?;
> +
> + let entries = self.entries.read();
> + let mut result = Vec::new();
> +
> + for child_name in children {
> + let child_path = if path == "/" {
> + format!("/{}", child_name)
> + } else {
> + format!("{}/{}", path, child_name)
> + };
> +
> + if let Some(entry) = entries.get(&child_path) {
> + result.push(entry.clone());
> + }
> + }
> +
> + Ok(result)
> + }
> +
> + fn set_mtime(&self, path: &str, _writer: u32, mtime: u32) -> Result<()> {
> + let mut entries = self.entries.write();
> + let entry = entries
> + .get_mut(path)
> + .ok_or_else(|| anyhow::anyhow!("File not found: {}", path))?;
> + entry.mtime = mtime;
> + Ok(())
> + }
> +
> + fn lookup_path(&self, path: &str) -> Option<TreeEntry> {
> + self.entries.read().get(path).cloned()
> + }
> +
> + fn get_entry_by_inode(&self, inode: u64) -> Option<TreeEntry> {
> + self.entries
> + .read()
> + .values()
> + .find(|e| e.inode == inode)
> + .cloned()
> + }
> +
> + fn acquire_lock(&self, path: &str, csum: &[u8; 32]) -> Result<()> {
> + let mut locks = self.locks.write();
> + let now = SystemTime::now()
> + .duration_since(UNIX_EPOCH)
> + .unwrap()
> + .as_secs();
> +
> + if let Some((timestamp, existing_csum)) = locks.get(path) {
> + // Check if expired
> + if now - timestamp > 120 {
nit: magic number here, could we use a
const LOCK_TIMEOUT_SECS: u64 = 120; for example?
> + // Expired, can acquire
> + locks.insert(path.to_string(), (now, *csum));
> + return Ok(());
> + }
> +
> + // Not expired, check if same checksum (refresh)
> + if existing_csum == csum {
> + locks.insert(path.to_string(), (now, *csum));
> + return Ok(());
> + }
> +
> + bail!("Lock already held with different checksum");
> + }
> +
> + locks.insert(path.to_string(), (now, *csum));
> + Ok(())
> + }
> +
> + fn release_lock(&self, path: &str, csum: &[u8; 32]) -> Result<()> {
> + let mut locks = self.locks.write();
> + if let Some((_, existing_csum)) = locks.get(path) {
> + if existing_csum == csum {
> + locks.remove(path);
> + return Ok(());
> + }
> + bail!("Lock checksum mismatch");
> + }
> + bail!("No lock found");
> + }
> +
> + fn is_locked(&self, path: &str) -> bool {
> + if let Some((timestamp, _)) = self.locks.read().get(path) {
> + let now = SystemTime::now()
> + .duration_since(UNIX_EPOCH)
> + .unwrap()
> + .as_secs();
> + now - timestamp <= 120
> + } else {
> + false
> + }
> + }
> +
> + fn lock_expired(&self, path: &str, csum: &[u8; 32]) -> bool {
> + if let Some((timestamp, existing_csum)) = self.locks.read().get(path).cloned() {
> + let now = SystemTime::now()
> + .duration_since(UNIX_EPOCH)
> + .unwrap()
> + .as_secs();
> +
> + // Checksum mismatch - reset timeout
> + if &existing_csum != csum {
> + self.locks.write().insert(path.to_string(), (now, *csum));
can we please document this, why we are modifying state when
checksums mismatch?
> + return false;
> + }
> +
> + // Check expiration
> + now - timestamp > 120
> + } else {
> + false
> + }
> + }
> +
> + fn get_version(&self) -> u64 {
> + self.version.load(Ordering::SeqCst)
> + }
> +
> + fn get_all_entries(&self) -> Result<Vec<TreeEntry>> {
> + Ok(self.entries.read().values().cloned().collect())
> + }
> +
> + fn replace_all_entries(&self, entries: Vec<TreeEntry>) -> Result<()> {
Also replace_all_entries() / apply_tree_entry() don’t rebuild parent
directories[..] children lists
> + self.entries.write().clear();
Clears entries, so the root TreeEntry ("/") should be reinserted to
preserve invariants not? (similar to directories below).
> + self.files.write().clear();
> + self.directories.write().clear();
Clearing directories removes "/" but doesn’t reinsert "/"
If possible, we could acquire all write locks once (in the right order)
before the loop
> +
> + for entry in entries {
> + let path = format!("/{}", entry.name); // Simplified
> + self.entries.write().insert(path.clone(), entry.clone());
> +
> + if entry.size > 0 {
Use entry.entry_type == DT_DIR to distinguish directories from files.
The current entry.size > 0 check incorrectly classifies empty files
(size 0) as directories.
> + self.files.write().insert(path, entry.data.clone());
> + } else {
> + self.directories.write().insert(path, Vec::new());
> + }
> + }
> +
> + self.version.fetch_add(1, Ordering::SeqCst);
> + Ok(())
> + }
> +
> + fn apply_tree_entry(&self, entry: TreeEntry) -> Result<()> {
> + let path = format!("/{}", entry.name); // Simplified
> + self.entries.write().insert(path.clone(), entry.clone());
> +
> + if entry.size > 0 {
also here please use entry.entry_type == DT_DIR
> + self.files.write().insert(path, entry.data.clone());
> + }
> +
> + self.version.fetch_add(1, Ordering::SeqCst);
> + Ok(())
> + }
> +
> + fn encode_database(&self) -> Result<Vec<u8>> {
> + // Simplified - just return empty vec
> + Ok(Vec::new())
> + }
> +
> + fn compute_database_checksum(&self) -> Result<[u8; 32]> {
> + // Simplified - return deterministic checksum based on version
> + let version = self.version.load(Ordering::SeqCst);
> + let mut checksum = [0u8; 32];
> + checksum[0..8].copy_from_slice(&version.to_le_bytes());
> + Ok(checksum)
> + }
> +}
> +
> +#[cfg(test)]
> +mod tests {
> + use super::*;
> + use std::sync::Arc;
> +
> + #[test]
> + fn test_mock_memdb_basic_operations() {
> + let db = MockMemDb::new();
> +
> + // Create file
> + db.create("/test.txt", libc::S_IFREG, 1234).unwrap();
> + assert!(db.exists("/test.txt").unwrap());
> +
> + // Write data
> + let data = b"Hello, MockMemDb!";
> + db.write("/test.txt", 0, 1235, data, false).unwrap();
> +
> + // Read data
> + let read_data = db.read("/test.txt", 0, 100).unwrap();
> + assert_eq!(&read_data[..], data);
> +
> + // Check entry
> + let entry = db.lookup_path("/test.txt").unwrap();
> + assert_eq!(entry.size, data.len());
> + assert_eq!(entry.mtime, 1235);
> + }
> +
> + #[test]
> + fn test_mock_memdb_directory_operations() {
> + let db = MockMemDb::new();
> +
> + // Create directory
> + db.create("/mydir", libc::S_IFDIR, 1000).unwrap();
> + assert!(db.exists("/mydir").unwrap());
> +
> + // Create file in directory
> + db.create("/mydir/file.txt", libc::S_IFREG, 1001).unwrap();
> +
> + // Read directory
> + let entries = db.readdir("/mydir").unwrap();
> + assert_eq!(entries.len(), 1);
> + assert_eq!(entries[0].name, "file.txt");
> + }
> +
> + #[test]
> + fn test_mock_memdb_lock_operations() {
> + let db = MockMemDb::new();
> + let csum1 = [1u8; 32];
> + let csum2 = [2u8; 32];
> +
> + // Acquire lock
> + db.acquire_lock("/priv/lock/resource", &csum1).unwrap();
> + assert!(db.is_locked("/priv/lock/resource"));
> +
> + // Lock with same checksum should succeed (refresh)
> + assert!(db.acquire_lock("/priv/lock/resource", &csum1).is_ok());
> +
> + // Lock with different checksum should fail
> + assert!(db.acquire_lock("/priv/lock/resource", &csum2).is_err());
> +
> + // Release lock
> + db.release_lock("/priv/lock/resource", &csum1).unwrap();
> + assert!(!db.is_locked("/priv/lock/resource"));
> +
> + // Can acquire with different checksum now
> + db.acquire_lock("/priv/lock/resource", &csum2).unwrap();
> + assert!(db.is_locked("/priv/lock/resource"));
> + }
> +
> + #[test]
> + fn test_mock_memdb_rename() {
> + let db = MockMemDb::new();
> +
> + // Create file
> + db.create("/old.txt", libc::S_IFREG, 1000).unwrap();
> + db.write("/old.txt", 0, 1001, b"content", false).unwrap();
> +
> + // Rename
> + db.rename("/old.txt", "/new.txt").unwrap();
> +
> + // Old path should not exist
> + assert!(!db.exists("/old.txt").unwrap());
> +
> + // New path should exist with same content
> + assert!(db.exists("/new.txt").unwrap());
> + let data = db.read("/new.txt", 0, 100).unwrap();
> + assert_eq!(&data[..], b"content");
> + }
> +
> + #[test]
> + fn test_mock_memdb_delete() {
> + let db = MockMemDb::new();
> +
> + // Create and delete file
> + db.create("/delete-me.txt", libc::S_IFREG, 1000).unwrap();
> + assert!(db.exists("/delete-me.txt").unwrap());
> +
> + db.delete("/delete-me.txt").unwrap();
> + assert!(!db.exists("/delete-me.txt").unwrap());
> +
> + // Delete non-existent file should fail
> + assert!(db.delete("/nonexistent.txt").is_err());
> + }
> +
> + #[test]
> + fn test_mock_memdb_version_tracking() {
> + let db = MockMemDb::new();
> + let initial_version = db.get_version();
> +
> + // Version should increment on modifications
> + db.create("/file1.txt", libc::S_IFREG, 1000).unwrap();
> + assert!(db.get_version() > initial_version);
> +
> + let v1 = db.get_version();
> + db.write("/file1.txt", 0, 1001, b"data", false).unwrap();
> + assert!(db.get_version() > v1);
> +
> + let v2 = db.get_version();
> + db.delete("/file1.txt").unwrap();
> + assert!(db.get_version() > v2);
> + }
> +
> + #[test]
> + fn test_mock_memdb_isolation() {
> + // Each MockMemDb instance is completely isolated
> + let db1 = MockMemDb::new();
> + let db2 = MockMemDb::new();
> +
> + db1.create("/test.txt", libc::S_IFREG, 1000).unwrap();
> +
> + // db2 should not see db1's files
> + assert!(db1.exists("/test.txt").unwrap());
> + assert!(!db2.exists("/test.txt").unwrap());
> + }
> +
> + #[test]
> + fn test_mock_memdb_as_trait_object() {
> + // Demonstrate using MockMemDb through trait object
> + let db: Arc<dyn MemDbOps> = Arc::new(MockMemDb::new());
> +
> + db.create("/trait-test.txt", libc::S_IFREG, 2000).unwrap();
> + assert!(db.exists("/trait-test.txt").unwrap());
> +
> + db.write("/trait-test.txt", 0, 2001, b"via trait", false)
> + .unwrap();
> + let data = db.read("/trait-test.txt", 0, 100).unwrap();
> + assert_eq!(&data[..], b"via trait");
> + }
> +
> + #[test]
> + fn test_mock_memdb_error_cases() {
> + let db = MockMemDb::new();
> +
> + // Create duplicate should fail
> + db.create("/dup.txt", libc::S_IFREG, 1000).unwrap();
> + assert!(db.create("/dup.txt", libc::S_IFREG, 1000).is_err());
> +
> + // Read non-existent file should fail
> + assert!(db.read("/nonexistent.txt", 0, 100).is_err());
> +
> + // Write to non-existent file should fail
> + assert!(
> + db.write("/nonexistent.txt", 0, 1000, b"data", false)
> + .is_err()
> + );
> +
> + // Empty path should fail
> + assert!(db.create("", libc::S_IFREG, 1000).is_err());
> + }
> +}
next prev parent reply other threads:[~2026-02-03 17:03 UTC|newest]
Thread overview: 24+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-01-06 14:24 [pve-devel] [PATCH pve-cluster 00/15 v1] Rewrite pmxcfs with Rust Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 01/15] pmxcfs-rs: add workspace and pmxcfs-api-types crate Kefu Chai
2026-01-23 14:17 ` Samuel Rufinatscha
2026-01-26 9:00 ` Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 02/15] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-01-23 15:01 ` Samuel Rufinatscha
2026-01-26 9:43 ` Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 03/15] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-01-27 13:16 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 04/15] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-01-29 14:44 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 05/15] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-01-30 15:35 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 06/15] pmxcfs-rs: add pmxcfs-status crate Kefu Chai
2026-02-02 16:07 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 07/15] pmxcfs-rs: add pmxcfs-test-utils infrastructure crate Kefu Chai
2026-02-03 17:03 ` Samuel Rufinatscha [this message]
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 08/15] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 09/15] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 10/15] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 11/15] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 13/15] pmxcfs-rs: add integration and workspace tests Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 14/15] pmxcfs-rs: add Makefile for build automation Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 15/15] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=8c0d71c9-63b8-4f16-9298-5be7db393f9a@proxmox.com \
--to=s.rufinatscha@proxmox.com \
--cc=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
--cc=tchaikov@gmail.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox