From: Samuel Rufinatscha <s.rufinatscha@proxmox.com>
To: Proxmox VE development discussion <pve-devel@lists.proxmox.com>,
Kefu Chai <k.chai@proxmox.com>
Subject: Re: [pve-devel] [PATCH pve-cluster 08/15] pmxcfs-rs: add pmxcfs-services crate
Date: Wed, 11 Feb 2026 12:52:48 +0100 [thread overview]
Message-ID: <de71d3ea-c1c4-4024-abf2-360c47ff308c@proxmox.com> (raw)
In-Reply-To: <20260106142440.2368585-9-k.chai@proxmox.com>
Thanks for this patch, Kefu!
Comments inline.
On 1/7/26 10:16 AM, Kefu Chai wrote:
> Add service lifecycle management framework providing:
> - Service trait: Lifecycle interface for async services
> - ServiceManager: Orchestrates multiple services
> - Automatic retry logic for failed services
> - Event-driven dispatching via file descriptors
> - Graceful shutdown coordination
>
> This is a generic framework with no pmxcfs-specific dependencies,
> only requiring tokio, async-trait, and standard error handling.
> It replaces the C version's qb_loop-based event management.
>
> Signed-off-by: Kefu Chai <k.chai@proxmox.com>
> ---
> src/pmxcfs-rs/Cargo.lock | 1798 +----------------
> src/pmxcfs-rs/Cargo.toml | 1 +
> src/pmxcfs-rs/pmxcfs-services/Cargo.toml | 17 +
> src/pmxcfs-rs/pmxcfs-services/README.md | 167 ++
> src/pmxcfs-rs/pmxcfs-services/src/error.rs | 37 +
> src/pmxcfs-rs/pmxcfs-services/src/lib.rs | 16 +
> src/pmxcfs-rs/pmxcfs-services/src/manager.rs | 477 +++++
> src/pmxcfs-rs/pmxcfs-services/src/service.rs | 173 ++
> .../pmxcfs-services/tests/service_tests.rs | 808 ++++++++
> 9 files changed, 1778 insertions(+), 1716 deletions(-)
> create mode 100644 src/pmxcfs-rs/pmxcfs-services/Cargo.toml
> create mode 100644 src/pmxcfs-rs/pmxcfs-services/README.md
> create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/error.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/lib.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/manager.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-services/src/service.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs
>
> diff --git a/src/pmxcfs-rs/Cargo.lock b/src/pmxcfs-rs/Cargo.lock
> index 31a30e13..f0ec6231 100644
> --- a/src/pmxcfs-rs/Cargo.lock
> +++ b/src/pmxcfs-rs/Cargo.lock
[..]
> diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
> index 8fe06b88..b00ca68f 100644
> --- a/src/pmxcfs-rs/Cargo.toml
> +++ b/src/pmxcfs-rs/Cargo.toml
> @@ -8,6 +8,7 @@ members = [
> "pmxcfs-memdb", # In-memory database with SQLite persistence
> "pmxcfs-status", # Status monitoring and RRD data management
> "pmxcfs-test-utils", # Test utilities and helpers (dev-only)
> + "pmxcfs-services", # Service framework for automatic retry and lifecycle management
> ]
> resolver = "2"
>
> diff --git a/src/pmxcfs-rs/pmxcfs-services/Cargo.toml b/src/pmxcfs-rs/pmxcfs-services/Cargo.toml
> new file mode 100644
> index 00000000..7991b913
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-services/Cargo.toml
> @@ -0,0 +1,17 @@
> +[package]
> +name = "pmxcfs-services"
> +version = "0.1.0"
> +edition = "2024"
> +
> +[dependencies]
> +anyhow = "1.0"
> +async-trait = "0.1"
> +tokio = { version = "1.41", features = ["full"] }
> +tokio-util = "0.7"
> +tracing = "0.1"
> +thiserror = "2.0"
> +parking_lot = "0.12"
> +scopeguard = "1.2"
This dependency is unused.
> +
> +[dev-dependencies]
> +pmxcfs-test-utils = { path = "../pmxcfs-test-utils" }
> diff --git a/src/pmxcfs-rs/pmxcfs-services/README.md b/src/pmxcfs-rs/pmxcfs-services/README.md
The lifecycle overview, C to Rust mapping, and usage example are great
for reviewers coming from the C codebase.
I think the rest should move to rustdoc to avoid duplication and drift.
It's easier to keep one source of truth in sync. I think the README
would benefit from being more brief/tight: what and why, how the pieces
fit together (who calls what and maybe when but as a brief but still
helpful summary) so readers get a good first idea, C mapping (maybe also
add local links to the C files, maybe even refer / tag the functions in
the link (if possible) to the mentioned functions to more easily
compare the impls), the usage example, and differences in regards to
the C impl. This would apply to the other crates in the series too.
> new file mode 100644
> index 00000000..ca17e3e9
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-services/README.md
> @@ -0,0 +1,167 @@
> +# pmxcfs-services
> +
> +**Service Management Framework** for pmxcfs - tokio-based replacement for qb_loop.
> +
> +This crate provides a robust, async service management framework with automatic retry, event-driven dispatching, periodic timers, and graceful shutdown. It replaces the C implementation's libqb loop with a modern tokio-based architecture.
> +
> +## Overview
> +
> +The service framework manages long-running services that need:
> +- **Automatic initialization retry** when connections fail
> +- **Event-driven dispatching** for file descriptor-based services (Corosync)
> +- **Periodic timers** for maintenance tasks
> +- **Error tracking** with throttled logging
> +- **Graceful shutdown** with resource cleanup
> +
> +## Key Concepts
> +
> +- **Service**: A trait implementing lifecycle methods (`initialize`, `dispatch`, `finalize`)
> +- **ServiceManager**: Orchestrates multiple services, handles retries, timers, and shutdown
> +- **ManagedService**: Internal wrapper that tracks state and handles recovery
> +
> +## Service Trait
> +
> +The `Service` trait defines the lifecycle of a managed service:
> +
> +```rust
> +#[async_trait]
> +pub trait Service: Send + Sync {
> + fn name(&self) -> &str;
> + async fn initialize(&mut self) -> Result<InitResult>;
> + async fn dispatch(&mut self) -> Result<DispatchAction>;
> + async fn finalize(&mut self) -> Result<()>;
> +
> + // Optional overrides:
> + fn timer_period(&self) -> Option<Duration> { None }
> + async fn timer_callback(&mut self) -> Result<()> { Ok(()) }
> + fn is_restartable(&self) -> bool { true }
> + fn retry_interval(&self) -> Duration { Duration::from_secs(5) }
> + fn dispatch_interval(&self) -> Duration { Duration::from_millis(100) }
> +}
> +```
> +
> +## InitResult
> +
> +Services return `InitResult` to indicate their dispatch mode:
> +
> +**WithFileDescriptor(fd)**:
> +- **Use case**: Corosync services (CPG, quorum, cmap)
> +- **Behavior**: `dispatch()` called when fd becomes readable
> +- **Efficiency**: Event-driven, no polling overhead
> +- **Example**: ClusterDatabaseService, QuorumService
> +
> +**NoFileDescriptor**:
> +- **Use case**: Services without external event sources
> +- **Behavior**: `dispatch()` called periodically at `dispatch_interval()`
> +- **Efficiency**: Polling overhead (default: 100ms interval)
> +
> +## ServiceManager
> +
> +Orchestrates multiple services with automatic management:
> +
> +```rust
> +let mut manager = ServiceManager::new();
> +manager.add_service(Box::new(MyService::new()));
> +manager.add_service(Box::new(AnotherService::new()));
> +let handle = manager.spawn(); // Returns JoinHandle for lifecycle control
> +// ... later ...
> +handle.abort(); // Gracefully shuts down all services
This does not gracefully shutdown all services, as it doesn't invoke
finalization code.
Please consider this approach:
let shutdown_token = manager.shutdown_token();
let handle = manager.spawn();
...
shutdown_token.cancel(); // Signal graceful shutdown
handle.await; // Wait for all services to finalize
> +```
> +
> +### Features
> +
> +1. **Automatic Retry**: Failed services automatically retry initialization
> +2. **Event-Driven**: Services with file descriptors use tokio AsyncFd (no polling)
> +3. **Timers**: Optional periodic callbacks for maintenance
> +4. **Error Tracking**: Counts consecutive failures, throttles error logs
> +5. **Graceful Shutdown**: Finalizes all services on exit
> +
> +## Usage Example
> +
> +```rust
> +use pmxcfs_services::{Service, InitResult, DispatchAction, ServiceManager};
> +
> +struct MyService {
> + fd: Option<i32>,
> +}
> +
> +#[async_trait]
> +impl Service for MyService {
> + fn name(&self) -> &str { "my-service" }
> +
> + async fn initialize(&mut self) -> Result<InitResult> {
> + let fd = connect_to_external_service()?;
> + self.fd = Some(fd);
> + Ok(InitResult::WithFileDescriptor(fd))
> + }
> +
> + async fn dispatch(&mut self) -> Result<DispatchAction> {
> + handle_events()?;
> + Ok(DispatchAction::Continue)
> + }
> +
> + async fn finalize(&mut self) -> Result<()> {
> + close_connection(self.fd.take())?;
> + Ok(())
> + }
> +}
> +```
> +
> +## C to Rust Mapping
> +
> +### Data Structures
> +
> +| C Type | Rust Type | Notes |
> +|--------|-----------|-------|
> +| `cfs_loop_t` | `ServiceManager` | Event loop manager |
> +| `cfs_service_t` | `dyn Service` | Service trait |
> +| `cfs_service_callbacks_t` | (trait methods) | Callbacks as trait methods |
> +
> +### Functions
> +
> +| C Function | Rust Equivalent | Location |
> +|-----------|-----------------|----------|
> +| `cfs_loop_new()` | `ServiceManager::new()` | manager.rs |
> +| `cfs_loop_add_service()` | `ServiceManager::add_service()` | manager.rs |
> +| `cfs_loop_start_worker()` | `ServiceManager::spawn()` | manager.rs |
> +| `cfs_loop_stop_worker()` | `handle.abort()` | Tokio abort |
The pattern should be shutdown_token.cancel() + handle.await
> +| `cfs_service_new()` | (struct + impl Service) | User code |
> +
> +## Key Differences from C Implementation
> +
> +### Event Loop Architecture
> +
> +**C Version (loop.c)**:
> +- Uses libqb's `qb_loop` event loop
> +- Manual fd registration with `qb_loop_poll_add()`
> +- Single-threaded callback-based model
> +- Priority levels for services
> +
> +**Rust Version**:
> +- Uses tokio async runtime
> +- Automatic fd monitoring with `AsyncFd`
> +- Concurrent task-based model
> +- No priority levels (all equal)
> +
> +### Concurrency
> +
> +**C Version**:
> +- Single-threaded qb_loop
> +- Callbacks run sequentially
> +
> +**Rust Version**:
> +- Multi-threaded tokio runtime
> +- Services can run in parallel
> +
> +## References
> +
> +### C Implementation
> +- `src/pmxcfs/loop.c` / `loop.h` - Service loop
> +
> +### Related Crates
> +- **pmxcfs-dfsm**: Uses Service trait for ClusterDatabaseService, StatusSyncService
> +- **pmxcfs**: Uses ServiceManager to orchestrate all cluster services
> +
> +### External Dependencies
> +- **tokio**: Async runtime and I/O
> +- **async-trait**: Async methods in traits
> diff --git a/src/pmxcfs-rs/pmxcfs-services/src/error.rs b/src/pmxcfs-rs/pmxcfs-services/src/error.rs
> new file mode 100644
> index 00000000..c0dde47b
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-services/src/error.rs
> @@ -0,0 +1,37 @@
> +//! Error types for the service framework
> +
> +use thiserror::Error;
> +
> +/// Errors that can occur during service operations
> +#[derive(Error, Debug)]
> +pub enum ServiceError {
Several variants are dead code, please remove if not needed.
> + /// Service initialization failed
> + #[error("Failed to initialize service: {0}")]
> + InitializationFailed(String),
> +
> + /// Service dispatch failed
> + #[error("Failed to dispatch service events: {0}")]
> + DispatchFailed(String),
> +
> + /// Service finalization failed
> + #[error("Failed to finalize service: {0}")]
> + FinalizationFailed(String),
> +
> + /// Timer callback failed
> + #[error("Timer callback failed: {0}")]
> + TimerFailed(String),
> +
> + /// Service is not running
> + #[error("Service is not running")]
> + NotRunning,
> +
> + /// Service is already running
> + #[error("Service is already running")]
> + AlreadyRunning,
> +
> + /// Generic error with context
> + #[error("{0}")]
> + Other(#[from] anyhow::Error),
> +}
> +
> +pub type Result<T> = std::result::Result<T, ServiceError>;
> diff --git a/src/pmxcfs-rs/pmxcfs-services/src/lib.rs b/src/pmxcfs-rs/pmxcfs-services/src/lib.rs
> new file mode 100644
> index 00000000..cf894cc5
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-services/src/lib.rs
> @@ -0,0 +1,16 @@
> +//! Service framework for pmxcfs
> +//!
> +//! This crate provides a robust, tokio-based service management framework with:
> +//! - Automatic retry on failure
> +//! - Event-driven file descriptor monitoring
> +//! - Periodic timer callbacks
> +//! - Error tracking and throttled logging
> +//! - Graceful shutdown
> +
> +mod error;
> +mod manager;
> +mod service;
> +
> +pub use error::{Result, ServiceError};
> +pub use manager::ServiceManager;
> +pub use service::{DispatchAction, InitResult, Service};
> diff --git a/src/pmxcfs-rs/pmxcfs-services/src/manager.rs b/src/pmxcfs-rs/pmxcfs-services/src/manager.rs
manager.rs is currently doing quite a lot.
can we split for example like this?
manager/mod.rs
manager/retry.rs
manager/timer.rs
manager/dispatch.rs
manager/state.rs
> new file mode 100644
> index 00000000..48c09c15
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-services/src/manager.rs
> @@ -0,0 +1,477 @@
> +//! Service manager for orchestrating multiple managed services
> +//!
> +//! The ServiceManager handles automatic retry, error tracking, event dispatching,
> +//! and timer callbacks for all registered services. It uses tokio for async I/O
> +//! and provides graceful shutdown capabilities.
> +
> +use crate::service::{DispatchAction, InitResult, Service};
> +use parking_lot::RwLock;
> +use std::collections::HashMap;
> +use std::os::unix::io::{AsRawFd, RawFd};
> +use std::sync::Arc;
> +use std::time::{Duration, Instant};
> +use tokio::io::unix::AsyncFd;
> +use tokio::task::JoinHandle;
> +use tokio::time::{MissedTickBehavior, interval};
> +use tokio_util::sync::CancellationToken;
> +use tracing::{debug, error, info, warn};
> +
> +/// Shared state for a managed service
> +struct ManagedService {
> + /// The service implementation (wrapped in Mutex for interior mutability)
> + service: tokio::sync::Mutex<Box<dyn Service>>,
> + /// Current service state
> + state: RwLock<ServiceState>,
The fields guarded by parking_lot::RwLock
are simple values that could be atomics, which would also let you
drop the parking_lot::RwLock dependency.
> + /// Consecutive error count (reset on successful initialization)
> + error_count: RwLock<u64>,
> + /// Last initialization attempt timestamp
> + last_init_attempt: RwLock<Option<Instant>>,
> + /// Async file descriptor for event monitoring (if applicable)
> + async_fd: RwLock<Option<Arc<AsyncFd<FdWrapper>>>>,
> + /// Last timer callback invocation
> + last_timer_invoke: RwLock<Option<Instant>>,
> +}
> +
> +/// Service state
> +#[derive(Debug, Clone, Copy, PartialEq, Eq)]
> +enum ServiceState {
> + /// Service not yet initialized
> + Uninitialized,
> + /// Service currently initializing
> + Initializing,
> + /// Service running successfully
> + Running,
> + /// Service failed, awaiting retry
> + Failed,
> +}
> +
> +/// Wrapper for raw file descriptor to implement AsRawFd
> +struct FdWrapper(RawFd);
> +
> +impl AsRawFd for FdWrapper {
> + fn as_raw_fd(&self) -> RawFd {
> + self.0
> + }
> +}
> +
> +impl Drop for FdWrapper {
> + fn drop(&mut self) {
> + // File descriptor ownership is managed by the service
> + // We just monitor it, so don't close it here
> + }
> +}
> +
> +/// Service manager for orchestrating multiple services
> +///
> +/// The ServiceManager provides:
> +/// - Automatic retry of failed initializations
> +/// - Event-driven dispatching for file descriptor-based services
> +/// - Periodic polling for services without file descriptors
> +/// - Timer callbacks for periodic maintenance
> +/// - Error tracking and throttled logging
> +/// - Graceful shutdown
> +pub struct ServiceManager {
> + /// Registered services by name
> + services: HashMap<String, Arc<ManagedService>>,
> + /// Cancellation token for graceful shutdown
> + shutdown_token: CancellationToken,
> +}
> +
> +impl ServiceManager {
> + /// Create a new service manager
> + pub fn new() -> Self {
> + Self {
> + services: HashMap::new(),
> + shutdown_token: CancellationToken::new(),
> + }
> + }
> +
> + /// Add a service to be managed
> + ///
> + /// Services will be started when `run()` is called.
> + ///
> + /// # Panics
> + ///
> + /// Panics if a service with the same name is already registered.
> + pub fn add_service(&mut self, service: Box<dyn Service>) {
> + let name = service.name().to_string();
> +
> + if self.services.contains_key(&name) {
> + panic!("Service '{name}' is already registered");
> + }
> +
> + let managed = Arc::new(ManagedService {
> + service: tokio::sync::Mutex::new(service),
> + state: RwLock::new(ServiceState::Uninitialized),
> + error_count: RwLock::new(0),
> + last_init_attempt: RwLock::new(None),
> + async_fd: RwLock::new(None),
> + last_timer_invoke: RwLock::new(None),
> + });
> +
> + self.services.insert(name, managed);
> + }
> +
> + /// Get a handle to trigger shutdown
> + ///
> + /// Call `cancel()` on the returned token to initiate graceful shutdown.
> + pub fn shutdown_token(&self) -> CancellationToken {
> + self.shutdown_token.clone()
> + }
> +
> + /// Spawn the service manager in a background task
> + ///
> + /// Returns a JoinHandle that can be used to await completion.
> + /// To gracefully shut down, call `.shutdown_token().cancel()` then await the handle.
> + ///
> + /// # Example
> + ///
> + /// ```ignore
> + /// let shutdown_token = manager.shutdown_token();
> + /// let handle = manager.spawn();
> + /// // ... later ...
> + /// shutdown_token.cancel(); // Trigger graceful shutdown
> + /// handle.await; // Wait for shutdown to complete
> + /// ```
> + pub fn spawn(self) -> JoinHandle<()> {
> + tokio::spawn(async move { self.run().await })
> + }
> +
> + /// Run the service manager (private - use spawn() instead)
> + ///
> + /// This starts all registered services and runs until shutdown is requested.
> + /// Services are automatically retried on failure according to their configuration.
> + async fn run(self) {
> + info!(
> + "Starting ServiceManager with {} services",
> + self.services.len()
> + );
> +
> + let services = Arc::new(self.services);
> +
> + // Spawn retry task for failed services
> + let retry_handle = Self::spawn_retry_task_static(Arc::clone(&services));
> +
> + // Spawn timer callback task
> + let timer_handle = Self::spawn_timer_task_static(Arc::clone(&services));
> +
> + // Spawn dispatch tasks for each service
> + let dispatch_handles = Self::spawn_dispatch_tasks_static(Arc::clone(&services));
> +
> + // Wait for shutdown signal
> + self.shutdown_token.cancelled().await;
> +
> + // Graceful shutdown sequence
> + info!("ServiceManager shutting down...");
> +
> + // Shutdown all services gracefully
> + Self::shutdown_all_services_static(&services).await;
Between finalize releasing the Mutex ...
> +
> + // Cancel background tasks
> + retry_handle.abort();
> + timer_handle.abort();
> + for handle in dispatch_handles {
> + handle.abort();
> + }
... and abort a re-initialization could happen.
Please pass the CancellationToken to every spawned task
and make each loop select! on token.canceled().
Also, reverse the shutdown order, stop tasks first and then finalize
second to be aligned with C.
> +
> + info!("ServiceManager stopped");
> + }
> +
> + /// Spawn task that retries failed service initializations
> + fn spawn_retry_task_static(
> + services: Arc<HashMap<String, Arc<ManagedService>>>,
> + ) -> JoinHandle<()> {
> + tokio::spawn(async move {
> + let mut retry_interval = interval(Duration::from_secs(1));
> + retry_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
> +
> + loop {
> + retry_interval.tick().await;
> + Self::retry_failed_services(&services).await;
> + }
> + })
> + }
> +
> + /// Retry initialization for failed services
> + async fn retry_failed_services(services: &HashMap<String, Arc<ManagedService>>) {
> + for (name, managed) in services {
> + // Check if service needs retry
> + let state = *managed.state.read();
> + if state != ServiceState::Uninitialized {
So this only retries on Uninitialized?
But the comment of the function says "for failed services".
Also ServiceState::Failed docs say it await retry.
Either removing Failed or actually using it (and retrying it). Right now
this is misleading.
> + continue;
> + }
> +
> + let (is_restartable, retry_interval) = {
> + let service = managed.service.lock().await;
> + (service.is_restartable(), service.retry_interval())
> + };
> +
> + // Check if this is a retry or first attempt
> + let now = Instant::now();
> + let is_first_attempt = managed.last_init_attempt.read().is_none();
> +
> + // Allow first attempt for all services, but block retries for non-restartable services
> + if !is_first_attempt && !is_restartable {
> + continue;
> + }
> +
> + // Check retry throttle (only for retries)
> + if let Some(last) = *managed.last_init_attempt.read()
> + && now.duration_since(last) < retry_interval
> + {
> + continue;
> + }
> +
> + // Attempt initialization
> + *managed.last_init_attempt.write() = Some(now);
> + *managed.state.write() = ServiceState::Initializing;
> +
> + debug!(service = %name, "Attempting to initialize service");
> +
> + let mut service = managed.service.lock().await;
> +
> + match service.initialize().await {
When calling this, resources are held already ..
> + Ok(InitResult::WithFileDescriptor(fd)) => match AsyncFd::new(FdWrapper(fd)) {
> + Ok(async_fd) => {
> + *managed.async_fd.write() = Some(Arc::new(async_fd));
> + *managed.state.write() = ServiceState::Running;
> + *managed.error_count.write() = 0;
> + info!(service = %name, fd, "Service initialized successfully");
> + }
> + Err(e) => {
> + error!(service = %name, fd, error = %e, "Failed to register fd");
> + *managed.state.write() = ServiceState::Failed;
> + *managed.error_count.write() += 1;
.. but if fail here afterwards, the service is marked Failed but
finalize() is never called which would leak resources.
Also shutdown doesnt finalize currently, as it skips non-Running.
To make sure the resources don’t leak, call finalize() before marking
Failed.
Also this failed service will not be retried?
> + }
> + },
> + Ok(InitResult::NoFileDescriptor) => {
> + *managed.state.write() = ServiceState::Running;
> + *managed.error_count.write() = 0;
> + info!(service = %name, "Service initialized successfully (no fd)");
> + }
> + Err(e) => {
> + let err_count = {
> + let mut count = managed.error_count.write();
> + *count += 1;
> + *count
> + };
> +
> + // Only log first failure to avoid spam
> + if err_count == 1 {
> + error!(service = %name, error = %e, "Failed to initialize service");
> + } else {
> + debug!(service = %name, attempt = err_count, error = %e, "Service initialization failed");
> + }
> +
> + *managed.state.write() = ServiceState::Uninitialized;
> + }
> + }
> + }
> + }
> +
> + /// Spawn task that invokes timer callbacks
> + fn spawn_timer_task_static(
> + services: Arc<HashMap<String, Arc<ManagedService>>>,
> + ) -> JoinHandle<()> {
> + tokio::spawn(async move {
> + let mut timer_interval = interval(Duration::from_secs(1));
The timer task ticks at 1 second, but the API allows any Duration.
fn timer_period(&self) -> Option<Duration> {
None
}
A service returning Some(Duration::from_millis(200)) would still only
fire roughly every 1 second.
> + timer_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
> +
> + loop {
> + timer_interval.tick().await;
> + Self::invoke_timer_callbacks(&services).await;
> + }
> + })
> + }
> +
> + /// Invoke timer callbacks for running services
> + async fn invoke_timer_callbacks(services: &HashMap<String, Arc<ManagedService>>) {
> + let now = Instant::now();
> +
> + for (name, managed) in services {
> + // Check if service is running
> + if *managed.state.read() != ServiceState::Running {
> + continue;
> + }
> +
> + let Some(period) = ({ managed.service.lock().await.timer_period() }) else {
> + continue;
> + };
The mutex is acquired twice per service per tick just to read the
timer_period() which is likely constant.
The same pattern applies to dispatch_interval().
You could probably cache the config at registration time in ManagedService?
> +
> + // Check if it's time to invoke timer
> + let should_invoke = match *managed.last_timer_invoke.read() {
> + Some(last) => now.duration_since(last) >= period,
> + None => true, // First invocation
> + };
> +
> + if !should_invoke {
> + continue;
> + }
> +
> + *managed.last_timer_invoke.write() = Some(now);
> +
> + debug!(service = %name, "Invoking timer callback");
> +
> + let mut service = managed.service.lock().await;
> +
> + if let Err(e) = service.timer_callback().await {
> + warn!(service = %name, error = %e, "Timer callback failed");
> + }
> + }
> + }
> +
> + /// Spawn dispatch tasks for all services
> + fn spawn_dispatch_tasks_static(
> + services: Arc<HashMap<String, Arc<ManagedService>>>,
> + ) -> Vec<JoinHandle<()>> {
> + let mut handles = Vec::new();
> +
> + for (name, managed) in services.iter() {
> + let name = name.clone();
> + let managed = Arc::clone(managed);
> +
> + let handle = tokio::spawn(async move {
> + loop {
> + // Wait for service to be running
> + loop {
> + tokio::time::sleep(Duration::from_millis(100)).await;
optional: a Notify/watch would be cleaner
> + let state = *managed.state.read();
> + if state == ServiceState::Running {
> + break;
> + }
> + }
> +
> + // Dispatch based on service type
> + let async_fd = managed.async_fd.read().clone();
> +
> + if let Some(fd) = async_fd {
> + // Event-driven dispatch
> + Self::dispatch_with_fd(&name, &managed, &fd).await;
> + } else {
> + // Polling dispatch
> + Self::dispatch_polling(&name, &managed).await;
> + }
> + }
> + });
> +
> + handles.push(handle);
> + }
> +
> + handles
> + }
> +
> + /// Dispatch events for service with file descriptor
> + async fn dispatch_with_fd(
> + name: &str,
> + managed: &Arc<ManagedService>,
> + async_fd: &Arc<AsyncFd<FdWrapper>>,
> + ) {
> + loop {
> + let readable = match async_fd.readable().await {
> + Ok(r) => r,
> + Err(e) => {
> + warn!(service = %name, error = %e, "Error waiting for fd readability");
> + break;
this breaks the loop / returns without doing re-initialization.
but a few lines later we handle reinitialize_service()?
is this missing here?
> + }
> + };
> +
> + let mut guard = readable;
> + let mut service = managed.service.lock().await;
> +
> + match service.dispatch().await {
> + Ok(DispatchAction::Continue) => {
> + guard.clear_ready();
> + }
> + Ok(DispatchAction::Reinitialize) => {
> + info!(service = %name, "Service requested reinitialization");
> + guard.clear_ready();
> + drop(service);
> + Self::reinitialize_service(name, managed).await;
> + break;
> + }
> + Err(e) => {
> + error!(service = %name, error = %e, "Service dispatch failed");
> + guard.clear_ready();
> + drop(service);
> + Self::reinitialize_service(name, managed).await;
> + break;
> + }
> + }
> + }
> + }
> +
> + /// Dispatch events for service without file descriptor (polling)
> + async fn dispatch_polling(name: &str, managed: &Arc<ManagedService>) {
> + let dispatch_interval = managed.service.lock().await.dispatch_interval();
> + let mut interval_timer = interval(dispatch_interval);
> + interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
> +
> + loop {
> + interval_timer.tick().await;
> +
> + // Check if still running
> + if *managed.state.read() != ServiceState::Running {
> + break;
> + }
> +
> + let mut service = managed.service.lock().await;
> +
> + match service.dispatch().await {
> + Ok(DispatchAction::Continue) => {}
> + Ok(DispatchAction::Reinitialize) => {
> + info!(service = %name, "Service requested reinitialization");
> + drop(service);
> + Self::reinitialize_service(name, managed).await;
> + break;
> + }
> + Err(e) => {
> + error!(service = %name, error = %e, "Service dispatch failed");
> + drop(service);
> + Self::reinitialize_service(name, managed).await;
this sets ServiceState::Uninitialized. retry_failed_services() will
refuse to re-init a service when is_restartable() == false after the
first attempt and it would become stuck through
if !is_first_attempt && !is_restartable {
continue;
}
> + break;
> + }
> + }
> + }
> + }
> +
> + /// Reinitialize a service (finalize, then mark for retry)
> + async fn reinitialize_service(name: &str, managed: &Arc<ManagedService>) {
> + debug!(service = %name, "Reinitializing service");
> +
> + let mut service = managed.service.lock().await;
> +
> + if let Err(e) = service.finalize().await {
> + warn!(service = %name, error = %e, "Error finalizing service");
> + }
> +
> + drop(service);
At this point, after dropping the lock, the timer task can see Running,
acquire a Mutex and call timer_callback().
Maybe make it invisible to timer/dispatch first?
> +
> + // Clear async fd and mark for retry
> + *managed.async_fd.write() = None;
> + *managed.state.write() = ServiceState::Uninitialized;
> + *managed.error_count.write() = 0;
> + }
> +
> + /// Shutdown all services gracefully
> + async fn shutdown_all_services_static(services: &HashMap<String, Arc<ManagedService>>) {
> + for (name, managed) in services {
> + if *managed.state.read() != ServiceState::Running {
In C we finalize unconditionally all services, regardless of state.
finalize() should be idempotent to avoid skipping Failed,
Uninitialized, Initializing services.
> + continue;
> + }
> +
> + info!(service = %name, "Shutting down service");
> +
> + let mut service = managed.service.lock().await;
> +
> + if let Err(e) = service.finalize().await {
> + error!(service = %name, error = %e, "Error finalizing service");
> + }
State is still Running here at this point.
Don’t we need to set ServiceState::Finalized or Uninitialized?
> + }
> + }
> +}
> +
> +impl Default for ServiceManager {
> + fn default() -> Self {
> + Self::new()
> + }
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-services/src/service.rs b/src/pmxcfs-rs/pmxcfs-services/src/service.rs
> new file mode 100644
> index 00000000..395ba67f
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-services/src/service.rs
> @@ -0,0 +1,173 @@
> +//! Service trait and related types
> +//!
> +//! This module provides the core abstraction for managed services that can
> +//! automatically retry initialization, handle errors gracefully, and provide
> +//! timer-based periodic callbacks.
> +
> +use crate::error::Result;
> +use async_trait::async_trait;
> +use std::time::Duration;
> +
> +/// A managed service that can be monitored and restarted automatically
> +///
> +/// This trait provides the core abstraction for services in the pmxcfs daemon.
> +/// Services implementing this trait gain automatic retry on failure, graceful
> +/// error handling, and optional periodic timer callbacks.
> +///
> +/// ## Lifecycle
> +///
> +/// 1. **Uninitialized** - Service created but not yet initialized
> +/// 2. **Initializing** - `initialize()` in progress
> +/// 3. **Running** - Service initialized successfully, dispatching events
> +/// 4. **Failed** - Service encountered an error, will retry if restartable
> +#[async_trait]
> +pub trait Service: Send + Sync {
> + /// Service name for logging and identification
> + ///
> + /// Should be a short, descriptive identifier (e.g., "quorum", "dfsm", "confdb")
> + fn name(&self) -> &str;
> +
> + /// Initialize the service
> + ///
> + /// Called when the service is first started or after a failure (if restartable).
> + /// Returns an `InitResult` indicating whether the service needs file descriptor
> + /// monitoring.
> + ///
> + /// # Errors
> + ///
> + /// Returns an error if initialization fails. The ServiceManager will automatically
> + /// retry initialization based on `retry_interval()` if `is_restartable()` returns true.
> + ///
> + /// # Implementation Notes
> + ///
> + /// - Initialize connections to external services (Corosync, CPG, etc.)
> + /// - Set up internal state
> + /// - Return file descriptor if the service needs event-driven dispatching
> + /// - Keep initialization lightweight - heavy work should be in `dispatch()`
> + async fn initialize(&mut self) -> Result<InitResult>;
> +
> + /// Handle events for this service
> + ///
> + /// Called when:
> + /// - The file descriptor returned by `initialize()` becomes readable (if WithFileDescriptor)
> + /// - Periodically for services without file descriptors (if NoFileDescriptor)
> + ///
> + /// # Returns
> + ///
> + /// - `DispatchAction::Continue` - Continue normal operation
> + /// - `DispatchAction::Reinitialize` - Request reinitialization (triggers `finalize()` then `initialize()`)
> + ///
> + /// # Errors
> + ///
> + /// Errors automatically trigger reinitialization if the service is restartable.
> + /// The service will be finalized and reinitialized according to `retry_interval()`.
> + async fn dispatch(&mut self) -> Result<DispatchAction>;
> +
> + /// Clean up service resources
> + ///
> + /// Called when:
> + /// - Service is being shut down
> + /// - Service is being reinitialized after dispatch failure
> + /// - ServiceManager is shutting down
> + ///
> + /// # Implementation Notes
> + ///
> + /// - Close connections
> + /// - Release resources
> + /// - Should not fail - log errors but return Ok(())
> + async fn finalize(&mut self) -> Result<()>;
> +
> + /// Optional periodic callback
> + ///
> + /// Called at the interval specified by `timer_period()` if the service is running.
> + /// Useful for periodic maintenance tasks like state verification or cleanup.
> + ///
> + /// # Default Implementation
> + ///
> + /// Does nothing by default. Override to implement periodic behavior.
> + async fn timer_callback(&mut self) -> Result<()> {
> + Ok(())
> + }
> +
> + /// Timer period for periodic callbacks
> + ///
> + /// If `Some(duration)`, `timer_callback()` will be invoked every `duration`.
> + /// If `None`, timer callbacks are disabled.
> + ///
> + /// # Default
> + ///
> + /// Returns `None` (no timer callbacks)
> + fn timer_period(&self) -> Option<Duration> {
> + None
> + }
> +
> + /// Whether to automatically retry initialization after failure
> + ///
> + /// If `true`, the ServiceManager will automatically retry `initialize()`
> + /// after failures using the interval specified by `retry_interval()`.
> + ///
> + /// If `false`, the service will remain in a failed state after the first
> + /// initialization failure.
> + ///
> + /// # Default
> + ///
> + /// Returns `true` (auto-retry enabled)
> + fn is_restartable(&self) -> bool {
> + true
> + }
> +
> + /// Minimum interval between retry attempts
> + ///
> + /// When `initialize()` fails, the ServiceManager will wait at least this
> + /// long before attempting to reinitialize.
> + ///
> + /// # Default
> + ///
> + /// Returns 5 seconds (matching C implementation)
> + fn retry_interval(&self) -> Duration {
> + Duration::from_secs(5)
> + }
> +
> + /// Dispatch interval for services without file descriptors
> + ///
> + /// For services that return `InitResult::NoFileDescriptor`, this determines
> + /// how often `dispatch()` is called.
> + ///
> + /// # Default
> + ///
> + /// Returns 100ms (matching current Rust implementation)
> + fn dispatch_interval(&self) -> Duration {
> + Duration::from_millis(100)
> + }
> +}
> +
> +/// Result of service initialization
> +#[derive(Debug, Clone, Copy)]
> +pub enum InitResult {
> + /// Service uses a file descriptor for event notification
> + ///
> + /// The ServiceManager will use tokio's AsyncFd to monitor this file descriptor
> + /// and call `dispatch()` when it becomes readable. This is the most efficient
> + /// mode for services that interact with Corosync (quorum, CPG, cmap).
> + WithFileDescriptor(i32),
should use RawFd (which is i32 underneath) to better reflect intent
> +
> + /// Service does not use a file descriptor
> + ///
> + /// The ServiceManager will call `dispatch()` periodically at the interval
> + /// specified by `dispatch_interval()`. Use this for services that poll
> + /// or have no external event source.
> + NoFileDescriptor,
> +}
> +
> +/// Action requested by service dispatch
> +#[derive(Debug, Clone, Copy, PartialEq, Eq)]
> +pub enum DispatchAction {
> + /// Continue normal operation
> + Continue,
> +
> + /// Request reinitialization
> + ///
> + /// The service will be finalized and reinitialized. This is useful when
> + /// the underlying connection is lost or becomes invalid.
> + Reinitialize,
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs b/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs
> new file mode 100644
> index 00000000..4574a8d6
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-services/tests/service_tests.rs
> @@ -0,0 +1,808 @@
> +//! Comprehensive tests for the service framework
> +//!
> +//! Tests cover:
> +//! - Service lifecycle (start, stop, restart)
> +//! - Service manager orchestration
> +//! - Error handling and retry logic
> +//! - Timer callbacks
> +//! - File descriptor and polling dispatch modes
> +//! - Service coordination and state management
> +
> +use async_trait::async_trait;
> +use pmxcfs_services::{DispatchAction, InitResult, Service, ServiceError, ServiceManager};
> +use pmxcfs_test_utils::wait_for_condition;
> +use std::sync::Arc;
> +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
> +use std::time::Duration;
> +use tokio::time::sleep;
> +
> +// ===== Test Service Implementations =====
> +
> +/// Mock service for testing lifecycle
> +struct MockService {
> + name: String,
> + init_count: Arc<AtomicU32>,
> + dispatch_count: Arc<AtomicU32>,
> + finalize_count: Arc<AtomicU32>,
> + timer_count: Arc<AtomicU32>,
> + should_fail_init: Arc<AtomicBool>,
> + should_fail_dispatch: Arc<AtomicBool>,
> + should_reinit: Arc<AtomicBool>,
> + use_fd: bool,
> + timer_period: Option<Duration>,
> + restartable: bool,
> +}
> +
> +impl MockService {
> + fn new(name: &str) -> Self {
> + Self {
> + name: name.to_string(),
> + init_count: Arc::new(AtomicU32::new(0)),
> + dispatch_count: Arc::new(AtomicU32::new(0)),
> + finalize_count: Arc::new(AtomicU32::new(0)),
> + timer_count: Arc::new(AtomicU32::new(0)),
> + should_fail_init: Arc::new(AtomicBool::new(false)),
> + should_fail_dispatch: Arc::new(AtomicBool::new(false)),
> + should_reinit: Arc::new(AtomicBool::new(false)),
> + use_fd: false,
> + timer_period: None,
> + restartable: true,
> + }
> + }
> +
> + fn with_timer(mut self, period: Duration) -> Self {
> + self.timer_period = Some(period);
> + self
> + }
> +
> + fn with_restartable(mut self, restartable: bool) -> Self {
> + self.restartable = restartable;
> + self
> + }
> +
> + fn counters(&self) -> ServiceCounters {
> + ServiceCounters {
> + init_count: self.init_count.clone(),
> + dispatch_count: self.dispatch_count.clone(),
> + finalize_count: self.finalize_count.clone(),
> + timer_count: self.timer_count.clone(),
> + should_fail_init: self.should_fail_init.clone(),
> + should_fail_dispatch: self.should_fail_dispatch.clone(),
> + should_reinit: self.should_reinit.clone(),
> + }
> + }
> +}
> +
> +#[async_trait]
> +impl Service for MockService {
> + fn name(&self) -> &str {
> + &self.name
> + }
> +
> + async fn initialize(&mut self) -> pmxcfs_services::Result<InitResult> {
> + self.init_count.fetch_add(1, Ordering::SeqCst);
> +
> + if self.should_fail_init.load(Ordering::SeqCst) {
> + return Err(ServiceError::InitializationFailed(
> + "Mock init failure".to_string(),
> + ));
> + }
> +
> + if self.use_fd {
> + // Return a dummy fd (stderr is always available)
> + Ok(InitResult::WithFileDescriptor(2))
> + } else {
> + Ok(InitResult::NoFileDescriptor)
> + }
> + }
> +
> + async fn dispatch(&mut self) -> pmxcfs_services::Result<DispatchAction> {
> + self.dispatch_count.fetch_add(1, Ordering::SeqCst);
> +
> + if self.should_fail_dispatch.load(Ordering::SeqCst) {
> + return Err(ServiceError::DispatchFailed(
> + "Mock dispatch failure".to_string(),
> + ));
> + }
> +
> + if self.should_reinit.load(Ordering::SeqCst) {
> + return Ok(DispatchAction::Reinitialize);
> + }
> +
> + Ok(DispatchAction::Continue)
> + }
> +
> + async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
> + self.finalize_count.fetch_add(1, Ordering::SeqCst);
> + Ok(())
> + }
> +
> + async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
> + self.timer_count.fetch_add(1, Ordering::SeqCst);
> + Ok(())
> + }
> +
> + fn timer_period(&self) -> Option<Duration> {
> + self.timer_period
> + }
> +
> + fn is_restartable(&self) -> bool {
> + self.restartable
> + }
> +
> + fn retry_interval(&self) -> Duration {
> + Duration::from_millis(100) // Fast retry for tests
> + }
> +
> + fn dispatch_interval(&self) -> Duration {
> + Duration::from_millis(50) // Fast polling for tests
> + }
> +}
> +
> +/// Helper struct to access service counters from tests
> +#[derive(Clone)]
> +struct ServiceCounters {
> + init_count: Arc<AtomicU32>,
> + dispatch_count: Arc<AtomicU32>,
> + finalize_count: Arc<AtomicU32>,
> + timer_count: Arc<AtomicU32>,
> + should_fail_init: Arc<AtomicBool>,
> + should_fail_dispatch: Arc<AtomicBool>,
> + should_reinit: Arc<AtomicBool>,
> +}
> +
> +impl ServiceCounters {
> + fn init_count(&self) -> u32 {
> + self.init_count.load(Ordering::SeqCst)
> + }
> +
> + fn dispatch_count(&self) -> u32 {
> + self.dispatch_count.load(Ordering::SeqCst)
> + }
> +
> + fn finalize_count(&self) -> u32 {
> + self.finalize_count.load(Ordering::SeqCst)
> + }
> +
> + fn timer_count(&self) -> u32 {
> + self.timer_count.load(Ordering::SeqCst)
> + }
> +
> + fn set_fail_init(&self, fail: bool) {
> + self.should_fail_init.store(fail, Ordering::SeqCst);
> + }
> +
> + fn set_fail_dispatch(&self, fail: bool) {
> + self.should_fail_dispatch.store(fail, Ordering::SeqCst);
> + }
> +
> + fn set_reinit(&self, reinit: bool) {
> + self.should_reinit.store(reinit, Ordering::SeqCst);
> + }
> +}
> +
> +// ===== Lifecycle Tests =====
> +
> +#[tokio::test]
> +async fn test_service_lifecycle_basic() {
> + let service = MockService::new("test_service");
> + let counters = service.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization and dispatching
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 1 && counters.dispatch_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should initialize and dispatch within 5 seconds"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +
> + // Service should be finalized
> + assert_eq!(
> + counters.finalize_count(),
> + 1,
> + "Service should be finalized exactly once"
> + );
> +}
> +
> +#[tokio::test]
> +async fn test_service_with_file_descriptor() {
> + // Don't use FD-based service in tests since we can't easily create a readable FD
> + // Just test that WithFileDescriptor variant works with manager
> + let service = MockService::new("no_fd_service"); // Changed to not use FD
The dispatch_with_fd code path is untested. Lets try to add
tests for that.
> + let counters = service.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization and some dispatches
> + assert!(
> + wait_for_condition(
> + || counters.init_count() == 1 && counters.dispatch_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should initialize once and dispatch within 5 seconds"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +
> + assert_eq!(counters.finalize_count(), 1, "Service should finalize once");
> +}
> +
> +#[tokio::test]
> +async fn test_service_initialization_failure() {
> + let service = MockService::new("failing_service");
> + let counters = service.counters();
> +
> + // Make initialization fail
> + counters.set_fail_init(true);
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for several retry attempts
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 3,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should retry initialization at least 3 times within 5 seconds"
> + );
> +
> + // Dispatch should not run if init fails
> + assert_eq!(
> + counters.dispatch_count(),
> + 0,
> + "Service should not dispatch if init fails"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +#[tokio::test]
> +async fn test_service_initialization_recovery() {
> + let service = MockService::new("recovering_service");
> + let counters = service.counters();
> +
> + // Start with failing initialization
> + counters.set_fail_init(true);
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for some failed attempts
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 2,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Should have at least 2 failed initialization attempts within 5 seconds"
> + );
> +
> + let failed_attempts = counters.init_count();
> +
> + // Allow initialization to succeed
> + counters.set_fail_init(false);
> +
> + // Wait for recovery
> + assert!(
> + wait_for_condition(
> + || counters.init_count() > failed_attempts && counters.dispatch_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should recover and start dispatching within 5 seconds"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +#[tokio::test]
> +async fn test_service_not_restartable() {
This only tests non restartable with init failure.
Please add an additional test for a non restartable service that
succeeds init but fails dispatch,
which would catch the path where
reinitialize_service() sets Uninitialized but retry refuses
because !is_first_attempt && !is_restartable.
> + let service = MockService::new("non_restartable").with_restartable(false);
> + let counters = service.counters();
> +
> + // Make initialization fail
> + counters.set_fail_init(true);
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization attempt
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should attempt initialization within 5 seconds"
> + );
> +
> + // Service should only try once (not restartable)
> + assert_eq!(
> + counters.init_count(),
> + 1,
> + "Non-restartable service should only try initialization once"
> + );
> +
> + // Wait another cycle to confirm it doesn't retry
> + sleep(Duration::from_millis(1500)).await;
> +
> + // Should still be 1
> + assert_eq!(
> + counters.init_count(),
> + 1,
> + "Non-restartable service should not retry, got {}",
> + counters.init_count()
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +// ===== Dispatch Tests =====
> +
> +#[tokio::test]
> +async fn test_service_dispatch_failure_triggers_reinit() {
> + let service = MockService::new("dispatch_fail_service");
> + let counters = service.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization and first dispatches
> + assert!(
> + wait_for_condition(
> + || counters.init_count() == 1 && counters.dispatch_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should initialize once and dispatch within 5 seconds"
> + );
> +
> + // Make dispatch fail
> + counters.set_fail_dispatch(true);
> +
> + // Wait for dispatch failure and reinitialization
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 2 && counters.finalize_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should reinitialize and finalize after dispatch failure within 5 seconds"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +#[tokio::test]
> +async fn test_service_dispatch_requests_reinit() {
> + let service = MockService::new("reinit_request_service");
> + let counters = service.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization
> + assert!(
> + wait_for_condition(
> + || counters.init_count() == 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should initialize once within 5 seconds"
> + );
> +
> + // Request reinitialization from dispatch
> + counters.set_reinit(true);
> +
> + // Wait for reinitialization
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 2 && counters.finalize_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should reinitialize and finalize when dispatch requests it within 5 seconds"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +// ===== Timer Callback Tests =====
> +
> +#[tokio::test]
> +async fn test_service_timer_callback() {
> + let service = MockService::new("timer_service").with_timer(Duration::from_millis(300));
> + let counters = service.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization plus several timer periods
> + assert!(
> + wait_for_condition(
> + || counters.timer_count() >= 3,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Timer should fire at least 3 times within 5 seconds"
> + );
> +
> + let timer_count = counters.timer_count();
> +
> + // Wait for more timer invocations
> + assert!(
> + wait_for_condition(
> + || counters.timer_count() > timer_count,
> + Duration::from_secs(2),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Timer should continue firing"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +#[tokio::test]
> +async fn test_service_timer_callback_not_invoked_when_failed() {
> + let service = MockService::new("failed_timer_service").with_timer(Duration::from_millis(100));
> + let counters = service.counters();
> +
> + // Make initialization fail
> + counters.set_fail_init(true);
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for several timer periods
> + sleep(Duration::from_millis(2000)).await;
> +
> + // Timer should NOT fire if service is not running
> + assert_eq!(
> + counters.timer_count(),
> + 0,
> + "Timer should not fire when service is not running"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +// ===== Service Manager Tests =====
> +
> +#[tokio::test]
> +async fn test_manager_multiple_services() {
> + let service1 = MockService::new("service1");
> + let service2 = MockService::new("service2");
> + let service3 = MockService::new("service3");
> +
> + let counters1 = service1.counters();
> + let counters2 = service2.counters();
> + let counters3 = service3.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service1));
> + manager.add_service(Box::new(service2));
> + manager.add_service(Box::new(service3));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization
> + assert!(
> + wait_for_condition(
> + || counters1.init_count() == 1
> + && counters2.init_count() == 1
> + && counters3.init_count() == 1
> + && counters1.dispatch_count() >= 1
> + && counters2.dispatch_count() >= 1
> + && counters3.dispatch_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "All services should initialize and dispatch within 5 seconds"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +
> + // All services should be finalized
> + assert_eq!(counters1.finalize_count(), 1, "Service1 should finalize");
> + assert_eq!(counters2.finalize_count(), 1, "Service2 should finalize");
> + assert_eq!(counters3.finalize_count(), 1, "Service3 should finalize");
> +}
> +
> +#[tokio::test]
> +#[should_panic(expected = "already registered")]
> +async fn test_manager_duplicate_service_name() {
> + let service1 = MockService::new("duplicate");
> + let service2 = MockService::new("duplicate");
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service1));
> + manager.add_service(Box::new(service2)); // Should panic
> +}
> +
> +#[tokio::test]
> +async fn test_manager_partial_service_failure() {
> + let service1 = MockService::new("working_service");
> + let service2 = MockService::new("failing_service");
> +
> + let counters1 = service1.counters();
> + let counters2 = service2.counters();
> +
> + // Make service2 fail
> + counters2.set_fail_init(true);
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service1));
> + manager.add_service(Box::new(service2));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization
> + assert!(
> + wait_for_condition(
> + || counters1.init_count() == 1
> + && counters1.dispatch_count() >= 1
> + && counters2.init_count() >= 2,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service1 should work normally and Service2 should retry within 5 seconds"
> + );
> +
> + // Service2 should not dispatch when failing
> + assert_eq!(
> + counters2.dispatch_count(),
> + 0,
> + "Service2 should not dispatch when failing"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +
> + // Only service1 should finalize (service2 never initialized)
> + assert_eq!(counters1.finalize_count(), 1, "Service1 should finalize");
> + assert_eq!(
> + counters2.finalize_count(),
> + 0,
> + "Service2 should not finalize if never initialized"
> + );
> +}
> +
> +// ===== Error Handling Tests =====
> +
> +#[tokio::test]
> +async fn test_service_error_count_tracking() {
> + let service = MockService::new("error_tracking_service");
> + let counters = service.counters();
> +
> + // Make initialization fail
> + counters.set_fail_init(true);
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for multiple failures
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 4,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Should accumulate at least 4 failures within 5 seconds"
> + );
> +
> + // Allow recovery
> + counters.set_fail_init(false);
> +
> + // Wait for recovery
> + assert!(
> + wait_for_condition(
> + || counters.dispatch_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should recover within 5 seconds"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +#[tokio::test]
> +async fn test_service_graceful_shutdown() {
> + let service = MockService::new("shutdown_test");
> + let counters = service.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for service to be running
> + assert!(
> + wait_for_condition(
> + || counters.dispatch_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should be running within 5 seconds"
> + );
> +
> + // Graceful shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +
> + // Service should be properly finalized
> + assert_eq!(
> + counters.finalize_count(),
> + 1,
> + "Service should finalize during shutdown"
> + );
> +}
> +
> +// ===== Concurrency Tests =====
> +
> +#[tokio::test]
> +async fn test_service_concurrent_operations() {
> + let service = MockService::new("concurrent_service").with_timer(Duration::from_millis(200));
> + let counters = service.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for service to run with both dispatch and timer
> + assert!(
> + wait_for_condition(
> + || counters.dispatch_count() >= 3 && counters.timer_count() >= 3,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should dispatch and timer should fire multiple times within 5 seconds"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
> +
> +#[tokio::test]
> +async fn test_service_state_consistency_after_reinit() {
> + let service = MockService::new("consistency_service");
> + let counters = service.counters();
> +
> + let mut manager = ServiceManager::new();
> + manager.add_service(Box::new(service));
> +
> + let shutdown_token = manager.shutdown_token();
> + let handle = manager.spawn();
> +
> + // Wait for initialization
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 1,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should initialize within 5 seconds"
> + );
> +
> + // Trigger reinitialization
> + counters.set_reinit(true);
> +
> + // Wait for reinit
> + assert!(
> + wait_for_condition(
> + || counters.init_count() >= 2,
> + Duration::from_secs(5),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should reinitialize within 5 seconds"
> + );
> +
> + // Clear reinit flag
> + counters.set_reinit(false);
> +
> + // Wait for more dispatches
> + let dispatch_count = counters.dispatch_count();
> + assert!(
> + wait_for_condition(
> + || counters.dispatch_count() > dispatch_count,
> + Duration::from_secs(2),
> + Duration::from_millis(10),
> + )
> + .await,
> + "Service should continue dispatching after reinit"
> + );
> +
> + // Shutdown
> + shutdown_token.cancel();
> + let _ = handle.await;
> +}
next prev parent reply other threads:[~2026-02-11 11:52 UTC|newest]
Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-01-06 14:24 [pve-devel] [PATCH pve-cluster 00/15 v1] Rewrite pmxcfs with Rust Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 01/15] pmxcfs-rs: add workspace and pmxcfs-api-types crate Kefu Chai
2026-01-23 14:17 ` Samuel Rufinatscha
2026-01-26 9:00 ` Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 02/15] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-01-23 15:01 ` Samuel Rufinatscha
2026-01-26 9:43 ` Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 03/15] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-01-27 13:16 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 04/15] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-01-29 14:44 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 05/15] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-01-30 15:35 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 06/15] pmxcfs-rs: add pmxcfs-status crate Kefu Chai
2026-02-02 16:07 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 07/15] pmxcfs-rs: add pmxcfs-test-utils infrastructure crate Kefu Chai
2026-02-03 17:03 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 08/15] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-02-11 11:52 ` Samuel Rufinatscha [this message]
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 09/15] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-02-12 15:21 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 10/15] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 11/15] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-02-11 12:55 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 13/15] pmxcfs-rs: add integration and workspace tests Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 14/15] pmxcfs-rs: add Makefile for build automation Kefu Chai
2026-02-09 16:25 ` Samuel Rufinatscha
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 15/15] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=de71d3ea-c1c4-4024-abf2-360c47ff308c@proxmox.com \
--to=s.rufinatscha@proxmox.com \
--cc=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.