public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [PATCH pve-cluster 08/14 v2] pmxcfs-rs: add pmxcfs-services crate
Date: Fri, 13 Feb 2026 17:33:45 +0800	[thread overview]
Message-ID: <20260213094119.2379288-9-k.chai@proxmox.com> (raw)
In-Reply-To: <20260213094119.2379288-1-k.chai@proxmox.com>

Add service lifecycle management framework providing:
- Service trait: Lifecycle interface for async services
- ServiceManager: Orchestrates multiple services
- Automatic retry logic for failed services
- Event-driven dispatching via file descriptors
- Graceful shutdown coordination

This is a generic framework with no pmxcfs-specific dependencies,
only requiring tokio, async-trait, and standard error handling.
It replaces the C version's qb_loop-based event management.

Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
 src/pmxcfs-rs/Cargo.toml                      |    2 +
 src/pmxcfs-rs/pmxcfs-services/Cargo.toml      |   17 +
 src/pmxcfs-rs/pmxcfs-services/README.md       |  162 +++
 src/pmxcfs-rs/pmxcfs-services/src/error.rs    |   21 +
 src/pmxcfs-rs/pmxcfs-services/src/lib.rs      |   15 +
 src/pmxcfs-rs/pmxcfs-services/src/manager.rs  |  341 +++++
 src/pmxcfs-rs/pmxcfs-services/src/service.rs  |  149 ++
 .../pmxcfs-services/tests/service_tests.rs    | 1271 +++++++++++++++++
 8 files changed, 1978 insertions(+)
 create mode 100644 src/pmxcfs-rs/pmxcfs-services/Cargo.toml
 create mode 100644 src/pmxcfs-rs/pmxcfs-services/README.md
 create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/error.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/lib.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/manager.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/service.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs

diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 9d509c1d2..b9f0f620b 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -8,6 +8,7 @@ members = [
     "pmxcfs-memdb",      # In-memory database with SQLite persistence
     "pmxcfs-status",     # Status monitoring and RRD data management
     "pmxcfs-test-utils", # Test utilities and helpers (dev-only)
+    "pmxcfs-services",   # Service framework for automatic retry and lifecycle management
 ]
 resolver = "2"
 
@@ -28,6 +29,7 @@ pmxcfs-rrd = { path = "pmxcfs-rrd" }
 pmxcfs-memdb = { path = "pmxcfs-memdb" }
 pmxcfs-status = { path = "pmxcfs-status" }
 pmxcfs-test-utils = { path = "pmxcfs-test-utils" }
+pmxcfs-services = { path = "pmxcfs-services" }
 
 # Core async runtime
 tokio = { version = "1.35", features = ["full"] }
