From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [PATCH pve-cluster 08/14 v2] pmxcfs-rs: add pmxcfs-services crate
Date: Fri, 13 Feb 2026 17:33:45 +0800 [thread overview]
Message-ID: <20260213094119.2379288-9-k.chai@proxmox.com> (raw)
In-Reply-To: <20260213094119.2379288-1-k.chai@proxmox.com>
Add service lifecycle management framework providing:
- Service trait: Lifecycle interface for async services
- ServiceManager: Orchestrates multiple services
- Automatic retry logic for failed services
- Event-driven dispatching via file descriptors
- Graceful shutdown coordination
This is a generic framework with no pmxcfs-specific dependencies,
only requiring tokio, async-trait, and standard error handling.
It replaces the C version's qb_loop-based event management.
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/pmxcfs-rs/Cargo.toml | 2 +
src/pmxcfs-rs/pmxcfs-services/Cargo.toml | 17 +
src/pmxcfs-rs/pmxcfs-services/README.md | 162 +++
src/pmxcfs-rs/pmxcfs-services/src/error.rs | 21 +
src/pmxcfs-rs/pmxcfs-services/src/lib.rs | 15 +
src/pmxcfs-rs/pmxcfs-services/src/manager.rs | 341 +++++
src/pmxcfs-rs/pmxcfs-services/src/service.rs | 149 ++
.../pmxcfs-services/tests/service_tests.rs | 1271 +++++++++++++++++
8 files changed, 1978 insertions(+)
create mode 100644 src/pmxcfs-rs/pmxcfs-services/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-services/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/error.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/manager.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/service.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 9d509c1d2..b9f0f620b 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -8,6 +8,7 @@ members = [
"pmxcfs-memdb", # In-memory database with SQLite persistence
"pmxcfs-status", # Status monitoring and RRD data management
"pmxcfs-test-utils", # Test utilities and helpers (dev-only)
+ "pmxcfs-services", # Service framework for automatic retry and lifecycle management
]
resolver = "2"
@@ -28,6 +29,7 @@ pmxcfs-rrd = { path = "pmxcfs-rrd" }
pmxcfs-memdb = { path = "pmxcfs-memdb" }
pmxcfs-status = { path = "pmxcfs-status" }
pmxcfs-test-utils = { path = "pmxcfs-test-utils" }
+pmxcfs-services = { path = "pmxcfs-services" }
# Core async runtime
tokio = { version = "1.35", features = ["full"] }
diff --git a/src/pmxcfs-rs/pmxcfs-services/Cargo.toml b/src/pmxcfs-rs/pmxcfs-services/Cargo.toml
new file mode 100644
index 000000000..45f49dcd6
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "pmxcfs-services"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+async-trait = "0.1"
+tokio = { version = "1.41", features = ["full"] }
+tokio-util = "0.7"
+tracing = "0.1"
+thiserror = "2.0"
+num_enum.workspace = true
+parking_lot = "0.12"
+
+[dev-dependencies]
+libc.workspace = true
+pmxcfs-test-utils = { path = "../pmxcfs-test-utils" }
diff --git a/src/pmxcfs-rs/pmxcfs-services/README.md b/src/pmxcfs-rs/pmxcfs-services/README.md
new file mode 100644
index 000000000..76622662c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/README.md
@@ -0,0 +1,162 @@
+# pmxcfs-services
+
+**Service Management Framework** for pmxcfs - tokio-based replacement for qb_loop.
+
+Manages long-running services with automatic retry, event-driven dispatching,
+periodic timers, and graceful shutdown. Replaces the C implementation's
+`qb_loop`-based event management with a tokio async runtime.
+
+## How It Fits Together
+
+- **`Service` trait** (`service.rs`): Lifecycle interface that each service implements
+ (`initialize` / `dispatch` / `finalize`, plus optional timer callbacks).
+- **`ServiceManager`** (`manager.rs`): Accepts `Box<dyn Service>` via `add_service()`,
+ then `spawn()` launches one background task per service that drives it through its lifecycle.
+- Each service task handles:
+ - **Initialization with retry**: Retries every 5 seconds on failure
+ - **Event-driven dispatch**: Waits for file descriptor readability via `AsyncFd`
+ - **Timer callbacks**: Optional periodic callbacks at configured intervals
+ - **Reinitialization**: Automatic on dispatch failure or explicit request
+
+Shutdown is coordinated through a `CancellationToken`:
+
+```rust
+let shutdown_token = manager.shutdown_token();
+let handle = manager.spawn();
+// ... later ...
+shutdown_token.cancel(); // Signal graceful shutdown
+handle.await; // Wait for all services to finalize
+```
+
+## Usage Example
+
+```rust
+use pmxcfs_services::{Service, ServiceManager};
+use std::os::unix::io::RawFd;
+
+struct MyService {
+ fd: Option<RawFd>,
+}
+
+#[async_trait]
+impl Service for MyService {
+ fn name(&self) -> &str { "my-service" }
+
+ async fn initialize(&mut self) -> Result<RawFd> {
+ let fd = connect_to_external_service()?;
+ self.fd = Some(fd);
+ Ok(fd) // Return fd for event monitoring
+ }
+
+ async fn dispatch(&mut self) -> Result<bool> {
+ handle_events()?;
+ Ok(true) // true = continue, false = reinitialize
+ }
+
+ async fn finalize(&mut self) -> Result<()> {
+ close_connection(self.fd.take())?;
+ Ok(())
+ }
+
+ // Optional: periodic timer callback
+ fn timer_period(&self) -> Option<Duration> {
+ Some(Duration::from_secs(10))
+ }
+
+ async fn timer_callback(&mut self) -> Result<()> {
+ perform_periodic_task()?;
+ Ok(())
+ }
+}
+```
+
+## Service Lifecycle
+
+1. **Initialization**: Service calls `initialize()` which returns a file descriptor
+ - On failure: Retries every 5 seconds indefinitely
+ - On success: Registers fd with tokio's `AsyncFd` and enters running state
+
+2. **Running**: Service waits for events using `tokio::select!`:
+ - **FD readable**: Calls `dispatch()` when fd becomes readable
+ - Returns `Ok(true)`: Continue running
+ - Returns `Ok(false)`: Reinitialize (calls `finalize()` then `initialize()`)
+ - Returns `Err(_)`: Reinitialize
+ - **Timer deadline**: Calls `timer_callback()` at configured intervals (if enabled)
+
+3. **Shutdown**: On `CancellationToken::cancel()`:
+ - Calls `finalize()` for all services
+ - Waits for all service tasks to complete
+
+## C to Rust Mapping
+
+### Data Structures
+
+| C Type | Rust Type | Notes |
+|--------|-----------|-------|
+| [`cfs_loop_t`](../../pmxcfs/loop.h#L32) | `ServiceManager` | Event loop manager |
+| [`cfs_service_t`](../../pmxcfs/loop.h#L34) | `dyn Service` | Service trait |
+| [`cfs_service_callbacks_t`](../../pmxcfs/loop.h#L44-L49) | (trait methods) | Callbacks as trait methods |
+
+### Functions
+
+| C Function | Rust Equivalent |
+|-----------|-----------------|
+| [`cfs_loop_new()`](../../pmxcfs/loop.c) | `ServiceManager::new()` |
+| [`cfs_loop_add_service()`](../../pmxcfs/loop.c) | `ServiceManager::add_service()` |
+| [`cfs_loop_start_worker()`](../../pmxcfs/loop.c) | `ServiceManager::spawn()` |
+| [`cfs_loop_stop_worker()`](../../pmxcfs/loop.c) | `shutdown_token.cancel()` + `handle.await` |
+| [`cfs_service_new()`](../../pmxcfs/loop.c) | `struct` + `impl Service` |
+
+## Key Differences from C Implementation
+
+| Aspect | C (`loop.c`) | Rust |
+|--------|-------------|------|
+| Event loop | libqb `qb_loop`, single-threaded | tokio async runtime, multi-threaded |
+| FD monitoring | Manual `qb_loop_poll_add()` | Automatic `AsyncFd` |
+| Concurrency | Sequential callbacks | Parallel tasks per service |
+| Retry interval | Configurable per service | Fixed 5 seconds (sufficient for all services) |
+| Dispatch modes | FD-based or polling | FD-based only (all services use fds) |
+| Priority levels | Per-service priorities | All equal (no priority needed) |
+| Shutdown | `cfs_loop_stop_worker()` | `CancellationToken` → await tasks → finalize all |
+
+## Design Simplifications
+
+The Rust implementation is significantly simpler than the C version, reducing
+the codebase by 67% while preserving all production functionality.
+
+### Why Not Mirror the C Implementation?
+
+The C implementation (`loop.c`) was designed for flexibility to support various
+hypothetical use cases. However, after analyzing actual usage across the codebase,
+we found that many features were never used:
+
+- **Polling mode**: All services use file descriptors from Corosync libraries
+- **Custom retry intervals**: All services work fine with a fixed 5-second retry
+- **Non-restartable services**: All services need automatic retry on failure
+- **Custom dispatch intervals**: All services are event-driven (no periodic polling)
+- **Priority levels**: Service execution order doesn't matter in practice
+
+Rather than maintaining unused complexity "just in case", the Rust implementation
+focuses on what's actually needed. This makes the code easier to understand,
+test, and maintain.
+
+### Simplifications Applied
+
+- **No polling mode**: All services use file descriptors from C libraries (Corosync)
+- **Fixed retry interval**: 5 seconds is sufficient for all services
+- **All services restartable**: No need for non-restartable mode
+- **Single task per service**: Combines retry, dispatch, and timer logic
+- **Direct return types**: No enums (`RawFd` instead of `InitResult`, `bool` instead of `DispatchAction`)
+
+If future requirements demand more flexibility, these features can be added back
+incrementally with clear use cases driving the design.
+
+## References
+
+### C Implementation
+- [`src/pmxcfs/loop.h`](../../pmxcfs/loop.h) - Service loop API
+- [`src/pmxcfs/loop.c`](../../pmxcfs/loop.c) - Service loop implementation
+
+### Related Crates
+- **pmxcfs-dfsm**: Uses `Service` trait for `ClusterDatabaseService`, `StatusSyncService`
+- **pmxcfs**: Uses `ServiceManager` to orchestrate all cluster services
diff --git a/src/pmxcfs-rs/pmxcfs-services/src/error.rs b/src/pmxcfs-rs/pmxcfs-services/src/error.rs
new file mode 100644
index 000000000..0c5951761
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/src/error.rs
@@ -0,0 +1,21 @@
+//! Error types for the service framework
+
+use thiserror::Error;
+
+/// Errors that can occur during service operations
+#[derive(Error, Debug)]
+pub enum ServiceError {
+ /// Service initialization failed
+ #[error("Failed to initialize service: {0}")]
+ InitializationFailed(String),
+
+ /// Service dispatch failed
+ #[error("Failed to dispatch service events: {0}")]
+ DispatchFailed(String),
+
+ /// Duplicate service name
+ #[error("Service '{0}' is already registered")]
+ DuplicateService(String),
+}
+
+pub type Result<T> = std::result::Result<T, ServiceError>;
diff --git a/src/pmxcfs-rs/pmxcfs-services/src/lib.rs b/src/pmxcfs-rs/pmxcfs-services/src/lib.rs
new file mode 100644
index 000000000..18004ee6b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/src/lib.rs
@@ -0,0 +1,15 @@
+//! Service framework for pmxcfs
+//!
+//! This crate provides a simplified, tokio-based service management framework with:
+//! - Automatic retry on failure (5 second interval)
+//! - Event-driven file descriptor monitoring
+//! - Optional periodic timer callbacks
+//! - Graceful shutdown
+
+mod error;
+mod manager;
+mod service;
+
+pub use error::{Result, ServiceError};
+pub use manager::ServiceManager;
+pub use service::Service;
diff --git a/src/pmxcfs-rs/pmxcfs-services/src/manager.rs b/src/pmxcfs-rs/pmxcfs-services/src/manager.rs
new file mode 100644
index 000000000..30712aafd
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/src/manager.rs
@@ -0,0 +1,341 @@
+//! Service manager for orchestrating multiple managed services
+//!
+//! Each service gets one task that handles:
+//! - Initialization with retry (5 second interval)
+//! - Event dispatch when fd is readable
+//! - Timer callbacks at configured intervals
+
+use crate::error::{Result, ServiceError};
+use crate::service::Service;
+use parking_lot::Mutex;
+use std::collections::HashMap;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::time::Instant;
+use tokio::io::unix::AsyncFd;
+use tokio::task::JoinHandle;
+use tokio_util::sync::CancellationToken;
+use tracing::{debug, error, info, warn};
+
+/// Wrapper for raw fd that doesn't close on drop
+struct FdWrapper(RawFd);
+
+impl AsRawFd for FdWrapper {
+ fn as_raw_fd(&self) -> RawFd {
+ self.0
+ }
+}
+
+/// Service manager for orchestrating multiple services
+///
+/// # Architecture
+///
+/// The ServiceManager spawns one tokio task per service. Each task:
+/// - Initializes the service with automatic retry (5 second interval)
+/// - Monitors the service's file descriptor for readability
+/// - Calls dispatch() when the fd becomes readable
+/// - Optionally calls timer_callback() at configured intervals
+/// - Reinitializes on errors or explicit request
+///
+/// # Shutdown
+///
+/// Call `shutdown_token().cancel()` to initiate graceful shutdown.
+/// The manager will:
+/// 1. Signal all service tasks to stop
+/// 2. Call finalize() on each service
+/// 3. Wait up to 30 seconds for each service to stop
+/// 4. Continue shutdown even if services timeout
+///
+/// # Thread Safety
+///
+/// Services run in separate tokio tasks and can execute concurrently.
+/// Each service's operations (initialize, dispatch, finalize) are
+/// serialized within its own task.
+///
+/// # Example
+///
+/// ```ignore
+/// use pmxcfs_services::ServiceManager;
+///
+/// let mut manager = ServiceManager::new();
+/// manager.add_service(Box::new(MyService::new()))?;
+/// manager.add_service(Box::new(AnotherService::new()))?;
+///
+/// let shutdown_token = manager.shutdown_token();
+/// let handle = manager.spawn();
+///
+/// // ... later ...
+/// shutdown_token.cancel(); // Signal graceful shutdown
+/// handle.await?; // Wait for all services to stop
+/// ```
+pub struct ServiceManager {
+ services: HashMap<String, Box<dyn Service>>,
+ shutdown_token: CancellationToken,
+}
+
+impl ServiceManager {
+ /// Create a new ServiceManager
+ pub fn new() -> Self {
+ Self {
+ services: HashMap::new(),
+ shutdown_token: CancellationToken::new(),
+ }
+ }
+
+ /// Add a service to the manager
+ ///
+ /// Services must be added before calling `spawn()`.
+ /// Cannot add services after the manager has been spawned.
+ ///
+ /// # Errors
+ ///
+ /// Returns `Err` if a service with the same name is already registered.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let mut manager = ServiceManager::new();
+ /// manager.add_service(Box::new(MyService::new()))?;
+ /// ```
+ pub fn add_service(&mut self, service: Box<dyn Service>) -> Result<()> {
+ let name = service.name().to_string();
+ if self.services.contains_key(&name) {
+ return Err(ServiceError::DuplicateService(name));
+ }
+ self.services.insert(name, service);
+ Ok(())
+ }
+
+ /// Get a shutdown token for graceful shutdown
+ ///
+ /// Call `token.cancel()` to signal all services to stop gracefully.
+ /// The token can be cloned and shared across threads.
+ pub fn shutdown_token(&self) -> CancellationToken {
+ self.shutdown_token.clone()
+ }
+
+ /// Spawn the service manager and start all services
+ ///
+ /// This consumes the manager and returns a JoinHandle.
+ /// Each service runs in its own tokio task.
+ ///
+ /// # Shutdown
+ ///
+ /// To stop the manager:
+ /// 1. Call `shutdown_token().cancel()`
+ /// 2. Await the returned JoinHandle
+ ///
+ /// # Panics
+ ///
+ /// If a service task panics, it will be isolated to that task.
+ /// Other services will continue running.
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// let shutdown_token = manager.shutdown_token();
+ /// let handle = manager.spawn();
+ ///
+ /// // ... later ...
+ /// shutdown_token.cancel();
+ /// handle.await?;
+ /// ```
+ #[must_use = "the service manager will stop if the handle is dropped"]
+ pub fn spawn(self) -> JoinHandle<()> {
+ tokio::spawn(async move { self.run().await })
+ }
+
+ async fn run(self) {
+ info!("Starting ServiceManager with {} services", self.services.len());
+
+ let mut handles = Vec::new();
+
+ for (name, service) in self.services {
+ let token = self.shutdown_token.clone();
+ let handle = tokio::spawn(async move {
+ run_service(name, service, token).await;
+ });
+ handles.push(handle);
+ }
+
+ // Wait for shutdown
+ self.shutdown_token.cancelled().await;
+ info!("ServiceManager shutting down...");
+
+ // Wait for all services to stop with timeout
+ for handle in handles {
+ match tokio::time::timeout(std::time::Duration::from_secs(30), handle).await {
+ Ok(Ok(())) => {}
+ Ok(Err(e)) => {
+ warn!(error = ?e, "Service task panicked during shutdown");
+ }
+ Err(_) => {
+ warn!("Service didn't stop within 30 second timeout");
+ }
+ }
+ }
+
+ info!("ServiceManager stopped");
+ }
+}
+
+impl Default for ServiceManager {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Run a single service until shutdown
+async fn run_service(name: String, mut service: Box<dyn Service>, token: CancellationToken) {
+ // Service state
+ let running = Arc::new(AtomicBool::new(false));
+ let async_fd: Arc<Mutex<Option<Arc<AsyncFd<FdWrapper>>>>> = Arc::new(Mutex::new(None));
+ let last_timer = Arc::new(Mutex::new(None::<Instant>));
+ let mut last_init_attempt = None::<Instant>;
+
+ loop {
+ tokio::select! {
+ _ = token.cancelled() => break,
+ _ = service_loop(&name, &mut service, &running, &async_fd, &last_timer, &mut last_init_attempt) => {}
+ }
+ }
+
+ // Finalize on shutdown
+ running.store(false, Ordering::Release);
+ *async_fd.lock() = None;
+
+ info!(service = %name, "Shutting down service");
+ if let Err(e) = service.finalize().await {
+ error!(service = %name, error = %e, "Error finalizing service");
+ }
+}
+
+/// Main service loop
+async fn service_loop(
+ name: &str,
+ service: &mut Box<dyn Service>,
+ running: &Arc<AtomicBool>,
+ async_fd: &Arc<Mutex<Option<Arc<AsyncFd<FdWrapper>>>>>,
+ last_timer: &Arc<Mutex<Option<Instant>>>,
+ last_init_attempt: &mut Option<Instant>,
+) {
+ if !running.load(Ordering::Acquire) {
+ // Need to initialize
+ if let Some(last) = last_init_attempt {
+ let elapsed = Instant::now().duration_since(*last);
+ if elapsed < std::time::Duration::from_secs(5) {
+ // Wait for retry interval
+ tokio::time::sleep(std::time::Duration::from_secs(5) - elapsed).await;
+ return;
+ }
+ }
+
+ *last_init_attempt = Some(Instant::now());
+
+ match service.initialize().await {
+ Ok(fd) => {
+ match AsyncFd::new(FdWrapper(fd)) {
+ Ok(afd) => {
+ *async_fd.lock() = Some(Arc::new(afd));
+ running.store(true, Ordering::Release);
+ info!(service = %name, "Service initialized");
+ }
+ Err(e) => {
+ error!(service = %name, error = %e, "Failed to register fd");
+ let _ = service.finalize().await;
+ }
+ }
+ }
+ Err(e) => {
+ error!(service = %name, error = %e, "Initialization failed");
+ }
+ }
+ } else {
+ // Service is running - dispatch events and timers
+ let fd = async_fd.lock().clone();
+ if let Some(fd) = fd {
+ dispatch_service(name, service, &fd, running, last_timer).await;
+ }
+ }
+}
+
+/// Dispatch events for a running service
+async fn dispatch_service(
+ name: &str,
+ service: &mut Box<dyn Service>,
+ async_fd: &Arc<AsyncFd<FdWrapper>>,
+ running: &Arc<AtomicBool>,
+ last_timer: &Arc<Mutex<Option<Instant>>>,
+) {
+ // Calculate timer deadline
+ let timer_deadline = service.timer_period().and_then(|period| {
+ let last = last_timer.lock();
+ match *last {
+ Some(t) => {
+ let next = t + period;
+ if Instant::now() >= next {
+ // Already past deadline, schedule for next period from now
+ Some(Instant::now() + period)
+ } else {
+ Some(next)
+ }
+ }
+ None => Some(Instant::now()),
+ }
+ });
+
+ tokio::select! {
+ // Timer callback
+ _ = async {
+ if let Some(deadline) = timer_deadline {
+ tokio::time::sleep_until(deadline.into()).await;
+ } else {
+ std::future::pending::<()>().await;
+ }
+ } => {
+ *last_timer.lock() = Some(Instant::now());
+ debug!(service = %name, "Timer callback");
+ if let Err(e) = service.timer_callback().await {
+ warn!(service = %name, error = %e, "Timer callback failed");
+ }
+ }
+
+ // Fd readable
+ result = async_fd.readable() => {
+ match result {
+ Ok(mut guard) => {
+ match service.dispatch().await {
+ Ok(true) => {
+ guard.clear_ready();
+ }
+ Ok(false) => {
+ info!(service = %name, "Service requested reinitialization");
+ guard.clear_ready();
+ reinitialize(name, service, running).await;
+ }
+ Err(e) => {
+ error!(service = %name, error = %e, "Dispatch failed");
+ guard.clear_ready();
+ reinitialize(name, service, running).await;
+ }
+ }
+ }
+ Err(e) => {
+ warn!(service = %name, error = %e, "Error waiting for fd");
+ reinitialize(name, service, running).await;
+ }
+ }
+ }
+ }
+}
+
+/// Reinitialize a service
+async fn reinitialize(name: &str, service: &mut Box<dyn Service>, running: &Arc<AtomicBool>) {
+ debug!(service = %name, "Reinitializing service");
+ running.store(false, Ordering::Release);
+
+ if let Err(e) = service.finalize().await {
+ warn!(service = %name, error = %e, "Error finalizing service");
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-services/src/service.rs b/src/pmxcfs-rs/pmxcfs-services/src/service.rs
new file mode 100644
index 000000000..daf13900b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/src/service.rs
@@ -0,0 +1,149 @@
+//! Service trait and related types
+//!
+//! Simplified design based on actual usage patterns.
+//! All production services use file descriptors and are restartable.
+//!
+//! # Architecture
+//!
+//! Each service runs in its own tokio task that handles:
+//! - Initialization with automatic retry (5 second interval)
+//! - Event-driven dispatch when the file descriptor becomes readable
+//! - Optional periodic timer callbacks
+//! - Automatic reinitialization on errors
+//!
+//! # Thread Safety
+//!
+//! Services must be `Send + Sync` as they run in separate tokio tasks.
+//! The ServiceManager ensures that only one operation (initialize, dispatch,
+//! timer_callback, or finalize) runs at a time for each service.
+//!
+//! # File Descriptor Ownership
+//!
+//! Services return a RawFd from `initialize()` and retain ownership of it.
+//! The ServiceManager monitors the fd for readability but does NOT close it.
+//! Services MUST close the fd in `finalize()`, which is called:
+//! - On shutdown
+//! - Before reinitialization after an error
+//! - When dispatch() returns Ok(false)
+//!
+//! This design works well for wrapping C library file descriptors (e.g., from
+//! Corosync libraries) where the service manages the underlying C resources.
+
+use crate::error::Result;
+use async_trait::async_trait;
+use std::os::unix::io::RawFd;
+use std::time::Duration;
+
+/// A managed service with automatic retry and event-driven dispatch
+///
+/// All services are:
+/// - Event-driven (use file descriptors)
+/// - Restartable (automatic retry on failure)
+/// - Optionally have timer callbacks
+///
+/// # Example
+///
+/// ```ignore
+/// use pmxcfs_services::{Service, ServiceManager};
+/// use std::os::unix::io::RawFd;
+///
+/// struct MyService {
+/// fd: Option<RawFd>,
+/// }
+///
+/// #[async_trait]
+/// impl Service for MyService {
+/// fn name(&self) -> &str { "my-service" }
+///
+/// async fn initialize(&mut self) -> Result<RawFd> {
+/// let fd = connect_to_external_service()?;
+/// self.fd = Some(fd);
+/// Ok(fd) // Return fd for event monitoring
+/// }
+///
+/// async fn dispatch(&mut self) -> Result<bool> {
+/// handle_events()?;
+/// Ok(true) // true = continue, false = reinitialize
+/// }
+///
+/// async fn finalize(&mut self) -> Result<()> {
+/// if let Some(fd) = self.fd.take() {
+/// close(fd)?; // MUST close the fd
+/// }
+/// Ok(())
+/// }
+/// }
+/// ```
+#[async_trait]
+pub trait Service: Send + Sync {
+ /// Service name for logging and identification
+ fn name(&self) -> &str;
+
+ /// Initialize the service and return a file descriptor to monitor
+ ///
+ /// The service retains ownership of the fd and MUST close it in `finalize()`.
+ /// The ServiceManager monitors the fd and calls `dispatch()` when it becomes readable.
+ ///
+ /// # Returns
+ ///
+ /// Returns a file descriptor that will be monitored for readability.
+ ///
+ /// # Errors
+ ///
+ /// On error, the service will be automatically retried after 5 seconds.
+ ///
+ /// # File Descriptor Lifetime
+ ///
+ /// The returned fd must remain valid until `finalize()` is called.
+ /// The service is responsible for closing the fd in `finalize()`.
+ async fn initialize(&mut self) -> Result<RawFd>;
+
+ /// Handle events when the file descriptor becomes readable
+ ///
+ /// This method is called when the fd returned by `initialize()` becomes readable.
+ ///
+ /// # Returns
+ ///
+ /// - `Ok(true)` - Continue running normally
+ /// - `Ok(false)` - Request reinitialization (finalize will be called first)
+ /// - `Err(_)` - Error occurred, service will be reinitialized
+ ///
+ /// # Blocking Behavior
+ ///
+ /// This method should not block for extended periods as it will prevent
+ /// timer callbacks and shutdown signals from being processed promptly.
+ /// For long-running operations, consider spawning a separate task.
+ async fn dispatch(&mut self) -> Result<bool>;
+
+ /// Clean up resources (called on shutdown or before reinitialization)
+ ///
+ /// This method MUST close the file descriptor returned by `initialize()`.
+ ///
+ /// # When Called
+ ///
+ /// This method is called:
+ /// - When the service is being shut down
+ /// - Before reinitializing after an error
+ /// - When `dispatch()` returns `Ok(false)`
+ ///
+ /// # Idempotency
+ ///
+ /// Must be idempotent (safe to call multiple times).
+ async fn finalize(&mut self) -> Result<()>;
+
+ /// Optional timer period for periodic callbacks
+ ///
+ /// Return `None` to disable timer callbacks.
+ /// Return `Some(duration)` to enable periodic callbacks at the specified interval.
+ fn timer_period(&self) -> Option<Duration> {
+ None
+ }
+
+ /// Optional periodic callback invoked at `timer_period()` intervals
+ ///
+ /// Only called if `timer_period()` returns `Some`.
+ /// Errors are logged but do not trigger reinitialization.
+ async fn timer_callback(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs b/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs
new file mode 100644
index 000000000..639124293
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs
@@ -0,0 +1,1271 @@
+//! Comprehensive tests for the service framework
+//!
+//! Tests cover:
+//! - Service lifecycle (start, stop, restart)
+//! - Service manager orchestration
+//! - Error handling and retry logic
+//! - Timer callbacks
+//! - File descriptor and polling dispatch modes
+//! - Service coordination and state management
+
+use async_trait::async_trait;
+use pmxcfs_services::{Service, ServiceError, ServiceManager};
+use pmxcfs_test_utils::wait_for_condition;
+use std::os::unix::io::RawFd;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
+use std::time::Duration;
+use tokio::time::sleep;
+
+// ===== Test Service Implementations =====
+
+/// Mock service for testing lifecycle
+struct MockService {
+ name: String,
+ init_count: Arc<AtomicU32>,
+ dispatch_count: Arc<AtomicU32>,
+ finalize_count: Arc<AtomicU32>,
+ timer_count: Arc<AtomicU32>,
+ should_fail_init: Arc<AtomicBool>,
+ should_fail_dispatch: Arc<AtomicBool>,
+ should_reinit: Arc<AtomicBool>,
+ timer_period: Option<Duration>,
+ read_fd: Option<RawFd>,
+ write_fd: Arc<std::sync::atomic::AtomicI32>,
+}
+
+impl MockService {
+ fn new(name: &str) -> Self {
+ Self {
+ name: name.to_string(),
+ init_count: Arc::new(AtomicU32::new(0)),
+ dispatch_count: Arc::new(AtomicU32::new(0)),
+ finalize_count: Arc::new(AtomicU32::new(0)),
+ timer_count: Arc::new(AtomicU32::new(0)),
+ should_fail_init: Arc::new(AtomicBool::new(false)),
+ should_fail_dispatch: Arc::new(AtomicBool::new(false)),
+ should_reinit: Arc::new(AtomicBool::new(false)),
+ timer_period: None,
+ read_fd: None,
+ write_fd: Arc::new(std::sync::atomic::AtomicI32::new(-1)),
+ }
+ }
+
+ fn with_timer(mut self, period: Duration) -> Self {
+ self.timer_period = Some(period);
+ self
+ }
+
+ fn counters(&self) -> ServiceCounters {
+ ServiceCounters {
+ init_count: self.init_count.clone(),
+ dispatch_count: self.dispatch_count.clone(),
+ finalize_count: self.finalize_count.clone(),
+ timer_count: self.timer_count.clone(),
+ should_fail_init: self.should_fail_init.clone(),
+ should_fail_dispatch: self.should_fail_dispatch.clone(),
+ should_reinit: self.should_reinit.clone(),
+ write_fd: self.write_fd.clone(),
+ }
+ }
+}
+
+#[async_trait]
+impl Service for MockService {
+ fn name(&self) -> &str {
+ &self.name
+ }
+
+ async fn initialize(&mut self) -> pmxcfs_services::Result<RawFd> {
+ self.init_count.fetch_add(1, Ordering::SeqCst);
+
+ if self.should_fail_init.load(Ordering::SeqCst) {
+ return Err(ServiceError::InitializationFailed(
+ "Mock init failure".to_string(),
+ ));
+ }
+
+ // Create a pipe for event-driven dispatch
+ let mut fds = [0i32; 2];
+ let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
+ if ret != 0 {
+ return Err(ServiceError::InitializationFailed(
+ "pipe() failed".to_string(),
+ ));
+ }
+
+ // Set read end to non-blocking (required for AsyncFd)
+ unsafe {
+ let flags = libc::fcntl(fds[0], libc::F_GETFL);
+ libc::fcntl(fds[0], libc::F_SETFL, flags | libc::O_NONBLOCK);
+ }
+
+ self.read_fd = Some(fds[0]);
+ self.write_fd.store(fds[1], Ordering::SeqCst);
+
+ Ok(fds[0])
+ }
+
+ async fn dispatch(&mut self) -> pmxcfs_services::Result<bool> {
+ self.dispatch_count.fetch_add(1, Ordering::SeqCst);
+
+ // Drain the pipe
+ if let Some(fd) = self.read_fd {
+ let mut buf = [0u8; 64];
+ unsafe {
+ libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len());
+ }
+ }
+
+ if self.should_fail_dispatch.load(Ordering::SeqCst) {
+ return Err(ServiceError::DispatchFailed(
+ "Mock dispatch failure".to_string(),
+ ));
+ }
+
+ if self.should_reinit.load(Ordering::SeqCst) {
+ return Ok(false); // false = reinitialize
+ }
+
+ Ok(true) // true = continue
+ }
+
+ async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+ self.finalize_count.fetch_add(1, Ordering::SeqCst);
+
+ if let Some(fd) = self.read_fd.take() {
+ unsafe { libc::close(fd) };
+ }
+ let wfd = self.write_fd.swap(-1, Ordering::SeqCst);
+ if wfd >= 0 {
+ unsafe { libc::close(wfd) };
+ }
+
+ Ok(())
+ }
+
+ async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+ self.timer_count.fetch_add(1, Ordering::SeqCst);
+ Ok(())
+ }
+
+ fn timer_period(&self) -> Option<Duration> {
+ self.timer_period
+ }
+}
+
+/// Helper struct to access service counters from tests
+#[derive(Clone)]
+struct ServiceCounters {
+ init_count: Arc<AtomicU32>,
+ dispatch_count: Arc<AtomicU32>,
+ finalize_count: Arc<AtomicU32>,
+ timer_count: Arc<AtomicU32>,
+ should_fail_init: Arc<AtomicBool>,
+ should_fail_dispatch: Arc<AtomicBool>,
+ should_reinit: Arc<AtomicBool>,
+ write_fd: Arc<std::sync::atomic::AtomicI32>,
+}
+
+impl ServiceCounters {
+ fn init_count(&self) -> u32 {
+ self.init_count.load(Ordering::SeqCst)
+ }
+
+ fn dispatch_count(&self) -> u32 {
+ self.dispatch_count.load(Ordering::SeqCst)
+ }
+
+ fn finalize_count(&self) -> u32 {
+ self.finalize_count.load(Ordering::SeqCst)
+ }
+
+ fn timer_count(&self) -> u32 {
+ self.timer_count.load(Ordering::SeqCst)
+ }
+
+ fn set_fail_init(&self, fail: bool) {
+ self.should_fail_init.store(fail, Ordering::SeqCst);
+ }
+
+ fn set_fail_dispatch(&self, fail: bool) {
+ self.should_fail_dispatch.store(fail, Ordering::SeqCst);
+ }
+
+ fn set_reinit(&self, reinit: bool) {
+ self.should_reinit.store(reinit, Ordering::SeqCst);
+ }
+
+ fn trigger_event(&self) {
+ let wfd = self.write_fd.load(Ordering::SeqCst);
+ if wfd >= 0 {
+ unsafe {
+ libc::write(wfd, b"x".as_ptr() as *const _, 1);
+ }
+ }
+ }
+}
+
+// ===== FD-based Mock Service =====
+
+extern crate libc;
+
+// ===== Lifecycle Tests =====
+
+#[tokio::test]
+async fn test_service_lifecycle_basic() {
+ let service = MockService::new("test_service");
+ let counters = service.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should initialize within 5 seconds"
+ );
+
+ // Trigger a dispatch event
+ counters.trigger_event();
+
+ // Wait for dispatch
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should dispatch within 5 seconds after event"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+
+ // Service should be finalized
+ assert_eq!(
+ counters.finalize_count(),
+ 1,
+ "Service should be finalized exactly once"
+ );
+}
+
+#[tokio::test]
+async fn test_service_with_file_descriptor() {
+ let service = MockService::new("fd_service");
+ let counters = service.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() == 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should initialize once within 5 seconds"
+ );
+
+ // Trigger a dispatch event
+ counters.trigger_event();
+
+ // Wait for dispatch
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should dispatch within 5 seconds after event"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+
+ assert_eq!(counters.finalize_count(), 1, "Service should finalize once");
+}
+
+#[tokio::test]
+async fn test_service_initialization_failure() {
+ let service = MockService::new("failing_service");
+ let counters = service.counters();
+
+ // Make initialization fail
+ counters.set_fail_init(true);
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for several retry attempts (retry interval is 5 seconds)
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 3,
+ Duration::from_secs(15),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should retry initialization at least 3 times within 15 seconds"
+ );
+
+ // Dispatch should not run if init fails
+ assert_eq!(
+ counters.dispatch_count(),
+ 0,
+ "Service should not dispatch if init fails"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_initialization_recovery() {
+ let service = MockService::new("recovering_service");
+ let counters = service.counters();
+
+ // Start with failing initialization
+ counters.set_fail_init(true);
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for some failed attempts (retry interval is 5 seconds)
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 2,
+ Duration::from_secs(12),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Should have at least 2 failed initialization attempts within 12 seconds"
+ );
+
+ let failed_attempts = counters.init_count();
+
+ // Allow initialization to succeed
+ counters.set_fail_init(false);
+
+ // Wait for recovery
+ assert!(
+ wait_for_condition(
+ || counters.init_count() > failed_attempts,
+ Duration::from_secs(7),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should recover within 7 seconds"
+ );
+
+ // Trigger a dispatch event
+ counters.trigger_event();
+
+ // Wait for dispatch
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should dispatch after recovery"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+// ===== Dispatch Tests =====
+
+#[tokio::test]
+async fn test_service_dispatch_failure_triggers_reinit() {
+ let service = MockService::new("dispatch_fail_service");
+ let counters = service.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() == 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should initialize once within 5 seconds"
+ );
+
+ // Trigger a dispatch event
+ counters.trigger_event();
+
+ // Wait for first dispatch
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should dispatch within 5 seconds"
+ );
+
+ // Make dispatch fail
+ counters.set_fail_dispatch(true);
+
+ // Trigger another dispatch event
+ counters.trigger_event();
+
+ // Wait for dispatch failure and reinitialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 2 && counters.finalize_count() >= 1,
+ Duration::from_secs(10),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should reinitialize after dispatch failure within 10 seconds"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_dispatch_requests_reinit() {
+ let service = MockService::new("reinit_request_service");
+ let counters = service.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() == 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should initialize once within 5 seconds"
+ );
+
+ // Request reinitialization from dispatch
+ counters.set_reinit(true);
+
+ // Trigger a dispatch event
+ counters.trigger_event();
+
+ // Wait for reinitialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 2 && counters.finalize_count() >= 1,
+ Duration::from_secs(10),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should reinitialize and finalize when dispatch requests it within 10 seconds"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+// ===== FD-based Dispatch Tests =====
+
+#[tokio::test]
+async fn test_fd_dispatch_basic() {
+ let (service, counters) = SharedFdService::new("fd_service");
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should initialize within 5 seconds"
+ );
+
+ // Verify no dispatch happens without data on the pipe
+ sleep(Duration::from_millis(200)).await;
+ assert_eq!(
+ counters.dispatch_count(),
+ 0,
+ "FD service should not dispatch without data on pipe"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+/// FD service that shares write_fd via Arc<AtomicI32> so tests can trigger events
+struct SharedFdService {
+ name: String,
+ read_fd: Option<RawFd>,
+ write_fd: Arc<std::sync::atomic::AtomicI32>,
+ init_count: Arc<AtomicU32>,
+ dispatch_count: Arc<AtomicU32>,
+ finalize_count: Arc<AtomicU32>,
+ should_fail_dispatch: Arc<AtomicBool>,
+ should_reinit: Arc<AtomicBool>,
+}
+
+impl SharedFdService {
+ fn new(name: &str) -> (Self, SharedFdCounters) {
+ let write_fd = Arc::new(std::sync::atomic::AtomicI32::new(-1));
+ let init_count = Arc::new(AtomicU32::new(0));
+ let dispatch_count = Arc::new(AtomicU32::new(0));
+ let finalize_count = Arc::new(AtomicU32::new(0));
+ let should_fail_dispatch = Arc::new(AtomicBool::new(false));
+ let should_reinit = Arc::new(AtomicBool::new(false));
+
+ let counters = SharedFdCounters {
+ write_fd: write_fd.clone(),
+ init_count: init_count.clone(),
+ dispatch_count: dispatch_count.clone(),
+ finalize_count: finalize_count.clone(),
+ should_fail_dispatch: should_fail_dispatch.clone(),
+ should_reinit: should_reinit.clone(),
+ };
+
+ let service = Self {
+ name: name.to_string(),
+ read_fd: None,
+ write_fd,
+ init_count,
+ dispatch_count,
+ finalize_count,
+ should_fail_dispatch,
+ should_reinit,
+ };
+
+ (service, counters)
+ }
+}
+
+#[derive(Clone)]
+struct SharedFdCounters {
+ write_fd: Arc<std::sync::atomic::AtomicI32>,
+ init_count: Arc<AtomicU32>,
+ dispatch_count: Arc<AtomicU32>,
+ finalize_count: Arc<AtomicU32>,
+ should_fail_dispatch: Arc<AtomicBool>,
+ should_reinit: Arc<AtomicBool>,
+}
+
+impl SharedFdCounters {
+ fn init_count(&self) -> u32 {
+ self.init_count.load(Ordering::SeqCst)
+ }
+ fn dispatch_count(&self) -> u32 {
+ self.dispatch_count.load(Ordering::SeqCst)
+ }
+ fn finalize_count(&self) -> u32 {
+ self.finalize_count.load(Ordering::SeqCst)
+ }
+ fn trigger_event(&self) {
+ let fd = self.write_fd.load(Ordering::SeqCst);
+ if fd >= 0 {
+ unsafe {
+ libc::write(fd, b"x".as_ptr() as *const _, 1);
+ }
+ }
+ }
+ fn set_fail_dispatch(&self, fail: bool) {
+ self.should_fail_dispatch.store(fail, Ordering::SeqCst);
+ }
+ fn set_reinit(&self, reinit: bool) {
+ self.should_reinit.store(reinit, Ordering::SeqCst);
+ }
+}
+
+#[async_trait]
+impl Service for SharedFdService {
+ fn name(&self) -> &str {
+ &self.name
+ }
+
+ async fn initialize(&mut self) -> pmxcfs_services::Result<RawFd> {
+ self.init_count.fetch_add(1, Ordering::SeqCst);
+
+ let mut fds = [0i32; 2];
+ let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
+ if ret != 0 {
+ return Err(ServiceError::InitializationFailed(
+ "pipe() failed".to_string(),
+ ));
+ }
+
+ // Set read end to non-blocking (required for AsyncFd)
+ unsafe {
+ let flags = libc::fcntl(fds[0], libc::F_GETFL);
+ libc::fcntl(fds[0], libc::F_SETFL, flags | libc::O_NONBLOCK);
+ }
+
+ self.read_fd = Some(fds[0]);
+ self.write_fd.store(fds[1], Ordering::SeqCst);
+
+ Ok(fds[0])
+ }
+
+ async fn dispatch(&mut self) -> pmxcfs_services::Result<bool> {
+ self.dispatch_count.fetch_add(1, Ordering::SeqCst);
+
+ // Drain the pipe
+ if let Some(fd) = self.read_fd {
+ let mut buf = [0u8; 64];
+ unsafe {
+ libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len());
+ }
+ }
+
+ if self.should_fail_dispatch.load(Ordering::SeqCst) {
+ return Err(ServiceError::DispatchFailed(
+ "Mock fd dispatch failure".to_string(),
+ ));
+ }
+
+ if self.should_reinit.load(Ordering::SeqCst) {
+ return Ok(false); // false = reinitialize
+ }
+
+ Ok(true) // true = continue
+ }
+
+ async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+ self.finalize_count.fetch_add(1, Ordering::SeqCst);
+
+ if let Some(fd) = self.read_fd.take() {
+ unsafe { libc::close(fd) };
+ }
+ let wfd = self.write_fd.swap(-1, Ordering::SeqCst);
+ if wfd >= 0 {
+ unsafe { libc::close(wfd) };
+ }
+
+ Ok(())
+ }
+}
+
+#[tokio::test]
+async fn test_fd_dispatch_event_driven() {
+ let (service, counters) = SharedFdService::new("fd_event_service");
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should initialize within 5 seconds"
+ );
+
+ // No dispatch should happen without data
+ sleep(Duration::from_millis(200)).await;
+ assert_eq!(
+ counters.dispatch_count(),
+ 0,
+ "FD service should not dispatch without data"
+ );
+
+ // Trigger an event by writing to the pipe
+ counters.trigger_event();
+
+ // Wait for dispatch
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should dispatch after data is written to pipe"
+ );
+
+ // Trigger more events
+ counters.trigger_event();
+ counters.trigger_event();
+
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 2,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should handle multiple events"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+
+ assert!(
+ counters.finalize_count() >= 1,
+ "FD service should be finalized"
+ );
+}
+
+#[tokio::test]
+async fn test_fd_dispatch_failure_triggers_reinit() {
+ let (service, counters) = SharedFdService::new("fd_fail_service");
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should initialize"
+ );
+
+ // Trigger an event and verify dispatch works
+ counters.trigger_event();
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should dispatch"
+ );
+
+ // Make dispatch fail, then trigger event
+ counters.set_fail_dispatch(true);
+ counters.trigger_event();
+
+ // Wait for finalize + reinit
+ assert!(
+ wait_for_condition(
+ || counters.finalize_count() >= 1 && counters.init_count() >= 2,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should finalize and reinitialize after dispatch failure"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_fd_dispatch_reinit_request() {
+ let (service, counters) = SharedFdService::new("fd_reinit_service");
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should initialize"
+ );
+
+ // Request reinit from dispatch
+ counters.set_reinit(true);
+ counters.trigger_event();
+
+ // Wait for reinit
+ assert!(
+ wait_for_condition(
+ || counters.finalize_count() >= 1 && counters.init_count() >= 2,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "FD service should finalize and reinitialize on reinit request"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+// ===== Timer Callback Tests =====
+
+#[tokio::test]
+async fn test_service_timer_callback() {
+ let service = MockService::new("timer_service").with_timer(Duration::from_millis(300));
+ let counters = service.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization plus several timer periods
+ assert!(
+ wait_for_condition(
+ || counters.timer_count() >= 3,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Timer should fire at least 3 times within 5 seconds"
+ );
+
+ let timer_count = counters.timer_count();
+
+ // Wait for more timer invocations
+ assert!(
+ wait_for_condition(
+ || counters.timer_count() > timer_count,
+ Duration::from_secs(2),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Timer should continue firing"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_timer_callback_not_invoked_when_failed() {
+ let service = MockService::new("failed_timer_service").with_timer(Duration::from_millis(100));
+ let counters = service.counters();
+
+ // Make initialization fail
+ counters.set_fail_init(true);
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for several timer periods
+ sleep(Duration::from_millis(2000)).await;
+
+ // Timer should NOT fire if service is not running
+ assert_eq!(
+ counters.timer_count(),
+ 0,
+ "Timer should not fire when service is not running"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+// ===== Service Manager Tests =====
+
+#[tokio::test]
+async fn test_manager_multiple_services() {
+ let service1 = MockService::new("service1");
+ let service2 = MockService::new("service2");
+ let service3 = MockService::new("service3");
+
+ let counters1 = service1.counters();
+ let counters2 = service2.counters();
+ let counters3 = service3.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service1)).unwrap();
+ manager.add_service(Box::new(service2)).unwrap();
+ manager.add_service(Box::new(service3));
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters1.init_count() == 1
+ && counters2.init_count() == 1
+ && counters3.init_count() == 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "All services should initialize within 5 seconds"
+ );
+
+ // Trigger dispatch events for all services
+ counters1.trigger_event();
+ counters2.trigger_event();
+ counters3.trigger_event();
+
+ // Wait for dispatch
+ assert!(
+ wait_for_condition(
+ || counters1.dispatch_count() >= 1
+ && counters2.dispatch_count() >= 1
+ && counters3.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "All services should dispatch within 5 seconds after events"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+
+ // All services should be finalized
+ assert_eq!(counters1.finalize_count(), 1, "Service1 should finalize");
+ assert_eq!(counters2.finalize_count(), 1, "Service2 should finalize");
+ assert_eq!(counters3.finalize_count(), 1, "Service3 should finalize");
+}
+
+#[tokio::test]
+async fn test_manager_duplicate_service_name() {
+ let service1 = MockService::new("duplicate");
+ let service2 = MockService::new("duplicate");
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service1)).unwrap();
+ let result = manager.add_service(Box::new(service2));
+ assert!(result.is_err(), "Should return error for duplicate service");
+}
+
+#[tokio::test]
+async fn test_manager_partial_service_failure() {
+ let service1 = MockService::new("working_service");
+ let service2 = MockService::new("failing_service");
+
+ let counters1 = service1.counters();
+ let counters2 = service2.counters();
+
+ // Make service2 fail
+ counters2.set_fail_init(true);
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service1)).unwrap();
+ manager.add_service(Box::new(service2)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for service1 initialization
+ assert!(
+ wait_for_condition(
+ || counters1.init_count() == 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service1 should initialize within 5 seconds"
+ );
+
+ // Trigger event for service1
+ counters1.trigger_event();
+
+ // Wait for service1 dispatch and service2 retries
+ assert!(
+ wait_for_condition(
+ || counters1.dispatch_count() >= 1 && counters2.init_count() >= 2,
+ Duration::from_secs(12),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service1 should work normally and Service2 should retry within 12 seconds"
+ );
+
+ // Service2 should not dispatch when failing
+ assert_eq!(
+ counters2.dispatch_count(),
+ 0,
+ "Service2 should not dispatch when failing"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+
+ // Service1 should finalize
+ assert_eq!(counters1.finalize_count(), 1, "Service1 should finalize");
+ // Service2 is also finalized unconditionally during shutdown (matching C behavior)
+ assert_eq!(
+ counters2.finalize_count(),
+ 1,
+ "Service2 should also be finalized during shutdown (idempotent finalize)"
+ );
+}
+
+// ===== Error Handling Tests =====
+
+#[tokio::test]
+async fn test_service_error_count_tracking() {
+ let service = MockService::new("error_tracking_service");
+ let counters = service.counters();
+
+ // Make initialization fail
+ counters.set_fail_init(true);
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for multiple failures (retry interval is 5 seconds)
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 3,
+ Duration::from_secs(15),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Should accumulate at least 3 failures within 15 seconds"
+ );
+
+ // Allow recovery
+ counters.set_fail_init(false);
+
+ // Wait for successful initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 4,
+ Duration::from_secs(7),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should recover within 7 seconds"
+ );
+
+ // Trigger a dispatch event
+ counters.trigger_event();
+
+ // Wait for dispatch
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should dispatch after recovery"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_graceful_shutdown() {
+ let service = MockService::new("shutdown_test");
+ let counters = service.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should initialize within 5 seconds"
+ );
+
+ // Trigger a dispatch event
+ counters.trigger_event();
+
+ // Wait for service to be running
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should be running within 5 seconds"
+ );
+
+ // Graceful shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+
+ // Service should be properly finalized
+ assert_eq!(
+ counters.finalize_count(),
+ 1,
+ "Service should finalize during shutdown"
+ );
+}
+
+// ===== Concurrency Tests =====
+
+#[tokio::test]
+async fn test_service_concurrent_operations() {
+ let service = MockService::new("concurrent_service").with_timer(Duration::from_millis(200));
+ let counters = service.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should initialize within 5 seconds"
+ );
+
+ // Trigger multiple dispatch events
+ for _ in 0..5 {
+ counters.trigger_event();
+ sleep(Duration::from_millis(50)).await;
+ }
+
+ // Wait for service to run with both dispatch and timer
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 3 && counters.timer_count() >= 3,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should handle concurrent dispatch and timer events within 5 seconds"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
+
+#[tokio::test]
+async fn test_service_state_consistency_after_reinit() {
+ let service = MockService::new("consistency_service");
+ let counters = service.counters();
+
+ let mut manager = ServiceManager::new();
+ manager.add_service(Box::new(service)).unwrap();
+
+ let shutdown_token = manager.shutdown_token();
+ let handle = manager.spawn();
+
+ // Wait for initialization
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should initialize within 5 seconds"
+ );
+
+ // Trigger reinitialization
+ counters.set_reinit(true);
+ counters.trigger_event();
+
+ // Wait for reinit
+ assert!(
+ wait_for_condition(
+ || counters.init_count() >= 2,
+ Duration::from_secs(10),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should reinitialize within 10 seconds"
+ );
+
+ // Clear reinit flag
+ counters.set_reinit(false);
+
+ // Trigger a dispatch event
+ counters.trigger_event();
+
+ // Wait for dispatch
+ assert!(
+ wait_for_condition(
+ || counters.dispatch_count() >= 1,
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ )
+ .await,
+ "Service should dispatch after reinit"
+ );
+
+ // Shutdown
+ shutdown_token.cancel();
+ let _ = handle.await;
+}
--
2.47.3
next prev parent reply other threads:[~2026-02-13 9:42 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-02-13 9:33 [PATCH pve-cluster 00/14 v2] Rewrite pmxcfs with Rust Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 01/14 v2] pmxcfs-rs: add Rust workspace configuration Kefu Chai
2026-02-18 10:41 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 02/14 v2] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-02-18 15:06 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 03/14 v2] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-02-18 16:41 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 04/14 v2] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 06/14 v2] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 07/14 v2] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-02-13 9:33 ` Kefu Chai [this message]
2026-02-13 9:33 ` [PATCH pve-cluster 09/14 v2] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 10/14 v2] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 11/14 v2] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 12/14 v2] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 14/14 v2] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260213094119.2379288-9-k.chai@proxmox.com \
--to=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox