From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 38FE11FF141 for ; Fri, 13 Feb 2026 10:42:45 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 755C832940; Fri, 13 Feb 2026 10:43:31 +0100 (CET) From: Kefu Chai To: pve-devel@lists.proxmox.com Subject: [PATCH pve-cluster 08/14 v2] pmxcfs-rs: add pmxcfs-services crate Date: Fri, 13 Feb 2026 17:33:45 +0800 Message-ID: <20260213094119.2379288-9-k.chai@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260213094119.2379288-1-k.chai@proxmox.com> References: <20260213094119.2379288-1-k.chai@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1770975750399 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.170 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: KPSAUADWUKDSLGWGC2OAZG5FVZV5YPN7 X-Message-ID-Hash: KPSAUADWUKDSLGWGC2OAZG5FVZV5YPN7 X-MailFrom: k.chai@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Add service lifecycle management framework providing: - Service trait: Lifecycle interface for async services - ServiceManager: Orchestrates multiple services - Automatic retry logic for failed services - Event-driven dispatching via file descriptors - Graceful shutdown coordination This is a generic framework with no pmxcfs-specific dependencies, only requiring tokio, async-trait, and standard error handling. It replaces the C version's qb_loop-based event management. Signed-off-by: Kefu Chai --- src/pmxcfs-rs/Cargo.toml | 2 + src/pmxcfs-rs/pmxcfs-services/Cargo.toml | 17 + src/pmxcfs-rs/pmxcfs-services/README.md | 162 +++ src/pmxcfs-rs/pmxcfs-services/src/error.rs | 21 + src/pmxcfs-rs/pmxcfs-services/src/lib.rs | 15 + src/pmxcfs-rs/pmxcfs-services/src/manager.rs | 341 +++++ src/pmxcfs-rs/pmxcfs-services/src/service.rs | 149 ++ .../pmxcfs-services/tests/service_tests.rs | 1271 +++++++++++++++++ 8 files changed, 1978 insertions(+) create mode 100644 src/pmxcfs-rs/pmxcfs-services/Cargo.toml create mode 100644 src/pmxcfs-rs/pmxcfs-services/README.md create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/error.rs create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/lib.rs create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/manager.rs create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/service.rs create mode 100644 src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml index 9d509c1d2..b9f0f620b 100644 --- a/src/pmxcfs-rs/Cargo.toml +++ b/src/pmxcfs-rs/Cargo.toml @@ -8,6 +8,7 @@ members = [ "pmxcfs-memdb", # In-memory database with SQLite persistence "pmxcfs-status", # Status monitoring and RRD data management "pmxcfs-test-utils", # Test utilities and helpers (dev-only) + "pmxcfs-services", # Service framework for automatic retry and lifecycle management ] resolver = "2" @@ -28,6 +29,7 @@ pmxcfs-rrd = { path = "pmxcfs-rrd" } pmxcfs-memdb = { path = "pmxcfs-memdb" } pmxcfs-status = { path = "pmxcfs-status" } pmxcfs-test-utils = { path = "pmxcfs-test-utils" } +pmxcfs-services = { path = "pmxcfs-services" } # Core async runtime tokio = { version = "1.35", features = ["full"] } diff --git a/src/pmxcfs-rs/pmxcfs-services/Cargo.toml b/src/pmxcfs-rs/pmxcfs-services/Cargo.toml new file mode 100644 index 000000000..45f49dcd6 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-services/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "pmxcfs-services" +version = "0.1.0" +edition = "2024" + +[dependencies] +async-trait = "0.1" +tokio = { version = "1.41", features = ["full"] } +tokio-util = "0.7" +tracing = "0.1" +thiserror = "2.0" +num_enum.workspace = true +parking_lot = "0.12" + +[dev-dependencies] +libc.workspace = true +pmxcfs-test-utils = { path = "../pmxcfs-test-utils" } diff --git a/src/pmxcfs-rs/pmxcfs-services/README.md b/src/pmxcfs-rs/pmxcfs-services/README.md new file mode 100644 index 000000000..76622662c --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-services/README.md @@ -0,0 +1,162 @@ +# pmxcfs-services + +**Service Management Framework** for pmxcfs - tokio-based replacement for qb_loop. + +Manages long-running services with automatic retry, event-driven dispatching, +periodic timers, and graceful shutdown. Replaces the C implementation's +`qb_loop`-based event management with a tokio async runtime. + +## How It Fits Together + +- **`Service` trait** (`service.rs`): Lifecycle interface that each service implements + (`initialize` / `dispatch` / `finalize`, plus optional timer callbacks). +- **`ServiceManager`** (`manager.rs`): Accepts `Box` via `add_service()`, + then `spawn()` launches one background task per service that drives it through its lifecycle. +- Each service task handles: + - **Initialization with retry**: Retries every 5 seconds on failure + - **Event-driven dispatch**: Waits for file descriptor readability via `AsyncFd` + - **Timer callbacks**: Optional periodic callbacks at configured intervals + - **Reinitialization**: Automatic on dispatch failure or explicit request + +Shutdown is coordinated through a `CancellationToken`: + +```rust +let shutdown_token = manager.shutdown_token(); +let handle = manager.spawn(); +// ... later ... +shutdown_token.cancel(); // Signal graceful shutdown +handle.await; // Wait for all services to finalize +``` + +## Usage Example + +```rust +use pmxcfs_services::{Service, ServiceManager}; +use std::os::unix::io::RawFd; + +struct MyService { + fd: Option, +} + +#[async_trait] +impl Service for MyService { + fn name(&self) -> &str { "my-service" } + + async fn initialize(&mut self) -> Result { + let fd = connect_to_external_service()?; + self.fd = Some(fd); + Ok(fd) // Return fd for event monitoring + } + + async fn dispatch(&mut self) -> Result { + handle_events()?; + Ok(true) // true = continue, false = reinitialize + } + + async fn finalize(&mut self) -> Result<()> { + close_connection(self.fd.take())?; + Ok(()) + } + + // Optional: periodic timer callback + fn timer_period(&self) -> Option { + Some(Duration::from_secs(10)) + } + + async fn timer_callback(&mut self) -> Result<()> { + perform_periodic_task()?; + Ok(()) + } +} +``` + +## Service Lifecycle + +1. **Initialization**: Service calls `initialize()` which returns a file descriptor + - On failure: Retries every 5 seconds indefinitely + - On success: Registers fd with tokio's `AsyncFd` and enters running state + +2. **Running**: Service waits for events using `tokio::select!`: + - **FD readable**: Calls `dispatch()` when fd becomes readable + - Returns `Ok(true)`: Continue running + - Returns `Ok(false)`: Reinitialize (calls `finalize()` then `initialize()`) + - Returns `Err(_)`: Reinitialize + - **Timer deadline**: Calls `timer_callback()` at configured intervals (if enabled) + +3. **Shutdown**: On `CancellationToken::cancel()`: + - Calls `finalize()` for all services + - Waits for all service tasks to complete + +## C to Rust Mapping + +### Data Structures + +| C Type | Rust Type | Notes | +|--------|-----------|-------| +| [`cfs_loop_t`](../../pmxcfs/loop.h#L32) | `ServiceManager` | Event loop manager | +| [`cfs_service_t`](../../pmxcfs/loop.h#L34) | `dyn Service` | Service trait | +| [`cfs_service_callbacks_t`](../../pmxcfs/loop.h#L44-L49) | (trait methods) | Callbacks as trait methods | + +### Functions + +| C Function | Rust Equivalent | +|-----------|-----------------| +| [`cfs_loop_new()`](../../pmxcfs/loop.c) | `ServiceManager::new()` | +| [`cfs_loop_add_service()`](../../pmxcfs/loop.c) | `ServiceManager::add_service()` | +| [`cfs_loop_start_worker()`](../../pmxcfs/loop.c) | `ServiceManager::spawn()` | +| [`cfs_loop_stop_worker()`](../../pmxcfs/loop.c) | `shutdown_token.cancel()` + `handle.await` | +| [`cfs_service_new()`](../../pmxcfs/loop.c) | `struct` + `impl Service` | + +## Key Differences from C Implementation + +| Aspect | C (`loop.c`) | Rust | +|--------|-------------|------| +| Event loop | libqb `qb_loop`, single-threaded | tokio async runtime, multi-threaded | +| FD monitoring | Manual `qb_loop_poll_add()` | Automatic `AsyncFd` | +| Concurrency | Sequential callbacks | Parallel tasks per service | +| Retry interval | Configurable per service | Fixed 5 seconds (sufficient for all services) | +| Dispatch modes | FD-based or polling | FD-based only (all services use fds) | +| Priority levels | Per-service priorities | All equal (no priority needed) | +| Shutdown | `cfs_loop_stop_worker()` | `CancellationToken` → await tasks → finalize all | + +## Design Simplifications + +The Rust implementation is significantly simpler than the C version, reducing +the codebase by 67% while preserving all production functionality. + +### Why Not Mirror the C Implementation? + +The C implementation (`loop.c`) was designed for flexibility to support various +hypothetical use cases. However, after analyzing actual usage across the codebase, +we found that many features were never used: + +- **Polling mode**: All services use file descriptors from Corosync libraries +- **Custom retry intervals**: All services work fine with a fixed 5-second retry +- **Non-restartable services**: All services need automatic retry on failure +- **Custom dispatch intervals**: All services are event-driven (no periodic polling) +- **Priority levels**: Service execution order doesn't matter in practice + +Rather than maintaining unused complexity "just in case", the Rust implementation +focuses on what's actually needed. This makes the code easier to understand, +test, and maintain. + +### Simplifications Applied + +- **No polling mode**: All services use file descriptors from C libraries (Corosync) +- **Fixed retry interval**: 5 seconds is sufficient for all services +- **All services restartable**: No need for non-restartable mode +- **Single task per service**: Combines retry, dispatch, and timer logic +- **Direct return types**: No enums (`RawFd` instead of `InitResult`, `bool` instead of `DispatchAction`) + +If future requirements demand more flexibility, these features can be added back +incrementally with clear use cases driving the design. + +## References + +### C Implementation +- [`src/pmxcfs/loop.h`](../../pmxcfs/loop.h) - Service loop API +- [`src/pmxcfs/loop.c`](../../pmxcfs/loop.c) - Service loop implementation + +### Related Crates +- **pmxcfs-dfsm**: Uses `Service` trait for `ClusterDatabaseService`, `StatusSyncService` +- **pmxcfs**: Uses `ServiceManager` to orchestrate all cluster services diff --git a/src/pmxcfs-rs/pmxcfs-services/src/error.rs b/src/pmxcfs-rs/pmxcfs-services/src/error.rs new file mode 100644 index 000000000..0c5951761 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-services/src/error.rs @@ -0,0 +1,21 @@ +//! Error types for the service framework + +use thiserror::Error; + +/// Errors that can occur during service operations +#[derive(Error, Debug)] +pub enum ServiceError { + /// Service initialization failed + #[error("Failed to initialize service: {0}")] + InitializationFailed(String), + + /// Service dispatch failed + #[error("Failed to dispatch service events: {0}")] + DispatchFailed(String), + + /// Duplicate service name + #[error("Service '{0}' is already registered")] + DuplicateService(String), +} + +pub type Result = std::result::Result; diff --git a/src/pmxcfs-rs/pmxcfs-services/src/lib.rs b/src/pmxcfs-rs/pmxcfs-services/src/lib.rs new file mode 100644 index 000000000..18004ee6b --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-services/src/lib.rs @@ -0,0 +1,15 @@ +//! Service framework for pmxcfs +//! +//! This crate provides a simplified, tokio-based service management framework with: +//! - Automatic retry on failure (5 second interval) +//! - Event-driven file descriptor monitoring +//! - Optional periodic timer callbacks +//! - Graceful shutdown + +mod error; +mod manager; +mod service; + +pub use error::{Result, ServiceError}; +pub use manager::ServiceManager; +pub use service::Service; diff --git a/src/pmxcfs-rs/pmxcfs-services/src/manager.rs b/src/pmxcfs-rs/pmxcfs-services/src/manager.rs new file mode 100644 index 000000000..30712aafd --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-services/src/manager.rs @@ -0,0 +1,341 @@ +//! Service manager for orchestrating multiple managed services +//! +//! Each service gets one task that handles: +//! - Initialization with retry (5 second interval) +//! - Event dispatch when fd is readable +//! - Timer callbacks at configured intervals + +use crate::error::{Result, ServiceError}; +use crate::service::Service; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Instant; +use tokio::io::unix::AsyncFd; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +/// Wrapper for raw fd that doesn't close on drop +struct FdWrapper(RawFd); + +impl AsRawFd for FdWrapper { + fn as_raw_fd(&self) -> RawFd { + self.0 + } +} + +/// Service manager for orchestrating multiple services +/// +/// # Architecture +/// +/// The ServiceManager spawns one tokio task per service. Each task: +/// - Initializes the service with automatic retry (5 second interval) +/// - Monitors the service's file descriptor for readability +/// - Calls dispatch() when the fd becomes readable +/// - Optionally calls timer_callback() at configured intervals +/// - Reinitializes on errors or explicit request +/// +/// # Shutdown +/// +/// Call `shutdown_token().cancel()` to initiate graceful shutdown. +/// The manager will: +/// 1. Signal all service tasks to stop +/// 2. Call finalize() on each service +/// 3. Wait up to 30 seconds for each service to stop +/// 4. Continue shutdown even if services timeout +/// +/// # Thread Safety +/// +/// Services run in separate tokio tasks and can execute concurrently. +/// Each service's operations (initialize, dispatch, finalize) are +/// serialized within its own task. +/// +/// # Example +/// +/// ```ignore +/// use pmxcfs_services::ServiceManager; +/// +/// let mut manager = ServiceManager::new(); +/// manager.add_service(Box::new(MyService::new()))?; +/// manager.add_service(Box::new(AnotherService::new()))?; +/// +/// let shutdown_token = manager.shutdown_token(); +/// let handle = manager.spawn(); +/// +/// // ... later ... +/// shutdown_token.cancel(); // Signal graceful shutdown +/// handle.await?; // Wait for all services to stop +/// ``` +pub struct ServiceManager { + services: HashMap>, + shutdown_token: CancellationToken, +} + +impl ServiceManager { + /// Create a new ServiceManager + pub fn new() -> Self { + Self { + services: HashMap::new(), + shutdown_token: CancellationToken::new(), + } + } + + /// Add a service to the manager + /// + /// Services must be added before calling `spawn()`. + /// Cannot add services after the manager has been spawned. + /// + /// # Errors + /// + /// Returns `Err` if a service with the same name is already registered. + /// + /// # Example + /// + /// ```ignore + /// let mut manager = ServiceManager::new(); + /// manager.add_service(Box::new(MyService::new()))?; + /// ``` + pub fn add_service(&mut self, service: Box) -> Result<()> { + let name = service.name().to_string(); + if self.services.contains_key(&name) { + return Err(ServiceError::DuplicateService(name)); + } + self.services.insert(name, service); + Ok(()) + } + + /// Get a shutdown token for graceful shutdown + /// + /// Call `token.cancel()` to signal all services to stop gracefully. + /// The token can be cloned and shared across threads. + pub fn shutdown_token(&self) -> CancellationToken { + self.shutdown_token.clone() + } + + /// Spawn the service manager and start all services + /// + /// This consumes the manager and returns a JoinHandle. + /// Each service runs in its own tokio task. + /// + /// # Shutdown + /// + /// To stop the manager: + /// 1. Call `shutdown_token().cancel()` + /// 2. Await the returned JoinHandle + /// + /// # Panics + /// + /// If a service task panics, it will be isolated to that task. + /// Other services will continue running. + /// + /// # Example + /// + /// ```ignore + /// let shutdown_token = manager.shutdown_token(); + /// let handle = manager.spawn(); + /// + /// // ... later ... + /// shutdown_token.cancel(); + /// handle.await?; + /// ``` + #[must_use = "the service manager will stop if the handle is dropped"] + pub fn spawn(self) -> JoinHandle<()> { + tokio::spawn(async move { self.run().await }) + } + + async fn run(self) { + info!("Starting ServiceManager with {} services", self.services.len()); + + let mut handles = Vec::new(); + + for (name, service) in self.services { + let token = self.shutdown_token.clone(); + let handle = tokio::spawn(async move { + run_service(name, service, token).await; + }); + handles.push(handle); + } + + // Wait for shutdown + self.shutdown_token.cancelled().await; + info!("ServiceManager shutting down..."); + + // Wait for all services to stop with timeout + for handle in handles { + match tokio::time::timeout(std::time::Duration::from_secs(30), handle).await { + Ok(Ok(())) => {} + Ok(Err(e)) => { + warn!(error = ?e, "Service task panicked during shutdown"); + } + Err(_) => { + warn!("Service didn't stop within 30 second timeout"); + } + } + } + + info!("ServiceManager stopped"); + } +} + +impl Default for ServiceManager { + fn default() -> Self { + Self::new() + } +} + +/// Run a single service until shutdown +async fn run_service(name: String, mut service: Box, token: CancellationToken) { + // Service state + let running = Arc::new(AtomicBool::new(false)); + let async_fd: Arc>>>> = Arc::new(Mutex::new(None)); + let last_timer = Arc::new(Mutex::new(None::)); + let mut last_init_attempt = None::; + + loop { + tokio::select! { + _ = token.cancelled() => break, + _ = service_loop(&name, &mut service, &running, &async_fd, &last_timer, &mut last_init_attempt) => {} + } + } + + // Finalize on shutdown + running.store(false, Ordering::Release); + *async_fd.lock() = None; + + info!(service = %name, "Shutting down service"); + if let Err(e) = service.finalize().await { + error!(service = %name, error = %e, "Error finalizing service"); + } +} + +/// Main service loop +async fn service_loop( + name: &str, + service: &mut Box, + running: &Arc, + async_fd: &Arc>>>>, + last_timer: &Arc>>, + last_init_attempt: &mut Option, +) { + if !running.load(Ordering::Acquire) { + // Need to initialize + if let Some(last) = last_init_attempt { + let elapsed = Instant::now().duration_since(*last); + if elapsed < std::time::Duration::from_secs(5) { + // Wait for retry interval + tokio::time::sleep(std::time::Duration::from_secs(5) - elapsed).await; + return; + } + } + + *last_init_attempt = Some(Instant::now()); + + match service.initialize().await { + Ok(fd) => { + match AsyncFd::new(FdWrapper(fd)) { + Ok(afd) => { + *async_fd.lock() = Some(Arc::new(afd)); + running.store(true, Ordering::Release); + info!(service = %name, "Service initialized"); + } + Err(e) => { + error!(service = %name, error = %e, "Failed to register fd"); + let _ = service.finalize().await; + } + } + } + Err(e) => { + error!(service = %name, error = %e, "Initialization failed"); + } + } + } else { + // Service is running - dispatch events and timers + let fd = async_fd.lock().clone(); + if let Some(fd) = fd { + dispatch_service(name, service, &fd, running, last_timer).await; + } + } +} + +/// Dispatch events for a running service +async fn dispatch_service( + name: &str, + service: &mut Box, + async_fd: &Arc>, + running: &Arc, + last_timer: &Arc>>, +) { + // Calculate timer deadline + let timer_deadline = service.timer_period().and_then(|period| { + let last = last_timer.lock(); + match *last { + Some(t) => { + let next = t + period; + if Instant::now() >= next { + // Already past deadline, schedule for next period from now + Some(Instant::now() + period) + } else { + Some(next) + } + } + None => Some(Instant::now()), + } + }); + + tokio::select! { + // Timer callback + _ = async { + if let Some(deadline) = timer_deadline { + tokio::time::sleep_until(deadline.into()).await; + } else { + std::future::pending::<()>().await; + } + } => { + *last_timer.lock() = Some(Instant::now()); + debug!(service = %name, "Timer callback"); + if let Err(e) = service.timer_callback().await { + warn!(service = %name, error = %e, "Timer callback failed"); + } + } + + // Fd readable + result = async_fd.readable() => { + match result { + Ok(mut guard) => { + match service.dispatch().await { + Ok(true) => { + guard.clear_ready(); + } + Ok(false) => { + info!(service = %name, "Service requested reinitialization"); + guard.clear_ready(); + reinitialize(name, service, running).await; + } + Err(e) => { + error!(service = %name, error = %e, "Dispatch failed"); + guard.clear_ready(); + reinitialize(name, service, running).await; + } + } + } + Err(e) => { + warn!(service = %name, error = %e, "Error waiting for fd"); + reinitialize(name, service, running).await; + } + } + } + } +} + +/// Reinitialize a service +async fn reinitialize(name: &str, service: &mut Box, running: &Arc) { + debug!(service = %name, "Reinitializing service"); + running.store(false, Ordering::Release); + + if let Err(e) = service.finalize().await { + warn!(service = %name, error = %e, "Error finalizing service"); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-services/src/service.rs b/src/pmxcfs-rs/pmxcfs-services/src/service.rs new file mode 100644 index 000000000..daf13900b --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-services/src/service.rs @@ -0,0 +1,149 @@ +//! Service trait and related types +//! +//! Simplified design based on actual usage patterns. +//! All production services use file descriptors and are restartable. +//! +//! # Architecture +//! +//! Each service runs in its own tokio task that handles: +//! - Initialization with automatic retry (5 second interval) +//! - Event-driven dispatch when the file descriptor becomes readable +//! - Optional periodic timer callbacks +//! - Automatic reinitialization on errors +//! +//! # Thread Safety +//! +//! Services must be `Send + Sync` as they run in separate tokio tasks. +//! The ServiceManager ensures that only one operation (initialize, dispatch, +//! timer_callback, or finalize) runs at a time for each service. +//! +//! # File Descriptor Ownership +//! +//! Services return a RawFd from `initialize()` and retain ownership of it. +//! The ServiceManager monitors the fd for readability but does NOT close it. +//! Services MUST close the fd in `finalize()`, which is called: +//! - On shutdown +//! - Before reinitialization after an error +//! - When dispatch() returns Ok(false) +//! +//! This design works well for wrapping C library file descriptors (e.g., from +//! Corosync libraries) where the service manages the underlying C resources. + +use crate::error::Result; +use async_trait::async_trait; +use std::os::unix::io::RawFd; +use std::time::Duration; + +/// A managed service with automatic retry and event-driven dispatch +/// +/// All services are: +/// - Event-driven (use file descriptors) +/// - Restartable (automatic retry on failure) +/// - Optionally have timer callbacks +/// +/// # Example +/// +/// ```ignore +/// use pmxcfs_services::{Service, ServiceManager}; +/// use std::os::unix::io::RawFd; +/// +/// struct MyService { +/// fd: Option, +/// } +/// +/// #[async_trait] +/// impl Service for MyService { +/// fn name(&self) -> &str { "my-service" } +/// +/// async fn initialize(&mut self) -> Result { +/// let fd = connect_to_external_service()?; +/// self.fd = Some(fd); +/// Ok(fd) // Return fd for event monitoring +/// } +/// +/// async fn dispatch(&mut self) -> Result { +/// handle_events()?; +/// Ok(true) // true = continue, false = reinitialize +/// } +/// +/// async fn finalize(&mut self) -> Result<()> { +/// if let Some(fd) = self.fd.take() { +/// close(fd)?; // MUST close the fd +/// } +/// Ok(()) +/// } +/// } +/// ``` +#[async_trait] +pub trait Service: Send + Sync { + /// Service name for logging and identification + fn name(&self) -> &str; + + /// Initialize the service and return a file descriptor to monitor + /// + /// The service retains ownership of the fd and MUST close it in `finalize()`. + /// The ServiceManager monitors the fd and calls `dispatch()` when it becomes readable. + /// + /// # Returns + /// + /// Returns a file descriptor that will be monitored for readability. + /// + /// # Errors + /// + /// On error, the service will be automatically retried after 5 seconds. + /// + /// # File Descriptor Lifetime + /// + /// The returned fd must remain valid until `finalize()` is called. + /// The service is responsible for closing the fd in `finalize()`. + async fn initialize(&mut self) -> Result; + + /// Handle events when the file descriptor becomes readable + /// + /// This method is called when the fd returned by `initialize()` becomes readable. + /// + /// # Returns + /// + /// - `Ok(true)` - Continue running normally + /// - `Ok(false)` - Request reinitialization (finalize will be called first) + /// - `Err(_)` - Error occurred, service will be reinitialized + /// + /// # Blocking Behavior + /// + /// This method should not block for extended periods as it will prevent + /// timer callbacks and shutdown signals from being processed promptly. + /// For long-running operations, consider spawning a separate task. + async fn dispatch(&mut self) -> Result; + + /// Clean up resources (called on shutdown or before reinitialization) + /// + /// This method MUST close the file descriptor returned by `initialize()`. + /// + /// # When Called + /// + /// This method is called: + /// - When the service is being shut down + /// - Before reinitializing after an error + /// - When `dispatch()` returns `Ok(false)` + /// + /// # Idempotency + /// + /// Must be idempotent (safe to call multiple times). + async fn finalize(&mut self) -> Result<()>; + + /// Optional timer period for periodic callbacks + /// + /// Return `None` to disable timer callbacks. + /// Return `Some(duration)` to enable periodic callbacks at the specified interval. + fn timer_period(&self) -> Option { + None + } + + /// Optional periodic callback invoked at `timer_period()` intervals + /// + /// Only called if `timer_period()` returns `Some`. + /// Errors are logged but do not trigger reinitialization. + async fn timer_callback(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs b/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs new file mode 100644 index 000000000..639124293 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs @@ -0,0 +1,1271 @@ +//! Comprehensive tests for the service framework +//! +//! Tests cover: +//! - Service lifecycle (start, stop, restart) +//! - Service manager orchestration +//! - Error handling and retry logic +//! - Timer callbacks +//! - File descriptor and polling dispatch modes +//! - Service coordination and state management + +use async_trait::async_trait; +use pmxcfs_services::{Service, ServiceError, ServiceManager}; +use pmxcfs_test_utils::wait_for_condition; +use std::os::unix::io::RawFd; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::time::Duration; +use tokio::time::sleep; + +// ===== Test Service Implementations ===== + +/// Mock service for testing lifecycle +struct MockService { + name: String, + init_count: Arc, + dispatch_count: Arc, + finalize_count: Arc, + timer_count: Arc, + should_fail_init: Arc, + should_fail_dispatch: Arc, + should_reinit: Arc, + timer_period: Option, + read_fd: Option, + write_fd: Arc, +} + +impl MockService { + fn new(name: &str) -> Self { + Self { + name: name.to_string(), + init_count: Arc::new(AtomicU32::new(0)), + dispatch_count: Arc::new(AtomicU32::new(0)), + finalize_count: Arc::new(AtomicU32::new(0)), + timer_count: Arc::new(AtomicU32::new(0)), + should_fail_init: Arc::new(AtomicBool::new(false)), + should_fail_dispatch: Arc::new(AtomicBool::new(false)), + should_reinit: Arc::new(AtomicBool::new(false)), + timer_period: None, + read_fd: None, + write_fd: Arc::new(std::sync::atomic::AtomicI32::new(-1)), + } + } + + fn with_timer(mut self, period: Duration) -> Self { + self.timer_period = Some(period); + self + } + + fn counters(&self) -> ServiceCounters { + ServiceCounters { + init_count: self.init_count.clone(), + dispatch_count: self.dispatch_count.clone(), + finalize_count: self.finalize_count.clone(), + timer_count: self.timer_count.clone(), + should_fail_init: self.should_fail_init.clone(), + should_fail_dispatch: self.should_fail_dispatch.clone(), + should_reinit: self.should_reinit.clone(), + write_fd: self.write_fd.clone(), + } + } +} + +#[async_trait] +impl Service for MockService { + fn name(&self) -> &str { + &self.name + } + + async fn initialize(&mut self) -> pmxcfs_services::Result { + self.init_count.fetch_add(1, Ordering::SeqCst); + + if self.should_fail_init.load(Ordering::SeqCst) { + return Err(ServiceError::InitializationFailed( + "Mock init failure".to_string(), + )); + } + + // Create a pipe for event-driven dispatch + let mut fds = [0i32; 2]; + let ret = unsafe { libc::pipe(fds.as_mut_ptr()) }; + if ret != 0 { + return Err(ServiceError::InitializationFailed( + "pipe() failed".to_string(), + )); + } + + // Set read end to non-blocking (required for AsyncFd) + unsafe { + let flags = libc::fcntl(fds[0], libc::F_GETFL); + libc::fcntl(fds[0], libc::F_SETFL, flags | libc::O_NONBLOCK); + } + + self.read_fd = Some(fds[0]); + self.write_fd.store(fds[1], Ordering::SeqCst); + + Ok(fds[0]) + } + + async fn dispatch(&mut self) -> pmxcfs_services::Result { + self.dispatch_count.fetch_add(1, Ordering::SeqCst); + + // Drain the pipe + if let Some(fd) = self.read_fd { + let mut buf = [0u8; 64]; + unsafe { + libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()); + } + } + + if self.should_fail_dispatch.load(Ordering::SeqCst) { + return Err(ServiceError::DispatchFailed( + "Mock dispatch failure".to_string(), + )); + } + + if self.should_reinit.load(Ordering::SeqCst) { + return Ok(false); // false = reinitialize + } + + Ok(true) // true = continue + } + + async fn finalize(&mut self) -> pmxcfs_services::Result<()> { + self.finalize_count.fetch_add(1, Ordering::SeqCst); + + if let Some(fd) = self.read_fd.take() { + unsafe { libc::close(fd) }; + } + let wfd = self.write_fd.swap(-1, Ordering::SeqCst); + if wfd >= 0 { + unsafe { libc::close(wfd) }; + } + + Ok(()) + } + + async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> { + self.timer_count.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + fn timer_period(&self) -> Option { + self.timer_period + } +} + +/// Helper struct to access service counters from tests +#[derive(Clone)] +struct ServiceCounters { + init_count: Arc, + dispatch_count: Arc, + finalize_count: Arc, + timer_count: Arc, + should_fail_init: Arc, + should_fail_dispatch: Arc, + should_reinit: Arc, + write_fd: Arc, +} + +impl ServiceCounters { + fn init_count(&self) -> u32 { + self.init_count.load(Ordering::SeqCst) + } + + fn dispatch_count(&self) -> u32 { + self.dispatch_count.load(Ordering::SeqCst) + } + + fn finalize_count(&self) -> u32 { + self.finalize_count.load(Ordering::SeqCst) + } + + fn timer_count(&self) -> u32 { + self.timer_count.load(Ordering::SeqCst) + } + + fn set_fail_init(&self, fail: bool) { + self.should_fail_init.store(fail, Ordering::SeqCst); + } + + fn set_fail_dispatch(&self, fail: bool) { + self.should_fail_dispatch.store(fail, Ordering::SeqCst); + } + + fn set_reinit(&self, reinit: bool) { + self.should_reinit.store(reinit, Ordering::SeqCst); + } + + fn trigger_event(&self) { + let wfd = self.write_fd.load(Ordering::SeqCst); + if wfd >= 0 { + unsafe { + libc::write(wfd, b"x".as_ptr() as *const _, 1); + } + } + } +} + +// ===== FD-based Mock Service ===== + +extern crate libc; + +// ===== Lifecycle Tests ===== + +#[tokio::test] +async fn test_service_lifecycle_basic() { + let service = MockService::new("test_service"); + let counters = service.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should initialize within 5 seconds" + ); + + // Trigger a dispatch event + counters.trigger_event(); + + // Wait for dispatch + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should dispatch within 5 seconds after event" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; + + // Service should be finalized + assert_eq!( + counters.finalize_count(), + 1, + "Service should be finalized exactly once" + ); +} + +#[tokio::test] +async fn test_service_with_file_descriptor() { + let service = MockService::new("fd_service"); + let counters = service.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() == 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should initialize once within 5 seconds" + ); + + // Trigger a dispatch event + counters.trigger_event(); + + // Wait for dispatch + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should dispatch within 5 seconds after event" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; + + assert_eq!(counters.finalize_count(), 1, "Service should finalize once"); +} + +#[tokio::test] +async fn test_service_initialization_failure() { + let service = MockService::new("failing_service"); + let counters = service.counters(); + + // Make initialization fail + counters.set_fail_init(true); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for several retry attempts (retry interval is 5 seconds) + assert!( + wait_for_condition( + || counters.init_count() >= 3, + Duration::from_secs(15), + Duration::from_millis(10), + ) + .await, + "Service should retry initialization at least 3 times within 15 seconds" + ); + + // Dispatch should not run if init fails + assert_eq!( + counters.dispatch_count(), + 0, + "Service should not dispatch if init fails" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +#[tokio::test] +async fn test_service_initialization_recovery() { + let service = MockService::new("recovering_service"); + let counters = service.counters(); + + // Start with failing initialization + counters.set_fail_init(true); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for some failed attempts (retry interval is 5 seconds) + assert!( + wait_for_condition( + || counters.init_count() >= 2, + Duration::from_secs(12), + Duration::from_millis(10), + ) + .await, + "Should have at least 2 failed initialization attempts within 12 seconds" + ); + + let failed_attempts = counters.init_count(); + + // Allow initialization to succeed + counters.set_fail_init(false); + + // Wait for recovery + assert!( + wait_for_condition( + || counters.init_count() > failed_attempts, + Duration::from_secs(7), + Duration::from_millis(10), + ) + .await, + "Service should recover within 7 seconds" + ); + + // Trigger a dispatch event + counters.trigger_event(); + + // Wait for dispatch + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should dispatch after recovery" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +// ===== Dispatch Tests ===== + +#[tokio::test] +async fn test_service_dispatch_failure_triggers_reinit() { + let service = MockService::new("dispatch_fail_service"); + let counters = service.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() == 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should initialize once within 5 seconds" + ); + + // Trigger a dispatch event + counters.trigger_event(); + + // Wait for first dispatch + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should dispatch within 5 seconds" + ); + + // Make dispatch fail + counters.set_fail_dispatch(true); + + // Trigger another dispatch event + counters.trigger_event(); + + // Wait for dispatch failure and reinitialization + assert!( + wait_for_condition( + || counters.init_count() >= 2 && counters.finalize_count() >= 1, + Duration::from_secs(10), + Duration::from_millis(10), + ) + .await, + "Service should reinitialize after dispatch failure within 10 seconds" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +#[tokio::test] +async fn test_service_dispatch_requests_reinit() { + let service = MockService::new("reinit_request_service"); + let counters = service.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() == 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should initialize once within 5 seconds" + ); + + // Request reinitialization from dispatch + counters.set_reinit(true); + + // Trigger a dispatch event + counters.trigger_event(); + + // Wait for reinitialization + assert!( + wait_for_condition( + || counters.init_count() >= 2 && counters.finalize_count() >= 1, + Duration::from_secs(10), + Duration::from_millis(10), + ) + .await, + "Service should reinitialize and finalize when dispatch requests it within 10 seconds" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +// ===== FD-based Dispatch Tests ===== + +#[tokio::test] +async fn test_fd_dispatch_basic() { + let (service, counters) = SharedFdService::new("fd_service"); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should initialize within 5 seconds" + ); + + // Verify no dispatch happens without data on the pipe + sleep(Duration::from_millis(200)).await; + assert_eq!( + counters.dispatch_count(), + 0, + "FD service should not dispatch without data on pipe" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +/// FD service that shares write_fd via Arc so tests can trigger events +struct SharedFdService { + name: String, + read_fd: Option, + write_fd: Arc, + init_count: Arc, + dispatch_count: Arc, + finalize_count: Arc, + should_fail_dispatch: Arc, + should_reinit: Arc, +} + +impl SharedFdService { + fn new(name: &str) -> (Self, SharedFdCounters) { + let write_fd = Arc::new(std::sync::atomic::AtomicI32::new(-1)); + let init_count = Arc::new(AtomicU32::new(0)); + let dispatch_count = Arc::new(AtomicU32::new(0)); + let finalize_count = Arc::new(AtomicU32::new(0)); + let should_fail_dispatch = Arc::new(AtomicBool::new(false)); + let should_reinit = Arc::new(AtomicBool::new(false)); + + let counters = SharedFdCounters { + write_fd: write_fd.clone(), + init_count: init_count.clone(), + dispatch_count: dispatch_count.clone(), + finalize_count: finalize_count.clone(), + should_fail_dispatch: should_fail_dispatch.clone(), + should_reinit: should_reinit.clone(), + }; + + let service = Self { + name: name.to_string(), + read_fd: None, + write_fd, + init_count, + dispatch_count, + finalize_count, + should_fail_dispatch, + should_reinit, + }; + + (service, counters) + } +} + +#[derive(Clone)] +struct SharedFdCounters { + write_fd: Arc, + init_count: Arc, + dispatch_count: Arc, + finalize_count: Arc, + should_fail_dispatch: Arc, + should_reinit: Arc, +} + +impl SharedFdCounters { + fn init_count(&self) -> u32 { + self.init_count.load(Ordering::SeqCst) + } + fn dispatch_count(&self) -> u32 { + self.dispatch_count.load(Ordering::SeqCst) + } + fn finalize_count(&self) -> u32 { + self.finalize_count.load(Ordering::SeqCst) + } + fn trigger_event(&self) { + let fd = self.write_fd.load(Ordering::SeqCst); + if fd >= 0 { + unsafe { + libc::write(fd, b"x".as_ptr() as *const _, 1); + } + } + } + fn set_fail_dispatch(&self, fail: bool) { + self.should_fail_dispatch.store(fail, Ordering::SeqCst); + } + fn set_reinit(&self, reinit: bool) { + self.should_reinit.store(reinit, Ordering::SeqCst); + } +} + +#[async_trait] +impl Service for SharedFdService { + fn name(&self) -> &str { + &self.name + } + + async fn initialize(&mut self) -> pmxcfs_services::Result { + self.init_count.fetch_add(1, Ordering::SeqCst); + + let mut fds = [0i32; 2]; + let ret = unsafe { libc::pipe(fds.as_mut_ptr()) }; + if ret != 0 { + return Err(ServiceError::InitializationFailed( + "pipe() failed".to_string(), + )); + } + + // Set read end to non-blocking (required for AsyncFd) + unsafe { + let flags = libc::fcntl(fds[0], libc::F_GETFL); + libc::fcntl(fds[0], libc::F_SETFL, flags | libc::O_NONBLOCK); + } + + self.read_fd = Some(fds[0]); + self.write_fd.store(fds[1], Ordering::SeqCst); + + Ok(fds[0]) + } + + async fn dispatch(&mut self) -> pmxcfs_services::Result { + self.dispatch_count.fetch_add(1, Ordering::SeqCst); + + // Drain the pipe + if let Some(fd) = self.read_fd { + let mut buf = [0u8; 64]; + unsafe { + libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()); + } + } + + if self.should_fail_dispatch.load(Ordering::SeqCst) { + return Err(ServiceError::DispatchFailed( + "Mock fd dispatch failure".to_string(), + )); + } + + if self.should_reinit.load(Ordering::SeqCst) { + return Ok(false); // false = reinitialize + } + + Ok(true) // true = continue + } + + async fn finalize(&mut self) -> pmxcfs_services::Result<()> { + self.finalize_count.fetch_add(1, Ordering::SeqCst); + + if let Some(fd) = self.read_fd.take() { + unsafe { libc::close(fd) }; + } + let wfd = self.write_fd.swap(-1, Ordering::SeqCst); + if wfd >= 0 { + unsafe { libc::close(wfd) }; + } + + Ok(()) + } +} + +#[tokio::test] +async fn test_fd_dispatch_event_driven() { + let (service, counters) = SharedFdService::new("fd_event_service"); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should initialize within 5 seconds" + ); + + // No dispatch should happen without data + sleep(Duration::from_millis(200)).await; + assert_eq!( + counters.dispatch_count(), + 0, + "FD service should not dispatch without data" + ); + + // Trigger an event by writing to the pipe + counters.trigger_event(); + + // Wait for dispatch + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should dispatch after data is written to pipe" + ); + + // Trigger more events + counters.trigger_event(); + counters.trigger_event(); + + assert!( + wait_for_condition( + || counters.dispatch_count() >= 2, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should handle multiple events" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; + + assert!( + counters.finalize_count() >= 1, + "FD service should be finalized" + ); +} + +#[tokio::test] +async fn test_fd_dispatch_failure_triggers_reinit() { + let (service, counters) = SharedFdService::new("fd_fail_service"); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should initialize" + ); + + // Trigger an event and verify dispatch works + counters.trigger_event(); + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should dispatch" + ); + + // Make dispatch fail, then trigger event + counters.set_fail_dispatch(true); + counters.trigger_event(); + + // Wait for finalize + reinit + assert!( + wait_for_condition( + || counters.finalize_count() >= 1 && counters.init_count() >= 2, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should finalize and reinitialize after dispatch failure" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +#[tokio::test] +async fn test_fd_dispatch_reinit_request() { + let (service, counters) = SharedFdService::new("fd_reinit_service"); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should initialize" + ); + + // Request reinit from dispatch + counters.set_reinit(true); + counters.trigger_event(); + + // Wait for reinit + assert!( + wait_for_condition( + || counters.finalize_count() >= 1 && counters.init_count() >= 2, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "FD service should finalize and reinitialize on reinit request" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +// ===== Timer Callback Tests ===== + +#[tokio::test] +async fn test_service_timer_callback() { + let service = MockService::new("timer_service").with_timer(Duration::from_millis(300)); + let counters = service.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization plus several timer periods + assert!( + wait_for_condition( + || counters.timer_count() >= 3, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Timer should fire at least 3 times within 5 seconds" + ); + + let timer_count = counters.timer_count(); + + // Wait for more timer invocations + assert!( + wait_for_condition( + || counters.timer_count() > timer_count, + Duration::from_secs(2), + Duration::from_millis(10), + ) + .await, + "Timer should continue firing" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +#[tokio::test] +async fn test_service_timer_callback_not_invoked_when_failed() { + let service = MockService::new("failed_timer_service").with_timer(Duration::from_millis(100)); + let counters = service.counters(); + + // Make initialization fail + counters.set_fail_init(true); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for several timer periods + sleep(Duration::from_millis(2000)).await; + + // Timer should NOT fire if service is not running + assert_eq!( + counters.timer_count(), + 0, + "Timer should not fire when service is not running" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +// ===== Service Manager Tests ===== + +#[tokio::test] +async fn test_manager_multiple_services() { + let service1 = MockService::new("service1"); + let service2 = MockService::new("service2"); + let service3 = MockService::new("service3"); + + let counters1 = service1.counters(); + let counters2 = service2.counters(); + let counters3 = service3.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service1)).unwrap(); + manager.add_service(Box::new(service2)).unwrap(); + manager.add_service(Box::new(service3)); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters1.init_count() == 1 + && counters2.init_count() == 1 + && counters3.init_count() == 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "All services should initialize within 5 seconds" + ); + + // Trigger dispatch events for all services + counters1.trigger_event(); + counters2.trigger_event(); + counters3.trigger_event(); + + // Wait for dispatch + assert!( + wait_for_condition( + || counters1.dispatch_count() >= 1 + && counters2.dispatch_count() >= 1 + && counters3.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "All services should dispatch within 5 seconds after events" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; + + // All services should be finalized + assert_eq!(counters1.finalize_count(), 1, "Service1 should finalize"); + assert_eq!(counters2.finalize_count(), 1, "Service2 should finalize"); + assert_eq!(counters3.finalize_count(), 1, "Service3 should finalize"); +} + +#[tokio::test] +async fn test_manager_duplicate_service_name() { + let service1 = MockService::new("duplicate"); + let service2 = MockService::new("duplicate"); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service1)).unwrap(); + let result = manager.add_service(Box::new(service2)); + assert!(result.is_err(), "Should return error for duplicate service"); +} + +#[tokio::test] +async fn test_manager_partial_service_failure() { + let service1 = MockService::new("working_service"); + let service2 = MockService::new("failing_service"); + + let counters1 = service1.counters(); + let counters2 = service2.counters(); + + // Make service2 fail + counters2.set_fail_init(true); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service1)).unwrap(); + manager.add_service(Box::new(service2)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for service1 initialization + assert!( + wait_for_condition( + || counters1.init_count() == 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service1 should initialize within 5 seconds" + ); + + // Trigger event for service1 + counters1.trigger_event(); + + // Wait for service1 dispatch and service2 retries + assert!( + wait_for_condition( + || counters1.dispatch_count() >= 1 && counters2.init_count() >= 2, + Duration::from_secs(12), + Duration::from_millis(10), + ) + .await, + "Service1 should work normally and Service2 should retry within 12 seconds" + ); + + // Service2 should not dispatch when failing + assert_eq!( + counters2.dispatch_count(), + 0, + "Service2 should not dispatch when failing" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; + + // Service1 should finalize + assert_eq!(counters1.finalize_count(), 1, "Service1 should finalize"); + // Service2 is also finalized unconditionally during shutdown (matching C behavior) + assert_eq!( + counters2.finalize_count(), + 1, + "Service2 should also be finalized during shutdown (idempotent finalize)" + ); +} + +// ===== Error Handling Tests ===== + +#[tokio::test] +async fn test_service_error_count_tracking() { + let service = MockService::new("error_tracking_service"); + let counters = service.counters(); + + // Make initialization fail + counters.set_fail_init(true); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for multiple failures (retry interval is 5 seconds) + assert!( + wait_for_condition( + || counters.init_count() >= 3, + Duration::from_secs(15), + Duration::from_millis(10), + ) + .await, + "Should accumulate at least 3 failures within 15 seconds" + ); + + // Allow recovery + counters.set_fail_init(false); + + // Wait for successful initialization + assert!( + wait_for_condition( + || counters.init_count() >= 4, + Duration::from_secs(7), + Duration::from_millis(10), + ) + .await, + "Service should recover within 7 seconds" + ); + + // Trigger a dispatch event + counters.trigger_event(); + + // Wait for dispatch + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should dispatch after recovery" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +#[tokio::test] +async fn test_service_graceful_shutdown() { + let service = MockService::new("shutdown_test"); + let counters = service.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should initialize within 5 seconds" + ); + + // Trigger a dispatch event + counters.trigger_event(); + + // Wait for service to be running + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should be running within 5 seconds" + ); + + // Graceful shutdown + shutdown_token.cancel(); + let _ = handle.await; + + // Service should be properly finalized + assert_eq!( + counters.finalize_count(), + 1, + "Service should finalize during shutdown" + ); +} + +// ===== Concurrency Tests ===== + +#[tokio::test] +async fn test_service_concurrent_operations() { + let service = MockService::new("concurrent_service").with_timer(Duration::from_millis(200)); + let counters = service.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should initialize within 5 seconds" + ); + + // Trigger multiple dispatch events + for _ in 0..5 { + counters.trigger_event(); + sleep(Duration::from_millis(50)).await; + } + + // Wait for service to run with both dispatch and timer + assert!( + wait_for_condition( + || counters.dispatch_count() >= 3 && counters.timer_count() >= 3, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should handle concurrent dispatch and timer events within 5 seconds" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} + +#[tokio::test] +async fn test_service_state_consistency_after_reinit() { + let service = MockService::new("consistency_service"); + let counters = service.counters(); + + let mut manager = ServiceManager::new(); + manager.add_service(Box::new(service)).unwrap(); + + let shutdown_token = manager.shutdown_token(); + let handle = manager.spawn(); + + // Wait for initialization + assert!( + wait_for_condition( + || counters.init_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should initialize within 5 seconds" + ); + + // Trigger reinitialization + counters.set_reinit(true); + counters.trigger_event(); + + // Wait for reinit + assert!( + wait_for_condition( + || counters.init_count() >= 2, + Duration::from_secs(10), + Duration::from_millis(10), + ) + .await, + "Service should reinitialize within 10 seconds" + ); + + // Clear reinit flag + counters.set_reinit(false); + + // Trigger a dispatch event + counters.trigger_event(); + + // Wait for dispatch + assert!( + wait_for_condition( + || counters.dispatch_count() >= 1, + Duration::from_secs(5), + Duration::from_millis(10), + ) + .await, + "Service should dispatch after reinit" + ); + + // Shutdown + shutdown_token.cancel(); + let _ = handle.await; +} -- 2.47.3