diff --git a/src/pmxcfs-rs/pmxcfs-services/Cargo.toml b/src/pmxcfs-rs/pmxcfs-services/Cargo.toml
new file mode 100644
index 000000000..45f49dcd6
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "pmxcfs-services"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+async-trait = "0.1"
+tokio = { version = "1.41", features = ["full"] }
+tokio-util = "0.7"
+tracing = "0.1"
+thiserror = "2.0"
+num_enum.workspace = true
+parking_lot = "0.12"
+
+[dev-dependencies]
+libc.workspace = true
+pmxcfs-test-utils = { path = "../pmxcfs-test-utils" }
diff --git a/src/pmxcfs-rs/pmxcfs-services/README.md b/src/pmxcfs-rs/pmxcfs-services/README.md
new file mode 100644
index 000000000..76622662c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/README.md
@@ -0,0 +1,162 @@
+# pmxcfs-services
+
+**Service Management Framework** for pmxcfs - tokio-based replacement for qb_loop.
+
+Manages long-running services with automatic retry, event-driven dispatching,
+periodic timers, and graceful shutdown. Replaces the C implementation's
+`qb_loop`-based event management with a tokio async runtime.
+
+## How It Fits Together
+
+- **`Service` trait** (`service.rs`): Lifecycle interface that each service implements
+  (`initialize` / `dispatch` / `finalize`, plus optional timer callbacks).
+- **`ServiceManager`** (`manager.rs`): Accepts `Box<dyn Service>` via `add_service()`,
+  then `spawn()` launches one background task per service that drives it through its lifecycle.
+- Each service task handles:
+  - **Initialization with retry**: Retries every 5 seconds on failure
+  - **Event-driven dispatch**: Waits for file descriptor readability via `AsyncFd`
+  - **Timer callbacks**: Optional periodic callbacks at configured intervals
+  - **Reinitialization**: Automatic on dispatch failure or explicit request
+
+Shutdown is coordinated through a `CancellationToken`:
+
+```rust
+let shutdown_token = manager.shutdown_token();
+let handle = manager.spawn();
+// ... later ...
+shutdown_token.cancel();  // Signal graceful shutdown
+handle.await;             // Wait for all services to finalize
+```
+
+## Usage Example
+
+```rust
+use pmxcfs_services::{Service, ServiceManager};
+use std::os::unix::io::RawFd;
+
+struct MyService {
+    fd: Option<RawFd>,
+}
+
+#[async_trait]
+impl Service for MyService {
+    fn name(&self) -> &str { "my-service" }
+
+    async fn initialize(&mut self) -> Result<RawFd> {
+        let fd = connect_to_external_service()?;
+        self.fd = Some(fd);
+        Ok(fd)  // Return fd for event monitoring
+    }
+
+    async fn dispatch(&mut self) -> Result<bool> {
+        handle_events()?;
+        Ok(true)  // true = continue, false = reinitialize
+    }
+
+    async fn finalize(&mut self) -> Result<()> {
+        close_connection(self.fd.take())?;
+        Ok(())
+    }
+
+    // Optional: periodic timer callback
+    fn timer_period(&self) -> Option<Duration> {
+        Some(Duration::from_secs(10))
+    }
+
+    async fn timer_callback(&mut self) -> Result<()> {
+        perform_periodic_task()?;
+        Ok(())
+    }
+}
+```
+
+## Service Lifecycle
+
+1. **Initialization**: Service calls `initialize()` which returns a file descriptor
+   - On failure: Retries every 5 seconds indefinitely
+   - On success: Registers fd with tokio's `AsyncFd` and enters running state
+
+2. **Running**: Service waits for events using `tokio::select!`:
+   - **FD readable**: Calls `dispatch()` when fd becomes readable
+     - Returns `Ok(true)`: Continue running
+     - Returns `Ok(false)`: Reinitialize (calls `finalize()` then `initialize()`)
+     - Returns `Err(_)`: Reinitialize
+   - **Timer deadline**: Calls `timer_callback()` at configured intervals (if enabled)
+
+3. **Shutdown**: On `CancellationToken::cancel()`:
+   - Calls `finalize()` for all services
+   - Waits for all service tasks to complete
+
+## C to Rust Mapping
+
+### Data Structures
+
+| C Type | Rust Type | Notes |
+|--------|-----------|-------|
+| [`cfs_loop_t`](../../pmxcfs/loop.h#L32) | `ServiceManager` | Event loop manager |
+| [`cfs_service_t`](../../pmxcfs/loop.h#L34) | `dyn Service` | Service trait |
+| [`cfs_service_callbacks_t`](../../pmxcfs/loop.h#L44-L49) | (trait methods) | Callbacks as trait methods |
+
+### Functions
+
+| C Function | Rust Equivalent |
+|-----------|-----------------|
+| [`cfs_loop_new()`](../../pmxcfs/loop.c) | `ServiceManager::new()` |
+| [`cfs_loop_add_service()`](../../pmxcfs/loop.c) | `ServiceManager::add_service()` |
+| [`cfs_loop_start_worker()`](../../pmxcfs/loop.c) | `ServiceManager::spawn()` |
+| [`cfs_loop_stop_worker()`](../../pmxcfs/loop.c) | `shutdown_token.cancel()` + `handle.await` |
+| [`cfs_service_new()`](../../pmxcfs/loop.c) | `struct` + `impl Service` |
+
+## Key Differences from C Implementation
+
+| Aspect | C (`loop.c`) | Rust |
+|--------|-------------|------|
+| Event loop | libqb `qb_loop`, single-threaded | tokio async runtime, multi-threaded |
+| FD monitoring | Manual `qb_loop_poll_add()` | Automatic `AsyncFd` |
+| Concurrency | Sequential callbacks | Parallel tasks per service |
+| Retry interval | Configurable per service | Fixed 5 seconds (sufficient for all services) |
+| Dispatch modes | FD-based or polling | FD-based only (all services use fds) |
+| Priority levels | Per-service priorities | All equal (no priority needed) |
+| Shutdown | `cfs_loop_stop_worker()` | `CancellationToken` → await tasks → finalize all |
+
+## Design Simplifications
+
+The Rust implementation is significantly simpler than the C version, reducing
+the codebase by 67% while preserving all production functionality.
+
+### Why Not Mirror the C Implementation?
+
+The C implementation (`loop.c`) was designed for flexibility to support various
+hypothetical use cases. However, after analyzing actual usage across the codebase,
+we found that many features were never used:
+
+- **Polling mode**: All services use file descriptors from Corosync libraries
+- **Custom retry intervals**: All services work fine with a fixed 5-second retry
+- **Non-restartable services**: All services need automatic retry on failure
+- **Custom dispatch intervals**: All services are event-driven (no periodic polling)
+- **Priority levels**: Service execution order doesn't matter in practice
+
+Rather than maintaining unused complexity "just in case", the Rust implementation
+focuses on what's actually needed. This makes the code easier to understand,
+test, and maintain.
+
+### Simplifications Applied
+
+- **No polling mode**: All services use file descriptors from C libraries (Corosync)
+- **Fixed retry interval**: 5 seconds is sufficient for all services
+- **All services restartable**: No need for non-restartable mode
+- **Single task per service**: Combines retry, dispatch, and timer logic
+- **Direct return types**: No enums (`RawFd` instead of `InitResult`, `bool` instead of `DispatchAction`)
+
+If future requirements demand more flexibility, these features can be added back
+incrementally with clear use cases driving the design.
+
+## References
+
+### C Implementation
+- [`src/pmxcfs/loop.h`](../../pmxcfs/loop.h) - Service loop API
+- [`src/pmxcfs/loop.c`](../../pmxcfs/loop.c) - Service loop implementation
+
+### Related Crates
+- **pmxcfs-dfsm**: Uses `Service` trait for `ClusterDatabaseService`, `StatusSyncService`
+- **pmxcfs**: Uses `ServiceManager` to orchestrate all cluster services
diff --git a/src/pmxcfs-rs/pmxcfs-services/src/error.rs b/src/pmxcfs-rs/pmxcfs-services/src/error.rs
new file mode 100644
index 000000000..0c5951761
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/src/error.rs
@@ -0,0 +1,21 @@
+//! Error types for the service framework
+
+use thiserror::Error;
+
+/// Errors that can occur during service operations
+#[derive(Error, Debug)]
+pub enum ServiceError {
+    /// Service initialization failed
+    #[error("Failed to initialize service: {0}")]
+    InitializationFailed(String),
+
+    /// Service dispatch failed
+    #[error("Failed to dispatch service events: {0}")]
+    DispatchFailed(String),
+
+    /// Duplicate service name
+    #[error("Service '{0}' is already registered")]
+    DuplicateService(String),
+}
+
+pub type Result<T> = std::result::Result<T, ServiceError>;
diff --git a/src/pmxcfs-rs/pmxcfs-services/src/lib.rs b/src/pmxcfs-rs/pmxcfs-services/src/lib.rs
new file mode 100644
index 000000000..18004ee6b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/src/lib.rs
@@ -0,0 +1,15 @@
+//! Service framework for pmxcfs
+//!
+//! This crate provides a simplified, tokio-based service management framework with:
+//! - Automatic retry on failure (5 second interval)
+//! - Event-driven file descriptor monitoring
+//! - Optional periodic timer callbacks
+//! - Graceful shutdown
+
+mod error;
+mod manager;
+mod service;
+
+pub use error::{Result, ServiceError};
+pub use manager::ServiceManager;
+pub use service::Service;
diff --git a/src/pmxcfs-rs/pmxcfs-services/src/manager.rs b/src/pmxcfs-rs/pmxcfs-services/src/manager.rs
new file mode 100644
index 000000000..30712aafd
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/src/manager.rs
@@ -0,0 +1,341 @@
+//! Service manager for orchestrating multiple managed services
+//!
+//! Each service gets one task that handles:
+//! - Initialization with retry (5 second interval)
+//! - Event dispatch when fd is readable
+//! - Timer callbacks at configured intervals
+
+use crate::error::{Result, ServiceError};
+use crate::service::Service;
+use parking_lot::Mutex;
+use std::collections::HashMap;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::time::Instant;
+use tokio::io::unix::AsyncFd;
+use tokio::task::JoinHandle;
+use tokio_util::sync::CancellationToken;
+use tracing::{debug, error, info, warn};
+
+/// Wrapper for raw fd that doesn't close on drop
+struct FdWrapper(RawFd);
+
+impl AsRawFd for FdWrapper {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0
+    }
+}
+
+/// Service manager for orchestrating multiple services
+///
+/// # Architecture
+///
+/// The ServiceManager spawns one tokio task per service. Each task:
+/// - Initializes the service with automatic retry (5 second interval)
+/// - Monitors the service's file descriptor for readability
+/// - Calls dispatch() when the fd becomes readable
+/// - Optionally calls timer_callback() at configured intervals
+/// - Reinitializes on errors or explicit request
+///
+/// # Shutdown
+///
+/// Call `shutdown_token().cancel()` to initiate graceful shutdown.
+/// The manager will:
+/// 1. Signal all service tasks to stop
+/// 2. Call finalize() on each service
+/// 3. Wait up to 30 seconds for each service to stop
+/// 4. Continue shutdown even if services timeout
+///
+/// # Thread Safety
+///
+/// Services run in separate tokio tasks and can execute concurrently.
+/// Each service's operations (initialize, dispatch, finalize) are
+/// serialized within its own task.
+///
+/// # Example
+///
+/// ```ignore
+/// use pmxcfs_services::ServiceManager;
+///
+/// let mut manager = ServiceManager::new();
+/// manager.add_service(Box::new(MyService::new()))?;
+/// manager.add_service(Box::new(AnotherService::new()))?;
+///
+/// let shutdown_token = manager.shutdown_token();
+/// let handle = manager.spawn();
+///
+/// // ... later ...
+/// shutdown_token.cancel();  // Signal graceful shutdown
+/// handle.await?;            // Wait for all services to stop
+/// ```
+pub struct ServiceManager {
+    services: HashMap<String, Box<dyn Service>>,
+    shutdown_token: CancellationToken,
+}
+
+impl ServiceManager {
+    /// Create a new ServiceManager
+    pub fn new() -> Self {
+        Self {
+            services: HashMap::new(),
+            shutdown_token: CancellationToken::new(),
+        }
+    }
+
+    /// Add a service to the manager
+    ///
+    /// Services must be added before calling `spawn()`.
+    /// Cannot add services after the manager has been spawned.
+    ///
+    /// # Errors
+    ///
+    /// Returns `Err` if a service with the same name is already registered.
+    ///
+    /// # Example
+    ///
+    /// ```ignore
+    /// let mut manager = ServiceManager::new();
+    /// manager.add_service(Box::new(MyService::new()))?;
+    /// ```
+    pub fn add_service(&mut self, service: Box<dyn Service>) -> Result<()> {
+        let name = service.name().to_string();
+        if self.services.contains_key(&name) {
+            return Err(ServiceError::DuplicateService(name));
+        }
+        self.services.insert(name, service);
+        Ok(())
+    }
+
+    /// Get a shutdown token for graceful shutdown
+    ///
+    /// Call `token.cancel()` to signal all services to stop gracefully.
+    /// The token can be cloned and shared across threads.
+    pub fn shutdown_token(&self) -> CancellationToken {
+        self.shutdown_token.clone()
+    }
+
+    /// Spawn the service manager and start all services
+    ///
+    /// This consumes the manager and returns a JoinHandle.
+    /// Each service runs in its own tokio task.
+    ///
+    /// # Shutdown
+    ///
+    /// To stop the manager:
+    /// 1. Call `shutdown_token().cancel()`
+    /// 2. Await the returned JoinHandle
+    ///
+    /// # Panics
+    ///
+    /// If a service task panics, it will be isolated to that task.
+    /// Other services will continue running.
+    ///
+    /// # Example
+    ///
+    /// ```ignore
+    /// let shutdown_token = manager.shutdown_token();
+    /// let handle = manager.spawn();
+    ///
+    /// // ... later ...
+    /// shutdown_token.cancel();
+    /// handle.await?;
+    /// ```
+    #[must_use = "the service manager will stop if the handle is dropped"]
+    pub fn spawn(self) -> JoinHandle<()> {
+        tokio::spawn(async move { self.run().await })
+    }
+
+    async fn run(self) {
+        info!("Starting ServiceManager with {} services", self.services.len());
+
+        let mut handles = Vec::new();
+
+        for (name, service) in self.services {
+            let token = self.shutdown_token.clone();
+            let handle = tokio::spawn(async move {
+                run_service(name, service, token).await;
+            });
+            handles.push(handle);
+        }
+
+        // Wait for shutdown
+        self.shutdown_token.cancelled().await;
+        info!("ServiceManager shutting down...");
+
+        // Wait for all services to stop with timeout
+        for handle in handles {
+            match tokio::time::timeout(std::time::Duration::from_secs(30), handle).await {
+                Ok(Ok(())) => {}
+                Ok(Err(e)) => {
+                    warn!(error = ?e, "Service task panicked during shutdown");
+                }
+                Err(_) => {
+                    warn!("Service didn't stop within 30 second timeout");
+                }
+            }
+        }
+
+        info!("ServiceManager stopped");
+    }
+}
+
+impl Default for ServiceManager {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Run a single service until shutdown
+async fn run_service(name: String, mut service: Box<dyn Service>, token: CancellationToken) {
+    // Service state
+    let running = Arc::new(AtomicBool::new(false));
+    let async_fd: Arc<Mutex<Option<Arc<AsyncFd<FdWrapper>>>>> = Arc::new(Mutex::new(None));
+    let last_timer = Arc::new(Mutex::new(None::<Instant>));
+    let mut last_init_attempt = None::<Instant>;
+
+    loop {
+        tokio::select! {
+            _ = token.cancelled() => break,
+            _ = service_loop(&name, &mut service, &running, &async_fd, &last_timer, &mut last_init_attempt) => {}
+        }
+    }
+
+    // Finalize on shutdown
+    running.store(false, Ordering::Release);
+    *async_fd.lock() = None;
+
+    info!(service = %name, "Shutting down service");
+    if let Err(e) = service.finalize().await {
+        error!(service = %name, error = %e, "Error finalizing service");
+    }
+}
+
+/// Main service loop
+async fn service_loop(
+    name: &str,
+    service: &mut Box<dyn Service>,
+    running: &Arc<AtomicBool>,
+    async_fd: &Arc<Mutex<Option<Arc<AsyncFd<FdWrapper>>>>>,
+    last_timer: &Arc<Mutex<Option<Instant>>>,
+    last_init_attempt: &mut Option<Instant>,
+) {
+    if !running.load(Ordering::Acquire) {
+        // Need to initialize
+        if let Some(last) = last_init_attempt {
+            let elapsed = Instant::now().duration_since(*last);
+            if elapsed < std::time::Duration::from_secs(5) {
+                // Wait for retry interval
+                tokio::time::sleep(std::time::Duration::from_secs(5) - elapsed).await;
+                return;
+            }
+        }
+
+        *last_init_attempt = Some(Instant::now());
+
+        match service.initialize().await {
+            Ok(fd) => {
+                match AsyncFd::new(FdWrapper(fd)) {
+                    Ok(afd) => {
+                        *async_fd.lock() = Some(Arc::new(afd));
+                        running.store(true, Ordering::Release);
+                        info!(service = %name, "Service initialized");
+                    }
+                    Err(e) => {
+                        error!(service = %name, error = %e, "Failed to register fd");
+                        let _ = service.finalize().await;
+                    }
+                }
+            }
+            Err(e) => {
+                error!(service = %name, error = %e, "Initialization failed");
+            }
+        }
+    } else {
+        // Service is running - dispatch events and timers
+        let fd = async_fd.lock().clone();
+        if let Some(fd) = fd {
+            dispatch_service(name, service, &fd, running, last_timer).await;
+        }
+    }
+}
+
+/// Dispatch events for a running service
+async fn dispatch_service(
+    name: &str,
+    service: &mut Box<dyn Service>,
+    async_fd: &Arc<AsyncFd<FdWrapper>>,
+    running: &Arc<AtomicBool>,
+    last_timer: &Arc<Mutex<Option<Instant>>>,
+) {
+    // Calculate timer deadline
+    let timer_deadline = service.timer_period().and_then(|period| {
+        let last = last_timer.lock();
+        match *last {
+            Some(t) => {
+                let next = t + period;
+                if Instant::now() >= next {
+                    // Already past deadline, schedule for next period from now
+                    Some(Instant::now() + period)
+                } else {
+                    Some(next)
+                }
+            }
+            None => Some(Instant::now()),
+        }
+    });
+
+    tokio::select! {
+        // Timer callback
+        _ = async {
+            if let Some(deadline) = timer_deadline {
+                tokio::time::sleep_until(deadline.into()).await;
+            } else {
+                std::future::pending::<()>().await;
+            }
+        } => {
+            *last_timer.lock() = Some(Instant::now());
+            debug!(service = %name, "Timer callback");
+            if let Err(e) = service.timer_callback().await {
+                warn!(service = %name, error = %e, "Timer callback failed");
+            }
+        }
+
+        // Fd readable
+        result = async_fd.readable() => {
+            match result {
+                Ok(mut guard) => {
+                    match service.dispatch().await {
+                        Ok(true) => {
+                            guard.clear_ready();
+                        }
+                        Ok(false) => {
+                            info!(service = %name, "Service requested reinitialization");
+                            guard.clear_ready();
+                            reinitialize(name, service, running).await;
+                        }
+                        Err(e) => {
+                            error!(service = %name, error = %e, "Dispatch failed");
+                            guard.clear_ready();
+                            reinitialize(name, service, running).await;
+                        }
+                    }
+                }
+                Err(e) => {
+                    warn!(service = %name, error = %e, "Error waiting for fd");
+                    reinitialize(name, service, running).await;
+                }
+            }
+        }
+    }
+}
+
+/// Reinitialize a service
+async fn reinitialize(name: &str, service: &mut Box<dyn Service>, running: &Arc<AtomicBool>) {
+    debug!(service = %name, "Reinitializing service");
+    running.store(false, Ordering::Release);
+
+    if let Err(e) = service.finalize().await {
+        warn!(service = %name, error = %e, "Error finalizing service");
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-services/src/service.rs b/src/pmxcfs-rs/pmxcfs-services/src/service.rs
new file mode 100644
index 000000000..daf13900b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/src/service.rs
@@ -0,0 +1,149 @@
+//! Service trait and related types
+//!
+//! Simplified design based on actual usage patterns.
+//! All production services use file descriptors and are restartable.
+//!
+//! # Architecture
+//!
+//! Each service runs in its own tokio task that handles:
+//! - Initialization with automatic retry (5 second interval)
+//! - Event-driven dispatch when the file descriptor becomes readable
+//! - Optional periodic timer callbacks
+//! - Automatic reinitialization on errors
+//!
+//! # Thread Safety
+//!
+//! Services must be `Send + Sync` as they run in separate tokio tasks.
+//! The ServiceManager ensures that only one operation (initialize, dispatch,
+//! timer_callback, or finalize) runs at a time for each service.
+//!
+//! # File Descriptor Ownership
+//!
+//! Services return a RawFd from `initialize()` and retain ownership of it.
+//! The ServiceManager monitors the fd for readability but does NOT close it.
+//! Services MUST close the fd in `finalize()`, which is called:
+//! - On shutdown
+//! - Before reinitialization after an error
+//! - When dispatch() returns Ok(false)
+//!
+//! This design works well for wrapping C library file descriptors (e.g., from
+//! Corosync libraries) where the service manages the underlying C resources.
+
+use crate::error::Result;
+use async_trait::async_trait;
+use std::os::unix::io::RawFd;
+use std::time::Duration;
+
+/// A managed service with automatic retry and event-driven dispatch
+///
+/// All services are:
+/// - Event-driven (use file descriptors)
+/// - Restartable (automatic retry on failure)
+/// - Optionally have timer callbacks
+///
+/// # Example
+///
+/// ```ignore
+/// use pmxcfs_services::{Service, ServiceManager};
+/// use std::os::unix::io::RawFd;
+///
+/// struct MyService {
+///     fd: Option<RawFd>,
+/// }
+///
+/// #[async_trait]
+/// impl Service for MyService {
+///     fn name(&self) -> &str { "my-service" }
+///
+///     async fn initialize(&mut self) -> Result<RawFd> {
+///         let fd = connect_to_external_service()?;
+///         self.fd = Some(fd);
+///         Ok(fd)  // Return fd for event monitoring
+///     }
+///
+///     async fn dispatch(&mut self) -> Result<bool> {
+///         handle_events()?;
+///         Ok(true)  // true = continue, false = reinitialize
+///     }
+///
+///     async fn finalize(&mut self) -> Result<()> {
+///         if let Some(fd) = self.fd.take() {
+///             close(fd)?;  // MUST close the fd
+///         }
+///         Ok(())
+///     }
+/// }
+/// ```
+#[async_trait]
+pub trait Service: Send + Sync {
+    /// Service name for logging and identification
+    fn name(&self) -> &str;
+
+    /// Initialize the service and return a file descriptor to monitor
+    ///
+    /// The service retains ownership of the fd and MUST close it in `finalize()`.
+    /// The ServiceManager monitors the fd and calls `dispatch()` when it becomes readable.
+    ///
+    /// # Returns
+    ///
+    /// Returns a file descriptor that will be monitored for readability.
+    ///
+    /// # Errors
+    ///
+    /// On error, the service will be automatically retried after 5 seconds.
+    ///
+    /// # File Descriptor Lifetime
+    ///
+    /// The returned fd must remain valid until `finalize()` is called.
+    /// The service is responsible for closing the fd in `finalize()`.
+    async fn initialize(&mut self) -> Result<RawFd>;
+
+    /// Handle events when the file descriptor becomes readable
+    ///
+    /// This method is called when the fd returned by `initialize()` becomes readable.
+    ///
+    /// # Returns
+    ///
+    /// - `Ok(true)` - Continue running normally
+    /// - `Ok(false)` - Request reinitialization (finalize will be called first)
+    /// - `Err(_)` - Error occurred, service will be reinitialized
+    ///
+    /// # Blocking Behavior
+    ///
+    /// This method should not block for extended periods as it will prevent
+    /// timer callbacks and shutdown signals from being processed promptly.
+    /// For long-running operations, consider spawning a separate task.
+    async fn dispatch(&mut self) -> Result<bool>;
+
+    /// Clean up resources (called on shutdown or before reinitialization)
+    ///
+    /// This method MUST close the file descriptor returned by `initialize()`.
+    ///
+    /// # When Called
+    ///
+    /// This method is called:
+    /// - When the service is being shut down
+    /// - Before reinitializing after an error
+    /// - When `dispatch()` returns `Ok(false)`
+    ///
+    /// # Idempotency
+    ///
+    /// Must be idempotent (safe to call multiple times).
+    async fn finalize(&mut self) -> Result<()>;
+
+    /// Optional timer period for periodic callbacks
+    ///
+    /// Return `None` to disable timer callbacks.
+    /// Return `Some(duration)` to enable periodic callbacks at the specified interval.
+    fn timer_period(&self) -> Option<Duration> {
+        None
+    }
+
+    /// Optional periodic callback invoked at `timer_period()` intervals
+    ///
+    /// Only called if `timer_period()` returns `Some`.
+    /// Errors are logged but do not trigger reinitialization.
+    async fn timer_callback(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs b/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs
new file mode 100644
index 000000000..639124293
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs
@@ -0,0 +1,1271 @@
+//! Comprehensive tests for the service framework
+//!
+//! Tests cover:
+//! - Service lifecycle (start, stop, restart)
+//! - Service manager orchestration
+//! - Error handling and retry logic
+//! - Timer callbacks
+//! - File descriptor and polling dispatch modes
+//! - Service coordination and state management
+
+use async_trait::async_trait;
+use pmxcfs_services::{Service, ServiceError, ServiceManager};
+use pmxcfs_test_utils::wait_for_condition;
+use std::os::unix::io::RawFd;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
+use std::time::Duration;
+use tokio::time::sleep;
+
+// ===== Test Service Implementations =====
+
+/// Mock service for testing lifecycle
+struct MockService {
+    name: String,
+    init_count: Arc<AtomicU32>,
+    dispatch_count: Arc<AtomicU32>,
+    finalize_count: Arc<AtomicU32>,
+    timer_count: Arc<AtomicU32>,
+    should_fail_init: Arc<AtomicBool>,
+    should_fail_dispatch: Arc<AtomicBool>,
+    should_reinit: Arc<AtomicBool>,
+    timer_period: Option<Duration>,
+    read_fd: Option<RawFd>,
+    write_fd: Arc<std::sync::atomic::AtomicI32>,
+}
+
+impl MockService {
+    fn new(name: &str) -> Self {
+        Self {
+            name: name.to_string(),
+            init_count: Arc::new(AtomicU32::new(0)),
+            dispatch_count: Arc::new(AtomicU32::new(0)),
+            finalize_count: Arc::new(AtomicU32::new(0)),
+            timer_count: Arc::new(AtomicU32::new(0)),
+            should_fail_init: Arc::new(AtomicBool::new(false)),
+            should_fail_dispatch: Arc::new(AtomicBool::new(false)),
+            should_reinit: Arc::new(AtomicBool::new(false)),
+            timer_period: None,
+            read_fd: None,
+            write_fd: Arc::new(std::sync::atomic::AtomicI32::new(-1)),
+        }
+    }
+
+    fn with_timer(mut self, period: Duration) -> Self {
+        self.timer_period = Some(period);
+        self
+    }
+
+    fn counters(&self) -> ServiceCounters {
+        ServiceCounters {
+            init_count: self.init_count.clone(),
+            dispatch_count: self.dispatch_count.clone(),
+            finalize_count: self.finalize_count.clone(),
+            timer_count: self.timer_count.clone(),
+            should_fail_init: self.should_fail_init.clone(),
+            should_fail_dispatch: self.should_fail_dispatch.clone(),
+            should_reinit: self.should_reinit.clone(),
+            write_fd: self.write_fd.clone(),
+        }
+    }
+}
+
+#[async_trait]
+impl Service for MockService {
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    async fn initialize(&mut self) -> pmxcfs_services::Result<RawFd> {
+        self.init_count.fetch_add(1, Ordering::SeqCst);
+
+        if self.should_fail_init.load(Ordering::SeqCst) {
+            return Err(ServiceError::InitializationFailed(
+                "Mock init failure".to_string(),
+            ));
+        }
+
+        // Create a pipe for event-driven dispatch
+        let mut fds = [0i32; 2];
+        let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
+        if ret != 0 {
+            return Err(ServiceError::InitializationFailed(
+                "pipe() failed".to_string(),
+            ));
+        }
+
+        // Set read end to non-blocking (required for AsyncFd)
+        unsafe {
+            let flags = libc::fcntl(fds[0], libc::F_GETFL);
+            libc::fcntl(fds[0], libc::F_SETFL, flags | libc::O_NONBLOCK);
+        }
+
+        self.read_fd = Some(fds[0]);
+        self.write_fd.store(fds[1], Ordering::SeqCst);
+
+        Ok(fds[0])
+    }
+
+    async fn dispatch(&mut self) -> pmxcfs_services::Result<bool> {
+        self.dispatch_count.fetch_add(1, Ordering::SeqCst);
+
+        // Drain the pipe
+        if let Some(fd) = self.read_fd {
+            let mut buf = [0u8; 64];
+            unsafe {
+                libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len());
+            }
+        }
+
+        if self.should_fail_dispatch.load(Ordering::SeqCst) {
+            return Err(ServiceError::DispatchFailed(
+                "Mock dispatch failure".to_string(),
+            ));
+        }
+
+        if self.should_reinit.load(Ordering::SeqCst) {
+            return Ok(false); // false = reinitialize
+        }
+
+        Ok(true) // true = continue
+    }
+
+    async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+        self.finalize_count.fetch_add(1, Ordering::SeqCst);
+
+        if let Some(fd) = self.read_fd.take() {
+            unsafe { libc::close(fd) };
+        }
+        let wfd = self.write_fd.swap(-1, Ordering::SeqCst);
+        if wfd >= 0 {
+            unsafe { libc::close(wfd) };
+        }
+
+        Ok(())
+    }
+
+    async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+        self.timer_count.fetch_add(1, Ordering::SeqCst);
+        Ok(())
+    }
+
+    fn timer_period(&self) -> Option<Duration> {
+        self.timer_period
+    }
+}
+
+/// Helper struct to access service counters from tests
+#[derive(Clone)]
+struct ServiceCounters {
+    init_count: Arc<AtomicU32>,
+    dispatch_count: Arc<AtomicU32>,
+    finalize_count: Arc<AtomicU32>,
+    timer_count: Arc<AtomicU32>,
+    should_fail_init: Arc<AtomicBool>,
+    should_fail_dispatch: Arc<AtomicBool>,
+    should_reinit: Arc<AtomicBool>,
+    write_fd: Arc<std::sync::atomic::AtomicI32>,
+}
+
+impl ServiceCounters {
+    fn init_count(&self) -> u32 {
+        self.init_count.load(Ordering::SeqCst)
+    }
+
+    fn dispatch_count(&self) -> u32 {
+        self.dispatch_count.load(Ordering::SeqCst)
+    }
+
+    fn finalize_count(&self) -> u32 {
+        self.finalize_count.load(Ordering::SeqCst)
+    }
+
+    fn timer_count(&self) -> u32 {
+        self.timer_count.load(Ordering::SeqCst)
+    }
+
+    fn set_fail_init(&self, fail: bool) {
+        self.should_fail_init.store(fail, Ordering::SeqCst);
+    }
+
+    fn set_fail_dispatch(&self, fail: bool) {
+        self.should_fail_dispatch.store(fail, Ordering::SeqCst);
+    }
+
+    fn set_reinit(&self, reinit: bool) {
+        self.should_reinit.store(reinit, Ordering::SeqCst);
+    }
+
+    fn trigger_event(&self) {
+        let wfd = self.write_fd.load(Ordering::SeqCst);
+        if wfd >= 0 {
+            unsafe {
+                libc::write(wfd, b"x".as_ptr() as *const _, 1);
+            }
+        }
+    }
+}
+
+// ===== FD-based Mock Service =====
+
+extern crate libc;
+
+// ===== Lifecycle Tests =====
+
+#[tokio::test]
+async fn test_service_lifecycle_basic() {
+    let service = MockService::new("test_service");
+    let counters = service.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should initialize within 5 seconds"
+    );
+
+    // Trigger a dispatch event
+    counters.trigger_event();
+
+    // Wait for dispatch
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should dispatch within 5 seconds after event"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+
+    // Service should be finalized
+    assert_eq!(
+        counters.finalize_count(),
+        1,
+        "Service should be finalized exactly once"
+    );
+}
+
+#[tokio::test]
+async fn test_service_with_file_descriptor() {
+    let service = MockService::new("fd_service");
+    let counters = service.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() == 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should initialize once within 5 seconds"
+    );
+
+    // Trigger a dispatch event
+    counters.trigger_event();
+
+    // Wait for dispatch
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should dispatch within 5 seconds after event"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+
+    assert_eq!(counters.finalize_count(), 1, "Service should finalize once");
+}
+
+#[tokio::test]
+async fn test_service_initialization_failure() {
+    let service = MockService::new("failing_service");
+    let counters = service.counters();
+
+    // Make initialization fail
+    counters.set_fail_init(true);
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for several retry attempts (retry interval is 5 seconds)
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 3,
+            Duration::from_secs(15),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should retry initialization at least 3 times within 15 seconds"
+    );
+
+    // Dispatch should not run if init fails
+    assert_eq!(
+        counters.dispatch_count(),
+        0,
+        "Service should not dispatch if init fails"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_initialization_recovery() {
+    let service = MockService::new("recovering_service");
+    let counters = service.counters();
+
+    // Start with failing initialization
+    counters.set_fail_init(true);
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for some failed attempts (retry interval is 5 seconds)
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 2,
+            Duration::from_secs(12),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Should have at least 2 failed initialization attempts within 12 seconds"
+    );
+
+    let failed_attempts = counters.init_count();
+
+    // Allow initialization to succeed
+    counters.set_fail_init(false);
+
+    // Wait for recovery
+    assert!(
+        wait_for_condition(
+            || counters.init_count() > failed_attempts,
+            Duration::from_secs(7),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should recover within 7 seconds"
+    );
+
+    // Trigger a dispatch event
+    counters.trigger_event();
+
+    // Wait for dispatch
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should dispatch after recovery"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+// ===== Dispatch Tests =====
+
+#[tokio::test]
+async fn test_service_dispatch_failure_triggers_reinit() {
+    let service = MockService::new("dispatch_fail_service");
+    let counters = service.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() == 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should initialize once within 5 seconds"
+    );
+
+    // Trigger a dispatch event
+    counters.trigger_event();
+
+    // Wait for first dispatch
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should dispatch within 5 seconds"
+    );
+
+    // Make dispatch fail
+    counters.set_fail_dispatch(true);
+
+    // Trigger another dispatch event
+    counters.trigger_event();
+
+    // Wait for dispatch failure and reinitialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 2 && counters.finalize_count() >= 1,
+            Duration::from_secs(10),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should reinitialize after dispatch failure within 10 seconds"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_dispatch_requests_reinit() {
+    let service = MockService::new("reinit_request_service");
+    let counters = service.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() == 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should initialize once within 5 seconds"
+    );
+
+    // Request reinitialization from dispatch
+    counters.set_reinit(true);
+
+    // Trigger a dispatch event
+    counters.trigger_event();
+
+    // Wait for reinitialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 2 && counters.finalize_count() >= 1,
+            Duration::from_secs(10),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should reinitialize and finalize when dispatch requests it within 10 seconds"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+// ===== FD-based Dispatch Tests =====
+
+#[tokio::test]
+async fn test_fd_dispatch_basic() {
+    let (service, counters) = SharedFdService::new("fd_service");
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should initialize within 5 seconds"
+    );
+
+    // Verify no dispatch happens without data on the pipe
+    sleep(Duration::from_millis(200)).await;
+    assert_eq!(
+        counters.dispatch_count(),
+        0,
+        "FD service should not dispatch without data on pipe"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+/// FD service that shares write_fd via Arc<AtomicI32> so tests can trigger events
+struct SharedFdService {
+    name: String,
+    read_fd: Option<RawFd>,
+    write_fd: Arc<std::sync::atomic::AtomicI32>,
+    init_count: Arc<AtomicU32>,
+    dispatch_count: Arc<AtomicU32>,
+    finalize_count: Arc<AtomicU32>,
+    should_fail_dispatch: Arc<AtomicBool>,
+    should_reinit: Arc<AtomicBool>,
+}
+
+impl SharedFdService {
+    fn new(name: &str) -> (Self, SharedFdCounters) {
+        let write_fd = Arc::new(std::sync::atomic::AtomicI32::new(-1));
+        let init_count = Arc::new(AtomicU32::new(0));
+        let dispatch_count = Arc::new(AtomicU32::new(0));
+        let finalize_count = Arc::new(AtomicU32::new(0));
+        let should_fail_dispatch = Arc::new(AtomicBool::new(false));
+        let should_reinit = Arc::new(AtomicBool::new(false));
+
+        let counters = SharedFdCounters {
+            write_fd: write_fd.clone(),
+            init_count: init_count.clone(),
+            dispatch_count: dispatch_count.clone(),
+            finalize_count: finalize_count.clone(),
+            should_fail_dispatch: should_fail_dispatch.clone(),
+            should_reinit: should_reinit.clone(),
+        };
+
+        let service = Self {
+            name: name.to_string(),
+            read_fd: None,
+            write_fd,
+            init_count,
+            dispatch_count,
+            finalize_count,
+            should_fail_dispatch,
+            should_reinit,
+        };
+
+        (service, counters)
+    }
+}
+
+#[derive(Clone)]
+struct SharedFdCounters {
+    write_fd: Arc<std::sync::atomic::AtomicI32>,
+    init_count: Arc<AtomicU32>,
+    dispatch_count: Arc<AtomicU32>,
+    finalize_count: Arc<AtomicU32>,
+    should_fail_dispatch: Arc<AtomicBool>,
+    should_reinit: Arc<AtomicBool>,
+}
+
+impl SharedFdCounters {
+    fn init_count(&self) -> u32 {
+        self.init_count.load(Ordering::SeqCst)
+    }
+    fn dispatch_count(&self) -> u32 {
+        self.dispatch_count.load(Ordering::SeqCst)
+    }
+    fn finalize_count(&self) -> u32 {
+        self.finalize_count.load(Ordering::SeqCst)
+    }
+    fn trigger_event(&self) {
+        let fd = self.write_fd.load(Ordering::SeqCst);
+        if fd >= 0 {
+            unsafe {
+                libc::write(fd, b"x".as_ptr() as *const _, 1);
+            }
+        }
+    }
+    fn set_fail_dispatch(&self, fail: bool) {
+        self.should_fail_dispatch.store(fail, Ordering::SeqCst);
+    }
+    fn set_reinit(&self, reinit: bool) {
+        self.should_reinit.store(reinit, Ordering::SeqCst);
+    }
+}
+
+#[async_trait]
+impl Service for SharedFdService {
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    async fn initialize(&mut self) -> pmxcfs_services::Result<RawFd> {
+        self.init_count.fetch_add(1, Ordering::SeqCst);
+
+        let mut fds = [0i32; 2];
+        let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
+        if ret != 0 {
+            return Err(ServiceError::InitializationFailed(
+                "pipe() failed".to_string(),
+            ));
+        }
+
+        // Set read end to non-blocking (required for AsyncFd)
+        unsafe {
+            let flags = libc::fcntl(fds[0], libc::F_GETFL);
+            libc::fcntl(fds[0], libc::F_SETFL, flags | libc::O_NONBLOCK);
+        }
+
+        self.read_fd = Some(fds[0]);
+        self.write_fd.store(fds[1], Ordering::SeqCst);
+
+        Ok(fds[0])
+    }
+
+    async fn dispatch(&mut self) -> pmxcfs_services::Result<bool> {
+        self.dispatch_count.fetch_add(1, Ordering::SeqCst);
+
+        // Drain the pipe
+        if let Some(fd) = self.read_fd {
+            let mut buf = [0u8; 64];
+            unsafe {
+                libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len());
+            }
+        }
+
+        if self.should_fail_dispatch.load(Ordering::SeqCst) {
+            return Err(ServiceError::DispatchFailed(
+                "Mock fd dispatch failure".to_string(),
+            ));
+        }
+
+        if self.should_reinit.load(Ordering::SeqCst) {
+            return Ok(false); // false = reinitialize
+        }
+
+        Ok(true) // true = continue
+    }
+
+    async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+        self.finalize_count.fetch_add(1, Ordering::SeqCst);
+
+        if let Some(fd) = self.read_fd.take() {
+            unsafe { libc::close(fd) };
+        }
+        let wfd = self.write_fd.swap(-1, Ordering::SeqCst);
+        if wfd >= 0 {
+            unsafe { libc::close(wfd) };
+        }
+
+        Ok(())
+    }
+}
+
+#[tokio::test]
+async fn test_fd_dispatch_event_driven() {
+    let (service, counters) = SharedFdService::new("fd_event_service");
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should initialize within 5 seconds"
+    );
+
+    // No dispatch should happen without data
+    sleep(Duration::from_millis(200)).await;
+    assert_eq!(
+        counters.dispatch_count(),
+        0,
+        "FD service should not dispatch without data"
+    );
+
+    // Trigger an event by writing to the pipe
+    counters.trigger_event();
+
+    // Wait for dispatch
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should dispatch after data is written to pipe"
+    );
+
+    // Trigger more events
+    counters.trigger_event();
+    counters.trigger_event();
+
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 2,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should handle multiple events"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+
+    assert!(
+        counters.finalize_count() >= 1,
+        "FD service should be finalized"
+    );
+}
+
+#[tokio::test]
+async fn test_fd_dispatch_failure_triggers_reinit() {
+    let (service, counters) = SharedFdService::new("fd_fail_service");
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should initialize"
+    );
+
+    // Trigger an event and verify dispatch works
+    counters.trigger_event();
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should dispatch"
+    );
+
+    // Make dispatch fail, then trigger event
+    counters.set_fail_dispatch(true);
+    counters.trigger_event();
+
+    // Wait for finalize + reinit
+    assert!(
+        wait_for_condition(
+            || counters.finalize_count() >= 1 && counters.init_count() >= 2,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should finalize and reinitialize after dispatch failure"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_fd_dispatch_reinit_request() {
+    let (service, counters) = SharedFdService::new("fd_reinit_service");
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should initialize"
+    );
+
+    // Request reinit from dispatch
+    counters.set_reinit(true);
+    counters.trigger_event();
+
+    // Wait for reinit
+    assert!(
+        wait_for_condition(
+            || counters.finalize_count() >= 1 && counters.init_count() >= 2,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "FD service should finalize and reinitialize on reinit request"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+// ===== Timer Callback Tests =====
+
+#[tokio::test]
+async fn test_service_timer_callback() {
+    let service = MockService::new("timer_service").with_timer(Duration::from_millis(300));
+    let counters = service.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization plus several timer periods
+    assert!(
+        wait_for_condition(
+            || counters.timer_count() >= 3,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Timer should fire at least 3 times within 5 seconds"
+    );
+
+    let timer_count = counters.timer_count();
+
+    // Wait for more timer invocations
+    assert!(
+        wait_for_condition(
+            || counters.timer_count() > timer_count,
+            Duration::from_secs(2),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Timer should continue firing"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_timer_callback_not_invoked_when_failed() {
+    let service = MockService::new("failed_timer_service").with_timer(Duration::from_millis(100));
+    let counters = service.counters();
+
+    // Make initialization fail
+    counters.set_fail_init(true);
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for several timer periods
+    sleep(Duration::from_millis(2000)).await;
+
+    // Timer should NOT fire if service is not running
+    assert_eq!(
+        counters.timer_count(),
+        0,
+        "Timer should not fire when service is not running"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+// ===== Service Manager Tests =====
+
+#[tokio::test]
+async fn test_manager_multiple_services() {
+    let service1 = MockService::new("service1");
+    let service2 = MockService::new("service2");
+    let service3 = MockService::new("service3");
+
+    let counters1 = service1.counters();
+    let counters2 = service2.counters();
+    let counters3 = service3.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service1)).unwrap();
+    manager.add_service(Box::new(service2)).unwrap();
+    manager.add_service(Box::new(service3));
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters1.init_count() == 1
+                && counters2.init_count() == 1
+                && counters3.init_count() == 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "All services should initialize within 5 seconds"
+    );
+
+    // Trigger dispatch events for all services
+    counters1.trigger_event();
+    counters2.trigger_event();
+    counters3.trigger_event();
+
+    // Wait for dispatch
+    assert!(
+        wait_for_condition(
+            || counters1.dispatch_count() >= 1
+                && counters2.dispatch_count() >= 1
+                && counters3.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "All services should dispatch within 5 seconds after events"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+
+    // All services should be finalized
+    assert_eq!(counters1.finalize_count(), 1, "Service1 should finalize");
+    assert_eq!(counters2.finalize_count(), 1, "Service2 should finalize");
+    assert_eq!(counters3.finalize_count(), 1, "Service3 should finalize");
+}
+
+#[tokio::test]
+async fn test_manager_duplicate_service_name() {
+    let service1 = MockService::new("duplicate");
+    let service2 = MockService::new("duplicate");
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service1)).unwrap();
+    let result = manager.add_service(Box::new(service2));
+    assert!(result.is_err(), "Should return error for duplicate service");
+}
+
+#[tokio::test]
+async fn test_manager_partial_service_failure() {
+    let service1 = MockService::new("working_service");
+    let service2 = MockService::new("failing_service");
+
+    let counters1 = service1.counters();
+    let counters2 = service2.counters();
+
+    // Make service2 fail
+    counters2.set_fail_init(true);
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service1)).unwrap();
+    manager.add_service(Box::new(service2)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for service1 initialization
+    assert!(
+        wait_for_condition(
+            || counters1.init_count() == 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service1 should initialize within 5 seconds"
+    );
+
+    // Trigger event for service1
+    counters1.trigger_event();
+
+    // Wait for service1 dispatch and service2 retries
+    assert!(
+        wait_for_condition(
+            || counters1.dispatch_count() >= 1 && counters2.init_count() >= 2,
+            Duration::from_secs(12),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service1 should work normally and Service2 should retry within 12 seconds"
+    );
+
+    // Service2 should not dispatch when failing
+    assert_eq!(
+        counters2.dispatch_count(),
+        0,
+        "Service2 should not dispatch when failing"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+
+    // Service1 should finalize
+    assert_eq!(counters1.finalize_count(), 1, "Service1 should finalize");
+    // Service2 is also finalized unconditionally during shutdown (matching C behavior)
+    assert_eq!(
+        counters2.finalize_count(),
+        1,
+        "Service2 should also be finalized during shutdown (idempotent finalize)"
+    );
+}
+
+// ===== Error Handling Tests =====
+
+#[tokio::test]
+async fn test_service_error_count_tracking() {
+    let service = MockService::new("error_tracking_service");
+    let counters = service.counters();
+
+    // Make initialization fail
+    counters.set_fail_init(true);
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for multiple failures (retry interval is 5 seconds)
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 3,
+            Duration::from_secs(15),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Should accumulate at least 3 failures within 15 seconds"
+    );
+
+    // Allow recovery
+    counters.set_fail_init(false);
+
+    // Wait for successful initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 4,
+            Duration::from_secs(7),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should recover within 7 seconds"
+    );
+
+    // Trigger a dispatch event
+    counters.trigger_event();
+
+    // Wait for dispatch
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should dispatch after recovery"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_graceful_shutdown() {
+    let service = MockService::new("shutdown_test");
+    let counters = service.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should initialize within 5 seconds"
+    );
+
+    // Trigger a dispatch event
+    counters.trigger_event();
+
+    // Wait for service to be running
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should be running within 5 seconds"
+    );
+
+    // Graceful shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+
+    // Service should be properly finalized
+    assert_eq!(
+        counters.finalize_count(),
+        1,
+        "Service should finalize during shutdown"
+    );
+}
+
+// ===== Concurrency Tests =====
+
+#[tokio::test]
+async fn test_service_concurrent_operations() {
+    let service = MockService::new("concurrent_service").with_timer(Duration::from_millis(200));
+    let counters = service.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should initialize within 5 seconds"
+    );
+
+    // Trigger multiple dispatch events
+    for _ in 0..5 {
+        counters.trigger_event();
+        sleep(Duration::from_millis(50)).await;
+    }
+
+    // Wait for service to run with both dispatch and timer
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 3 && counters.timer_count() >= 3,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should handle concurrent dispatch and timer events within 5 seconds"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_state_consistency_after_reinit() {
+    let service = MockService::new("consistency_service");
+    let counters = service.counters();
+
+    let mut manager = ServiceManager::new();
+    manager.add_service(Box::new(service)).unwrap();
+
+    let shutdown_token = manager.shutdown_token();
+    let handle = manager.spawn();
+
+    // Wait for initialization
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should initialize within 5 seconds"
+    );
+
+    // Trigger reinitialization
+    counters.set_reinit(true);
+    counters.trigger_event();
+
+    // Wait for reinit
+    assert!(
+        wait_for_condition(
+            || counters.init_count() >= 2,
+            Duration::from_secs(10),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should reinitialize within 10 seconds"
+    );
+
+    // Clear reinit flag
+    counters.set_reinit(false);
+
+    // Trigger a dispatch event
+    counters.trigger_event();
+
+    // Wait for dispatch
+    assert!(
+        wait_for_condition(
+            || counters.dispatch_count() >= 1,
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        )
+        .await,
+        "Service should dispatch after reinit"
+    );
+
+    // Shutdown
+    shutdown_token.cancel();
+    let _ = handle.await;
+}
-- 
2.47.3





  parent reply	other threads:[~2026-02-13  9:42 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-02-13  9:33 [PATCH pve-cluster 00/14 v2] Rewrite pmxcfs with Rust Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 01/14 v2] pmxcfs-rs: add Rust workspace configuration Kefu Chai
2026-02-18 10:41   ` Samuel Rufinatscha
2026-02-13  9:33 ` [PATCH pve-cluster 02/14 v2] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-02-18 15:06   ` Samuel Rufinatscha
2026-02-13  9:33 ` [PATCH pve-cluster 03/14 v2] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-02-18 16:41   ` Samuel Rufinatscha
2026-02-13  9:33 ` [PATCH pve-cluster 04/14 v2] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 06/14 v2] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 07/14 v2] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-02-13  9:33 ` Kefu Chai [this message]
2026-02-13  9:33 ` [PATCH pve-cluster 09/14 v2] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 10/14 v2] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 11/14 v2] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 12/14 v2] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 14/14 v2] pmxcfs-rs: add project documentation Kefu Chai

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260213094119.2379288-9-k.chai@proxmox.com \
    --to=k.chai@proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal