From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH pve-cluster 09/15] pmxcfs-rs: add pmxcfs-ipc crate
Date: Tue, 6 Jan 2026 22:24:33 +0800 [thread overview]
Message-ID: <20260106142440.2368585-10-k.chai@proxmox.com> (raw)
In-Reply-To: <20260106142440.2368585-1-k.chai@proxmox.com>
Add libqb-compatible IPC server implementation:
- QB_IPC_SHM protocol (shared memory ring buffers)
- Abstract Unix socket (@pve2) for handshake
- Lock-free SPSC ring buffers
- Authentication via SO_PASSCRED (uid/gid/pid)
- 13 IPC operations (GET_FS_VERSION, GET_CLUSTER_INFO, etc.)
This is an independent crate with no internal dependencies,
only requiring tokio, nix, and memmap2. It provides wire-
compatible IPC with the C implementation's libqb-based server,
allowing existing clients to work unchanged.
Includes wire protocol compatibility tests (require root to run).
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/pmxcfs-rs/Cargo.toml | 1 +
src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml | 44 +
src/pmxcfs-rs/pmxcfs-ipc/README.md | 182 +++
.../pmxcfs-ipc/examples/test_server.rs | 92 ++
src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs | 657 ++++++++++
src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs | 93 ++
src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs | 37 +
src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs | 332 +++++
src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs | 1158 +++++++++++++++++
src/pmxcfs-rs/pmxcfs-ipc/src/server.rs | 278 ++++
src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs | 84 ++
src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs | 450 +++++++
.../pmxcfs-ipc/tests/qb_wire_compat.rs | 413 ++++++
13 files changed, 3821 insertions(+)
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/server.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index b00ca68f..f4497d58 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -9,6 +9,7 @@ members = [
"pmxcfs-status", # Status monitoring and RRD data management
"pmxcfs-test-utils", # Test utilities and helpers (dev-only)
"pmxcfs-services", # Service framework for automatic retry and lifecycle management
+ "pmxcfs-ipc", # libqb-compatible IPC server
]
resolver = "2"
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml b/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml
new file mode 100644
index 00000000..dbee2e9a
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml
@@ -0,0 +1,44 @@
+[package]
+name = "pmxcfs-ipc"
+description = "libqb-compatible IPC server implementation in pure Rust"
+
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+# System dependencies:
+# - libqb (runtime) - QB IPC library for client compatibility
+# - libqb-dev (build/test only) - Required to run wire protocol tests
+
+[dependencies]
+# Error handling
+anyhow.workspace = true
+
+# Async runtime
+tokio.workspace = true
+tokio-util.workspace = true
+
+# Concurrency primitives
+parking_lot.workspace = true
+
+# System integration
+libc.workspace = true
+nix.workspace = true
+memmap2 = "0.9"
+
+# Logging
+tracing.workspace = true
+
+# Async trait support
+async-trait.workspace = true
+
+[dev-dependencies]
+pmxcfs-test-utils = { path = "../pmxcfs-test-utils" }
+tempfile.workspace = true
+tokio = { workspace = true, features = ["rt", "macros"] }
+tracing-subscriber.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/README.md b/src/pmxcfs-rs/pmxcfs-ipc/README.md
new file mode 100644
index 00000000..5b5b98ae
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/README.md
@@ -0,0 +1,182 @@
+# pmxcfs-ipc: libqb-Compatible IPC Server
+
+**Rust implementation of libqb IPC server for pmxcfs using shared memory ring buffers**
+
+This crate provides a wire-compatible IPC server that works with libqb clients (C `qb_ipcc_*` API) without depending on the libqb C library.
+
+## Table of Contents
+
+- [Overview](#overview)
+- [Architecture](#architecture)
+- [Protocol Implementation](#protocol-implementation)
+- [Usage](#usage)
+- [Testing](#testing)
+- [References](#references)
+
+---
+
+## Overview
+
+pmxcfs uses libqb for IPC communication between the daemon and client tools (`pvecm`, `pvenode`, etc.). This crate implements a server using QB_IPC_SHM (shared memory ring buffers) that is wire-compatible with libqb clients, enabling the Rust pmxcfs implementation to communicate with existing C-based tools.
+
+**Key Features**:
+- Wire-compatible with libqb clients
+- QB_IPC_SHM transport (shared memory ring buffers)
+- Async I/O via tokio
+- Lock-free SPSC ring buffers
+- Supports authentication via uid/gid
+- Per-connection context (uid, gid, pid, read-only flag)
+- Connection statistics tracking
+- Abstract Unix sockets for setup handshake (Linux-specific)
+
+---
+
+## Architecture
+
+### Transport: QB_IPC_SHM (Shared Memory Ring Buffers)
+
+**Rust pmxcfs uses**: `QB_IPC_SHM` (shared memory ring buffers)
+
+We implemented shared memory transport using lock-free SPSC (single-producer single-consumer) ring buffers. This provides:
+
+- **Wire compatibility**: Same handshake protocol as libqb
+- **Async I/O**: Integration with tokio ecosystem
+
+**Ring Buffer Design**:
+- Each connection has 3 ring buffers:
+ 1. **Request ring**: Client writes, server reads
+ 2. **Response ring**: Server writes, client reads
+ 3. **Event ring**: Server writes, client reads (for async notifications)
+- Ring buffers stored in `/dev/shm` (Linux shared memory)
+- Chunk-based protocol matching libqb
+
+### Server Structure
+
+### Connection Statistics
+
+Tracks statistics for C compatibility (matching `qb_ipcs_stats`).
+
+---
+
+## Protocol Implementation
+
+### Connection Handshake
+
+Server creates an abstract Unix socket `@pve2` (@ prefix indicates abstract namespace) for initial connection setup.
+
+### Request/Response Communication
+
+After handshake, communication happens via shared memory ring buffers using libqb-compatible chunk format.
+
+### Wire Format Structures
+
+All structures use `#[repr(C, align(8))]` to match C's alignment requirements.
+
+Error codes must be negative errno values (e.g., `-EPERM`, `-EINVAL`) to match libqb convention.
+
+---
+
+## Testing
+
+Requires Corosync running for integration tests. See `tests/` directory for C client FFI compatibility tests.
+
+## Implementation Status
+
+### Implemented
+
+- Connection handshake (SOCK_STREAM setup socket)
+- Authentication via SO_PASSCRED (uid/gid/pid)
+- QB_IPC_SHM transport (shared memory ring buffers)
+- Lock-free SPSC ring buffers
+- Async I/O via tokio
+- Abstract Unix sockets for setup handshake
+- Message header parsing (request/response)
+- Error code propagation (negative errno)
+- Ring buffer file management (creation/cleanup)
+- Event channel ring buffers (created, not actively used)
+- Connection statistics tracking
+- Disconnect detection
+- Read-only flag based on gid
+
+### Not Implemented
+
+- Event channel message sending (pmxcfs doesn't use events yet)
+
+## Application-Level IPC Operations
+
+### Operation Summary
+
+The following IPC operations are supported (defined in pmxcfs):
+
+| Operation | Request Data | Response Data | Description |
+|-----------|-------------|---------------|-------------|
+| GET_FS_VERSION | Empty | uint32_t version | Get filesystem version number |
+| GET_CLUSTER_INFO | Empty | JSON string | Get cluster information |
+| GET_GUEST_LIST | Empty | JSON array | Get list of all VMs/containers |
+| SET_STATUS | name + data | Empty | Set status key-value pair |
+| GET_STATUS | name | Binary data | Get status value by name |
+| GET_CONFIG | name | File contents | Read configuration file |
+| LOG_CLUSTER_MSG | priority + msg | Empty | Add cluster log entry |
+| GET_CLUSTER_LOG | max_entries | JSON array | Get cluster log entries |
+| GET_RRD_DUMP | Empty | RRD dump text | Get all RRD data |
+| GET_GUEST_CONFIG_PROPERTY | vmid + key | String value | Get single VM config property |
+| GET_GUEST_CONFIG_PROPERTIES | vmid | JSON object | Get all VM config properties |
+| VERIFY_TOKEN | userid + token | Boolean | Verify API token validity |
+
+### Common Clients
+
+The following Proxmox components use the IPC interface:
+
+- **pvestatd**: Updates node/VM/storage metrics (SET_STATUS, GET_STATUS)
+- **pve-ha-crm**: HA cluster resource manager (GET_CLUSTER_INFO, GET_GUEST_LIST)
+- **pve-ha-lrm**: HA local resource manager (GET_CONFIG, LOG_CLUSTER_MSG)
+- **pvecm**: Cluster management CLI (GET_CLUSTER_INFO, GET_CLUSTER_LOG)
+- **pvedaemon**: PVE API daemon (All query operations)
+
+### Permission Model
+
+**Write Operations** (require root):
+- SET_STATUS
+- LOG_CLUSTER_MSG
+
+**Read Operations** (any authenticated user):
+- All GET_* operations
+- VERIFY_TOKEN
+
+---
+
+## References
+
+### libqb Source
+
+Reference implementation of QB IPC protocol (available at https://github.com/ClusterLabs/libqb):
+
+- `libqb/lib/ringbuffer.c` - Ring buffer implementation
+- `libqb/lib/ipc_shm.c` - Shared memory transport
+- `libqb/lib/ipc_setup.c` - Connection setup/handshake
+- `libqb/include/qb/qbipc_common.h` - Wire protocol structures
+
+### C pmxcfs (pve-cluster)
+
+- `src/pmxcfs/server.c` - C IPC server using libqb
+- `src/pmxcfs/cfs-ipc-ops.h` - pmxcfs IPC operation codes
+
+### Related Documentation
+
+- `../C_COMPATIBILITY.md` - General C compatibility notes (if exists)
+
+---
+
+## Notes
+
+### Ring Buffer Naming Convention
+
+Ring buffer files are created in `/dev/shm` with names based on connection descriptor and ring type (request/response/event).
+
+### Error Handling
+
+Always use **negative errno values** for errors to maintain compatibility with libqb clients.
+
+### Alignment and Padding
+
+All wire format structures must use `#[repr(C, align(8))]` to ensure 8-byte alignment matching C's requirements.
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs b/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs
new file mode 100644
index 00000000..6b9695ce
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs
@@ -0,0 +1,92 @@
+//! Simple test server for debugging libqb connectivity
+
+use async_trait::async_trait;
+use pmxcfs_ipc::{Handler, Permissions, Request, Response, Server};
+
+/// Example handler implementation
+struct TestHandler;
+
+#[async_trait]
+impl Handler for TestHandler {
+ fn authenticate(&self, uid: u32, gid: u32) -> Option<Permissions> {
+ // Accept root with read-write access
+ if uid == 0 {
+ eprintln!("Authenticated uid={uid}, gid={gid} as root (read-write)");
+ return Some(Permissions::ReadWrite);
+ }
+
+ // Accept all other users with read-only access for testing
+ eprintln!("Authenticated uid={uid}, gid={gid} as regular user (read-only)");
+ Some(Permissions::ReadOnly)
+ }
+
+ async fn handle(&self, request: Request) -> Response {
+ eprintln!(
+ "Received request: id={}, data_len={}, conn={}, uid={}, gid={}, pid={}, read_only={}",
+ request.msg_id,
+ request.data.len(),
+ request.conn_id,
+ request.uid,
+ request.gid,
+ request.pid,
+ request.is_read_only
+ );
+
+ match request.msg_id {
+ 1 => {
+ // CFS_IPC_GET_FS_VERSION
+ let response_str = r#"{"version":1,"protocol":1}"#;
+ eprintln!("Responding with: {response_str}");
+ Response::ok(response_str.as_bytes().to_vec())
+ }
+ 2 => {
+ // CFS_IPC_GET_CLUSTER_INFO
+ let response_str = r#"{"nodes":["node1","node2"],"quorate":true}"#;
+ eprintln!("Responding with: {response_str}");
+ Response::ok(response_str.as_bytes().to_vec())
+ }
+ 3 => {
+ // CFS_IPC_GET_GUEST_LIST
+ let response_str = r#"{"data":[{"vmid":100}]}"#;
+ eprintln!("Responding with: {response_str}");
+ Response::ok(response_str.as_bytes().to_vec())
+ }
+ _ => {
+ eprintln!("Unknown message id: {}", request.msg_id);
+ Response::err(-libc::EINVAL)
+ }
+ }
+ }
+}
+
+#[tokio::main]
+async fn main() {
+ // Initialize tracing
+ tracing_subscriber::fmt()
+ .with_max_level(tracing::Level::DEBUG)
+ .with_target(true)
+ .init();
+
+ println!("Starting QB IPC test server on 'pve2'...");
+
+ // Create handler and server
+ let handler = TestHandler;
+ let mut server = Server::new("pve2", handler);
+
+ println!("Server created, starting...");
+
+ if let Err(e) = server.start() {
+ eprintln!("Failed to start server: {e}");
+ std::process::exit(1);
+ }
+
+ println!("Server started successfully!");
+ println!("Waiting for connections...");
+
+ // Keep server running
+ tokio::signal::ctrl_c()
+ .await
+ .expect("Failed to wait for Ctrl-C");
+
+ println!("Shutting down...");
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs
new file mode 100644
index 00000000..d6d77e6c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs
@@ -0,0 +1,657 @@
+/// Per-connection handling for libqb IPC with shared memory ring buffers
+///
+/// This module contains all connection-specific logic including connection
+/// establishment, authentication, request handling, and shared memory ring buffer management.
+use anyhow::{Context, Result};
+use std::os::unix::io::AsRawFd;
+use std::path::PathBuf;
+use std::sync::Arc;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::net::UnixStream;
+use tokio_util::sync::CancellationToken;
+
+use super::handler::{Handler, Permissions};
+use super::protocol::*;
+use super::ringbuffer::{FlowControl, RingBuffer};
+
+/// Per-connection state using shared memory ring buffers
+///
+/// Uses SHM transport (shared memory ring buffers).
+#[allow(dead_code)] // Fields are intentionally stored for lifecycle management
+pub(super) struct QbConnection {
+ /// Connection ID for logging and debugging
+ conn_id: u64,
+
+ /// Client process ID (from SO_PEERCRED)
+ pid: u32,
+
+ /// Client user ID (from SO_PEERCRED)
+ uid: u32,
+
+ /// Client group ID (from SO_PEERCRED)
+ gid: u32,
+
+ /// Whether this connection has read-only access (determined by Handler::authenticate)
+ pub(super) read_only: bool,
+
+ /// Setup socket (kept open for disconnect detection)
+ _setup_stream: UnixStream,
+
+ /// Ring buffers for shared memory IPC
+ /// Request ring: client writes, server reads
+ request_rb: Option<RingBuffer>,
+ /// Response ring: server writes, client reads
+ response_rb: Option<RingBuffer>,
+ /// Event ring: server writes, client reads (for async notifications)
+ /// NOTE: The existing PVE/IPCC.xs Perl client only uses qb_ipcc_sendv_recv()
+ /// and never calls qb_ipcc_event_recv(), so this ring buffer is created
+ /// for libqb compatibility but remains unused in practice.
+ _event_rb: Option<RingBuffer>,
+
+ /// Paths to ring buffer data files (for debugging/cleanup)
+ pub(super) ring_buffer_paths: Vec<PathBuf>,
+
+ /// Task handle for request handler (auto-aborted on drop)
+ pub(super) task_handle: Option<tokio::task::JoinHandle<()>>,
+}
+
+impl QbConnection {
+ /// Accept a new connection from the setup socket
+ ///
+ /// Performs authentication, creates ring buffers, spawns request handler task,
+ /// and returns the connection object.
+ pub(super) async fn accept(
+ mut stream: UnixStream,
+ conn_id: u64,
+ service_name: &str,
+ handler: Arc<dyn Handler>,
+ cancellation_token: CancellationToken,
+ ) -> Result<Self> {
+ // Read connection request
+ let fd = stream.as_raw_fd();
+ let mut req_bytes = vec![0u8; std::mem::size_of::<ConnectionRequest>()];
+ stream
+ .read_exact(&mut req_bytes)
+ .await
+ .context("Failed to read connection request")?;
+
+ tracing::debug!(
+ "Connection request raw bytes ({} bytes): {:02x?}",
+ req_bytes.len(),
+ req_bytes
+ );
+
+ let req =
+ unsafe { std::ptr::read_unaligned(req_bytes.as_ptr() as *const ConnectionRequest) };
+
+ tracing::debug!(
+ "Connection request: id={}, size={}, max_msg_size={}",
+ *req.hdr.id,
+ *req.hdr.size,
+ req.max_msg_size
+ );
+
+ // Get peer credentials (SO_PEERCRED on Linux)
+ let (uid, gid, pid) = get_peer_credentials(fd)?;
+
+ // Authenticate using Handler trait
+ let read_only = match handler.authenticate(uid, gid) {
+ Some(Permissions::ReadWrite) => {
+ tracing::info!(pid, uid, gid, "Connection accepted with read-write access");
+ false
+ }
+ Some(Permissions::ReadOnly) => {
+ tracing::info!(pid, uid, gid, "Connection accepted with read-only access");
+ true
+ }
+ None => {
+ tracing::warn!(
+ pid,
+ uid,
+ gid,
+ "Connection rejected by authentication policy"
+ );
+ send_connection_response(&mut stream, -libc::EPERM, conn_id, 0, "", "", "").await?;
+ anyhow::bail!("Connection authentication failed");
+ }
+ };
+
+ // Create connection descriptor for ring buffer naming
+ let conn_desc = format!("{}-{}-{}", std::process::id(), pid, conn_id);
+ let max_msg_size = req.max_msg_size.max(8192);
+
+ // Create ring buffers in /dev/shm
+ // Pass max_msg_size directly - RingBuffer::new() will add QB_RB_CHUNK_MARGIN and round up
+ // (just like qb_rb_open() does on the client side)
+ let ring_size = max_msg_size as usize;
+
+ tracing::debug!(
+ "Creating ring buffers for connection {}: size={} bytes",
+ conn_id,
+ ring_size
+ );
+
+ // Request ring: client writes, server reads
+ // Request ring needs sizeof(int32_t) for flow control (shared_user_data)
+ let request_rb_name = format!("{conn_desc}-{service_name}-request");
+ let request_rb = RingBuffer::new(
+ "/dev/shm",
+ &request_rb_name,
+ ring_size,
+ std::mem::size_of::<i32>(),
+ )
+ .context("Failed to create request ring buffer")?;
+
+ // Response ring: server writes, client reads
+ // Response ring doesn't need shared_user_data
+ let response_rb_name = format!("{conn_desc}-{service_name}-response");
+ let response_rb = RingBuffer::new("/dev/shm", &response_rb_name, ring_size, 0)
+ .context("Failed to create response ring buffer")?;
+
+ // Event ring: server writes, client reads (for async notifications)
+ // Event ring doesn't need shared_user_data
+ let event_rb_name = format!("{conn_desc}-{service_name}-event");
+ let event_rb = RingBuffer::new("/dev/shm", &event_rb_name, ring_size, 0)
+ .context("Failed to create event ring buffer")?;
+
+ // Collect full paths for cleanup tracking
+ let request_data_path = PathBuf::from(format!("/dev/shm/qb-{request_rb_name}-data"));
+ let response_data_path = PathBuf::from(format!("/dev/shm/qb-{response_rb_name}-data"));
+ let event_data_path = PathBuf::from(format!("/dev/shm/qb-{event_rb_name}-data"));
+
+ // Send connection response with ring buffer BASE NAMES (not full paths)
+ // libqb client expects base names (e.g., "123-456-1-pve2-request")
+ // It will internally prepend "/dev/shm/qb-" and append "-header" or "-data"
+ send_connection_response(
+ &mut stream,
+ 0,
+ conn_id,
+ max_msg_size,
+ &request_rb_name,
+ &response_rb_name,
+ &event_rb_name,
+ )
+ .await?;
+
+ // Spawn request handler task
+ let handler_for_task = handler.clone();
+ let cancellation_for_task = cancellation_token.child_token();
+
+ let task_handle = tokio::spawn(async move {
+ Self::handle_requests(
+ request_rb,
+ response_rb,
+ handler_for_task,
+ cancellation_for_task,
+ conn_id,
+ uid,
+ gid,
+ pid,
+ read_only,
+ )
+ .await;
+ });
+
+ tracing::info!("Connection {} established (SHM transport)", conn_id);
+
+ Ok(Self {
+ conn_id,
+ pid,
+ uid,
+ gid,
+ read_only,
+ _setup_stream: stream,
+ request_rb: None, // Moved to task
+ response_rb: None, // Moved to task
+ _event_rb: Some(event_rb),
+ ring_buffer_paths: vec![request_data_path, response_data_path, event_data_path],
+ task_handle: Some(task_handle),
+ })
+ }
+
+ /// Request handler loop - receives and processes messages via ring buffers
+ ///
+ /// Runs in a background async task, receiving requests and sending responses
+ /// through shared memory ring buffers.
+ ///
+ /// Uses tokio channels to implement a workqueue with flow control:
+ /// - FlowControl::OK: Proceed with sending
+ /// - FlowControl::SLOW_DOWN: Reduce send rate
+ /// - FlowControl::STOP: Do not send
+ ///
+ /// Architecture: Three concurrent tasks communicating via tokio channels:
+ /// 1. Request receiver: reads from request ring buffer, queues work
+ /// 2. Worker: processes requests from work queue, sends to response queue
+ /// 3. Response sender: writes responses from response queue to response ring buffer
+ #[allow(clippy::too_many_arguments)]
+ async fn handle_requests(
+ mut request_rb: RingBuffer,
+ mut response_rb: RingBuffer,
+ handler: Arc<dyn Handler>,
+ cancellation_token: CancellationToken,
+ conn_id: u64,
+ uid: u32,
+ gid: u32,
+ pid: u32,
+ read_only: bool,
+ ) {
+ tracing::debug!("Request handler started for connection {}", conn_id);
+
+ // Workqueue capacity and flow control thresholds
+ //
+ // NOTE: The C implementation (using libqb) processes requests synchronously
+ // in the event loop callback (server.c:159 s1_msg_process_fn), so there's
+ // no explicit queue. We add async queueing in Rust to allow non-blocking
+ // request handling with tokio.
+ //
+ // Queue capacity of 8 is chosen as a reasonable default for:
+ // - Typical PVE workloads: Most IPC operations are fast (file reads/writes)
+ // - Memory efficiency: Each queued item = ~1KB (request header + data)
+ // - Backpressure: Small queue encourages flow control to activate quickly
+ // - Testing: Flow control test (02-flow-control.sh) verifies 20 concurrent
+ // operations work correctly with capacity 8
+ //
+ // Flow control thresholds match libqb's rate limiting (ipcs.c:199-203):
+ // - FlowControl::OK (0): Proceed with sending (QB_IPCS_RATE_NORMAL)
+ // - FlowControl::SLOW_DOWN (1): Reduce send rate (QB_IPCS_RATE_OFF)
+ // - FlowControl::STOP (2): Do not send (QB_IPCS_RATE_OFF_2)
+ const MAX_PENDING_REQUESTS: usize = 8;
+
+ // Set SLOW_DOWN when queue reaches 75% capacity (6/8 items)
+ // This provides early warning before the queue fills completely,
+ // allowing clients to throttle before hitting STOP
+ const FC_WARNING_THRESHOLD: usize = 6;
+
+ // Work queue: (header, request) -> worker
+ let (work_tx, mut work_rx) =
+ tokio::sync::mpsc::channel::<(RequestHeader, Request)>(MAX_PENDING_REQUESTS);
+
+ // Response queue: worker -> response sender
+ // Unbounded because responses must not block the worker
+ let (response_tx, mut response_rx) =
+ tokio::sync::mpsc::unbounded_channel::<(RequestHeader, Response)>();
+
+ // Spawn worker task to process requests
+ let worker_handler = handler.clone();
+ let worker_response_tx = response_tx.clone();
+ let worker_task = tokio::spawn(async move {
+ while let Some((header, request)) = work_rx.recv().await {
+ let handler_response = worker_handler.handle(request).await;
+ // Send to response queue (unbounded, never blocks)
+ let _ = worker_response_tx.send((header, handler_response));
+ }
+ });
+
+ // Spawn response sender task
+ let response_task = tokio::spawn(async move {
+ while let Some((header, handler_response)) = response_rx.recv().await {
+ Self::send_response(&mut response_rb, header, handler_response).await;
+ }
+ });
+
+ // Main request receiver loop
+ loop {
+ // Wait for incoming request (async, yields to tokio scheduler)
+ let request_data = tokio::select! {
+ _ = cancellation_token.cancelled() => {
+ tracing::debug!("Request handler cancelled for connection {}", conn_id);
+ break;
+ }
+ result = request_rb.recv() => {
+ match result {
+ Ok(data) => data,
+ Err(e) => {
+ tracing::error!("Error receiving request on conn {}: {}", conn_id, e);
+ break;
+ }
+ }
+ }
+ };
+
+ // After receiving from ring buffer, flow control is already set to 0
+ // by RingBufferShared::read_chunk()
+
+ // Parse request header
+ if request_data.len() < std::mem::size_of::<RequestHeader>() {
+ tracing::warn!(
+ "Request too small: {} bytes (need {} for header)",
+ request_data.len(),
+ std::mem::size_of::<RequestHeader>()
+ );
+ continue;
+ }
+
+ let header =
+ unsafe { std::ptr::read_unaligned(request_data.as_ptr() as *const RequestHeader) };
+
+ tracing::debug!(
+ "Received request on conn {}: id={}, size={}",
+ conn_id,
+ *header.id,
+ *header.size
+ );
+
+ // Extract message data (after header)
+ let header_size = std::mem::size_of::<RequestHeader>();
+ let msg_data = &request_data[header_size..];
+
+ // Build request object with full context
+ let request = Request {
+ msg_id: *header.id,
+ data: msg_data.to_vec(),
+ is_read_only: read_only,
+ conn_id,
+ uid,
+ gid,
+ pid,
+ };
+
+ // Send to workqueue - implements backpressure via flow control
+ match work_tx.try_send((header, request)) {
+ Ok(()) => {
+ // Request queued successfully
+
+ // Update flow control based on queue depth
+ // This matches libqb's rate limiting behavior
+ let queue_len = MAX_PENDING_REQUESTS - work_tx.capacity();
+ let fc_value = if queue_len >= MAX_PENDING_REQUESTS {
+ FlowControl::STOP // Queue full - stop sending
+ } else if queue_len >= FC_WARNING_THRESHOLD {
+ FlowControl::SLOW_DOWN // Queue approaching full - slow down
+ } else {
+ FlowControl::OK // Queue has space - OK to send
+ };
+
+ if fc_value > FlowControl::OK {
+ tracing::debug!(
+ "Setting flow control to {} (queue: {}/{})",
+ fc_value,
+ queue_len,
+ MAX_PENDING_REQUESTS
+ );
+ }
+ request_rb.flow_control.set(fc_value);
+ }
+ Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
+ // Queue is full - set flow control to STOP and send EAGAIN
+ tracing::warn!("Work queue full on conn {}, sending EAGAIN", conn_id);
+ request_rb.flow_control.set(FlowControl::STOP);
+
+ let error_response = Response {
+ error_code: -libc::EAGAIN,
+ data: Vec::new(),
+ };
+ // Send error response directly (bypassing queue)
+ let _ = response_tx.send((header, error_response));
+ }
+ Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
+ tracing::error!("Work queue closed on conn {}", conn_id);
+ break;
+ }
+ }
+ }
+
+ // Cleanup: drop channels to signal tasks to exit
+ drop(work_tx);
+ drop(response_tx);
+ let _ = worker_task.await;
+ let _ = response_task.await;
+
+ tracing::debug!("Request handler finished for connection {}", conn_id);
+ }
+
+ /// Send a response to the client
+ async fn send_response(
+ response_rb: &mut RingBuffer,
+ header: RequestHeader,
+ handler_response: Response,
+ ) {
+ // Build and serialize response: [header][data]
+ let response_size = std::mem::size_of::<ResponseHeader>() + handler_response.data.len();
+ let mut response_bytes = Vec::with_capacity(response_size);
+
+ let response_header = ResponseHeader {
+ id: header.id,
+ size: (response_size as i32).into(),
+ error: handler_response.error_code.into(),
+ };
+
+ response_bytes.extend_from_slice(unsafe {
+ std::slice::from_raw_parts(
+ &response_header as *const _ as *const u8,
+ std::mem::size_of::<ResponseHeader>(),
+ )
+ });
+ response_bytes.extend_from_slice(&handler_response.data);
+
+ tracing::debug!("Response header bytes (24): {:02x?}", &response_bytes[..24]);
+
+ // Send response (async, yields if buffer full)
+ match response_rb.send(&response_bytes).await {
+ Ok(()) => {
+ // Response sent successfully
+ }
+ Err(e) => {
+ tracing::error!("Failed to send response: {}", e);
+ }
+ }
+ }
+}
+
+/// Get peer credentials from Unix socket
+fn get_peer_credentials(fd: i32) -> Result<(u32, u32, u32)> {
+ #[cfg(target_os = "linux")]
+ {
+ let mut ucred: libc::ucred = unsafe { std::mem::zeroed() };
+ let mut ucred_size = std::mem::size_of::<libc::ucred>() as libc::socklen_t;
+
+ let res = unsafe {
+ libc::getsockopt(
+ fd,
+ libc::SOL_SOCKET,
+ libc::SO_PEERCRED,
+ &mut ucred as *mut _ as *mut libc::c_void,
+ &mut ucred_size,
+ )
+ };
+
+ if res != 0 {
+ anyhow::bail!(
+ "getsockopt SO_PEERCRED failed: {}",
+ std::io::Error::last_os_error()
+ );
+ }
+
+ Ok((ucred.uid, ucred.gid, ucred.pid as u32))
+ }
+
+ #[cfg(not(target_os = "linux"))]
+ {
+ anyhow::bail!("Peer credentials not supported on this platform");
+ }
+}
+
+/// Send connection response to client
+async fn send_connection_response(
+ stream: &mut UnixStream,
+ error: i32,
+ conn_id: u64,
+ max_msg_size: u32,
+ request_path: &str,
+ response_path: &str,
+ event_path: &str,
+) -> Result<()> {
+ let mut response = ConnectionResponse {
+ hdr: ResponseHeader {
+ id: MSG_AUTHENTICATE.into(),
+ size: (std::mem::size_of::<ConnectionResponse>() as i32).into(),
+ error: error.into(),
+ },
+ connection_type: CONNECTION_TYPE_SHM, // Shared memory transport
+ max_msg_size,
+ connection: conn_id as usize,
+ request: [0u8; PATH_MAX],
+ response: [0u8; PATH_MAX],
+ event: [0u8; PATH_MAX],
+ };
+
+ // Helper to copy path strings into fixed-size buffers
+ let copy_path = |dest: &mut [u8; PATH_MAX], src: &str| {
+ if !src.is_empty() {
+ let len = src.len().min(PATH_MAX - 1);
+ dest[..len].copy_from_slice(&src.as_bytes()[..len]);
+ tracing::debug!("Connection response path: '{}'", src);
+ }
+ };
+
+ copy_path(&mut response.request, request_path);
+ copy_path(&mut response.response, response_path);
+ copy_path(&mut response.event, event_path);
+
+ // Serialize and send
+ let response_bytes = unsafe {
+ std::slice::from_raw_parts(
+ &response as *const _ as *const u8,
+ std::mem::size_of::<ConnectionResponse>(),
+ )
+ };
+
+ stream
+ .write_all(response_bytes)
+ .await
+ .context("Failed to send connection response")?;
+
+ tracing::debug!(
+ "Sent connection response: error={}, connection_type=SHM",
+ error
+ );
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_malformed_request_size_validation() {
+ // This test verifies the size validation logic for malformed requests
+ // The actual validation happens in handle_requests() at line 247-254
+
+ let header_size = std::mem::size_of::<RequestHeader>();
+ assert_eq!(header_size, 16, "RequestHeader should be 16 bytes");
+
+ // Test case 1: Request too small (would be rejected)
+ let too_small_data = [0x01, 0x02, 0x03]; // Only 3 bytes
+ assert!(
+ too_small_data.len() < header_size,
+ "Malformed request with {} bytes should be less than header size {}",
+ too_small_data.len(),
+ header_size
+ );
+
+ // Test case 2: More realistic too-small cases
+ let test_cases = vec![
+ (vec![0u8; 0], 0), // Empty request
+ (vec![0u8; 1], 1), // 1 byte
+ (vec![0u8; 8], 8), // 8 bytes (half header)
+ (vec![0u8; 15], 15), // 15 bytes (just short of header)
+ ];
+
+ for (data, expected_len) in test_cases {
+ assert_eq!(data.len(), expected_len);
+ assert!(
+ data.len() < header_size,
+ "Request with {} bytes should be rejected (need {})",
+ data.len(),
+ header_size
+ );
+ }
+
+ // Test case 3: Valid size requests (would pass size check)
+ let valid_cases = vec![
+ vec![0u8; 16], // Exact header size
+ vec![0u8; 32], // Header + data
+ vec![0u8; 1024], // Large request
+ ];
+
+ for data in valid_cases {
+ assert!(
+ data.len() >= header_size,
+ "Request with {} bytes should pass size check",
+ data.len()
+ );
+ }
+ }
+
+ #[test]
+ fn test_malformed_header_structure() {
+ // This test verifies that the header structure is correctly defined
+ // and that we can safely parse various header patterns
+
+ let header_size = std::mem::size_of::<RequestHeader>();
+
+ // Create a valid-sized buffer with various patterns
+ let patterns = vec![
+ vec![0x00; header_size], // All zeros
+ vec![0xFF; header_size], // All ones
+ vec![0xAA; header_size], // Alternating pattern
+ ];
+
+ for pattern in patterns {
+ assert_eq!(pattern.len(), header_size);
+
+ // Parse header (same unsafe code as in handle_requests:256-258)
+ let header =
+ unsafe { std::ptr::read_unaligned(pattern.as_ptr() as *const RequestHeader) };
+
+ // The parsing should not crash, regardless of values
+ // The actual values don't matter for this safety test
+ let _id = *header.id;
+ let _size = *header.size;
+ }
+ }
+
+ #[test]
+ fn test_request_header_alignment() {
+ // Verify that RequestHeader can be read with read_unaligned
+ // This is important because data from ring buffers may not be aligned
+
+ let header_size = std::mem::size_of::<RequestHeader>();
+
+ // Create misaligned buffer (offset by 1 byte to test unaligned access)
+ let mut buffer = vec![0u8; header_size + 1];
+ buffer[1..].fill(0x42);
+
+ // Read from misaligned offset (this is what read_unaligned is for)
+ let header =
+ unsafe { std::ptr::read_unaligned(&buffer[1] as *const u8 as *const RequestHeader) };
+
+ // Should successfully read without crashing
+ let _id = *header.id;
+ let _size = *header.size;
+ }
+
+ #[test]
+ fn test_connection_request_structure() {
+ // Verify ConnectionRequest structure for connection setup
+
+ let conn_req_size = std::mem::size_of::<ConnectionRequest>();
+
+ // ConnectionRequest should be properly sized
+ assert!(
+ conn_req_size > std::mem::size_of::<RequestHeader>(),
+ "ConnectionRequest should include header plus additional fields"
+ );
+
+ // Test that we can parse a zero-filled connection request
+ let data = vec![0u8; conn_req_size];
+ let conn_req =
+ unsafe { std::ptr::read_unaligned(data.as_ptr() as *const ConnectionRequest) };
+
+ // Should not crash when accessing fields
+ let _id = *conn_req.hdr.id;
+ let _size = *conn_req.hdr.size;
+ let _max_msg_size = conn_req.max_msg_size;
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs
new file mode 100644
index 00000000..12b40cd4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs
@@ -0,0 +1,93 @@
+//! Handler trait for processing IPC requests
+//!
+//! This module defines the core `Handler` trait that users implement to process
+//! IPC requests. The trait-based approach provides a more idiomatic and extensible
+//! API compared to raw function closures.
+
+use crate::protocol::{Request, Response};
+use async_trait::async_trait;
+
+/// Permissions for IPC connections
+///
+/// Determines the access level for authenticated connections.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Permissions {
+ /// Read-only access
+ ReadOnly,
+ /// Read-write access
+ ReadWrite,
+}
+
+/// Handler trait for processing IPC requests and authentication
+///
+/// Implement this trait to define custom request handling logic and authentication
+/// policy for your IPC server. The handler receives a `Request` containing the
+/// message ID, payload data, and connection context, and returns a `Response` with
+/// an error code and response data.
+///
+/// ## Authentication
+///
+/// The `authenticate` method is called during connection setup to determine whether
+/// a client with given credentials should be accepted. This allows the handler to
+/// implement application-specific authentication policies.
+///
+/// ## Async Support
+///
+/// The `handle` method is async, allowing you to perform I/O operations, database
+/// queries, or other async work within your handler.
+///
+/// ## Thread Safety
+///
+/// Handlers must be `Send + Sync` as they may be called from multiple tokio tasks
+/// concurrently. Use `Arc<Mutex<T>>` or other synchronization primitives if you need
+/// mutable shared state.
+///
+/// ## Error Handling
+///
+/// Return negative errno values in `Response::error_code` to indicate errors.
+/// Use 0 for success. See `libc::*` constants for standard errno values.
+#[async_trait]
+pub trait Handler: Send + Sync {
+ /// Authenticate a connecting client and determine access level
+ ///
+ /// Called during connection setup to determine whether to accept the connection
+ /// and what access level to grant.
+ ///
+ /// # Arguments
+ ///
+ /// * `uid` - Client user ID (from SO_PEERCRED)
+ /// * `gid` - Client group ID (from SO_PEERCRED)
+ ///
+ /// # Returns
+ ///
+ /// - `Some(Permissions::ReadWrite)` to accept with read-write access
+ /// - `Some(Permissions::ReadOnly)` to accept with read-only access
+ /// - `None` to reject the connection
+ fn authenticate(&self, uid: u32, gid: u32) -> Option<Permissions>;
+
+ /// Handle an IPC request
+ ///
+ /// # Arguments
+ ///
+ /// * `request` - The incoming request with message ID, data, and connection context
+ ///
+ /// # Returns
+ ///
+ /// A `Response` containing the error code (0 = success, negative = errno) and
+ /// optional response data to send back to the client.
+ async fn handle(&self, request: Request) -> Response;
+}
+
+/// Blanket implementation for Arc<T> where T: Handler
+///
+/// This allows passing `Arc<MyHandler>` directly to `Server::new()`.
+#[async_trait]
+impl<T: Handler> Handler for std::sync::Arc<T> {
+ fn authenticate(&self, uid: u32, gid: u32) -> Option<Permissions> {
+ (**self).authenticate(uid, gid)
+ }
+
+ async fn handle(&self, request: Request) -> Response {
+ (**self).handle(request).await
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs
new file mode 100644
index 00000000..923c359e
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs
@@ -0,0 +1,37 @@
+/// libqb-compatible IPC server implementation in pure Rust
+///
+/// This crate implements a minimal libqb IPC server that is wire-compatible
+/// with libqb clients (qb_ipcc_*), without depending on the libqb C library.
+///
+/// ## Protocol Overview
+///
+/// 1. **Connection Handshake** (SOCK_STREAM):
+/// - Server listens on `/var/run/{service_name}`
+/// - Client connects and sends `qb_ipc_connection_request`
+/// - Server authenticates (uid/gid), creates per-connection datagram sockets
+/// - Server sends `qb_ipc_connection_response` with socket paths
+///
+/// 2. **Request/Response** (SOCK_DGRAM):
+/// - Client sends requests on datagram socket
+/// - Server receives, processes, and sends responses
+///
+/// ## Module Structure
+///
+/// - `protocol` - Wire protocol structures and constants
+/// - `socket` - Abstract Unix socket utilities
+/// - `connection` - Per-connection handling and request processing
+/// - `server` - Main IPC server and connection acceptance
+///
+/// References:
+/// - libqb source: ~/dev/libqb/lib/ipc_socket.c, ipc_setup.c
+mod connection;
+mod handler;
+mod protocol;
+mod ringbuffer;
+mod server;
+mod socket;
+
+// Public API
+pub use handler::{Handler, Permissions};
+pub use protocol::{Request, Response};
+pub use server::Server;
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs
new file mode 100644
index 00000000..469099f2
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs
@@ -0,0 +1,332 @@
+//! libqb wire protocol structures and constants
+//!
+//! This module contains the low-level protocol definitions for libqb IPC communication.
+//! All structures must match the C counterparts exactly for binary compatibility.
+
+/// Message ID for authentication requests (matches libqb's QB_IPC_MSG_AUTHENTICATE)
+pub(super) const MSG_AUTHENTICATE: i32 = 1;
+
+/// Connection type for shared memory transport (matches libqb's QB_IPC_SHM)
+pub(super) const CONNECTION_TYPE_SHM: u32 = 1;
+
+/// Maximum path length - used in connection response
+pub(super) const PATH_MAX: usize = 4096;
+
+/// Wrapper for i32 that aligns to 8-byte boundary with explicit padding
+///
+/// Simulates C's `__attribute__ ((aligned(8)))` on individual i32 fields.
+/// This is used to match libqb's per-field alignment behavior.
+///
+/// Memory layout:
+/// - Bytes 0-3: i32 value
+/// - Bytes 4-7: zero padding
+/// - Total: 8 bytes
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub struct Align8 {
+ pub value: i32,
+ _pad: u32, // 4 bytes padding for i32 -> 8 bytes total
+}
+
+impl Align8 {
+ #[inline]
+ pub const fn new(value: i32) -> Self {
+ Align8 { value, _pad: 0 }
+ }
+}
+
+impl std::ops::Deref for Align8 {
+ type Target = i32;
+
+ #[inline]
+ fn deref(&self) -> &i32 {
+ &self.value
+ }
+}
+
+impl std::ops::DerefMut for Align8 {
+ #[inline]
+ fn deref_mut(&mut self) -> &mut i32 {
+ &mut self.value
+ }
+}
+
+impl From<i32> for Align8 {
+ #[inline]
+ fn from(value: i32) -> Self {
+ Align8::new(value)
+ }
+}
+
+impl Default for Align8 {
+ #[inline]
+ fn default() -> Self {
+ Align8::new(0)
+ }
+}
+
+/// Request header (matches libqb's qb_ipc_request_header)
+///
+/// Each field is 8-byte aligned to match C's __attribute__ ((aligned(8)))
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone)]
+pub struct RequestHeader {
+ pub id: Align8,
+ pub size: Align8,
+}
+
+/// Response header (matches libqb's qb_ipc_response_header)
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone)]
+pub struct ResponseHeader {
+ pub id: Align8,
+ pub size: Align8,
+ pub error: Align8,
+}
+
+/// Connection request sent by client during handshake (matches libqb's qb_ipc_connection_request)
+#[repr(C)]
+#[derive(Debug, Copy, Clone)]
+pub(super) struct ConnectionRequest {
+ pub hdr: RequestHeader,
+ pub max_msg_size: u32,
+}
+
+/// Connection response sent by server during handshake (matches libqb's qb_ipc_connection_response)
+#[repr(C, align(8))]
+#[derive(Debug)]
+pub(super) struct ConnectionResponse {
+ pub hdr: ResponseHeader,
+ pub connection_type: u32,
+ pub max_msg_size: u32,
+ pub connection: usize,
+ pub request: [u8; PATH_MAX],
+ pub response: [u8; PATH_MAX],
+ pub event: [u8; PATH_MAX],
+}
+
+/// Request passed to handlers
+///
+/// Contains all information about an IPC request including the message ID,
+/// payload data, and connection context (uid, gid, pid, permissions).
+#[derive(Debug, Clone)]
+pub struct Request {
+ /// Message ID identifying the operation (application-defined)
+ pub msg_id: i32,
+
+ /// Request payload data
+ pub data: Vec<u8>,
+
+ /// Whether this connection has read-only access
+ pub is_read_only: bool,
+
+ /// Connection ID (for logging/debugging)
+ pub conn_id: u64,
+
+ /// Client user ID (from SO_PEERCRED)
+ pub uid: u32,
+
+ /// Client group ID (from SO_PEERCRED)
+ pub gid: u32,
+
+ /// Client process ID (from SO_PEERCRED)
+ pub pid: u32,
+}
+
+/// Response from handlers
+///
+/// Contains the error code and response data to send back to the client.
+#[derive(Debug, Clone)]
+pub struct Response {
+ /// Error code (0 = success, negative = errno)
+ pub error_code: i32,
+
+ /// Response payload data
+ pub data: Vec<u8>,
+}
+
+impl Response {
+ /// Create a successful response with data
+ pub fn ok(data: Vec<u8>) -> Self {
+ Self {
+ error_code: 0,
+ data,
+ }
+ }
+
+ /// Create an error response with errno
+ pub fn err(error_code: i32) -> Self {
+ Self {
+ error_code,
+ data: Vec::new(),
+ }
+ }
+
+ /// Create an error response with errno and optional data
+ pub fn with_error(error_code: i32, data: Vec<u8>) -> Self {
+ Self { error_code, data }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_header_sizes() {
+ assert_eq!(std::mem::size_of::<RequestHeader>(), 16);
+ assert_eq!(std::mem::align_of::<RequestHeader>(), 8);
+ assert_eq!(std::mem::size_of::<ResponseHeader>(), 24);
+ assert_eq!(std::mem::align_of::<ResponseHeader>(), 8);
+ assert_eq!(std::mem::size_of::<ConnectionRequest>(), 24); // 16 (header) + 4 (max_msg_size) + 4 (padding)
+
+ println!(
+ "ConnectionResponse size: {}",
+ std::mem::size_of::<ConnectionResponse>()
+ );
+ println!(
+ "ConnectionResponse align: {}",
+ std::mem::align_of::<ConnectionResponse>()
+ );
+ println!("PATH_MAX: {PATH_MAX}");
+
+ // C expects: 24 (header) + 4 (connection_type) + 4 (max_msg_size) + 8 (connection pointer) + 3*4096 (paths) = 12328
+ assert_eq!(std::mem::size_of::<ConnectionResponse>(), 12328);
+ }
+
+ // ===== Align8 Tests =====
+
+ #[test]
+ fn test_align8_size_and_alignment() {
+ // Verify Align8 is exactly 8 bytes
+ assert_eq!(std::mem::size_of::<Align8>(), 8);
+ assert_eq!(std::mem::align_of::<Align8>(), 8);
+ }
+
+ #[test]
+ fn test_align8_creation_and_value_access() {
+ let a = Align8::new(42);
+ assert_eq!(a.value, 42);
+ assert_eq!(*a, 42); // Test Deref
+ }
+
+ #[test]
+ fn test_align8_from_i32() {
+ let a: Align8 = (-100).into();
+ assert_eq!(a.value, -100);
+ }
+
+ #[test]
+ fn test_align8_default() {
+ let a = Align8::default();
+ assert_eq!(a.value, 0);
+ }
+
+ #[test]
+ fn test_align8_deref_mut() {
+ let mut a = Align8::new(10);
+ *a = 20; // Test DerefMut
+ assert_eq!(a.value, 20);
+ }
+
+ #[test]
+ fn test_align8_padding_is_zero() {
+ let a = Align8::new(123);
+ // Padding should always be 0
+ assert_eq!(a._pad, 0);
+ }
+
+ // ===== Response Tests =====
+
+ #[test]
+ fn test_response_ok_creation() {
+ let data = b"test data".to_vec();
+ let resp = Response::ok(data.clone());
+
+ assert_eq!(resp.error_code, 0);
+ assert_eq!(resp.data, data);
+ }
+
+ #[test]
+ fn test_response_err_creation() {
+ let resp = Response::err(-5); // ERRNO like EIO
+
+ assert_eq!(resp.error_code, -5);
+ assert!(resp.data.is_empty());
+ }
+
+ #[test]
+ fn test_response_with_error_and_data() {
+ let data = b"error details".to_vec();
+ let resp = Response::with_error(-22, data.clone()); // EINVAL
+
+ assert_eq!(resp.error_code, -22);
+ assert_eq!(resp.data, data);
+ }
+
+ #[test]
+ fn test_response_error_codes() {
+ // Test various errno values
+ let test_cases = vec![
+ (0, "success"),
+ (-1, "EPERM"),
+ (-2, "ENOENT"),
+ (-13, "EACCES"),
+ (-22, "EINVAL"),
+ ];
+
+ for (code, _name) in test_cases {
+ let resp = Response::err(code);
+ assert_eq!(resp.error_code, code);
+ }
+ }
+
+ // ===== Request Tests =====
+
+ #[test]
+ fn test_request_creation() {
+ let req = Request {
+ msg_id: 100,
+ data: b"payload".to_vec(),
+ is_read_only: false,
+ conn_id: 12345,
+ uid: 0,
+ gid: 0,
+ pid: 999,
+ };
+
+ assert_eq!(req.msg_id, 100);
+ assert_eq!(req.data, b"payload");
+ assert!(!req.is_read_only);
+ assert_eq!(req.conn_id, 12345);
+ assert_eq!(req.uid, 0);
+ assert_eq!(req.gid, 0);
+ assert_eq!(req.pid, 999);
+ }
+
+ #[test]
+ fn test_request_read_only_flag() {
+ let req_ro = Request {
+ msg_id: 1,
+ data: vec![],
+ is_read_only: true,
+ conn_id: 1,
+ uid: 33,
+ gid: 33,
+ pid: 1000,
+ };
+
+ let req_rw = Request {
+ msg_id: 1,
+ data: vec![],
+ is_read_only: false,
+ conn_id: 2,
+ uid: 0,
+ gid: 0,
+ pid: 1001,
+ };
+
+ assert!(req_ro.is_read_only);
+ assert!(!req_rw.is_read_only);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs
new file mode 100644
index 00000000..96dd192b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs
@@ -0,0 +1,1158 @@
+/// Lock-free ring buffer implementation compatible with libqb's shared memory IPC
+///
+/// This module implements a SPSC (single-producer single-consumer) ring buffer
+/// using shared memory, matching libqb's wire protocol and memory layout.
+///
+/// ## Design
+///
+/// - **Shared Memory**: Two mmap'd files (header + data) in /dev/shm
+/// - **Lock-Free**: Uses atomic operations for read_pt/write_pt synchronization
+/// - **Chunk-Based**: Messages stored as [size][magic][data] chunks
+/// - **Wire-Compatible**: Matches libqb's qb_ringbuffer_shared_s layout
+use anyhow::{Context, Result};
+use memmap2::MmapMut;
+use std::fs::OpenOptions;
+use std::os::fd::AsRawFd;
+use std::path::Path;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
+use tokio::sync::Notify;
+
+/// Circular mmap wrapper for ring buffer data
+///
+/// This struct manages a circular memory mapping where the same file is mapped
+/// twice in consecutive virtual addresses. This allows ring buffer operations
+/// to wrap around naturally without modulo arithmetic.
+///
+/// Matches libqb's qb_sys_circular_mmap() behavior.
+struct CircularMmap {
+ /// Starting address of the 2x circular mapping
+ addr: *mut libc::c_void,
+ /// Size of the file (virtual mapping is 2x this size)
+ size: usize,
+}
+
+impl CircularMmap {
+ /// Create a circular mmap from a file descriptor
+ ///
+ /// Maps the file TWICE in consecutive virtual addresses, allowing ring buffer
+ /// wraparound without modulo arithmetic. Matches libqb's qb_sys_circular_mmap().
+ ///
+ /// # Arguments
+ /// - `fd`: File descriptor of the data file (must be sized to `size` bytes)
+ /// - `size`: Size of the file in bytes (virtual mapping will be 2x this)
+ ///
+ /// # Safety
+ /// The file must be properly sized before calling this function.
+ unsafe fn new(fd: i32, size: usize) -> Result<Self> {
+ // SAFETY: All operations in this function are inherently unsafe as they
+ // manipulate raw memory mappings. The caller must ensure the fd is valid
+ // and the file is properly sized.
+ unsafe {
+ // Step 1: Reserve 2x space with anonymous mmap
+ let addr_orig = libc::mmap(
+ std::ptr::null_mut(),
+ size * 2,
+ libc::PROT_NONE,
+ libc::MAP_ANONYMOUS | libc::MAP_PRIVATE,
+ -1,
+ 0,
+ );
+
+ if addr_orig == libc::MAP_FAILED {
+ anyhow::bail!(
+ "Failed to reserve circular mmap space: {}",
+ std::io::Error::last_os_error()
+ );
+ }
+
+ // Step 2: Map the file at the start of reserved space
+ let addr1 = libc::mmap(
+ addr_orig,
+ size,
+ libc::PROT_READ | libc::PROT_WRITE,
+ libc::MAP_FIXED | libc::MAP_SHARED,
+ fd,
+ 0,
+ );
+
+ if addr1 != addr_orig {
+ libc::munmap(addr_orig, size * 2);
+ anyhow::bail!(
+ "Failed to map first half of circular buffer: {}",
+ std::io::Error::last_os_error()
+ );
+ }
+
+ // Step 3: Map the SAME file again right after
+ let addr_next = (addr_orig as *mut u8).add(size) as *mut libc::c_void;
+ let addr2 = libc::mmap(
+ addr_next,
+ size,
+ libc::PROT_READ | libc::PROT_WRITE,
+ libc::MAP_FIXED | libc::MAP_SHARED,
+ fd,
+ 0,
+ );
+
+ if addr2 != addr_next {
+ libc::munmap(addr_orig, size * 2);
+ anyhow::bail!(
+ "Failed to map second half of circular buffer: {}",
+ std::io::Error::last_os_error()
+ );
+ }
+
+ tracing::debug!(
+ "Created circular mmap: {:p}, {} bytes (2x {} bytes file)",
+ addr_orig,
+ size * 2,
+ size
+ );
+
+ Ok(Self {
+ addr: addr_orig,
+ size,
+ })
+ }
+ }
+
+ /// Get the base address as a mutable pointer to u32
+ ///
+ /// This is the most common use case for ring buffers which work with u32 words.
+ fn as_mut_ptr(&self) -> *mut u32 {
+ self.addr as *mut u32
+ }
+
+ /// Zero-initialize the circular mapping
+ ///
+ /// Only needs to write to the first half due to the circular nature.
+ ///
+ /// # Safety
+ /// The circular mmap must be properly initialized and the address valid.
+ unsafe fn zero_initialize(&mut self) {
+ // SAFETY: Caller ensures the circular mmap is valid and mapped
+ unsafe {
+ std::ptr::write_bytes(self.addr as *mut u8, 0, self.size);
+ }
+ }
+}
+
+impl Drop for CircularMmap {
+ fn drop(&mut self) {
+ // Munmap the 2x circular mapping
+ // Matches libqb's cleanup in qb_rb_close_helper
+ unsafe {
+ libc::munmap(self.addr, self.size * 2);
+ }
+ tracing::debug!(
+ "Unmapped circular buffer: {:p}, {} bytes (2x {} bytes file)",
+ self.addr,
+ self.size * 2,
+ self.size
+ );
+ }
+}
+
+/// Process-shared POSIX semaphore wrapper
+///
+/// This wraps the native Linux sem_t (32 bytes on x86_64) for inter-process
+/// synchronization in the ring buffer.
+///
+/// **libqb compatibility note**: This corresponds to libqb's `rpl_sem_t` type.
+/// On Linux with HAVE_SEM_TIMEDWAIT defined, rpl_sem_t is just an alias for
+/// the native sem_t. The "rpl" prefix stands for "replacement" - libqb provides
+/// a fallback implementation using mutexes/condvars on systems without proper
+/// POSIX semaphore support (like BSD). Since we only target Linux, we use the
+/// native sem_t directly.
+#[repr(C)]
+struct PosixSem {
+ /// Raw sem_t storage (32 bytes on Linux x86_64)
+ _sem: [u8; 32],
+}
+
+impl PosixSem {
+ /// Initialize a POSIX semaphore in-place in shared memory
+ ///
+ /// This initializes the semaphore at its current memory location, which is
+ /// critical for process-shared semaphores in mmap'd memory. The semaphore
+ /// must not be moved after initialization.
+ ///
+ /// The semaphore is always initialized as:
+ /// - **Process-shared** (pshared=1): Shared between processes via mmap
+ /// - **Initial value 0**: No data available initially
+ ///
+ /// Matches libqb's semaphore initialization in `qb_rb_create_from_file`.
+ ///
+ /// # Safety
+ /// The semaphore must remain at its current memory location and must not
+ /// be moved or copied after initialization.
+ unsafe fn init_in_place(&mut self) -> Result<()> {
+ let sem_ptr = self._sem.as_mut_ptr() as *mut libc::sem_t;
+
+ // pshared=1: Process-shared semaphore (for cross-process IPC)
+ // initial_value=0: No data available initially (producers will post)
+ const PSHARED: libc::c_int = 1;
+ const INITIAL_VALUE: libc::c_uint = 0;
+
+ // SAFETY: Caller ensures the semaphore memory is valid and will remain
+ // at this location for its lifetime
+ let ret = unsafe { libc::sem_init(sem_ptr, PSHARED, INITIAL_VALUE) };
+
+ if ret != 0 {
+ anyhow::bail!("sem_init failed: {}", std::io::Error::last_os_error());
+ }
+
+ Ok(())
+ }
+
+ /// Destroy the semaphore
+ ///
+ /// This should be called when the semaphore is no longer needed.
+ /// Matches libqb's rpl_sem_destroy (which is sem_destroy on Linux).
+ ///
+ /// # Safety
+ /// The semaphore must have been properly initialized and no threads should
+ /// be waiting on it.
+ unsafe fn destroy(&mut self) -> Result<()> {
+ let sem_ptr = self._sem.as_mut_ptr() as *mut libc::sem_t;
+
+ // SAFETY: Caller ensures the semaphore is initialized and not in use
+ let ret = unsafe { libc::sem_destroy(sem_ptr) };
+
+ if ret != 0 {
+ anyhow::bail!("sem_destroy failed: {}", std::io::Error::last_os_error());
+ }
+
+ Ok(())
+ }
+
+ /// Post to the semaphore (increment)
+ ///
+ /// Matches libqb's rpl_sem_post (which is sem_post on Linux).
+ unsafe fn post(&self) -> Result<()> {
+ let ret = unsafe { libc::sem_post(self._sem.as_ptr() as *mut libc::sem_t) };
+
+ if ret != 0 {
+ anyhow::bail!("sem_post failed: {}", std::io::Error::last_os_error());
+ }
+
+ Ok(())
+ }
+
+ /// Wait on the semaphore asynchronously (decrement, blocking)
+ ///
+ /// Uses `spawn_blocking` to wait on the semaphore without blocking the tokio
+ /// runtime. This provides true event-driven behavior while maintaining
+ /// compatibility with libqb's semaphore-based notification mechanism.
+ ///
+ /// Matches libqb's `my_posix_sem_timedwait` / `sem_wait` behavior.
+ ///
+ /// # Safety
+ /// The semaphore must be properly initialized and remain valid for the
+ /// duration of the wait operation.
+ async unsafe fn wait(&self) -> Result<()> {
+ // Get raw pointer to semaphore
+ let sem_ptr = self._sem.as_ptr() as *mut libc::sem_t;
+
+ // Convert to usize for safe transfer between threads
+ // This is safe because:
+ // 1. The semaphore is in process-shared memory (mmap'd file)
+ // 2. The memory remains valid for the lifetime of the containing structure
+ // 3. We're only using the pointer on the blocking thread pool
+ let sem_ptr_addr = sem_ptr as usize;
+
+ // Use spawn_blocking to wait on the semaphore without blocking tokio runtime
+ // This offloads the blocking sem_wait to tokio's dedicated blocking thread pool
+ tokio::task::spawn_blocking(move || {
+ // Reconstruct the pointer on the blocking thread
+ // SAFETY: The semaphore is in shared memory and remains valid.
+ // We're calling sem_wait on a process-shared semaphore from a thread
+ // in the same process, which is safe.
+ let sem_ptr = sem_ptr_addr as *mut libc::sem_t;
+ let ret = unsafe { libc::sem_wait(sem_ptr) };
+
+ if ret != 0 {
+ let err = std::io::Error::last_os_error();
+ // Handle EINTR by returning an error that causes retry
+ if err.raw_os_error() == Some(libc::EINTR) {
+ anyhow::bail!("sem_wait interrupted (EINTR), will retry");
+ }
+ anyhow::bail!("sem_wait failed: {err}");
+ }
+
+ Ok(())
+ })
+ .await
+ .context("spawn_blocking task failed")??;
+
+ Ok(())
+ }
+}
+
+/// Shared memory header matching libqb's qb_ringbuffer_shared_s layout
+///
+/// This structure is mmap'd and shared between processes.
+/// Field order and alignment must exactly match libqb for compatibility.
+///
+/// Note: libqb's struct has `char user_data[1]` which contributes 1 byte to sizeof(),
+/// then the struct is padded to 8-byte alignment (7 bytes padding).
+/// Additional shared_user_data_size bytes are allocated beyond sizeof().
+#[repr(C, align(8))]
+struct RingBufferShared {
+ /// Write pointer (word index, not byte offset)
+ write_pt: AtomicU32,
+ /// Read pointer (word index, not byte offset)
+ read_pt: AtomicU32,
+ /// Ring buffer size in words (u32 units)
+ word_size: u32,
+ /// Path to header file
+ hdr_path: [u8; libc::PATH_MAX as usize],
+ /// Path to data file
+ data_path: [u8; libc::PATH_MAX as usize],
+ /// Reference count (for cleanup)
+ ref_count: AtomicU32,
+ /// Process-shared semaphore for notification
+ posix_sem: PosixSem,
+ /// Flexible array member placeholder (matches C's char user_data[1])
+ /// Actual user_data starts here and continues beyond sizeof(RingBufferShared)
+ user_data: [u8; 1],
+ // 7 bytes of padding added by align(8) to reach 8248 bytes total
+}
+
+impl RingBufferShared {
+ /// Chunk header size in 32-bit words (matching libqb)
+ const CHUNK_HEADER_WORDS: usize = 2;
+
+ /// Chunk magic numbers (matching libqb qb_ringbuffer_int.h)
+ const CHUNK_MAGIC: u32 = 0xA1A1A1A1; // Valid allocated chunk
+ const CHUNK_MAGIC_DEAD: u32 = 0xD0D0D0D0; // Reclaimed/dead chunk
+ const CHUNK_MAGIC_ALLOC: u32 = 0xA110CED0; // Chunk being allocated
+
+ /// Calculate the next pointer position after a chunk of given size
+ ///
+ /// This implements libqb's qb_rb_chunk_step logic (ringbuffer.c:464-484):
+ /// 1. Skip chunk header (CHUNK_HEADER_WORDS)
+ /// 2. Skip user data (rounded up to word boundary)
+ /// 3. Wrap around if needed
+ ///
+ /// # Arguments
+ /// - `current_pt`: Current read or write pointer (in words)
+ /// - `data_size_bytes`: Size of the data payload in bytes
+ ///
+ /// # Returns
+ /// New pointer position (in words), wrapped to [0, word_size)
+ fn chunk_step(&self, current_pt: u32, data_size_bytes: usize) -> u32 {
+ let word_size = self.word_size as usize;
+
+ // Convert bytes to words, rounding up to word boundary
+ // This matches libqb's logic:
+ // pointer += (chunk_size / sizeof(uint32_t));
+ // if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) pointer++;
+ let data_words = data_size_bytes.div_ceil(std::mem::size_of::<u32>());
+
+ // Calculate new position: current + header + data (in words)
+ let new_pt = (current_pt as usize + Self::CHUNK_HEADER_WORDS + data_words) % word_size;
+
+ new_pt as u32
+ }
+
+ /// Initialize a RingBufferShared structure in-place in shared memory
+ ///
+ /// This initializes the ring buffer header at its current memory location, which is
+ /// critical for process-shared data structures in mmap'd memory. The structure
+ /// must not be moved after initialization.
+ ///
+ /// # Arguments
+ /// - `word_size`: Size of ring buffer in 32-bit words
+ /// - `hdr_path`: Path to the header file (will be copied into the structure)
+ /// - `data_path`: Path to the data file (will be copied into the structure)
+ ///
+ /// # Safety
+ /// The RingBufferShared must remain at its current memory location and must not
+ /// be moved or copied after initialization.
+ unsafe fn init_in_place(
+ &mut self,
+ word_size: u32,
+ hdr_path: &std::path::Path,
+ data_path: &std::path::Path,
+ ) -> Result<()> {
+ // SAFETY: Caller ensures this structure is in shared memory and will remain
+ // at this location for its lifetime
+ unsafe {
+ // Zero-initialize the entire structure first
+ std::ptr::write_bytes(self as *mut Self, 0, 1);
+
+ // Initialize atomic fields
+ self.write_pt = AtomicU32::new(0);
+ self.read_pt = AtomicU32::new(0);
+ self.word_size = word_size;
+ self.ref_count = AtomicU32::new(1);
+
+ // Initialize semaphore in-place in shared memory
+ // This is critical - the semaphore must be initialized at its final location
+ self.posix_sem
+ .init_in_place()
+ .context("Failed to initialize semaphore")?;
+
+ // Copy header path into structure
+ let hdr_path_str = hdr_path.to_string_lossy();
+ let hdr_path_bytes = hdr_path_str.as_bytes();
+ let len = hdr_path_bytes.len().min(libc::PATH_MAX as usize - 1);
+ self.hdr_path[..len].copy_from_slice(&hdr_path_bytes[..len]);
+
+ // Copy data path into structure
+ let data_path_str = data_path.to_string_lossy();
+ let data_path_bytes = data_path_str.as_bytes();
+ let len = data_path_bytes.len().min(libc::PATH_MAX as usize - 1);
+ self.data_path[..len].copy_from_slice(&data_path_bytes[..len]);
+ }
+
+ Ok(())
+ }
+
+ /// Calculate free space in the ring buffer (in words)
+ ///
+ /// Returns the number of free words (u32 units) available for allocation.
+ /// This uses atomic loads to read the pointers safely.
+ fn space_free_words(&self) -> usize {
+ let write_pt = self.write_pt.load(Ordering::Acquire);
+ let read_pt = self.read_pt.load(Ordering::Acquire);
+ let word_size = self.word_size as usize;
+
+ if write_pt >= read_pt {
+ if write_pt == read_pt {
+ word_size // Buffer is empty, all space available
+ } else {
+ (read_pt as usize + word_size - write_pt as usize) - 1
+ }
+ } else {
+ (read_pt as usize - write_pt as usize) - 1
+ }
+ }
+
+ /// Calculate free space in bytes
+ ///
+ /// Converts the word count to bytes by multiplying by sizeof(uint32_t).
+ /// Matches libqb's qb_rb_space_free (ringbuffer.c:373).
+ fn space_free_bytes(&self) -> usize {
+ self.space_free_words() * std::mem::size_of::<u32>()
+ }
+
+ /// Check if a chunk of given size (in bytes) can fit in the buffer
+ ///
+ /// Includes chunk header overhead and alignment requirements.
+ fn chunk_fits(&self, message_size: usize, chunk_margin: usize) -> bool {
+ let required_bytes = message_size + chunk_margin;
+ self.space_free_bytes() >= required_bytes
+ }
+
+ /// Write a chunk to the ring buffer
+ ///
+ /// This performs the complete chunk write operation:
+ /// 1. Allocate space in the ring buffer
+ /// 2. Write the message data (handling wraparound)
+ /// 3. Commit the chunk (update write_pt, set magic)
+ /// 4. Post to semaphore to wake readers
+ ///
+ /// # Safety
+ /// Caller must ensure:
+ /// - shared_data points to valid ring buffer data
+ /// - There is sufficient space (checked via chunk_fits)
+ /// - No other thread is writing concurrently
+ unsafe fn write_chunk(&self, shared_data: *mut u32, message: &[u8]) -> Result<()> {
+ let msg_len = message.len();
+ let word_size = self.word_size as usize;
+
+ // Get current write pointer
+ let write_pt = self.write_pt.load(Ordering::Acquire);
+
+ // Write chunk header: [size=0][magic=ALLOC]
+ // Matches libqb's qb_rb_chunk_alloc (ringbuffer.c:439-440)
+ unsafe {
+ *shared_data.add(write_pt as usize) = 0; // Size is 0 during allocation
+ *shared_data.add((write_pt as usize + 1) % word_size) = Self::CHUNK_MAGIC_ALLOC;
+ }
+
+ // Write message data
+ let data_offset = (write_pt as usize + Self::CHUNK_HEADER_WORDS) % word_size;
+ let data_ptr = unsafe { shared_data.add(data_offset) as *mut u8 };
+
+ // Handle wraparound - calculate remaining bytes in buffer before wraparound
+ let remaining = (word_size - data_offset) * std::mem::size_of::<u32>();
+ if msg_len <= remaining {
+ // No wraparound needed
+ unsafe {
+ std::ptr::copy_nonoverlapping(message.as_ptr(), data_ptr, msg_len);
+ }
+ } else {
+ // Need to wrap around
+ unsafe {
+ std::ptr::copy_nonoverlapping(message.as_ptr(), data_ptr, remaining);
+ std::ptr::copy_nonoverlapping(
+ message.as_ptr().add(remaining),
+ shared_data as *mut u8,
+ msg_len - remaining,
+ );
+ }
+ }
+
+ // Calculate new write pointer - matches libqb's qb_rb_chunk_step logic
+ let new_write_pt = self.chunk_step(write_pt, msg_len);
+
+ // Commit: write size, update write pointer, then set magic with atomic RELEASE
+ // This matches libqb's qb_rb_chunk_commit behavior (ringbuffer.c:497-504)
+ unsafe {
+ // 1. Write chunk size
+ *shared_data.add(write_pt as usize) = msg_len as u32;
+
+ // 2. Update write pointer
+ self.write_pt.store(new_write_pt, Ordering::Relaxed);
+
+ // 3. Set magic with RELEASE
+ // RELEASE ensures all previous writes (data, size, write_pt) are visible before magic
+ let magic_offset = (write_pt as usize + 1) % word_size;
+ let magic_ptr = shared_data.add(magic_offset) as *mut AtomicU32;
+ (*magic_ptr).store(Self::CHUNK_MAGIC, Ordering::Release);
+
+ // 4. Post to semaphore to wake up waiting readers
+ self.posix_sem
+ .post()
+ .context("Failed to post to semaphore")?;
+ }
+
+ tracing::debug!(
+ "Wrote chunk: {} bytes, write_pt {} -> {}",
+ msg_len,
+ write_pt,
+ new_write_pt
+ );
+
+ Ok(())
+ }
+
+ /// Read a chunk from the ring buffer
+ ///
+ /// This reads the chunk at the current read pointer, validates it,
+ /// copies the data, and reclaims the chunk.
+ ///
+ /// Returns None if the buffer is empty (read_pt == write_pt).
+ ///
+ /// # Safety
+ /// Caller must ensure:
+ /// - shared_data points to valid ring buffer data
+ /// - flow_control_ptr (if Some) points to valid i32
+ /// - No other thread is reading concurrently
+ unsafe fn read_chunk(
+ &self,
+ shared_data: *mut u32,
+ flow_control_ptr: Option<*mut i32>,
+ ) -> Result<Option<Vec<u8>>> {
+ let word_size = self.word_size as usize;
+
+ // Get current read pointer
+ let read_pt = self.read_pt.load(Ordering::Acquire);
+ let write_pt = self.write_pt.load(Ordering::Acquire);
+
+ // Check if buffer is empty
+ if read_pt == write_pt {
+ return Ok(None);
+ }
+
+ // Read chunk header with ACQUIRE to see all writes
+ let magic_offset = (read_pt as usize + 1) % word_size;
+ let magic_ptr = unsafe { shared_data.add(magic_offset) as *const AtomicU32 };
+ let chunk_magic = unsafe { (*magic_ptr).load(Ordering::Acquire) };
+
+ // Read chunk size
+ let chunk_size = unsafe { *shared_data.add(read_pt as usize) };
+
+ tracing::debug!(
+ "Reading chunk: read_pt={}, write_pt={}, size={}, magic=0x{:08x}",
+ read_pt,
+ write_pt,
+ chunk_size,
+ chunk_magic
+ );
+
+ // Verify magic
+ if chunk_magic != Self::CHUNK_MAGIC {
+ anyhow::bail!(
+ "Invalid chunk magic at read_pt={}: expected 0x{:08x}, got 0x{:08x}",
+ read_pt,
+ Self::CHUNK_MAGIC,
+ chunk_magic
+ );
+ }
+
+ // Read message data
+ let data_offset = (read_pt as usize + Self::CHUNK_HEADER_WORDS) % word_size;
+ let data_ptr = unsafe { shared_data.add(data_offset) as *const u8 };
+
+ let mut message = vec![0u8; chunk_size as usize];
+
+ // Handle wraparound - calculate remaining bytes in buffer before wraparound
+ let remaining = (word_size - data_offset) * std::mem::size_of::<u32>();
+ if chunk_size as usize <= remaining {
+ // No wraparound
+ unsafe {
+ std::ptr::copy_nonoverlapping(data_ptr, message.as_mut_ptr(), chunk_size as usize);
+ }
+ } else {
+ // Wraparound
+ unsafe {
+ std::ptr::copy_nonoverlapping(data_ptr, message.as_mut_ptr(), remaining);
+ std::ptr::copy_nonoverlapping(
+ shared_data as *const u8,
+ message.as_mut_ptr().add(remaining),
+ chunk_size as usize - remaining,
+ );
+ }
+ }
+
+ // Reclaim chunk: clear header and update read pointer
+ let new_read_pt = self.chunk_step(read_pt, chunk_size as usize);
+
+ unsafe {
+ // Clear chunk size
+ *shared_data.add(read_pt as usize) = 0;
+
+ // Set magic to DEAD with RELEASE
+ let magic_ptr = shared_data.add(magic_offset) as *mut AtomicU32;
+ (*magic_ptr).store(Self::CHUNK_MAGIC_DEAD, Ordering::Release);
+
+ // Update read_pt
+ self.read_pt.store(new_read_pt, Ordering::Relaxed);
+
+ // Signal flow control - server is ready for next request
+ if let Some(fc_ptr) = flow_control_ptr {
+ let refcount = self.ref_count.load(Ordering::Acquire);
+ if refcount == 2 {
+ let fc_atomic = fc_ptr as *mut AtomicI32;
+ (*fc_atomic).store(0, Ordering::Relaxed);
+ }
+ }
+ }
+
+ Ok(Some(message))
+ }
+}
+
+/// Flow control mechanism for ring buffer backpressure
+///
+/// Implements libqb's flow control protocol for IPC communication.
+/// The server writes flow control values to shared memory, and clients
+/// read these values to determine if they should back off.
+///
+/// Flow control values (matching libqb's rate limiting):
+/// - `OK`: Proceed with sending (QB_IPCS_RATE_NORMAL)
+/// - `SLOW_DOWN`: Approaching capacity, reduce send rate (QB_IPCS_RATE_OFF)
+/// - `STOP`: Queue full, do not send (QB_IPCS_RATE_OFF_2)
+///
+/// ## Disabled Flow Control
+///
+/// When constructed with a null fc_ptr, flow control is disabled and all
+/// operations become no-ops. This matches libqb's behavior for response/event
+/// rings which don't need backpressure signaling.
+///
+/// Matches libqb's qb_ipc_shm_fc_get/qb_ipc_shm_fc_set (ipc_shm.c:176-195)
+pub struct FlowControl {
+ /// Pointer to flow control field in shared memory (i32 atomic)
+ /// Located in shared_user_data area of RingBufferShared
+ /// If null, flow control is disabled (no-op mode)
+ fc_ptr: *mut i32,
+ /// Pointer to shared header for refcount checks
+ /// If null, flow control is disabled (no-op mode)
+ shared_hdr: *mut RingBufferShared,
+}
+
+impl FlowControl {
+ /// OK to send - queue has space (QB_IPCS_RATE_NORMAL)
+ pub const OK: i32 = 0;
+
+ /// Slow down - queue approaching full (QB_IPCS_RATE_OFF)
+ pub const SLOW_DOWN: i32 = 1;
+
+ /// Stop sending - queue full (QB_IPCS_RATE_OFF_2)
+ pub const STOP: i32 = 2;
+
+ /// Create a new FlowControl instance
+ ///
+ /// Pass null pointers to create a disabled (no-op) flow control instance.
+ /// This is used for response/event rings that don't need backpressure.
+ ///
+ /// # Safety
+ /// - If fc_ptr is non-null, it must point to valid shared memory for an i32
+ /// - If shared_hdr is non-null, it must point to valid RingBufferShared
+ /// - Both must remain valid for the lifetime of FlowControl (if non-null)
+ unsafe fn new(fc_ptr: *mut i32, shared_hdr: *mut RingBufferShared) -> Self {
+ // Initialize to 0 if enabled - server is ready for requests
+ // libqb clients check: if (fc > 0 && fc <= fc_enable_max) return EAGAIN
+ // So 0 means "ready to transmit", > 0 means "flow control active/blocked"
+ if !fc_ptr.is_null() {
+ let fc_atomic = fc_ptr as *mut AtomicI32;
+ unsafe {
+ (*fc_atomic).store(0, Ordering::Relaxed);
+ }
+ }
+
+ Self { fc_ptr, shared_hdr }
+ }
+
+ /// Check if flow control is enabled
+ #[inline]
+ fn is_enabled(&self) -> bool {
+ !self.fc_ptr.is_null()
+ }
+
+ /// Get the raw flow control pointer (for internal use)
+ #[inline]
+ fn fc_ptr(&self) -> *mut i32 {
+ self.fc_ptr
+ }
+
+ /// Get flow control value
+ ///
+ /// Matches libqb's qb_ipc_shm_fc_get (ipc_shm.c:185-195).
+ /// Returns:
+ /// - 0: Ready for requests (or flow control disabled)
+ /// - >0: Flow control active (client should retry)
+ /// - <0: Error (not connected)
+ ///
+ /// Note: This method is primarily for libqb clients, not used internally by server
+ #[allow(dead_code)]
+ pub fn get(&self) -> i32 {
+ if !self.is_enabled() {
+ return 0; // Disabled = always ready
+ }
+
+ // Check if both client and server are connected (refcount == 2)
+ let refcount = unsafe { (*self.shared_hdr).ref_count.load(Ordering::Acquire) };
+ if refcount != 2 {
+ return -libc::ENOTCONN;
+ }
+
+ // Read flow control value atomically
+ unsafe {
+ let fc_atomic = self.fc_ptr as *const AtomicI32;
+ (*fc_atomic).load(Ordering::Relaxed)
+ }
+ }
+
+ /// Set flow control value
+ ///
+ /// Matches libqb's qb_ipc_shm_fc_set (ipc_shm.c:176-182).
+ /// - fc_enable = 0: Ready for requests
+ /// - fc_enable > 0: Flow control active (backpressure)
+ ///
+ /// No-op if flow control is disabled.
+ pub fn set(&self, fc_enable: i32) {
+ if !self.is_enabled() {
+ return; // Disabled = no-op
+ }
+
+ tracing::trace!("Setting flow control to {}", fc_enable);
+ unsafe {
+ let fc_atomic = self.fc_ptr as *mut AtomicI32;
+ (*fc_atomic).store(fc_enable, Ordering::Relaxed);
+ }
+ }
+}
+
+// Safety: FlowControl uses atomic operations for synchronization
+unsafe impl Send for FlowControl {}
+unsafe impl Sync for FlowControl {}
+
+/// Ring buffer handle
+///
+/// Owns the mmap'd memory regions and provides async message-passing API.
+pub struct RingBuffer {
+ /// Mmap of shared header
+ _mmap_hdr: MmapMut,
+ /// Circular mmap of shared data (2x virtual mapping)
+ _mmap_data: CircularMmap,
+ /// Pointer to shared header (inside _mmap_hdr)
+ shared_hdr: *mut RingBufferShared,
+ /// Pointer to shared data array (inside _mmap_data)
+ shared_data: *mut u32,
+ /// Flow control mechanism
+ /// Always present, but may be disabled (no-op) for response/event rings
+ pub flow_control: FlowControl,
+ /// Notifier for when data becomes available (for consumers)
+ data_available: Arc<Notify>,
+ /// Notifier for when space becomes available (for producers)
+ space_available: Arc<Notify>,
+ /// Whether this instance created the ring buffer (and thus owns cleanup)
+ /// Matches libqb's QB_RB_FLAG_CREATE flag
+ is_creator: bool,
+}
+
+// Safety: RingBuffer uses atomic operations for synchronization
+unsafe impl Send for RingBuffer {}
+unsafe impl Sync for RingBuffer {}
+
+impl RingBuffer {
+ /// Chunk margin for space calculations (in bytes)
+ /// Matches libqb: sizeof(uint32_t) * (CHUNK_HEADER_WORDS + WORD_ALIGN + CACHE_LINE_WORDS)
+ /// We don't use cache line alignment, so CACHE_LINE_WORDS = 0
+ const CHUNK_MARGIN: usize = 4 * (RingBufferShared::CHUNK_HEADER_WORDS + 1);
+
+ /// Create a new ring buffer in shared memory
+ ///
+ /// Creates two files in `/dev/shm`:
+ /// - `{base_dir}/qb-{name}-header`
+ /// - `{base_dir}/qb-{name}-data`
+ ///
+ /// # Arguments
+ /// - `base_dir`: Directory for shared memory files (typically "/dev/shm")
+ /// - `name`: Ring buffer name
+ /// - `size_bytes`: Size of ring buffer data in bytes
+ /// - `shared_user_data_size`: Extra bytes to allocate after RingBufferShared for flow control
+ ///
+ /// The header file size will be: sizeof(RingBufferShared) + shared_user_data_size
+ /// This matches libqb's behavior: sizeof(qb_ringbuffer_shared_s) + shared_user_data_size
+ pub fn new(
+ base_dir: impl AsRef<Path>,
+ name: &str,
+ size_bytes: usize,
+ shared_user_data_size: usize,
+ ) -> Result<Self> {
+ let base_dir = base_dir.as_ref();
+
+ // Match libqb's size calculation exactly:
+ // 1. Add CHUNK_MARGIN + 1 (13 bytes)
+ // CHUNK_MARGIN = sizeof(uint32_t) * (CHUNK_HEADER_WORDS + WORD_ALIGN + CACHE_LINE_WORDS)
+ // = 4 * (2 + 1 + 0) = 12 bytes (without cache line alignment)
+ let size = size_bytes + Self::CHUNK_MARGIN + 1;
+
+ // 2. Round up to page size (typically 4096)
+ let page_size = 4096; // Standard page size on Linux
+ let real_size = size.div_ceil(page_size) * page_size;
+
+ // 3. Calculate word_size from rounded size
+ let word_size = real_size / 4;
+
+ tracing::info!(
+ "Creating ring buffer '{}': size_bytes={}, real_size={}, word_size={} ({}words = {} bytes)",
+ name,
+ size_bytes,
+ real_size,
+ word_size,
+ word_size,
+ real_size
+ );
+
+ // Create header file
+ let hdr_filename = format!("qb-{name}-header");
+ let hdr_path = base_dir.join(&hdr_filename);
+
+ let hdr_file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(&hdr_path)
+ .context("Failed to create header file")?;
+
+ // Resize to fit RingBufferShared structure + shared_user_data
+ // This matches libqb: sizeof(qb_ringbuffer_shared_s) + shared_user_data_size
+ let hdr_size = std::mem::size_of::<RingBufferShared>() + shared_user_data_size;
+ hdr_file
+ .set_len(hdr_size as u64)
+ .context("Failed to resize header file")?;
+
+ // Mmap header
+ let mut mmap_hdr =
+ unsafe { MmapMut::map_mut(&hdr_file) }.context("Failed to mmap header")?;
+
+ // Create data file path (needed for init_in_place)
+ let data_filename = format!("qb-{name}-data");
+ let data_path = base_dir.join(&data_filename);
+
+ // Initialize shared header
+ let shared_hdr = mmap_hdr.as_mut_ptr() as *mut RingBufferShared;
+
+ unsafe {
+ (*shared_hdr).init_in_place(word_size as u32, &hdr_path, &data_path)?;
+ }
+
+ // Create data file
+ let data_file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(&data_path)
+ .context("Failed to create data file")?;
+
+ // Create data file with real_size (NOT 2x real_size!)
+ // libqb creates the file with real_size, then uses circular mmap to map it TWICE
+ // in consecutive virtual address space. The file itself is only real_size bytes.
+ // During cleanup, libqb unmaps 2*real_size bytes (the circular mmap), but the
+ // file itself remains real_size bytes.
+ data_file
+ .set_len(real_size as u64)
+ .context("Failed to resize data file")?;
+
+ // Create circular mmap - maps the file TWICE in consecutive virtual memory
+ // This matches libqb's qb_sys_circular_mmap implementation
+ let data_fd = data_file.as_raw_fd();
+ let mut mmap_data = unsafe {
+ CircularMmap::new(data_fd, real_size).context("Failed to create circular mmap")?
+ };
+
+ // Zero-initialize the data (only need to zero first half due to circular mapping)
+ unsafe {
+ mmap_data.zero_initialize();
+ }
+
+ let shared_data = mmap_data.as_mut_ptr();
+
+ // Write sentinel value at end of buffer (matches libqb behavior)
+ // This works now because we have circular mmap with 2x virtual space!
+ unsafe {
+ *shared_data.add(word_size) = 5;
+ }
+
+ // Initialize flow control
+ // If shared_user_data_size >= sizeof(i32), flow control is enabled (for request ring)
+ // Otherwise, flow control is disabled (for response/event rings)
+ let flow_control = if shared_user_data_size >= std::mem::size_of::<i32>() {
+ unsafe {
+ // Get pointer to user_data field within the structure
+ // This matches libqb's: return rb->shared_hdr->user_data;
+ let fc_ptr = std::ptr::addr_of_mut!((*shared_hdr).user_data) as *mut i32;
+ FlowControl::new(fc_ptr, shared_hdr)
+ }
+ } else {
+ // Disabled flow control (null pointers = no-op mode)
+ unsafe { FlowControl::new(std::ptr::null_mut(), std::ptr::null_mut()) }
+ };
+
+ Ok(Self {
+ _mmap_hdr: mmap_hdr,
+ _mmap_data: mmap_data,
+ shared_hdr,
+ shared_data,
+ flow_control,
+ data_available: Arc::new(Notify::new()),
+ space_available: Arc::new(Notify::new()),
+ is_creator: true, // This instance created the ring buffer
+ })
+ }
+
+ /// Send a message into the ring buffer (async)
+ ///
+ /// Allocates a chunk, writes the message data, and commits the chunk.
+ /// Awaits if there's insufficient space.
+ pub async fn send(&mut self, message: &[u8]) -> Result<()> {
+ loop {
+ match self.try_send(message) {
+ Ok(()) => {
+ // Notify consumers that data is available
+ self.data_available.notify_one();
+ return Ok(());
+ }
+ Err(e) if e.to_string().contains("Insufficient space") => {
+ // Wait for space to become available
+ self.space_available.notified().await;
+ continue;
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ }
+
+ /// Try to send a message without blocking
+ ///
+ /// Returns an error if there's insufficient space.
+ pub fn try_send(&mut self, message: &[u8]) -> Result<()> {
+ // Check if we have enough space
+ if !unsafe { (*self.shared_hdr).chunk_fits(message.len(), Self::CHUNK_MARGIN) } {
+ let space_free = self.space_free();
+ let required = Self::CHUNK_MARGIN + message.len();
+ anyhow::bail!(
+ "Insufficient space: need {required} bytes, have {space_free} bytes free"
+ );
+ }
+
+ // Write the chunk using RingBufferShared
+ unsafe { (*self.shared_hdr).write_chunk(self.shared_data, message)? };
+
+ Ok(())
+ }
+
+ /// Receive a message from the ring buffer (async)
+ ///
+ /// Awaits if no message is available.
+ /// After processing, the chunk is automatically reclaimed.
+ ///
+ /// ## Implementation Note
+ ///
+ /// libqb uses semaphore-based blocking (sem_timedwait) to wait for data
+ /// (see qb_rb_chunk_peek in libqb/lib/ringbuffer.c).
+ ///
+ /// We use tokio's `spawn_blocking` to wait on the POSIX semaphore without
+ /// blocking the async runtime. This provides true event-driven behavior with
+ /// zero polling overhead, while maintaining compatibility with libqb clients.
+ pub async fn recv(&mut self) -> Result<Vec<u8>> {
+ loop {
+ // Wait on POSIX semaphore asynchronously
+ // This matches libqb's timedwait_fn behavior in qb_rb_chunk_peek
+ // SAFETY: The semaphore is properly initialized in new() and remains
+ // valid for the lifetime of RingBuffer
+ unsafe { (*self.shared_hdr).posix_sem.wait().await? };
+
+ // Semaphore was decremented, data should be available
+ // Read and reclaim the chunk
+ match self.recv_after_semwait()? {
+ Some(data) => {
+ // Notify producers that space is available
+ self.space_available.notify_one();
+ return Ok(data);
+ }
+ None => {
+ // Spurious wakeup or race condition - semaphore was decremented
+ // but no valid data found. This shouldn't happen in normal operation.
+ tracing::warn!("Spurious semaphore wakeup detected, retrying");
+ continue;
+ }
+ }
+ }
+ }
+
+ /// Receive a message after semaphore has been decremented
+ ///
+ /// This is called after `PosixSem::wait()` has successfully decremented
+ /// the semaphore. It reads the chunk data and reclaims the chunk.
+ ///
+ /// Returns `None` if the buffer is empty despite semaphore being decremented
+ /// (which indicates a bug or race condition).
+ fn recv_after_semwait(&mut self) -> Result<Option<Vec<u8>>> {
+ // Get fc_ptr if flow control is enabled, otherwise null
+ let fc_ptr = if self.flow_control.is_enabled() {
+ Some(self.flow_control.fc_ptr())
+ } else {
+ None
+ };
+ unsafe { (*self.shared_hdr).read_chunk(self.shared_data, fc_ptr) }
+ }
+
+ /// Calculate free space in the ring buffer (in bytes)
+ fn space_free(&self) -> usize {
+ unsafe { (*self.shared_hdr).space_free_bytes() }
+ }
+}
+
+impl Drop for RingBuffer {
+ fn drop(&mut self) {
+ // Decrement ref count
+ let ref_count = unsafe { (*self.shared_hdr).ref_count.fetch_sub(1, Ordering::AcqRel) };
+
+ tracing::debug!(
+ "Dropping ring buffer, ref_count: {} -> {}",
+ ref_count,
+ ref_count - 1
+ );
+
+ // If last reference AND we created it, clean up semaphore and files
+ // This matches libqb's behavior: only the creator (QB_RB_FLAG_CREATE) destroys the semaphore
+ if ref_count == 1 && self.is_creator {
+ unsafe {
+ // Destroy the semaphore before cleaning up the mmap
+ // Matches libqb's cleanup in qb_rb_close_helper
+ if let Err(e) = (*self.shared_hdr).posix_sem.destroy() {
+ tracing::warn!("Failed to destroy semaphore: {}", e);
+ }
+
+ let hdr_path =
+ std::ffi::CStr::from_ptr((*self.shared_hdr).hdr_path.as_ptr() as *const i8);
+ let data_path =
+ std::ffi::CStr::from_ptr((*self.shared_hdr).data_path.as_ptr() as *const i8);
+
+ if let Ok(hdr_path_str) = hdr_path.to_str()
+ && !hdr_path_str.is_empty()
+ {
+ let _ = std::fs::remove_file(hdr_path_str);
+ tracing::debug!("Removed header file: {}", hdr_path_str);
+ }
+
+ if let Ok(data_path_str) = data_path.to_str()
+ && !data_path_str.is_empty()
+ {
+ let _ = std::fs::remove_file(data_path_str);
+ tracing::debug!("Removed data file: {}", data_path_str);
+ }
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ async fn test_ringbuffer_basic() -> Result<()> {
+ let temp_dir = tempfile::tempdir()?;
+ let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?;
+
+ // Send a message
+ rb.send(b"hello world").await?;
+
+ // Receive the message
+ let msg = rb.recv().await?;
+ assert_eq!(msg, b"hello world");
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_ringbuffer_multiple_messages() -> Result<()> {
+ let temp_dir = tempfile::tempdir()?;
+ let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?;
+
+ // Send multiple messages
+ rb.send(b"message 1").await?;
+ rb.send(b"message 2").await?;
+ rb.send(b"message 3").await?;
+
+ // Receive in order
+ assert_eq!(rb.recv().await?, b"message 1");
+ assert_eq!(rb.recv().await?, b"message 2");
+ assert_eq!(rb.recv().await?, b"message 3");
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_ringbuffer_nonblocking_send() -> Result<()> {
+ let temp_dir = tempfile::tempdir()?;
+ let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?;
+
+ // Test try_send (non-blocking send) with async recv
+ rb.try_send(b"data")?;
+ let msg = rb.recv().await?;
+ assert_eq!(msg, b"data");
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_ringbuffer_wraparound() -> Result<()> {
+ let temp_dir = tempfile::tempdir()?;
+ let mut rb = RingBuffer::new(temp_dir.path(), "test", 256, 0)?;
+
+ // Fill and drain to force wraparound
+ for _ in 0..10 {
+ rb.send(b"data").await?;
+ rb.recv().await?;
+ }
+
+ // Should still work
+ rb.send(b"after wrap").await?;
+ assert_eq!(rb.recv().await?, b"after wrap");
+
+ Ok(())
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs
new file mode 100644
index 00000000..73d63de0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs
@@ -0,0 +1,278 @@
+/// Main libqb IPC server implementation
+///
+/// This module contains the Server struct and its implementation,
+/// including connection acceptance and server lifecycle management.
+use anyhow::{Context, Result};
+use parking_lot::Mutex;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
+use tokio::net::UnixListener;
+use tokio_util::sync::CancellationToken;
+
+use super::connection::QbConnection;
+use super::handler::Handler;
+use super::socket::bind_abstract_socket;
+
+/// Server-level connection statistics (matches libqb qb_ipcs_stats)
+#[derive(Debug, Default)]
+pub struct ServerStats {
+ /// Number of currently active connections
+ pub active_connections: AtomicUsize,
+ /// Total number of closed connections since server start
+ pub closed_connections: AtomicUsize,
+}
+
+impl ServerStats {
+ fn new() -> Self {
+ Self {
+ active_connections: AtomicUsize::new(0),
+ closed_connections: AtomicUsize::new(0),
+ }
+ }
+
+ /// Increment active connections count (new connection established)
+ fn connection_created(&self) {
+ self.active_connections.fetch_add(1, Ordering::Relaxed);
+ tracing::debug!(
+ active = self.active_connections.load(Ordering::Relaxed),
+ closed = self.closed_connections.load(Ordering::Relaxed),
+ "Connection created"
+ );
+ }
+
+ /// Decrement active, increment closed (connection terminated)
+ fn connection_closed(&self) {
+ self.active_connections.fetch_sub(1, Ordering::Relaxed);
+ self.closed_connections.fetch_add(1, Ordering::Relaxed);
+ tracing::debug!(
+ active = self.active_connections.load(Ordering::Relaxed),
+ closed = self.closed_connections.load(Ordering::Relaxed),
+ "Connection closed"
+ );
+ }
+
+ /// Get current statistics (for monitoring/debugging)
+ pub fn get(&self) -> (usize, usize) {
+ (
+ self.active_connections.load(Ordering::Relaxed),
+ self.closed_connections.load(Ordering::Relaxed),
+ )
+ }
+}
+
+/// libqb-compatible IPC server
+pub struct Server {
+ service_name: String,
+
+ // Setup socket (SOCK_STREAM) - accepts new connections
+ setup_listener: Option<Arc<UnixListener>>,
+
+ // Per-connection state
+ connections: Arc<Mutex<HashMap<u64, QbConnection>>>,
+ next_conn_id: Arc<AtomicU64>,
+
+ // Connection statistics (matches libqb behavior)
+ stats: Arc<ServerStats>,
+
+ // Message handler (trait object, also handles authentication)
+ handler: Arc<dyn Handler>,
+
+ // Cancellation token for graceful shutdown
+ cancellation_token: CancellationToken,
+}
+
+impl Server {
+ /// Create a new libqb-compatible IPC server
+ ///
+ /// Uses Linux abstract Unix sockets for IPC (no filesystem paths needed).
+ ///
+ /// # Arguments
+ /// * `service_name` - Service name (e.g., "pve2"), used as abstract socket name
+ /// * `handler` - Handler implementing the Handler trait (handles both authentication and requests)
+ pub fn new(service_name: &str, handler: impl Handler + 'static) -> Self {
+ Self {
+ service_name: service_name.to_string(),
+ setup_listener: None,
+ connections: Arc::new(Mutex::new(HashMap::new())),
+ next_conn_id: Arc::new(AtomicU64::new(1)),
+ stats: Arc::new(ServerStats::new()),
+ handler: Arc::new(handler),
+ cancellation_token: CancellationToken::new(),
+ }
+ }
+
+ /// Start the IPC server
+ ///
+ /// Creates abstract Unix socket that libqb clients can connect to
+ pub fn start(&mut self) -> Result<()> {
+ tracing::info!(
+ "Starting libqb-compatible IPC server: {}",
+ self.service_name
+ );
+
+ // Create abstract Unix socket (no filesystem paths needed)
+ let std_listener =
+ bind_abstract_socket(&self.service_name).context("Failed to bind abstract socket")?;
+
+ // Convert to tokio listener
+ std_listener.set_nonblocking(true)?;
+ let listener = UnixListener::from_std(std_listener)?;
+
+ tracing::info!("Bound abstract Unix socket: @{}", self.service_name);
+
+ let listener_arc = Arc::new(listener);
+ self.setup_listener = Some(listener_arc.clone());
+
+ // Start connection acceptor task
+ let context = AcceptorContext {
+ listener: listener_arc,
+ service_name: self.service_name.clone(),
+ connections: self.connections.clone(),
+ next_conn_id: self.next_conn_id.clone(),
+ stats: self.stats.clone(),
+ handler: self.handler.clone(),
+ cancellation_token: self.cancellation_token.child_token(),
+ };
+
+ tokio::spawn(async move {
+ context.run().await;
+ });
+
+ tracing::info!("libqb IPC server started: {}", self.service_name);
+ Ok(())
+ }
+
+ /// Stop the IPC server
+ pub fn stop(&mut self) {
+ tracing::info!("Stopping libqb IPC server: {}", self.service_name);
+
+ // Signal all tasks to stop
+ self.cancellation_token.cancel();
+
+ // Close all connections
+ let connections = std::mem::take(&mut *self.connections.lock());
+ let num_connections = connections.len();
+
+ for (_id, conn) in connections {
+ // Clean up ring buffer files
+ for rb_path in &conn.ring_buffer_paths {
+ if let Err(e) = std::fs::remove_file(rb_path) {
+ tracing::debug!(
+ "Failed to remove ring buffer file {} (may already be cleaned up): {}",
+ rb_path.display(),
+ e
+ );
+ }
+ }
+
+ // Update statistics
+ self.stats.connection_closed();
+
+ // Task handles will be aborted when dropped
+ }
+
+ // Final stats
+ if num_connections > 0 {
+ let (active, closed) = self.stats.get();
+ tracing::info!(
+ "Closed {} connections (final stats: active={}, closed={})",
+ num_connections,
+ active,
+ closed
+ );
+ }
+
+ self.setup_listener = None;
+
+ tracing::info!("libqb IPC server stopped");
+ }
+}
+
+impl Drop for Server {
+ fn drop(&mut self) {
+ self.stop();
+ }
+}
+
+/// Context for the connection acceptor task
+///
+/// Bundles all the state needed by the acceptor loop to avoid passing many parameters.
+struct AcceptorContext {
+ listener: Arc<UnixListener>,
+ service_name: String,
+ connections: Arc<Mutex<HashMap<u64, QbConnection>>>,
+ next_conn_id: Arc<AtomicU64>,
+ stats: Arc<ServerStats>,
+ handler: Arc<dyn Handler>,
+ cancellation_token: CancellationToken,
+}
+
+impl AcceptorContext {
+ /// Run the connection acceptor loop
+ ///
+ /// Accepts new connections and spawns handler tasks for each.
+ async fn run(self) {
+ tracing::debug!("libqb IPC connection acceptor started");
+
+ loop {
+ // Accept new connection with cancellation support
+ let accept_result = tokio::select! {
+ _ = self.cancellation_token.cancelled() => {
+ tracing::debug!("Connection acceptor cancelled");
+ break;
+ }
+ result = self.listener.accept() => result,
+ };
+
+ let (stream, _addr) = match accept_result {
+ Ok((stream, addr)) => (stream, addr),
+ Err(e) => {
+ if !self.cancellation_token.is_cancelled() {
+ tracing::error!("Error accepting connection: {}", e);
+ }
+ break;
+ }
+ };
+
+ tracing::debug!("Accepted new setup connection");
+
+ // Handle connection
+ let conn_id = self.next_conn_id.fetch_add(1, Ordering::SeqCst);
+ match QbConnection::accept(
+ stream,
+ conn_id,
+ &self.service_name,
+ self.handler.clone(),
+ self.cancellation_token.child_token(),
+ )
+ .await
+ {
+ Ok(conn) => {
+ self.connections.lock().insert(conn_id, conn);
+ // Update statistics
+ self.stats.connection_created();
+ }
+ Err(e) => {
+ tracing::error!("Failed to accept connection {}: {}", conn_id, e);
+ }
+ }
+ }
+
+ tracing::debug!("libqb IPC connection acceptor finished");
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::protocol::*;
+
+ #[test]
+ fn test_header_sizes() {
+ // Verify C struct compatibility
+ assert_eq!(std::mem::size_of::<RequestHeader>(), 16);
+ assert_eq!(std::mem::align_of::<RequestHeader>(), 8);
+ assert_eq!(std::mem::size_of::<ResponseHeader>(), 24);
+ assert_eq!(std::mem::align_of::<ResponseHeader>(), 8);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs
new file mode 100644
index 00000000..5831b329
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs
@@ -0,0 +1,84 @@
+/// Abstract Unix socket utilities
+///
+/// This module provides functions for working with Linux abstract Unix sockets,
+/// which are used by libqb for IPC communication.
+use anyhow::Result;
+use std::os::unix::io::FromRawFd;
+use std::os::unix::net::UnixListener;
+
+/// Bind to an abstract Unix socket (Linux-specific)
+///
+/// Abstract sockets are identified by a name in the kernel's socket namespace,
+/// not a filesystem path. They are automatically removed when all references are closed.
+///
+/// libqb clients create abstract sockets with FULL 108-byte sun_path (null-padded).
+/// Linux abstract sockets are length-sensitive, so we must match exactly.
+pub(super) fn bind_abstract_socket(name: &str) -> Result<UnixListener> {
+ // Create a Unix socket using libc directly
+ let sock_fd = unsafe { libc::socket(libc::AF_UNIX, libc::SOCK_STREAM, 0) };
+ if sock_fd < 0 {
+ anyhow::bail!(
+ "Failed to create Unix socket: {}",
+ std::io::Error::last_os_error()
+ );
+ }
+
+ // RAII guard to ensure socket is closed on error
+ struct SocketGuard(i32);
+ impl Drop for SocketGuard {
+ fn drop(&mut self) {
+ unsafe { libc::close(self.0) };
+ }
+ }
+ let guard = SocketGuard(sock_fd);
+
+ // Create sockaddr_un with full 108-byte abstract address (matching libqb)
+ // libqb format: sun_path[0] = '\0', sun_path[1..] = "name\0\0..." (null-padded)
+ let mut addr: libc::sockaddr_un = unsafe { std::mem::zeroed() };
+ addr.sun_family = libc::AF_UNIX as libc::sa_family_t;
+
+ // sun_path[0] is already 0 (abstract socket marker)
+ // Copy name starting at sun_path[1]
+ let name_bytes = name.as_bytes();
+ let copy_len = name_bytes.len().min(107); // Leave room for initial \0
+ unsafe {
+ std::ptr::copy_nonoverlapping(
+ name_bytes.as_ptr(),
+ addr.sun_path.as_mut_ptr().offset(1) as *mut u8,
+ copy_len,
+ );
+ }
+
+ // Use FULL sockaddr_un length for libqb compatibility!
+ // libqb clients use the full 110-byte structure (2 + 108) when connecting,
+ // so we MUST bind with the same length. Verified via strace.
+ let addr_len = std::mem::size_of::<libc::sockaddr_un>() as libc::socklen_t;
+ let bind_res = unsafe {
+ libc::bind(
+ sock_fd,
+ &addr as *const _ as *const libc::sockaddr,
+ addr_len,
+ )
+ };
+ if bind_res < 0 {
+ anyhow::bail!(
+ "Failed to bind abstract socket: {}",
+ std::io::Error::last_os_error()
+ );
+ }
+
+ // Set socket to listen mode (backlog = 128)
+ let listen_res = unsafe { libc::listen(sock_fd, 128) };
+ if listen_res < 0 {
+ anyhow::bail!(
+ "Failed to listen on socket: {}",
+ std::io::Error::last_os_error()
+ );
+ }
+
+ // Convert raw fd to UnixListener (takes ownership, forget guard)
+ std::mem::forget(guard);
+ let listener = unsafe { UnixListener::from_raw_fd(sock_fd) };
+
+ Ok(listener)
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs b/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs
new file mode 100644
index 00000000..f8e541b0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs
@@ -0,0 +1,450 @@
+//! Authentication tests for pmxcfs-ipc
+//!
+//! These tests verify that the Handler::authenticate() mechanism works correctly
+//! for different authentication policies.
+//!
+//! Note: These tests use real Unix sockets, so they test authentication behavior
+//! from the server's perspective. The UID/GID will be the test process's credentials,
+//! so we test the Handler logic rather than OS-level credential checking.
+use async_trait::async_trait;
+use pmxcfs_ipc::{Handler, Permissions, Request, Response, Server};
+use pmxcfs_test_utils::wait_for_condition_blocking;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, Ordering};
+use std::thread;
+use std::time::Duration;
+
+/// Helper to create a unique service name for each test
+fn unique_service_name() -> String {
+ static COUNTER: AtomicU32 = AtomicU32::new(0);
+ format!("auth-test-{}", COUNTER.fetch_add(1, Ordering::SeqCst))
+}
+
+/// Helper to connect using the qb_wire_compat FFI client
+/// Returns true if connection succeeded, false if rejected
+fn try_connect(service_name: &str) -> bool {
+ use std::ffi::CString;
+
+ #[repr(C)]
+ struct QbIpccConnection {
+ _private: [u8; 0],
+ }
+
+ #[link(name = "qb")]
+ unsafe extern "C" {
+ fn qb_ipcc_connect(name: *const libc::c_char, max_msg_size: usize)
+ -> *mut QbIpccConnection;
+ fn qb_ipcc_disconnect(conn: *mut QbIpccConnection);
+ }
+
+ let name = CString::new(service_name).expect("Invalid service name");
+ let conn = unsafe { qb_ipcc_connect(name.as_ptr(), 8192) };
+
+ let success = !conn.is_null();
+
+ if success {
+ unsafe { qb_ipcc_disconnect(conn) };
+ }
+
+ success
+}
+
+// ============================================================================
+// Test Handlers with Different Authentication Policies
+// ============================================================================
+
+/// Handler that accepts all connections with read-write access
+struct AcceptAllHandler;
+
+#[async_trait]
+impl Handler for AcceptAllHandler {
+ fn authenticate(&self, _uid: u32, _gid: u32) -> Option<Permissions> {
+ Some(Permissions::ReadWrite)
+ }
+
+ async fn handle(&self, _request: Request) -> Response {
+ Response::ok(b"test".to_vec())
+ }
+}
+
+/// Handler that rejects all connections
+struct RejectAllHandler;
+
+#[async_trait]
+impl Handler for RejectAllHandler {
+ fn authenticate(&self, _uid: u32, _gid: u32) -> Option<Permissions> {
+ None
+ }
+
+ async fn handle(&self, _request: Request) -> Response {
+ Response::ok(b"test".to_vec())
+ }
+}
+
+/// Handler that only accepts root (uid=0)
+struct RootOnlyHandler;
+
+#[async_trait]
+impl Handler for RootOnlyHandler {
+ fn authenticate(&self, uid: u32, _gid: u32) -> Option<Permissions> {
+ if uid == 0 {
+ Some(Permissions::ReadWrite)
+ } else {
+ None
+ }
+ }
+
+ async fn handle(&self, _request: Request) -> Response {
+ Response::ok(b"test".to_vec())
+ }
+}
+
+/// Handler that tracks authentication calls
+struct TrackingHandler {
+ call_count: Arc<AtomicU32>,
+ last_uid: Arc<AtomicU32>,
+ last_gid: Arc<AtomicU32>,
+}
+
+impl TrackingHandler {
+ fn new() -> (Self, Arc<AtomicU32>, Arc<AtomicU32>, Arc<AtomicU32>) {
+ let call_count = Arc::new(AtomicU32::new(0));
+ let last_uid = Arc::new(AtomicU32::new(0));
+ let last_gid = Arc::new(AtomicU32::new(0));
+
+ (
+ Self {
+ call_count: call_count.clone(),
+ last_uid: last_uid.clone(),
+ last_gid: last_gid.clone(),
+ },
+ call_count,
+ last_uid,
+ last_gid,
+ )
+ }
+}
+
+#[async_trait]
+impl Handler for TrackingHandler {
+ fn authenticate(&self, uid: u32, gid: u32) -> Option<Permissions> {
+ self.call_count.fetch_add(1, Ordering::SeqCst);
+ self.last_uid.store(uid, Ordering::SeqCst);
+ self.last_gid.store(gid, Ordering::SeqCst);
+ Some(Permissions::ReadWrite)
+ }
+
+ async fn handle(&self, _request: Request) -> Response {
+ Response::ok(b"test".to_vec())
+ }
+}
+
+/// Handler that grants read-only access to non-root
+struct ReadOnlyForNonRootHandler;
+
+#[async_trait]
+impl Handler for ReadOnlyForNonRootHandler {
+ fn authenticate(&self, uid: u32, _gid: u32) -> Option<Permissions> {
+ if uid == 0 {
+ Some(Permissions::ReadWrite)
+ } else {
+ Some(Permissions::ReadOnly)
+ }
+ }
+
+ async fn handle(&self, request: Request) -> Response {
+ // read_only field is visible to the handler via the connection
+ // For testing purposes, just accept requests
+ Response::ok(format!("handled msg_id {}", request.msg_id).into_bytes())
+ }
+}
+
+// ============================================================================
+// Helper to start server in background thread
+// ============================================================================
+
+fn start_server<H: Handler + 'static>(service_name: String, handler: H) -> thread::JoinHandle<()> {
+ thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
+ rt.block_on(async {
+ let mut server = Server::new(&service_name, handler);
+ server.start().expect("Server startup failed");
+ std::future::pending::<()>().await;
+ });
+ })
+}
+
+/// Wait for server to be ready by checking if socket file exists
+fn wait_for_server_ready(service_name: &str) {
+ // The socket is created in /dev/shm/qb-{service_name}-*
+ // We'll just try to connect repeatedly until successful or timeout
+ assert!(
+ wait_for_condition_blocking(
+ || {
+ // Try a quick connection attempt
+ // For servers that accept connections, this will succeed
+ // For servers that reject, the socket will at least exist
+
+ let socket_pattern = format!("/dev/shm/qb-{service_name}-");
+ // Check if any socket file matching the pattern exists
+ if let Ok(entries) = std::fs::read_dir("/dev/shm") {
+ for entry in entries.flatten() {
+ if let Ok(name) = entry.file_name().into_string()
+ && name.starts_with(&socket_pattern)
+ {
+ return true;
+ }
+ }
+ }
+ false
+ },
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ ),
+ "Server should be ready within 5 seconds"
+ );
+}
+
+// ============================================================================
+// Tests
+// ============================================================================
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_accept_all_handler() {
+ let service_name = unique_service_name();
+ let _server = start_server(service_name.clone(), AcceptAllHandler);
+
+ wait_for_server_ready(&service_name);
+
+ assert!(
+ try_connect(&service_name),
+ "AcceptAllHandler should accept connection"
+ );
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_reject_all_handler() {
+ let service_name = unique_service_name();
+ let _server = start_server(service_name.clone(), RejectAllHandler);
+
+ wait_for_server_ready(&service_name);
+
+ assert!(
+ !try_connect(&service_name),
+ "RejectAllHandler should reject connection"
+ );
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_root_only_handler() {
+ let service_name = unique_service_name();
+ let _server = start_server(service_name.clone(), RootOnlyHandler);
+
+ wait_for_server_ready(&service_name);
+
+ let connected = try_connect(&service_name);
+
+ // Get current uid
+ let current_uid = unsafe { libc::getuid() };
+
+ if current_uid == 0 {
+ assert!(
+ connected,
+ "RootOnlyHandler should accept connection when running as root"
+ );
+ } else {
+ assert!(
+ !connected,
+ "RootOnlyHandler should reject connection when not running as root (uid={current_uid})"
+ );
+ }
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_authentication_called_with_credentials() {
+ let service_name = unique_service_name();
+ let (handler, call_count, last_uid, last_gid) = TrackingHandler::new();
+ let _server = start_server(service_name.clone(), handler);
+
+ wait_for_server_ready(&service_name);
+
+ let current_uid = unsafe { libc::getuid() };
+ let current_gid = unsafe { libc::getgid() };
+
+ assert_eq!(
+ call_count.load(Ordering::SeqCst),
+ 0,
+ "Should not be called yet"
+ );
+
+ let connected = try_connect(&service_name);
+
+ assert!(connected, "TrackingHandler should accept connection");
+ assert_eq!(
+ call_count.load(Ordering::SeqCst),
+ 1,
+ "authenticate() should be called once"
+ );
+ assert_eq!(
+ last_uid.load(Ordering::SeqCst),
+ current_uid,
+ "authenticate() should receive correct uid"
+ );
+ assert_eq!(
+ last_gid.load(Ordering::SeqCst),
+ current_gid,
+ "authenticate() should receive correct gid"
+ );
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_multiple_connections_call_authenticate_each_time() {
+ let service_name = unique_service_name();
+ let (handler, call_count, _, _) = TrackingHandler::new();
+ let _server = start_server(service_name.clone(), handler);
+
+ wait_for_server_ready(&service_name);
+
+ // First connection
+ assert!(try_connect(&service_name));
+ assert_eq!(call_count.load(Ordering::SeqCst), 1);
+
+ // Second connection
+ assert!(try_connect(&service_name));
+ assert_eq!(call_count.load(Ordering::SeqCst), 2);
+
+ // Third connection
+ assert!(try_connect(&service_name));
+ assert_eq!(call_count.load(Ordering::SeqCst), 3);
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_read_only_permissions_accepted() {
+ let service_name = unique_service_name();
+ let _server = start_server(service_name.clone(), ReadOnlyForNonRootHandler);
+
+ wait_for_server_ready(&service_name);
+
+ // Connection should succeed regardless of whether we get ReadOnly or ReadWrite
+ // (both are accepted, just with different permissions)
+ assert!(
+ try_connect(&service_name),
+ "ReadOnlyForNonRootHandler should accept connections with appropriate permissions"
+ );
+}
+
+/// Test that demonstrates the authentication policy is enforced at connection time
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_authentication_enforced_at_connection_time() {
+ // This test verifies that authentication happens during connection setup,
+ // not during request handling
+ let service_name = unique_service_name();
+ let _server = start_server(service_name.clone(), RejectAllHandler);
+
+ wait_for_server_ready(&service_name);
+
+ // Connection should fail immediately, before any request is sent
+ let start = std::time::Instant::now();
+ let connected = try_connect(&service_name);
+ let duration = start.elapsed();
+
+ assert!(!connected, "Connection should be rejected");
+ assert!(
+ duration < Duration::from_millis(100),
+ "Rejection should happen quickly during handshake, not during request processing"
+ );
+}
+
+#[cfg(test)]
+mod policy_examples {
+ use super::*;
+
+ /// Example: Handler that mimics Proxmox VE authentication policy
+ /// - Root (uid=0) gets read-write
+ /// - www-data (uid=33) gets read-only (for web UI)
+ /// - Others are rejected
+ struct ProxmoxStyleHandler;
+
+ #[async_trait]
+ impl Handler for ProxmoxStyleHandler {
+ fn authenticate(&self, uid: u32, _gid: u32) -> Option<Permissions> {
+ match uid {
+ 0 => Some(Permissions::ReadWrite), // root
+ 33 => Some(Permissions::ReadOnly), // www-data
+ _ => None, // reject others
+ }
+ }
+
+ async fn handle(&self, request: Request) -> Response {
+ // In real implementation, would check request.read_only
+ // to enforce read-only restrictions
+ Response::ok(format!("msg_id {}", request.msg_id).into_bytes())
+ }
+ }
+
+ #[test]
+ #[ignore] // Requires libqb-dev
+ fn test_proxmox_style_policy() {
+ let service_name = unique_service_name();
+ let _server = start_server(service_name.clone(), ProxmoxStyleHandler);
+
+ wait_for_server_ready(&service_name);
+
+ let current_uid = unsafe { libc::getuid() };
+ let connected = try_connect(&service_name);
+
+ match current_uid {
+ 0 => assert!(connected, "Root should be accepted"),
+ 33 => assert!(connected, "www-data should be accepted"),
+ _ => assert!(!connected, "Other users should be rejected"),
+ }
+ }
+
+ /// Example: Handler that uses group-based authentication
+ struct GroupBasedHandler {
+ allowed_gid: u32,
+ }
+
+ impl GroupBasedHandler {
+ fn new(allowed_gid: u32) -> Self {
+ Self { allowed_gid }
+ }
+ }
+
+ #[async_trait]
+ impl Handler for GroupBasedHandler {
+ fn authenticate(&self, _uid: u32, gid: u32) -> Option<Permissions> {
+ if gid == self.allowed_gid {
+ Some(Permissions::ReadWrite)
+ } else {
+ None
+ }
+ }
+
+ async fn handle(&self, _request: Request) -> Response {
+ Response::ok(b"ok".to_vec())
+ }
+ }
+
+ #[test]
+ #[ignore] // Requires libqb-dev
+ fn test_group_based_authentication() {
+ let service_name = unique_service_name();
+ let current_gid = unsafe { libc::getgid() };
+ let _server = start_server(service_name.clone(), GroupBasedHandler::new(current_gid));
+
+ wait_for_server_ready(&service_name);
+
+ assert!(
+ try_connect(&service_name),
+ "Should accept connection from same group"
+ );
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs b/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs
new file mode 100644
index 00000000..8c0db962
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs
@@ -0,0 +1,413 @@
+//! Wire protocol compatibility test with libqb C clients
+//!
+//! This integration test verifies that our Rust Server is fully compatible
+//! with real libqb C clients by using libqb's client API via FFI.
+//!
+//! Run with: cargo test --package pmxcfs-ipc --test qb_wire_compat -- --ignored --nocapture
+//!
+//! Requires: libqb-dev installed
+
+use pmxcfs_test_utils::wait_for_condition_blocking;
+use std::ffi::CString;
+use std::thread;
+use std::time::Duration;
+
+// ============================================================================
+// Minimal libqb FFI bindings (client-side only)
+// ============================================================================
+
+/// libqb request header matching C's __attribute__ ((aligned(8)))
+/// Each field is i32 with 8-byte alignment, achieved via explicit padding
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone)]
+struct QbIpcRequestHeader {
+ id: i32, // 4 bytes
+ _pad1: u32, // 4 bytes padding
+ size: i32, // 4 bytes
+ _pad2: u32, // 4 bytes padding
+}
+
+/// libqb response header matching C's __attribute__ ((aligned(8)))
+/// Each field is i32 with 8-byte alignment, achieved via explicit padding
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone)]
+struct QbIpcResponseHeader {
+ id: i32, // 4 bytes
+ _pad1: u32, // 4 bytes padding
+ size: i32, // 4 bytes
+ _pad2: u32, // 4 bytes padding
+ error: i32, // 4 bytes
+ _pad3: u32, // 4 bytes padding
+}
+
+// Opaque type for connection handle
+#[repr(C)]
+struct QbIpccConnection {
+ _private: [u8; 0],
+}
+
+#[link(name = "qb")]
+unsafe extern "C" {
+ /// Connect to a QB IPC service
+ /// Returns NULL on failure
+ fn qb_ipcc_connect(name: *const libc::c_char, max_msg_size: usize) -> *mut QbIpccConnection;
+
+ /// Send request and receive response (with iovec)
+ /// Returns number of bytes received, or negative errno on error
+ fn qb_ipcc_sendv_recv(
+ conn: *mut QbIpccConnection,
+ iov: *const libc::iovec,
+ iov_len: u32,
+ res_buf: *mut libc::c_void,
+ res_buf_size: usize,
+ timeout_ms: i32,
+ ) -> libc::ssize_t;
+
+ /// Disconnect from service
+ fn qb_ipcc_disconnect(conn: *mut QbIpccConnection);
+
+ /// Initialize libqb logging
+ fn qb_log_init(name: *const libc::c_char, facility: i32, priority: i32);
+
+ /// Control log targets
+ fn qb_log_ctl(target: i32, conf: i32, arg: i32) -> i32;
+
+ /// Filter control
+ fn qb_log_filter_ctl(
+ target: i32,
+ op: i32,
+ type_: i32,
+ text: *const libc::c_char,
+ priority: i32,
+ ) -> i32;
+}
+
+// Log targets
+const QB_LOG_STDERR: i32 = 2;
+
+// Log control operations
+const QB_LOG_CONF_ENABLED: i32 = 1;
+
+// Log filter operations
+const QB_LOG_FILTER_ADD: i32 = 0;
+const QB_LOG_FILTER_FILE: i32 = 1;
+
+// Log levels (from syslog.h)
+const LOG_TRACE: i32 = 8; // LOG_DEBUG + 1
+
+// ============================================================================
+// Safe Rust wrapper around libqb client
+// ============================================================================
+
+struct QbIpcClient {
+ conn: *mut QbIpccConnection,
+}
+
+impl QbIpcClient {
+ fn connect(service_name: &str, max_msg_size: usize) -> Result<Self, String> {
+ let name = CString::new(service_name).map_err(|e| format!("Invalid service name: {e}"))?;
+
+ let conn = unsafe { qb_ipcc_connect(name.as_ptr(), max_msg_size) };
+
+ if conn.is_null() {
+ let errno = unsafe { *libc::__errno_location() };
+ let error_str = unsafe {
+ let err_ptr = libc::strerror(errno);
+ std::ffi::CStr::from_ptr(err_ptr)
+ .to_string_lossy()
+ .to_string()
+ };
+ Err(format!(
+ "qb_ipcc_connect returned NULL (errno={errno}: {error_str})"
+ ))
+ } else {
+ Ok(Self { conn })
+ }
+ }
+
+ fn send_recv(
+ &self,
+ request_id: i32,
+ request_data: &[u8],
+ timeout_ms: i32,
+ ) -> Result<(i32, Vec<u8>), String> {
+ // Build request
+ let req_header = QbIpcRequestHeader {
+ id: request_id,
+ _pad1: 0,
+ size: (std::mem::size_of::<QbIpcRequestHeader>() + request_data.len()) as i32,
+ _pad2: 0,
+ };
+
+ // Setup iovec
+ let mut iov = vec![libc::iovec {
+ iov_base: &req_header as *const _ as *mut libc::c_void,
+ iov_len: std::mem::size_of::<QbIpcRequestHeader>(),
+ }];
+
+ if !request_data.is_empty() {
+ iov.push(libc::iovec {
+ iov_base: request_data.as_ptr() as *mut libc::c_void,
+ iov_len: request_data.len(),
+ });
+ }
+
+ // Response buffer
+ const MAX_RESPONSE: usize = 8192 * 128;
+ let mut resp_buf = vec![0u8; MAX_RESPONSE];
+
+ // Send and receive
+ let result = unsafe {
+ qb_ipcc_sendv_recv(
+ self.conn,
+ iov.as_ptr(),
+ iov.len() as u32,
+ resp_buf.as_mut_ptr() as *mut libc::c_void,
+ resp_buf.len(),
+ timeout_ms,
+ )
+ };
+
+ if result < 0 {
+ return Err(format!("qb_ipcc_sendv_recv failed: {}", -result));
+ }
+
+ let bytes_received = result as usize;
+
+ // Parse response header
+ if bytes_received < std::mem::size_of::<QbIpcResponseHeader>() {
+ return Err("Response too short".to_string());
+ }
+
+ let resp_header = unsafe { *(resp_buf.as_ptr() as *const QbIpcResponseHeader) };
+
+ // Verify response ID matches request
+ if resp_header.id != request_id {
+ return Err(format!(
+ "Response ID mismatch: expected {}, got {}",
+ request_id, resp_header.id
+ ));
+ }
+
+ // Extract data
+ let data_start = std::mem::size_of::<QbIpcResponseHeader>();
+ let data = resp_buf[data_start..bytes_received].to_vec();
+
+ Ok((resp_header.error, data))
+ }
+}
+
+impl Drop for QbIpcClient {
+ fn drop(&mut self) {
+ unsafe {
+ qb_ipcc_disconnect(self.conn);
+ }
+ }
+}
+
+// ============================================================================
+// Integration Test
+// ============================================================================
+
+#[test]
+#[ignore] // Run with: cargo test -- --ignored
+fn test_libqb_wire_protocol_compatibility() {
+ eprintln!("🧪 Starting wire protocol compatibility test");
+
+ // Check if libqb is available
+ eprintln!("🔍 Checking if libqb is available...");
+ if !check_libqb_available() {
+ eprintln!("⏭️ SKIP: libqb not installed");
+ eprintln!(" Install with: sudo apt-get install libqb-dev");
+ return;
+ }
+ eprintln!("✓ libqb is available");
+
+ // Start test server
+ eprintln!("🚀 Starting test server...");
+ let server_handle = start_test_server();
+ eprintln!("✓ Server thread spawned");
+
+ // Wait for server to be ready
+ eprintln!("⏳ Waiting for server initialization...");
+ wait_for_server_ready("pve2");
+ eprintln!("✓ Server is ready");
+
+ // Run tests
+ eprintln!("🧪 Running client tests...");
+ let test_result = run_client_tests();
+
+ // Cleanup
+ drop(server_handle);
+
+ // Assert results
+ assert!(
+ test_result.is_ok(),
+ "Client tests failed: {:?}",
+ test_result.err()
+ );
+}
+
+fn check_libqb_available() -> bool {
+ std::process::Command::new("pkg-config")
+ .args(["--exists", "libqb"])
+ .status()
+ .map(|s| s.success())
+ .unwrap_or(false)
+}
+
+fn start_test_server() -> thread::JoinHandle<()> {
+ use async_trait::async_trait;
+ use pmxcfs_ipc::{Handler, Request, Response, Server};
+
+ // Create test handler
+ struct TestHandler;
+
+ #[async_trait]
+ impl Handler for TestHandler {
+ fn authenticate(&self, _uid: u32, _gid: u32) -> Option<pmxcfs_ipc::Permissions> {
+ // Accept all connections with read-write access for testing
+ Some(pmxcfs_ipc::Permissions::ReadWrite)
+ }
+
+ async fn handle(&self, request: Request) -> Response {
+ match request.msg_id {
+ 1 => {
+ // CFS_IPC_GET_FS_VERSION
+ let response_str = r#"{"version":1,"protocol":1}"#;
+ Response::ok(response_str.as_bytes().to_vec())
+ }
+ 2 => {
+ // CFS_IPC_GET_CLUSTER_INFO
+ let response_str = r#"{"nodes":[],"quorate":false}"#;
+ Response::ok(response_str.as_bytes().to_vec())
+ }
+ 3 => {
+ // CFS_IPC_GET_GUEST_LIST
+ let response_str = r#"{"data":[]}"#;
+ Response::ok(response_str.as_bytes().to_vec())
+ }
+ _ => Response::err(-libc::EINVAL),
+ }
+ }
+ }
+
+ // Spawn server thread with tokio runtime
+ thread::spawn(move || {
+ // Initialize tracing for server (WARN level - silent on success)
+ tracing_subscriber::fmt()
+ .with_max_level(tracing::Level::WARN)
+ .with_target(false)
+ .init();
+
+ // Create tokio runtime for async server
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
+
+ rt.block_on(async {
+ let mut server = Server::new("pve2", TestHandler);
+
+ // Server uses abstract Unix socket (Linux-specific)
+ if let Err(e) = server.start() {
+ eprintln!("Server startup failed: {e}");
+ eprintln!("Error details: {e:?}");
+ panic!("Server startup failed");
+ }
+
+ // Give tokio a chance to start the acceptor task
+ tokio::task::yield_now().await;
+
+ // Block forever to keep server alive
+ std::future::pending::<()>().await;
+ });
+ })
+}
+
+/// Wait for server to be ready by checking if socket file exists
+fn wait_for_server_ready(service_name: &str) {
+ assert!(
+ wait_for_condition_blocking(
+ || {
+ // Check if socket file exists in /dev/shm
+ let socket_pattern = format!("/dev/shm/qb-{service_name}-");
+ if let Ok(entries) = std::fs::read_dir("/dev/shm") {
+ for entry in entries.flatten() {
+ if let Ok(name) = entry.file_name().into_string()
+ && name.starts_with(&socket_pattern)
+ {
+ return true;
+ }
+ }
+ }
+ false
+ },
+ Duration::from_secs(5),
+ Duration::from_millis(10),
+ ),
+ "Server should be ready within 5 seconds"
+ );
+}
+
+fn run_client_tests() -> Result<(), String> {
+ // Enable libqb debug logging to see what's happening
+ eprintln!("🔧 Enabling libqb debug logging...");
+ unsafe {
+ let name = CString::new("qb_test").unwrap();
+ qb_log_init(name.as_ptr(), libc::LOG_USER, LOG_TRACE);
+ qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, 1);
+ // Enable all log messages from all files at TRACE level
+ let all_files = CString::new("*").unwrap();
+ qb_log_filter_ctl(
+ QB_LOG_STDERR,
+ QB_LOG_FILTER_ADD,
+ QB_LOG_FILTER_FILE,
+ all_files.as_ptr(),
+ LOG_TRACE,
+ );
+ }
+ eprintln!("✓ libqb logging enabled (TRACE level)");
+
+ eprintln!("📡 Connecting to server...");
+ // Connect to abstract socket "pve2"
+ // Use a very large buffer size to rule out space issues
+ let client = QbIpcClient::connect("pve2", 8192 * 1024)?; // 8MB instead of 1MB
+ eprintln!("✓ Connected successfully");
+
+ eprintln!("🧪 Test 1: GET_FS_VERSION");
+ // Test 1: GET_FS_VERSION
+ let (error, data) = client.send_recv(1, &[], 5000)?;
+ eprintln!("✓ Got response: error={}, data_len={}", error, data.len());
+ if error == 0 {
+ let response = String::from_utf8_lossy(&data);
+ eprintln!(" Response: {response}");
+ assert!(
+ response.contains("version"),
+ "Response should contain version field"
+ );
+ }
+
+ eprintln!("🧪 Test 2: GET_CLUSTER_INFO");
+ // Test 2: GET_CLUSTER_INFO
+ let (error, data) = client.send_recv(2, &[], 5000)?;
+ eprintln!("✓ Got response: error={}, data_len={}", error, data.len());
+ if error == 0 {
+ let response = String::from_utf8_lossy(&data);
+ eprintln!(" Response: {response}");
+ assert!(
+ response.contains("nodes"),
+ "Response should contain nodes field"
+ );
+ }
+
+ eprintln!("🧪 Test 3: Request with data payload");
+ // Test 3: Request with data payload
+ let test_payload = b"test_payload_data";
+ let (_error, _data) = client.send_recv(1, test_payload, 5000)?;
+ eprintln!("✓ Request with payload succeeded");
+
+ eprintln!("🧪 Test 4: GET_GUEST_LIST");
+ // Test 4: GET_GUEST_LIST
+ let (_error, _data) = client.send_recv(3, &[], 5000)?;
+ eprintln!("✓ GET_GUEST_LIST succeeded");
+
+ Ok(())
+}
--
2.47.3
_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel
next prev parent reply other threads:[~2026-01-07 9:15 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-01-06 14:24 [pve-devel] [PATCH pve-cluster 00/15 v1] Rewrite pmxcfs with Rust Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 01/15] pmxcfs-rs: add workspace and pmxcfs-api-types crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 02/15] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 03/15] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 04/15] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 05/15] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 06/15] pmxcfs-rs: add pmxcfs-status crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 07/15] pmxcfs-rs: add pmxcfs-test-utils infrastructure crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 08/15] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-01-06 14:24 ` Kefu Chai [this message]
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 10/15] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 11/15] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 13/15] pmxcfs-rs: add integration and workspace tests Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 14/15] pmxcfs-rs: add Makefile for build automation Kefu Chai
2026-01-06 14:24 ` [pve-devel] [PATCH pve-cluster 15/15] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260106142440.2368585-10-k.chai@proxmox.com \
--to=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox