From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 9A3851FF141 for ; Fri, 13 Feb 2026 08:28:34 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 6FFC51EBCF; Fri, 13 Feb 2026 08:29:21 +0100 (CET) Message-ID: <1f1a9473-e84e-462d-aa8b-1d1431084c99@proxmox.com> Date: Thu, 12 Feb 2026 16:21:53 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird From: Samuel Rufinatscha Subject: Re: [pve-devel] [PATCH pve-cluster 09/15] pmxcfs-rs: add pmxcfs-ipc crate To: Proxmox VE development discussion , Kefu Chai References: <20260106142440.2368585-1-k.chai@proxmox.com> <20260106142440.2368585-10-k.chai@proxmox.com> Content-Language: en-US In-Reply-To: <20260106142440.2368585-10-k.chai@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.264 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record X-MailFrom: s.rufinatscha@proxmox.com X-Mailman-Rule-Hits: max-size X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; news-moderation; no-subject; digests; suspicious-header Message-ID-Hash: CPQZTSPDR26BLO7LHWLNPTGVY2VOTPTJ X-Message-ID-Hash: CPQZTSPDR26BLO7LHWLNPTGVY2VOTPTJ X-Mailman-Approved-At: Fri, 13 Feb 2026 08:29:25 +0100 X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Thanks for this IPC implementation, Kefu :) Quite a comprehensive patch. Already a good step in the right direction. Comments inlined. On 1/7/26 10:16 AM, Kefu Chai wrote: > Add libqb-compatible IPC server implementation: > - QB_IPC_SHM protocol (shared memory ring buffers) > - Abstract Unix socket (@pve2) for handshake > - Lock-free SPSC ring buffers > - Authentication via SO_PASSCRED (uid/gid/pid) > - 13 IPC operations (GET_FS_VERSION, GET_CLUSTER_INFO, etc.) > > This is an independent crate with no internal dependencies, > only requiring tokio, nix, and memmap2. It provides wire- > compatible IPC with the C implementation's libqb-based server, > allowing existing clients to work unchanged. > > Includes wire protocol compatibility tests (require root to run). IMO the commit message reads a bit hard (mostly because of the list points). Could you please reword / add a bit of touch to make sure its easier to follow? Applies also to the other patches. Please revisit also the READMEs as noted in the last patch. > > Signed-off-by: Kefu Chai > --- > src/pmxcfs-rs/Cargo.toml | 1 + > src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml | 44 + > src/pmxcfs-rs/pmxcfs-ipc/README.md | 182 +++ > .../pmxcfs-ipc/examples/test_server.rs | 92 ++ > src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs | 657 ++++++++++ > src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs | 93 ++ > src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs | 37 + > src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs | 332 +++++ > src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs | 1158 +++++++++++++++++ > src/pmxcfs-rs/pmxcfs-ipc/src/server.rs | 278 ++++ > src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs | 84 ++ > src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs | 450 +++++++ > .../pmxcfs-ipc/tests/qb_wire_compat.rs | 413 ++++++ > 13 files changed, 3821 insertions(+) > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/README.md > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/server.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs > > diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml > index b00ca68f..f4497d58 100644 > --- a/src/pmxcfs-rs/Cargo.toml > +++ b/src/pmxcfs-rs/Cargo.toml > @@ -9,6 +9,7 @@ members = [ > "pmxcfs-status", # Status monitoring and RRD data management > "pmxcfs-test-utils", # Test utilities and helpers (dev-only) > "pmxcfs-services", # Service framework for automatic retry and lifecycle management > + "pmxcfs-ipc", # libqb-compatible IPC server > ] > resolver = "2" > > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml b/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml > new file mode 100644 > index 00000000..dbee2e9a > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml > @@ -0,0 +1,44 @@ > +[package] > +name = "pmxcfs-ipc" > +description = "libqb-compatible IPC server implementation in pure Rust" > + > +version.workspace = true > +edition.workspace = true > +authors.workspace = true > +license.workspace = true > +repository.workspace = true > + > +[lints] > +workspace = true > + > +# System dependencies: > +# - libqb (runtime) - QB IPC library for client compatibility > +# - libqb-dev (build/test only) - Required to run wire protocol tests > + > +[dependencies] > +# Error handling > +anyhow.workspace = true > + > +# Async runtime > +tokio.workspace = true > +tokio-util.workspace = true > + > +# Concurrency primitives > +parking_lot.workspace = true > + > +# System integration > +libc.workspace = true > +nix.workspace = true > +memmap2 = "0.9" > + > +# Logging > +tracing.workspace = true > + > +# Async trait support > +async-trait.workspace = true > + > +[dev-dependencies] > +pmxcfs-test-utils = { path = "../pmxcfs-test-utils" } > +tempfile.workspace = true > +tokio = { workspace = true, features = ["rt", "macros"] } > +tracing-subscriber.workspace = true > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/README.md b/src/pmxcfs-rs/pmxcfs-ipc/README.md > new file mode 100644 > index 00000000..5b5b98ae > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/README.md > @@ -0,0 +1,182 @@ > +# pmxcfs-ipc: libqb-Compatible IPC Server > + > +**Rust implementation of libqb IPC server for pmxcfs using shared memory ring buffers** > + > +This crate provides a wire-compatible IPC server that works with libqb clients (C `qb_ipcc_*` API) without depending on the libqb C library. > + > +## Table of Contents > + > +- [Overview](#overview) > +- [Architecture](#architecture) > +- [Protocol Implementation](#protocol-implementation) > +- [Usage](#usage) > +- [Testing](#testing) > +- [References](#references) nit: not all READMEs have a table of contents. For consistency I think we should either have it everywhere (if it helps), or simply drop it. > + > +--- > + > +## Overview > + > +pmxcfs uses libqb for IPC communication between the daemon and client tools (`pvecm`, `pvenode`, etc.). This crate implements a server using QB_IPC_SHM (shared memory ring buffers) that is wire-compatible with libqb clients, enabling the Rust pmxcfs implementation to communicate with existing C-based tools. > + > +**Key Features**: > +- Wire-compatible with libqb clients > +- QB_IPC_SHM transport (shared memory ring buffers) > +- Async I/O via tokio > +- Lock-free SPSC ring buffers > +- Supports authentication via uid/gid > +- Per-connection context (uid, gid, pid, read-only flag) > +- Connection statistics tracking > +- Abstract Unix sockets for setup handshake (Linux-specific) > + > +--- > + > +## Architecture > + > +### Transport: QB_IPC_SHM (Shared Memory Ring Buffers) > + > +**Rust pmxcfs uses**: `QB_IPC_SHM` (shared memory ring buffers) > + > +We implemented shared memory transport using lock-free SPSC (single-producer single-consumer) ring buffers. This provides: > + > +- **Wire compatibility**: Same handshake protocol as libqb > +- **Async I/O**: Integration with tokio ecosystem > + > +**Ring Buffer Design**: > +- Each connection has 3 ring buffers: > + 1. **Request ring**: Client writes, server reads > + 2. **Response ring**: Server writes, client reads > + 3. **Event ring**: Server writes, client reads (for async notifications) > +- Ring buffers stored in `/dev/shm` (Linux shared memory) > +- Chunk-based protocol matching libqb > + > +### Server Structure > + > +### Connection Statistics > + > +Tracks statistics for C compatibility (matching `qb_ipcs_stats`). > + > +--- > + > +## Protocol Implementation > + > +### Connection Handshake > + > +Server creates an abstract Unix socket `@pve2` (@ prefix indicates abstract namespace) for initial connection setup. > + > +### Request/Response Communication > + > +After handshake, communication happens via shared memory ring buffers using libqb-compatible chunk format. > + > +### Wire Format Structures > + > +All structures use `#[repr(C, align(8))]` to match C's alignment requirements. > + > +Error codes must be negative errno values (e.g., `-EPERM`, `-EINVAL`) to match libqb convention. > + > +--- > + > +## Testing > + > +Requires Corosync running for integration tests. See `tests/` directory for C client FFI compatibility tests. > + > +## Implementation Status > + > +### Implemented > + > +- Connection handshake (SOCK_STREAM setup socket) > +- Authentication via SO_PASSCRED (uid/gid/pid) > +- QB_IPC_SHM transport (shared memory ring buffers) > +- Lock-free SPSC ring buffers > +- Async I/O via tokio > +- Abstract Unix sockets for setup handshake > +- Message header parsing (request/response) > +- Error code propagation (negative errno) > +- Ring buffer file management (creation/cleanup) > +- Event channel ring buffers (created, not actively used) > +- Connection statistics tracking > +- Disconnect detection > +- Read-only flag based on gid > + > +### Not Implemented > + > +- Event channel message sending (pmxcfs doesn't use events yet) > + > +## Application-Level IPC Operations > + > +### Operation Summary > + > +The following IPC operations are supported (defined in pmxcfs): > + > +| Operation | Request Data | Response Data | Description | > +|-----------|-------------|---------------|-------------| > +| GET_FS_VERSION | Empty | uint32_t version | Get filesystem version number | > +| GET_CLUSTER_INFO | Empty | JSON string | Get cluster information | > +| GET_GUEST_LIST | Empty | JSON array | Get list of all VMs/containers | > +| SET_STATUS | name + data | Empty | Set status key-value pair | > +| GET_STATUS | name | Binary data | Get status value by name | > +| GET_CONFIG | name | File contents | Read configuration file | > +| LOG_CLUSTER_MSG | priority + msg | Empty | Add cluster log entry | > +| GET_CLUSTER_LOG | max_entries | JSON array | Get cluster log entries | > +| GET_RRD_DUMP | Empty | RRD dump text | Get all RRD data | > +| GET_GUEST_CONFIG_PROPERTY | vmid + key | String value | Get single VM config property | > +| GET_GUEST_CONFIG_PROPERTIES | vmid | JSON object | Get all VM config properties | > +| VERIFY_TOKEN | userid + token | Boolean | Verify API token validity | > + > +### Common Clients > + > +The following Proxmox components use the IPC interface: > + > +- **pvestatd**: Updates node/VM/storage metrics (SET_STATUS, GET_STATUS) > +- **pve-ha-crm**: HA cluster resource manager (GET_CLUSTER_INFO, GET_GUEST_LIST) > +- **pve-ha-lrm**: HA local resource manager (GET_CONFIG, LOG_CLUSTER_MSG) > +- **pvecm**: Cluster management CLI (GET_CLUSTER_INFO, GET_CLUSTER_LOG) > +- **pvedaemon**: PVE API daemon (All query operations) > + > +### Permission Model > + > +**Write Operations** (require root): > +- SET_STATUS > +- LOG_CLUSTER_MSG > + > +**Read Operations** (any authenticated user): > +- All GET_* operations > +- VERIFY_TOKEN > + > +--- > + > +## References > + > +### libqb Source > + > +Reference implementation of QB IPC protocol (available at https://github.com/ClusterLabs/libqb): > + > +- `libqb/lib/ringbuffer.c` - Ring buffer implementation > +- `libqb/lib/ipc_shm.c` - Shared memory transport > +- `libqb/lib/ipc_setup.c` - Connection setup/handshake > +- `libqb/include/qb/qbipc_common.h` - Wire protocol structures > + > +### C pmxcfs (pve-cluster) > + > +- `src/pmxcfs/server.c` - C IPC server using libqb > +- `src/pmxcfs/cfs-ipc-ops.h` - pmxcfs IPC operation codes > + > +### Related Documentation > + > +- `../C_COMPATIBILITY.md` - General C compatibility notes (if exists) > + > +--- > + > +## Notes > + > +### Ring Buffer Naming Convention > + > +Ring buffer files are created in `/dev/shm` with names based on connection descriptor and ring type (request/response/event). > + > +### Error Handling > + > +Always use **negative errno values** for errors to maintain compatibility with libqb clients. > + > +### Alignment and Padding > + > +All wire format structures must use `#[repr(C, align(8))]` to ensure 8-byte alignment matching C's requirements. > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs b/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs > new file mode 100644 > index 00000000..6b9695ce > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs > @@ -0,0 +1,92 @@ > +//! Simple test server for debugging libqb connectivity > + > +use async_trait::async_trait; > +use pmxcfs_ipc::{Handler, Permissions, Request, Response, Server}; > + > +/// Example handler implementation > +struct TestHandler; > + > +#[async_trait] > +impl Handler for TestHandler { > + fn authenticate(&self, uid: u32, gid: u32) -> Option { > + // Accept root with read-write access > + if uid == 0 { > + eprintln!("Authenticated uid={uid}, gid={gid} as root (read-write)"); > + return Some(Permissions::ReadWrite); > + } > + > + // Accept all other users with read-only access for testing > + eprintln!("Authenticated uid={uid}, gid={gid} as regular user (read-only)"); > + Some(Permissions::ReadOnly) > + } > + > + async fn handle(&self, request: Request) -> Response { > + eprintln!( > + "Received request: id={}, data_len={}, conn={}, uid={}, gid={}, pid={}, read_only={}", > + request.msg_id, > + request.data.len(), > + request.conn_id, > + request.uid, > + request.gid, > + request.pid, > + request.is_read_only > + ); > + > + match request.msg_id { > + 1 => { > + // CFS_IPC_GET_FS_VERSION > + let response_str = r#"{"version":1,"protocol":1}"#; > + eprintln!("Responding with: {response_str}"); > + Response::ok(response_str.as_bytes().to_vec()) > + } > + 2 => { > + // CFS_IPC_GET_CLUSTER_INFO > + let response_str = r#"{"nodes":["node1","node2"],"quorate":true}"#; > + eprintln!("Responding with: {response_str}"); > + Response::ok(response_str.as_bytes().to_vec()) > + } > + 3 => { > + // CFS_IPC_GET_GUEST_LIST > + let response_str = r#"{"data":[{"vmid":100}]}"#; > + eprintln!("Responding with: {response_str}"); > + Response::ok(response_str.as_bytes().to_vec()) > + } > + _ => { > + eprintln!("Unknown message id: {}", request.msg_id); > + Response::err(-libc::EINVAL) > + } > + } > + } > +} > + > +#[tokio::main] > +async fn main() { > + // Initialize tracing > + tracing_subscriber::fmt() > + .with_max_level(tracing::Level::DEBUG) > + .with_target(true) > + .init(); > + > + println!("Starting QB IPC test server on 'pve2'..."); > + > + // Create handler and server > + let handler = TestHandler; > + let mut server = Server::new("pve2", handler); > + > + println!("Server created, starting..."); > + > + if let Err(e) = server.start() { > + eprintln!("Failed to start server: {e}"); > + std::process::exit(1); > + } > + > + println!("Server started successfully!"); > + println!("Waiting for connections..."); > + > + // Keep server running > + tokio::signal::ctrl_c() > + .await > + .expect("Failed to wait for Ctrl-C"); > + > + println!("Shutting down..."); > +} > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs > new file mode 100644 > index 00000000..d6d77e6c > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs > @@ -0,0 +1,657 @@ > +/// Per-connection handling for libqb IPC with shared memory ring buffers > +/// > +/// This module contains all connection-specific logic including connection > +/// establishment, authentication, request handling, and shared memory ring buffer management. > +use anyhow::{Context, Result}; > +use std::os::unix::io::AsRawFd; > +use std::path::PathBuf; > +use std::sync::Arc; > +use tokio::io::{AsyncReadExt, AsyncWriteExt}; > +use tokio::net::UnixStream; > +use tokio_util::sync::CancellationToken; > + > +use super::handler::{Handler, Permissions}; > +use super::protocol::*; > +use super::ringbuffer::{FlowControl, RingBuffer}; > + > +/// Per-connection state using shared memory ring buffers > +/// > +/// Uses SHM transport (shared memory ring buffers). > +#[allow(dead_code)] // Fields are intentionally stored for lifecycle management > +pub(super) struct QbConnection { > + /// Connection ID for logging and debugging > + conn_id: u64, > + > + /// Client process ID (from SO_PEERCRED) > + pid: u32, > + > + /// Client user ID (from SO_PEERCRED) > + uid: u32, > + > + /// Client group ID (from SO_PEERCRED) > + gid: u32, > + > + /// Whether this connection has read-only access (determined by Handler::authenticate) > + pub(super) read_only: bool, > + > + /// Setup socket (kept open for disconnect detection) > + _setup_stream: UnixStream, > + > + /// Ring buffers for shared memory IPC > + /// Request ring: client writes, server reads > + request_rb: Option, > + /// Response ring: server writes, client reads > + response_rb: Option, > + /// Event ring: server writes, client reads (for async notifications) > + /// NOTE: The existing PVE/IPCC.xs Perl client only uses qb_ipcc_sendv_recv() > + /// and never calls qb_ipcc_event_recv(), so this ring buffer is created > + /// for libqb compatibility but remains unused in practice. > + _event_rb: Option, > + > + /// Paths to ring buffer data files (for debugging/cleanup) > + pub(super) ring_buffer_paths: Vec, > + > + /// Task handle for request handler (auto-aborted on drop) > + pub(super) task_handle: Option>, The comment says "auto-aborted on drop which is not correct. Tokio detaches the task, it keeps running in the background. Please handle explicit abort. Also the "task handles will be aborted when drop" in the server stop() is wrong for the same reason, please re-visit. > +} > + > +impl QbConnection { > + /// Accept a new connection from the setup socket > + /// > + /// Performs authentication, creates ring buffers, spawns request handler task, > + /// and returns the connection object. > + pub(super) async fn accept( > + mut stream: UnixStream, > + conn_id: u64, > + service_name: &str, > + handler: Arc, > + cancellation_token: CancellationToken, > + ) -> Result { > + // Read connection request > + let fd = stream.as_raw_fd(); > + let mut req_bytes = vec![0u8; std::mem::size_of::()]; > + stream > + .read_exact(&mut req_bytes) > + .await > + .context("Failed to read connection request")?; > + > + tracing::debug!( > + "Connection request raw bytes ({} bytes): {:02x?}", > + req_bytes.len(), > + req_bytes > + ); > + > + let req = > + unsafe { std::ptr::read_unaligned(req_bytes.as_ptr() as *const ConnectionRequest) }; Can we please validate the handshake / the sent data? > + > + tracing::debug!( > + "Connection request: id={}, size={}, max_msg_size={}", > + *req.hdr.id, > + *req.hdr.size, > + req.max_msg_size > + ); > + > + // Get peer credentials (SO_PEERCRED on Linux) > + let (uid, gid, pid) = get_peer_credentials(fd)?; > + > + // Authenticate using Handler trait > + let read_only = match handler.authenticate(uid, gid) { > + Some(Permissions::ReadWrite) => { > + tracing::info!(pid, uid, gid, "Connection accepted with read-write access"); > + false > + } > + Some(Permissions::ReadOnly) => { > + tracing::info!(pid, uid, gid, "Connection accepted with read-only access"); > + true > + } > + None => { > + tracing::warn!( > + pid, > + uid, > + gid, > + "Connection rejected by authentication policy" > + ); > + send_connection_response(&mut stream, -libc::EPERM, conn_id, 0, "", "", "").await?; > + anyhow::bail!("Connection authentication failed"); > + } > + }; > + > + // Create connection descriptor for ring buffer naming > + let conn_desc = format!("{}-{}-{}", std::process::id(), pid, conn_id); > + let max_msg_size = req.max_msg_size.max(8192); Please clamp this to a reasonable server side maximum. > + > + // Create ring buffers in /dev/shm > + // Pass max_msg_size directly - RingBuffer::new() will add QB_RB_CHUNK_MARGIN and round up > + // (just like qb_rb_open() does on the client side) > + let ring_size = max_msg_size as usize; > + > + tracing::debug!( > + "Creating ring buffers for connection {}: size={} bytes", > + conn_id, > + ring_size > + ); > + > + // Request ring: client writes, server reads > + // Request ring needs sizeof(int32_t) for flow control (shared_user_data) > + let request_rb_name = format!("{conn_desc}-{service_name}-request"); > + let request_rb = RingBuffer::new( > + "/dev/shm", > + &request_rb_name, > + ring_size, > + std::mem::size_of::(), > + ) > + .context("Failed to create request ring buffer")?; > + > + // Response ring: server writes, client reads > + // Response ring doesn't need shared_user_data > + let response_rb_name = format!("{conn_desc}-{service_name}-response"); > + let response_rb = RingBuffer::new("/dev/shm", &response_rb_name, ring_size, 0) > + .context("Failed to create response ring buffer")?; > + > + // Event ring: server writes, client reads (for async notifications) > + // Event ring doesn't need shared_user_data > + let event_rb_name = format!("{conn_desc}-{service_name}-event"); > + let event_rb = RingBuffer::new("/dev/shm", &event_rb_name, ring_size, 0) > + .context("Failed to create event ring buffer")?; > + > + // Collect full paths for cleanup tracking > + let request_data_path = PathBuf::from(format!("/dev/shm/qb-{request_rb_name}-data")); > + let response_data_path = PathBuf::from(format!("/dev/shm/qb-{response_rb_name}-data")); > + let event_data_path = PathBuf::from(format!("/dev/shm/qb-{event_rb_name}-data")); request, response and event headers should be tracked too > + > + // Send connection response with ring buffer BASE NAMES (not full paths) > + // libqb client expects base names (e.g., "123-456-1-pve2-request") > + // It will internally prepend "/dev/shm/qb-" and append "-header" or "-data" > + send_connection_response( > + &mut stream, > + 0, > + conn_id, > + max_msg_size, > + &request_rb_name, > + &response_rb_name, > + &event_rb_name, > + ) > + .await?; > + > + // Spawn request handler task > + let handler_for_task = handler.clone(); > + let cancellation_for_task = cancellation_token.child_token(); > + > + let task_handle = tokio::spawn(async move { > + Self::handle_requests( > + request_rb, > + response_rb, > + handler_for_task, > + cancellation_for_task, > + conn_id, > + uid, > + gid, > + pid, > + read_only, > + ) > + .await; > + }); > + > + tracing::info!("Connection {} established (SHM transport)", conn_id); > + > + Ok(Self { > + conn_id, > + pid, > + uid, > + gid, > + read_only, > + _setup_stream: stream, > + request_rb: None, // Moved to task > + response_rb: None, // Moved to task > + _event_rb: Some(event_rb), > + ring_buffer_paths: vec![request_data_path, response_data_path, event_data_path], > + task_handle: Some(task_handle), > + }) > + } > + > + /// Request handler loop - receives and processes messages via ring buffers > + /// > + /// Runs in a background async task, receiving requests and sending responses > + /// through shared memory ring buffers. > + /// > + /// Uses tokio channels to implement a workqueue with flow control: > + /// - FlowControl::OK: Proceed with sending > + /// - FlowControl::SLOW_DOWN: Reduce send rate > + /// - FlowControl::STOP: Do not send > + /// > + /// Architecture: Three concurrent tasks communicating via tokio channels: > + /// 1. Request receiver: reads from request ring buffer, queues work > + /// 2. Worker: processes requests from work queue, sends to response queue > + /// 3. Response sender: writes responses from response queue to response ring buffer > + #[allow(clippy::too_many_arguments)] > + async fn handle_requests( > + mut request_rb: RingBuffer, > + mut response_rb: RingBuffer, > + handler: Arc, > + cancellation_token: CancellationToken, > + conn_id: u64, > + uid: u32, > + gid: u32, > + pid: u32, > + read_only: bool, > + ) { > + tracing::debug!("Request handler started for connection {}", conn_id); > + > + // Workqueue capacity and flow control thresholds > + // > + // NOTE: The C implementation (using libqb) processes requests synchronously > + // in the event loop callback (server.c:159 s1_msg_process_fn), so there's > + // no explicit queue. We add async queueing in Rust to allow non-blocking > + // request handling with tokio. > + // > + // Queue capacity of 8 is chosen as a reasonable default for: > + // - Typical PVE workloads: Most IPC operations are fast (file reads/writes) > + // - Memory efficiency: Each queued item = ~1KB (request header + data) > + // - Backpressure: Small queue encourages flow control to activate quickly > + // - Testing: Flow control test (02-flow-control.sh) verifies 20 concurrent > + // operations work correctly with capacity 8 > + // > + // Flow control thresholds match libqb's rate limiting (ipcs.c:199-203): > + // - FlowControl::OK (0): Proceed with sending (QB_IPCS_RATE_NORMAL) > + // - FlowControl::SLOW_DOWN (1): Reduce send rate (QB_IPCS_RATE_OFF) > + // - FlowControl::STOP (2): Do not send (QB_IPCS_RATE_OFF_2) > + const MAX_PENDING_REQUESTS: usize = 8; > + > + // Set SLOW_DOWN when queue reaches 75% capacity (6/8 items) > + // This provides early warning before the queue fills completely, > + // allowing clients to throttle before hitting STOP > + const FC_WARNING_THRESHOLD: usize = 6; > + > + // Work queue: (header, request) -> worker > + let (work_tx, mut work_rx) = > + tokio::sync::mpsc::channel::<(RequestHeader, Request)>(MAX_PENDING_REQUESTS); > + > + // Response queue: worker -> response sender > + // Unbounded because responses must not block the worker > + let (response_tx, mut response_rx) = > + tokio::sync::mpsc::unbounded_channel::<(RequestHeader, Response)>(); if the response ring buffer fills up (slow/stuck client), responses queue in memory without limit and can OOM the daemon. This should be bounded like the work queue already is. > + > + // Spawn worker task to process requests > + let worker_handler = handler.clone(); > + let worker_response_tx = response_tx.clone(); > + let worker_task = tokio::spawn(async move { > + while let Some((header, request)) = work_rx.recv().await { > + let handler_response = worker_handler.handle(request).await; > + // Send to response queue (unbounded, never blocks) > + let _ = worker_response_tx.send((header, handler_response)); > + } > + }); > + > + // Spawn response sender task > + let response_task = tokio::spawn(async move { > + while let Some((header, handler_response)) = response_rx.recv().await { > + Self::send_response(&mut response_rb, header, handler_response).await; > + } > + }); > + > + // Main request receiver loop > + loop { > + // Wait for incoming request (async, yields to tokio scheduler) > + let request_data = tokio::select! { > + _ = cancellation_token.cancelled() => { > + tracing::debug!("Request handler cancelled for connection {}", conn_id); > + break; > + } > + result = request_rb.recv() => { > + match result { > + Ok(data) => data, > + Err(e) => { > + tracing::error!("Error receiving request on conn {}: {}", conn_id, e); > + break; > + } > + } > + } > + }; > + > + // After receiving from ring buffer, flow control is already set to 0 > + // by RingBufferShared::read_chunk() > + > + // Parse request header > + if request_data.len() < std::mem::size_of::() { > + tracing::warn!( > + "Request too small: {} bytes (need {} for header)", > + request_data.len(), > + std::mem::size_of::() > + ); > + continue; > + } > + > + let header = > + unsafe { std::ptr::read_unaligned(request_data.as_ptr() as *const RequestHeader) }; > + > + tracing::debug!( > + "Received request on conn {}: id={}, size={}", > + conn_id, > + *header.id, > + *header.size > + ); > + > + // Extract message data (after header) > + let header_size = std::mem::size_of::(); > + let msg_data = &request_data[header_size..]; > + > + // Build request object with full context > + let request = Request { > + msg_id: *header.id, > + data: msg_data.to_vec(), > + is_read_only: read_only, > + conn_id, > + uid, > + gid, > + pid, > + }; > + > + // Send to workqueue - implements backpressure via flow control > + match work_tx.try_send((header, request)) { > + Ok(()) => { > + // Request queued successfully > + > + // Update flow control based on queue depth > + // This matches libqb's rate limiting behavior > + let queue_len = MAX_PENDING_REQUESTS - work_tx.capacity(); > + let fc_value = if queue_len >= MAX_PENDING_REQUESTS { > + FlowControl::STOP // Queue full - stop sending > + } else if queue_len >= FC_WARNING_THRESHOLD { > + FlowControl::SLOW_DOWN // Queue approaching full - slow down > + } else { > + FlowControl::OK // Queue has space - OK to send > + }; > + > + if fc_value > FlowControl::OK { > + tracing::debug!( > + "Setting flow control to {} (queue: {}/{})", > + fc_value, > + queue_len, > + MAX_PENDING_REQUESTS > + ); > + } > + request_rb.flow_control.set(fc_value); > + } > + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { > + // Queue is full - set flow control to STOP and send EAGAIN > + tracing::warn!("Work queue full on conn {}, sending EAGAIN", conn_id); > + request_rb.flow_control.set(FlowControl::STOP); > + > + let error_response = Response { > + error_code: -libc::EAGAIN, > + data: Vec::new(), > + }; > + // Send error response directly (bypassing queue) > + let _ = response_tx.send((header, error_response)); > + } > + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { > + tracing::error!("Work queue closed on conn {}", conn_id); > + break; > + } > + } > + } > + > + // Cleanup: drop channels to signal tasks to exit > + drop(work_tx); > + drop(response_tx); > + let _ = worker_task.await; > + let _ = response_task.await; > + > + tracing::debug!("Request handler finished for connection {}", conn_id); > + } > + > + /// Send a response to the client > + async fn send_response( > + response_rb: &mut RingBuffer, > + header: RequestHeader, > + handler_response: Response, > + ) { > + // Build and serialize response: [header][data] > + let response_size = std::mem::size_of::() + handler_response.data.len(); > + let mut response_bytes = Vec::with_capacity(response_size); > + > + let response_header = ResponseHeader { > + id: header.id, > + size: (response_size as i32).into(), > + error: handler_response.error_code.into(), > + }; > + > + response_bytes.extend_from_slice(unsafe { > + std::slice::from_raw_parts( > + &response_header as *const _ as *const u8, > + std::mem::size_of::(), > + ) > + }); > + response_bytes.extend_from_slice(&handler_response.data); > + > + tracing::debug!("Response header bytes (24): {:02x?}", &response_bytes[..24]); > + > + // Send response (async, yields if buffer full) > + match response_rb.send(&response_bytes).await { > + Ok(()) => { > + // Response sent successfully > + } > + Err(e) => { > + tracing::error!("Failed to send response: {}", e); > + } > + } > + } > +} > + > +/// Get peer credentials from Unix socket > +fn get_peer_credentials(fd: i32) -> Result<(u32, u32, u32)> { > + #[cfg(target_os = "linux")] > + { > + let mut ucred: libc::ucred = unsafe { std::mem::zeroed() }; > + let mut ucred_size = std::mem::size_of::() as libc::socklen_t; > + > + let res = unsafe { > + libc::getsockopt( > + fd, > + libc::SOL_SOCKET, > + libc::SO_PEERCRED, > + &mut ucred as *mut _ as *mut libc::c_void, > + &mut ucred_size, > + ) > + }; > + > + if res != 0 { > + anyhow::bail!( > + "getsockopt SO_PEERCRED failed: {}", > + std::io::Error::last_os_error() > + ); > + } > + > + Ok((ucred.uid, ucred.gid, ucred.pid as u32)) > + } > + > + #[cfg(not(target_os = "linux"))] > + { > + anyhow::bail!("Peer credentials not supported on this platform"); > + } > +} > + > +/// Send connection response to client > +async fn send_connection_response( > + stream: &mut UnixStream, > + error: i32, > + conn_id: u64, > + max_msg_size: u32, > + request_path: &str, > + response_path: &str, > + event_path: &str, > +) -> Result<()> { > + let mut response = ConnectionResponse { > + hdr: ResponseHeader { > + id: MSG_AUTHENTICATE.into(), > + size: (std::mem::size_of::() as i32).into(), > + error: error.into(), > + }, > + connection_type: CONNECTION_TYPE_SHM, // Shared memory transport > + max_msg_size, > + connection: conn_id as usize, > + request: [0u8; PATH_MAX], > + response: [0u8; PATH_MAX], > + event: [0u8; PATH_MAX], > + }; > + > + // Helper to copy path strings into fixed-size buffers > + let copy_path = |dest: &mut [u8; PATH_MAX], src: &str| { > + if !src.is_empty() { > + let len = src.len().min(PATH_MAX - 1); > + dest[..len].copy_from_slice(&src.as_bytes()[..len]); > + tracing::debug!("Connection response path: '{}'", src); > + } > + }; > + > + copy_path(&mut response.request, request_path); > + copy_path(&mut response.response, response_path); > + copy_path(&mut response.event, event_path); > + > + // Serialize and send > + let response_bytes = unsafe { > + std::slice::from_raw_parts( > + &response as *const _ as *const u8, > + std::mem::size_of::(), > + ) > + }; > + > + stream > + .write_all(response_bytes) > + .await > + .context("Failed to send connection response")?; > + > + tracing::debug!( > + "Sent connection response: error={}, connection_type=SHM", > + error > + ); > + > + Ok(()) > +} > + > +#[cfg(test)] > +mod tests { > + use super::*; > + > + #[test] > + fn test_malformed_request_size_validation() { > + // This test verifies the size validation logic for malformed requests > + // The actual validation happens in handle_requests() at line 247-254 > + > + let header_size = std::mem::size_of::(); > + assert_eq!(header_size, 16, "RequestHeader should be 16 bytes"); > + > + // Test case 1: Request too small (would be rejected) > + let too_small_data = [0x01, 0x02, 0x03]; // Only 3 bytes > + assert!( > + too_small_data.len() < header_size, > + "Malformed request with {} bytes should be less than header size {}", > + too_small_data.len(), > + header_size > + ); > + > + // Test case 2: More realistic too-small cases > + let test_cases = vec![ > + (vec![0u8; 0], 0), // Empty request > + (vec![0u8; 1], 1), // 1 byte > + (vec![0u8; 8], 8), // 8 bytes (half header) > + (vec![0u8; 15], 15), // 15 bytes (just short of header) > + ]; > + > + for (data, expected_len) in test_cases { > + assert_eq!(data.len(), expected_len); > + assert!( > + data.len() < header_size, > + "Request with {} bytes should be rejected (need {})", > + data.len(), > + header_size > + ); > + } > + > + // Test case 3: Valid size requests (would pass size check) > + let valid_cases = vec![ > + vec![0u8; 16], // Exact header size > + vec![0u8; 32], // Header + data > + vec![0u8; 1024], // Large request > + ]; > + > + for data in valid_cases { > + assert!( > + data.len() >= header_size, > + "Request with {} bytes should pass size check", > + data.len() > + ); > + } > + } > + > + #[test] > + fn test_malformed_header_structure() { > + // This test verifies that the header structure is correctly defined > + // and that we can safely parse various header patterns > + > + let header_size = std::mem::size_of::(); > + > + // Create a valid-sized buffer with various patterns > + let patterns = vec![ > + vec![0x00; header_size], // All zeros > + vec![0xFF; header_size], // All ones > + vec![0xAA; header_size], // Alternating pattern > + ]; > + > + for pattern in patterns { > + assert_eq!(pattern.len(), header_size); > + > + // Parse header (same unsafe code as in handle_requests:256-258) > + let header = > + unsafe { std::ptr::read_unaligned(pattern.as_ptr() as *const RequestHeader) }; > + > + // The parsing should not crash, regardless of values > + // The actual values don't matter for this safety test > + let _id = *header.id; > + let _size = *header.size; > + } > + } > + > + #[test] > + fn test_request_header_alignment() { > + // Verify that RequestHeader can be read with read_unaligned > + // This is important because data from ring buffers may not be aligned > + > + let header_size = std::mem::size_of::(); > + > + // Create misaligned buffer (offset by 1 byte to test unaligned access) > + let mut buffer = vec![0u8; header_size + 1]; > + buffer[1..].fill(0x42); > + > + // Read from misaligned offset (this is what read_unaligned is for) > + let header = > + unsafe { std::ptr::read_unaligned(&buffer[1] as *const u8 as *const RequestHeader) }; > + > + // Should successfully read without crashing > + let _id = *header.id; > + let _size = *header.size; > + } > + > + #[test] > + fn test_connection_request_structure() { > + // Verify ConnectionRequest structure for connection setup > + > + let conn_req_size = std::mem::size_of::(); > + > + // ConnectionRequest should be properly sized > + assert!( > + conn_req_size > std::mem::size_of::(), > + "ConnectionRequest should include header plus additional fields" > + ); > + > + // Test that we can parse a zero-filled connection request > + let data = vec![0u8; conn_req_size]; > + let conn_req = > + unsafe { std::ptr::read_unaligned(data.as_ptr() as *const ConnectionRequest) }; > + > + // Should not crash when accessing fields > + let _id = *conn_req.hdr.id; > + let _size = *conn_req.hdr.size; > + let _max_msg_size = conn_req.max_msg_size; > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs > new file mode 100644 > index 00000000..12b40cd4 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs > @@ -0,0 +1,93 @@ > +//! Handler trait for processing IPC requests > +//! > +//! This module defines the core `Handler` trait that users implement to process > +//! IPC requests. The trait-based approach provides a more idiomatic and extensible > +//! API compared to raw function closures. > + > +use crate::protocol::{Request, Response}; > +use async_trait::async_trait; > + > +/// Permissions for IPC connections > +/// > +/// Determines the access level for authenticated connections. > +#[derive(Debug, Clone, Copy, PartialEq, Eq)] > +pub enum Permissions { > + /// Read-only access > + ReadOnly, > + /// Read-write access > + ReadWrite, > +} > + > +/// Handler trait for processing IPC requests and authentication > +/// > +/// Implement this trait to define custom request handling logic and authentication > +/// policy for your IPC server. The handler receives a `Request` containing the > +/// message ID, payload data, and connection context, and returns a `Response` with > +/// an error code and response data. > +/// > +/// ## Authentication > +/// > +/// The `authenticate` method is called during connection setup to determine whether > +/// a client with given credentials should be accepted. This allows the handler to > +/// implement application-specific authentication policies. > +/// > +/// ## Async Support > +/// > +/// The `handle` method is async, allowing you to perform I/O operations, database > +/// queries, or other async work within your handler. > +/// > +/// ## Thread Safety > +/// > +/// Handlers must be `Send + Sync` as they may be called from multiple tokio tasks > +/// concurrently. Use `Arc>` or other synchronization primitives if you need > +/// mutable shared state. > +/// > +/// ## Error Handling > +/// > +/// Return negative errno values in `Response::error_code` to indicate errors. > +/// Use 0 for success. See `libc::*` constants for standard errno values. > +#[async_trait] > +pub trait Handler: Send + Sync { > + /// Authenticate a connecting client and determine access level > + /// > + /// Called during connection setup to determine whether to accept the connection > + /// and what access level to grant. > + /// > + /// # Arguments > + /// > + /// * `uid` - Client user ID (from SO_PEERCRED) > + /// * `gid` - Client group ID (from SO_PEERCRED) > + /// > + /// # Returns > + /// > + /// - `Some(Permissions::ReadWrite)` to accept with read-write access > + /// - `Some(Permissions::ReadOnly)` to accept with read-only access > + /// - `None` to reject the connection > + fn authenticate(&self, uid: u32, gid: u32) -> Option; > + > + /// Handle an IPC request > + /// > + /// # Arguments > + /// > + /// * `request` - The incoming request with message ID, data, and connection context > + /// > + /// # Returns > + /// > + /// A `Response` containing the error code (0 = success, negative = errno) and > + /// optional response data to send back to the client. > + async fn handle(&self, request: Request) -> Response; > +} > + > +/// Blanket implementation for Arc where T: Handler > +/// > +/// This allows passing `Arc` directly to `Server::new()`. > +#[async_trait] > +impl Handler for std::sync::Arc { > + fn authenticate(&self, uid: u32, gid: u32) -> Option { > + (**self).authenticate(uid, gid) > + } > + > + async fn handle(&self, request: Request) -> Response { > + (**self).handle(request).await > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs > new file mode 100644 > index 00000000..923c359e > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs > @@ -0,0 +1,37 @@ > +/// libqb-compatible IPC server implementation in pure Rust > +/// > +/// This crate implements a minimal libqb IPC server that is wire-compatible > +/// with libqb clients (qb_ipcc_*), without depending on the libqb C library. > +/// > +/// ## Protocol Overview > +/// > +/// 1. **Connection Handshake** (SOCK_STREAM): > +/// - Server listens on `/var/run/{service_name}` > +/// - Client connects and sends `qb_ipc_connection_request` > +/// - Server authenticates (uid/gid), creates per-connection datagram sockets > +/// - Server sends `qb_ipc_connection_response` with socket paths > +/// > +/// 2. **Request/Response** (SOCK_DGRAM): > +/// - Client sends requests on datagram socket The actual implementation uses SHM ring buffers, please re-visit documentation. > +/// - Server receives, processes, and sends responses > +/// > +/// ## Module Structure > +/// > +/// - `protocol` - Wire protocol structures and constants > +/// - `socket` - Abstract Unix socket utilities > +/// - `connection` - Per-connection handling and request processing > +/// - `server` - Main IPC server and connection acceptance > +/// > +/// References: > +/// - libqb source: ~/dev/libqb/lib/ipc_socket.c, ipc_setup.c > +mod connection; > +mod handler; > +mod protocol; > +mod ringbuffer; > +mod server; > +mod socket; > + > +// Public API > +pub use handler::{Handler, Permissions}; > +pub use protocol::{Request, Response}; > +pub use server::Server; > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs > new file mode 100644 > index 00000000..469099f2 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs > @@ -0,0 +1,332 @@ > +//! libqb wire protocol structures and constants > +//! > +//! This module contains the low-level protocol definitions for libqb IPC communication. > +//! All structures must match the C counterparts exactly for binary compatibility. > + > +/// Message ID for authentication requests (matches libqb's QB_IPC_MSG_AUTHENTICATE) > +pub(super) const MSG_AUTHENTICATE: i32 = 1; > + > +/// Connection type for shared memory transport (matches libqb's QB_IPC_SHM) > +pub(super) const CONNECTION_TYPE_SHM: u32 = 1; > + > +/// Maximum path length - used in connection response > +pub(super) const PATH_MAX: usize = 4096; > + > +/// Wrapper for i32 that aligns to 8-byte boundary with explicit padding > +/// > +/// Simulates C's `__attribute__ ((aligned(8)))` on individual i32 fields. > +/// This is used to match libqb's per-field alignment behavior. > +/// > +/// Memory layout: > +/// - Bytes 0-3: i32 value > +/// - Bytes 4-7: zero padding > +/// - Total: 8 bytes > +#[repr(C, align(8))] > +#[derive(Debug, Copy, Clone, PartialEq, Eq)] > +pub struct Align8 { > + pub value: i32, > + _pad: u32, // 4 bytes padding for i32 -> 8 bytes total > +} > + > +impl Align8 { > + #[inline] > + pub const fn new(value: i32) -> Self { > + Align8 { value, _pad: 0 } > + } > +} > + > +impl std::ops::Deref for Align8 { > + type Target = i32; > + > + #[inline] > + fn deref(&self) -> &i32 { > + &self.value > + } > +} > + > +impl std::ops::DerefMut for Align8 { > + #[inline] > + fn deref_mut(&mut self) -> &mut i32 { > + &mut self.value > + } > +} > + > +impl From for Align8 { > + #[inline] > + fn from(value: i32) -> Self { > + Align8::new(value) > + } > +} > + > +impl Default for Align8 { > + #[inline] > + fn default() -> Self { > + Align8::new(0) > + } > +} > + > +/// Request header (matches libqb's qb_ipc_request_header) > +/// > +/// Each field is 8-byte aligned to match C's __attribute__ ((aligned(8))) > +#[repr(C, align(8))] > +#[derive(Debug, Copy, Clone)] > +pub struct RequestHeader { > + pub id: Align8, > + pub size: Align8, > +} > + > +/// Response header (matches libqb's qb_ipc_response_header) > +#[repr(C, align(8))] > +#[derive(Debug, Copy, Clone)] > +pub struct ResponseHeader { > + pub id: Align8, > + pub size: Align8, > + pub error: Align8, > +} > + > +/// Connection request sent by client during handshake (matches libqb's qb_ipc_connection_request) > +#[repr(C)] > +#[derive(Debug, Copy, Clone)] > +pub(super) struct ConnectionRequest { > + pub hdr: RequestHeader, > + pub max_msg_size: u32, > +} > + > +/// Connection response sent by server during handshake (matches libqb's qb_ipc_connection_response) > +#[repr(C, align(8))] > +#[derive(Debug)] > +pub(super) struct ConnectionResponse { > + pub hdr: ResponseHeader, > + pub connection_type: u32, > + pub max_msg_size: u32, > + pub connection: usize, > + pub request: [u8; PATH_MAX], > + pub response: [u8; PATH_MAX], > + pub event: [u8; PATH_MAX], > +} > + > +/// Request passed to handlers > +/// > +/// Contains all information about an IPC request including the message ID, > +/// payload data, and connection context (uid, gid, pid, permissions). > +#[derive(Debug, Clone)] > +pub struct Request { > + /// Message ID identifying the operation (application-defined) > + pub msg_id: i32, > + > + /// Request payload data > + pub data: Vec, > + > + /// Whether this connection has read-only access > + pub is_read_only: bool, > + > + /// Connection ID (for logging/debugging) > + pub conn_id: u64, > + > + /// Client user ID (from SO_PEERCRED) > + pub uid: u32, > + > + /// Client group ID (from SO_PEERCRED) > + pub gid: u32, > + > + /// Client process ID (from SO_PEERCRED) > + pub pid: u32, > +} > + > +/// Response from handlers > +/// > +/// Contains the error code and response data to send back to the client. > +#[derive(Debug, Clone)] > +pub struct Response { > + /// Error code (0 = success, negative = errno) > + pub error_code: i32, > + > + /// Response payload data > + pub data: Vec, > +} > + > +impl Response { > + /// Create a successful response with data > + pub fn ok(data: Vec) -> Self { > + Self { > + error_code: 0, > + data, > + } > + } > + > + /// Create an error response with errno > + pub fn err(error_code: i32) -> Self { > + Self { > + error_code, > + data: Vec::new(), > + } > + } > + > + /// Create an error response with errno and optional data > + pub fn with_error(error_code: i32, data: Vec) -> Self { > + Self { error_code, data } > + } > +} > + > +#[cfg(test)] > +mod tests { > + use super::*; > + > + #[test] > + fn test_header_sizes() { > + assert_eq!(std::mem::size_of::(), 16); > + assert_eq!(std::mem::align_of::(), 8); > + assert_eq!(std::mem::size_of::(), 24); > + assert_eq!(std::mem::align_of::(), 8); > + assert_eq!(std::mem::size_of::(), 24); // 16 (header) + 4 (max_msg_size) + 4 (padding) > + > + println!( > + "ConnectionResponse size: {}", > + std::mem::size_of::() > + ); > + println!( > + "ConnectionResponse align: {}", > + std::mem::align_of::() > + ); > + println!("PATH_MAX: {PATH_MAX}"); > + > + // C expects: 24 (header) + 4 (connection_type) + 4 (max_msg_size) + 8 (connection pointer) + 3*4096 (paths) = 12328 > + assert_eq!(std::mem::size_of::(), 12328); > + } > + > + // ===== Align8 Tests ===== > + > + #[test] > + fn test_align8_size_and_alignment() { > + // Verify Align8 is exactly 8 bytes > + assert_eq!(std::mem::size_of::(), 8); > + assert_eq!(std::mem::align_of::(), 8); > + } > + > + #[test] > + fn test_align8_creation_and_value_access() { > + let a = Align8::new(42); > + assert_eq!(a.value, 42); > + assert_eq!(*a, 42); // Test Deref > + } > + > + #[test] > + fn test_align8_from_i32() { > + let a: Align8 = (-100).into(); > + assert_eq!(a.value, -100); > + } > + > + #[test] > + fn test_align8_default() { > + let a = Align8::default(); > + assert_eq!(a.value, 0); > + } > + > + #[test] > + fn test_align8_deref_mut() { > + let mut a = Align8::new(10); > + *a = 20; // Test DerefMut > + assert_eq!(a.value, 20); > + } > + > + #[test] > + fn test_align8_padding_is_zero() { > + let a = Align8::new(123); > + // Padding should always be 0 > + assert_eq!(a._pad, 0); > + } > + > + // ===== Response Tests ===== > + > + #[test] > + fn test_response_ok_creation() { > + let data = b"test data".to_vec(); > + let resp = Response::ok(data.clone()); > + > + assert_eq!(resp.error_code, 0); > + assert_eq!(resp.data, data); > + } > + > + #[test] > + fn test_response_err_creation() { > + let resp = Response::err(-5); // ERRNO like EIO > + > + assert_eq!(resp.error_code, -5); > + assert!(resp.data.is_empty()); > + } > + > + #[test] > + fn test_response_with_error_and_data() { > + let data = b"error details".to_vec(); > + let resp = Response::with_error(-22, data.clone()); // EINVAL > + > + assert_eq!(resp.error_code, -22); > + assert_eq!(resp.data, data); > + } > + > + #[test] > + fn test_response_error_codes() { > + // Test various errno values > + let test_cases = vec![ > + (0, "success"), > + (-1, "EPERM"), > + (-2, "ENOENT"), > + (-13, "EACCES"), > + (-22, "EINVAL"), > + ]; > + > + for (code, _name) in test_cases { > + let resp = Response::err(code); > + assert_eq!(resp.error_code, code); > + } > + } > + > + // ===== Request Tests ===== > + > + #[test] > + fn test_request_creation() { > + let req = Request { > + msg_id: 100, > + data: b"payload".to_vec(), > + is_read_only: false, > + conn_id: 12345, > + uid: 0, > + gid: 0, > + pid: 999, > + }; > + > + assert_eq!(req.msg_id, 100); > + assert_eq!(req.data, b"payload"); > + assert!(!req.is_read_only); > + assert_eq!(req.conn_id, 12345); > + assert_eq!(req.uid, 0); > + assert_eq!(req.gid, 0); > + assert_eq!(req.pid, 999); > + } > + > + #[test] > + fn test_request_read_only_flag() { > + let req_ro = Request { > + msg_id: 1, > + data: vec![], > + is_read_only: true, > + conn_id: 1, > + uid: 33, > + gid: 33, > + pid: 1000, > + }; > + > + let req_rw = Request { > + msg_id: 1, > + data: vec![], > + is_read_only: false, > + conn_id: 2, > + uid: 0, > + gid: 0, > + pid: 1001, > + }; > + > + assert!(req_ro.is_read_only); > + assert!(!req_rw.is_read_only); > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs > new file mode 100644 > index 00000000..96dd192b > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs > @@ -0,0 +1,1158 @@ > +/// Lock-free ring buffer implementation compatible with libqb's shared memory IPC > +/// > +/// This module implements a SPSC (single-producer single-consumer) ring buffer > +/// using shared memory, matching libqb's wire protocol and memory layout. > +/// > +/// ## Design > +/// > +/// - **Shared Memory**: Two mmap'd files (header + data) in /dev/shm > +/// - **Lock-Free**: Uses atomic operations for read_pt/write_pt synchronization > +/// - **Chunk-Based**: Messages stored as [size][magic][data] chunks > +/// - **Wire-Compatible**: Matches libqb's qb_ringbuffer_shared_s layout > +use anyhow::{Context, Result}; > +use memmap2::MmapMut; > +use std::fs::OpenOptions; > +use std::os::fd::AsRawFd; > +use std::path::Path; > +use std::sync::Arc; > +use std::sync::atomic::{AtomicI32, AtomicU32, Ordering}; > +use tokio::sync::Notify; > + > +/// Circular mmap wrapper for ring buffer data > +/// > +/// This struct manages a circular memory mapping where the same file is mapped > +/// twice in consecutive virtual addresses. This allows ring buffer operations > +/// to wrap around naturally without modulo arithmetic. > +/// > +/// Matches libqb's qb_sys_circular_mmap() behavior. > +struct CircularMmap { > + /// Starting address of the 2x circular mapping > + addr: *mut libc::c_void, > + /// Size of the file (virtual mapping is 2x this size) > + size: usize, > +} > + > +impl CircularMmap { > + /// Create a circular mmap from a file descriptor > + /// > + /// Maps the file TWICE in consecutive virtual addresses, allowing ring buffer > + /// wraparound without modulo arithmetic. Matches libqb's qb_sys_circular_mmap(). > + /// > + /// # Arguments > + /// - `fd`: File descriptor of the data file (must be sized to `size` bytes) > + /// - `size`: Size of the file in bytes (virtual mapping will be 2x this) > + /// > + /// # Safety > + /// The file must be properly sized before calling this function. > + unsafe fn new(fd: i32, size: usize) -> Result { > + // SAFETY: All operations in this function are inherently unsafe as they > + // manipulate raw memory mappings. The caller must ensure the fd is valid > + // and the file is properly sized. > + unsafe { > + // Step 1: Reserve 2x space with anonymous mmap > + let addr_orig = libc::mmap( > + std::ptr::null_mut(), > + size * 2, > + libc::PROT_NONE, > + libc::MAP_ANONYMOUS | libc::MAP_PRIVATE, > + -1, > + 0, > + ); > + > + if addr_orig == libc::MAP_FAILED { > + anyhow::bail!( > + "Failed to reserve circular mmap space: {}", > + std::io::Error::last_os_error() > + ); > + } > + > + // Step 2: Map the file at the start of reserved space > + let addr1 = libc::mmap( > + addr_orig, > + size, > + libc::PROT_READ | libc::PROT_WRITE, > + libc::MAP_FIXED | libc::MAP_SHARED, > + fd, > + 0, > + ); > + > + if addr1 != addr_orig { > + libc::munmap(addr_orig, size * 2); > + anyhow::bail!( > + "Failed to map first half of circular buffer: {}", > + std::io::Error::last_os_error() > + ); > + } > + > + // Step 3: Map the SAME file again right after > + let addr_next = (addr_orig as *mut u8).add(size) as *mut libc::c_void; > + let addr2 = libc::mmap( > + addr_next, > + size, > + libc::PROT_READ | libc::PROT_WRITE, > + libc::MAP_FIXED | libc::MAP_SHARED, > + fd, > + 0, > + ); > + > + if addr2 != addr_next { > + libc::munmap(addr_orig, size * 2); > + anyhow::bail!( > + "Failed to map second half of circular buffer: {}", > + std::io::Error::last_os_error() > + ); > + } > + > + tracing::debug!( > + "Created circular mmap: {:p}, {} bytes (2x {} bytes file)", > + addr_orig, > + size * 2, > + size > + ); > + > + Ok(Self { > + addr: addr_orig, > + size, > + }) > + } > + } > + > + /// Get the base address as a mutable pointer to u32 > + /// > + /// This is the most common use case for ring buffers which work with u32 words. > + fn as_mut_ptr(&self) -> *mut u32 { > + self.addr as *mut u32 > + } > + > + /// Zero-initialize the circular mapping > + /// > + /// Only needs to write to the first half due to the circular nature. > + /// > + /// # Safety > + /// The circular mmap must be properly initialized and the address valid. > + unsafe fn zero_initialize(&mut self) { > + // SAFETY: Caller ensures the circular mmap is valid and mapped > + unsafe { > + std::ptr::write_bytes(self.addr as *mut u8, 0, self.size); > + } > + } > +} > + > +impl Drop for CircularMmap { > + fn drop(&mut self) { > + // Munmap the 2x circular mapping > + // Matches libqb's cleanup in qb_rb_close_helper > + unsafe { > + libc::munmap(self.addr, self.size * 2); > + } > + tracing::debug!( > + "Unmapped circular buffer: {:p}, {} bytes (2x {} bytes file)", > + self.addr, > + self.size * 2, > + self.size > + ); > + } > +} > + > +/// Process-shared POSIX semaphore wrapper > +/// > +/// This wraps the native Linux sem_t (32 bytes on x86_64) for inter-process > +/// synchronization in the ring buffer. > +/// > +/// **libqb compatibility note**: This corresponds to libqb's `rpl_sem_t` type. > +/// On Linux with HAVE_SEM_TIMEDWAIT defined, rpl_sem_t is just an alias for > +/// the native sem_t. The "rpl" prefix stands for "replacement" - libqb provides > +/// a fallback implementation using mutexes/condvars on systems without proper > +/// POSIX semaphore support (like BSD). Since we only target Linux, we use the > +/// native sem_t directly. > +#[repr(C)] > +struct PosixSem { > + /// Raw sem_t storage (32 bytes on Linux x86_64) > + _sem: [u8; 32], > +} > + > +impl PosixSem { > + /// Initialize a POSIX semaphore in-place in shared memory > + /// > + /// This initializes the semaphore at its current memory location, which is > + /// critical for process-shared semaphores in mmap'd memory. The semaphore > + /// must not be moved after initialization. > + /// > + /// The semaphore is always initialized as: > + /// - **Process-shared** (pshared=1): Shared between processes via mmap > + /// - **Initial value 0**: No data available initially > + /// > + /// Matches libqb's semaphore initialization in `qb_rb_create_from_file`. > + /// > + /// # Safety > + /// The semaphore must remain at its current memory location and must not > + /// be moved or copied after initialization. > + unsafe fn init_in_place(&mut self) -> Result<()> { > + let sem_ptr = self._sem.as_mut_ptr() as *mut libc::sem_t; > + > + // pshared=1: Process-shared semaphore (for cross-process IPC) > + // initial_value=0: No data available initially (producers will post) > + const PSHARED: libc::c_int = 1; > + const INITIAL_VALUE: libc::c_uint = 0; > + > + // SAFETY: Caller ensures the semaphore memory is valid and will remain > + // at this location for its lifetime > + let ret = unsafe { libc::sem_init(sem_ptr, PSHARED, INITIAL_VALUE) }; > + > + if ret != 0 { > + anyhow::bail!("sem_init failed: {}", std::io::Error::last_os_error()); > + } > + > + Ok(()) > + } > + > + /// Destroy the semaphore > + /// > + /// This should be called when the semaphore is no longer needed. > + /// Matches libqb's rpl_sem_destroy (which is sem_destroy on Linux). > + /// > + /// # Safety > + /// The semaphore must have been properly initialized and no threads should > + /// be waiting on it. > + unsafe fn destroy(&mut self) -> Result<()> { > + let sem_ptr = self._sem.as_mut_ptr() as *mut libc::sem_t; > + > + // SAFETY: Caller ensures the semaphore is initialized and not in use > + let ret = unsafe { libc::sem_destroy(sem_ptr) }; > + > + if ret != 0 { > + anyhow::bail!("sem_destroy failed: {}", std::io::Error::last_os_error()); > + } > + > + Ok(()) > + } > + > + /// Post to the semaphore (increment) > + /// > + /// Matches libqb's rpl_sem_post (which is sem_post on Linux). > + unsafe fn post(&self) -> Result<()> { > + let ret = unsafe { libc::sem_post(self._sem.as_ptr() as *mut libc::sem_t) }; > + > + if ret != 0 { > + anyhow::bail!("sem_post failed: {}", std::io::Error::last_os_error()); > + } > + > + Ok(()) > + } > + > + /// Wait on the semaphore asynchronously (decrement, blocking) > + /// > + /// Uses `spawn_blocking` to wait on the semaphore without blocking the tokio > + /// runtime. This provides true event-driven behavior while maintaining > + /// compatibility with libqb's semaphore-based notification mechanism. > + /// > + /// Matches libqb's `my_posix_sem_timedwait` / `sem_wait` behavior. > + /// > + /// # Safety > + /// The semaphore must be properly initialized and remain valid for the > + /// duration of the wait operation. > + async unsafe fn wait(&self) -> Result<()> { > + // Get raw pointer to semaphore > + let sem_ptr = self._sem.as_ptr() as *mut libc::sem_t; > + > + // Convert to usize for safe transfer between threads > + // This is safe because: > + // 1. The semaphore is in process-shared memory (mmap'd file) > + // 2. The memory remains valid for the lifetime of the containing structure > + // 3. We're only using the pointer on the blocking thread pool > + let sem_ptr_addr = sem_ptr as usize; > + > + // Use spawn_blocking to wait on the semaphore without blocking tokio runtime > + // This offloads the blocking sem_wait to tokio's dedicated blocking thread pool > + tokio::task::spawn_blocking(move || { > + // Reconstruct the pointer on the blocking thread > + // SAFETY: The semaphore is in shared memory and remains valid. > + // We're calling sem_wait on a process-shared semaphore from a thread > + // in the same process, which is safe. > + let sem_ptr = sem_ptr_addr as *mut libc::sem_t; > + let ret = unsafe { libc::sem_wait(sem_ptr) }; On shutdown the async task can be dropped while the blocking sem_wait keeps running, but RingBuffer may then sem_destroy/unmap. Please make the wait shutdown aware and wake before destroying. > + > + if ret != 0 { > + let err = std::io::Error::last_os_error(); > + // Handle EINTR by returning an error that causes retry > + if err.raw_os_error() == Some(libc::EINTR) { > + anyhow::bail!("sem_wait interrupted (EINTR), will retry"); this says "will retry" but actually crashes? > + } > + anyhow::bail!("sem_wait failed: {err}"); > + } > + > + Ok(()) > + }) > + .await > + .context("spawn_blocking task failed")??; > + > + Ok(()) > + } > +} > + > +/// Shared memory header matching libqb's qb_ringbuffer_shared_s layout > +/// > +/// This structure is mmap'd and shared between processes. > +/// Field order and alignment must exactly match libqb for compatibility. > +/// > +/// Note: libqb's struct has `char user_data[1]` which contributes 1 byte to sizeof(), > +/// then the struct is padded to 8-byte alignment (7 bytes padding). > +/// Additional shared_user_data_size bytes are allocated beyond sizeof(). > +#[repr(C, align(8))] > +struct RingBufferShared { > + /// Write pointer (word index, not byte offset) > + write_pt: AtomicU32, > + /// Read pointer (word index, not byte offset) > + read_pt: AtomicU32, > + /// Ring buffer size in words (u32 units) > + word_size: u32, > + /// Path to header file > + hdr_path: [u8; libc::PATH_MAX as usize], > + /// Path to data file > + data_path: [u8; libc::PATH_MAX as usize], > + /// Reference count (for cleanup) > + ref_count: AtomicU32, > + /// Process-shared semaphore for notification > + posix_sem: PosixSem, > + /// Flexible array member placeholder (matches C's char user_data[1]) > + /// Actual user_data starts here and continues beyond sizeof(RingBufferShared) > + user_data: [u8; 1], > + // 7 bytes of padding added by align(8) to reach 8248 bytes total > +} > + > +impl RingBufferShared { > + /// Chunk header size in 32-bit words (matching libqb) > + const CHUNK_HEADER_WORDS: usize = 2; > + > + /// Chunk magic numbers (matching libqb qb_ringbuffer_int.h) > + const CHUNK_MAGIC: u32 = 0xA1A1A1A1; // Valid allocated chunk > + const CHUNK_MAGIC_DEAD: u32 = 0xD0D0D0D0; // Reclaimed/dead chunk > + const CHUNK_MAGIC_ALLOC: u32 = 0xA110CED0; // Chunk being allocated > + > + /// Calculate the next pointer position after a chunk of given size > + /// > + /// This implements libqb's qb_rb_chunk_step logic (ringbuffer.c:464-484): > + /// 1. Skip chunk header (CHUNK_HEADER_WORDS) > + /// 2. Skip user data (rounded up to word boundary) > + /// 3. Wrap around if needed > + /// > + /// # Arguments > + /// - `current_pt`: Current read or write pointer (in words) > + /// - `data_size_bytes`: Size of the data payload in bytes > + /// > + /// # Returns > + /// New pointer position (in words), wrapped to [0, word_size) > + fn chunk_step(&self, current_pt: u32, data_size_bytes: usize) -> u32 { > + let word_size = self.word_size as usize; > + > + // Convert bytes to words, rounding up to word boundary > + // This matches libqb's logic: > + // pointer += (chunk_size / sizeof(uint32_t)); > + // if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) pointer++; > + let data_words = data_size_bytes.div_ceil(std::mem::size_of::()); > + > + // Calculate new position: current + header + data (in words) > + let new_pt = (current_pt as usize + Self::CHUNK_HEADER_WORDS + data_words) % word_size; > + > + new_pt as u32 > + } > + > + /// Initialize a RingBufferShared structure in-place in shared memory > + /// > + /// This initializes the ring buffer header at its current memory location, which is > + /// critical for process-shared data structures in mmap'd memory. The structure > + /// must not be moved after initialization. > + /// > + /// # Arguments > + /// - `word_size`: Size of ring buffer in 32-bit words > + /// - `hdr_path`: Path to the header file (will be copied into the structure) > + /// - `data_path`: Path to the data file (will be copied into the structure) > + /// > + /// # Safety > + /// The RingBufferShared must remain at its current memory location and must not > + /// be moved or copied after initialization. > + unsafe fn init_in_place( > + &mut self, > + word_size: u32, > + hdr_path: &std::path::Path, > + data_path: &std::path::Path, > + ) -> Result<()> { > + // SAFETY: Caller ensures this structure is in shared memory and will remain > + // at this location for its lifetime > + unsafe { > + // Zero-initialize the entire structure first > + std::ptr::write_bytes(self as *mut Self, 0, 1); > + > + // Initialize atomic fields > + self.write_pt = AtomicU32::new(0); > + self.read_pt = AtomicU32::new(0); > + self.word_size = word_size; > + self.ref_count = AtomicU32::new(1); > + > + // Initialize semaphore in-place in shared memory > + // This is critical - the semaphore must be initialized at its final location > + self.posix_sem > + .init_in_place() > + .context("Failed to initialize semaphore")?; > + > + // Copy header path into structure > + let hdr_path_str = hdr_path.to_string_lossy(); > + let hdr_path_bytes = hdr_path_str.as_bytes(); > + let len = hdr_path_bytes.len().min(libc::PATH_MAX as usize - 1); > + self.hdr_path[..len].copy_from_slice(&hdr_path_bytes[..len]); > + > + // Copy data path into structure > + let data_path_str = data_path.to_string_lossy(); > + let data_path_bytes = data_path_str.as_bytes(); > + let len = data_path_bytes.len().min(libc::PATH_MAX as usize - 1); > + self.data_path[..len].copy_from_slice(&data_path_bytes[..len]); > + } > + > + Ok(()) > + } > + > + /// Calculate free space in the ring buffer (in words) > + /// > + /// Returns the number of free words (u32 units) available for allocation. > + /// This uses atomic loads to read the pointers safely. > + fn space_free_words(&self) -> usize { > + let write_pt = self.write_pt.load(Ordering::Acquire); > + let read_pt = self.read_pt.load(Ordering::Acquire); > + let word_size = self.word_size as usize; > + > + if write_pt >= read_pt { > + if write_pt == read_pt { > + word_size // Buffer is empty, all space available > + } else { > + (read_pt as usize + word_size - write_pt as usize) - 1 > + } > + } else { > + (read_pt as usize - write_pt as usize) - 1 > + } > + } > + > + /// Calculate free space in bytes > + /// > + /// Converts the word count to bytes by multiplying by sizeof(uint32_t). > + /// Matches libqb's qb_rb_space_free (ringbuffer.c:373). > + fn space_free_bytes(&self) -> usize { > + self.space_free_words() * std::mem::size_of::() > + } > + > + /// Check if a chunk of given size (in bytes) can fit in the buffer > + /// > + /// Includes chunk header overhead and alignment requirements. > + fn chunk_fits(&self, message_size: usize, chunk_margin: usize) -> bool { > + let required_bytes = message_size + chunk_margin; > + self.space_free_bytes() >= required_bytes > + } > + > + /// Write a chunk to the ring buffer > + /// > + /// This performs the complete chunk write operation: > + /// 1. Allocate space in the ring buffer > + /// 2. Write the message data (handling wraparound) > + /// 3. Commit the chunk (update write_pt, set magic) > + /// 4. Post to semaphore to wake readers > + /// > + /// # Safety > + /// Caller must ensure: > + /// - shared_data points to valid ring buffer data > + /// - There is sufficient space (checked via chunk_fits) > + /// - No other thread is writing concurrently > + unsafe fn write_chunk(&self, shared_data: *mut u32, message: &[u8]) -> Result<()> { > + let msg_len = message.len(); > + let word_size = self.word_size as usize; > + > + // Get current write pointer > + let write_pt = self.write_pt.load(Ordering::Acquire); > + > + // Write chunk header: [size=0][magic=ALLOC] > + // Matches libqb's qb_rb_chunk_alloc (ringbuffer.c:439-440) > + unsafe { > + *shared_data.add(write_pt as usize) = 0; // Size is 0 during allocation > + *shared_data.add((write_pt as usize + 1) % word_size) = Self::CHUNK_MAGIC_ALLOC; > + } > + > + // Write message data > + let data_offset = (write_pt as usize + Self::CHUNK_HEADER_WORDS) % word_size; > + let data_ptr = unsafe { shared_data.add(data_offset) as *mut u8 }; > + > + // Handle wraparound - calculate remaining bytes in buffer before wraparound > + let remaining = (word_size - data_offset) * std::mem::size_of::(); > + if msg_len <= remaining { > + // No wraparound needed > + unsafe { > + std::ptr::copy_nonoverlapping(message.as_ptr(), data_ptr, msg_len); > + } > + } else { > + // Need to wrap around > + unsafe { > + std::ptr::copy_nonoverlapping(message.as_ptr(), data_ptr, remaining); > + std::ptr::copy_nonoverlapping( > + message.as_ptr().add(remaining), > + shared_data as *mut u8, > + msg_len - remaining, > + ); > + } > + } > + > + // Calculate new write pointer - matches libqb's qb_rb_chunk_step logic > + let new_write_pt = self.chunk_step(write_pt, msg_len); > + > + // Commit: write size, update write pointer, then set magic with atomic RELEASE > + // This matches libqb's qb_rb_chunk_commit behavior (ringbuffer.c:497-504) > + unsafe { > + // 1. Write chunk size > + *shared_data.add(write_pt as usize) = msg_len as u32; > + > + // 2. Update write pointer > + self.write_pt.store(new_write_pt, Ordering::Relaxed); > + > + // 3. Set magic with RELEASE > + // RELEASE ensures all previous writes (data, size, write_pt) are visible before magic > + let magic_offset = (write_pt as usize + 1) % word_size; > + let magic_ptr = shared_data.add(magic_offset) as *mut AtomicU32; > + (*magic_ptr).store(Self::CHUNK_MAGIC, Ordering::Release); write_pt could eventually be peeked before the chunk is committed? We should publish the chunk by advancing write_pt with Release after writing data/size/magic, so readers canโ€™t observe the new write_pt before the chunk is committed. > + > + // 4. Post to semaphore to wake up waiting readers > + self.posix_sem > + .post() > + .context("Failed to post to semaphore")?; > + } > + > + tracing::debug!( > + "Wrote chunk: {} bytes, write_pt {} -> {}", > + msg_len, > + write_pt, > + new_write_pt > + ); > + > + Ok(()) > + } > + > + /// Read a chunk from the ring buffer > + /// > + /// This reads the chunk at the current read pointer, validates it, > + /// copies the data, and reclaims the chunk. > + /// > + /// Returns None if the buffer is empty (read_pt == write_pt). > + /// > + /// # Safety > + /// Caller must ensure: > + /// - shared_data points to valid ring buffer data > + /// - flow_control_ptr (if Some) points to valid i32 > + /// - No other thread is reading concurrently > + unsafe fn read_chunk( > + &self, > + shared_data: *mut u32, > + flow_control_ptr: Option<*mut i32>, > + ) -> Result>> { > + let word_size = self.word_size as usize; > + > + // Get current read pointer > + let read_pt = self.read_pt.load(Ordering::Acquire); > + let write_pt = self.write_pt.load(Ordering::Acquire); > + > + // Check if buffer is empty > + if read_pt == write_pt { > + return Ok(None); > + } > + > + // Read chunk header with ACQUIRE to see all writes > + let magic_offset = (read_pt as usize + 1) % word_size; > + let magic_ptr = unsafe { shared_data.add(magic_offset) as *const AtomicU32 }; > + let chunk_magic = unsafe { (*magic_ptr).load(Ordering::Acquire) }; > + > + // Read chunk size > + let chunk_size = unsafe { *shared_data.add(read_pt as usize) }; This should be validated too. > + > + tracing::debug!( > + "Reading chunk: read_pt={}, write_pt={}, size={}, magic=0x{:08x}", > + read_pt, > + write_pt, > + chunk_size, > + chunk_magic > + ); > + > + // Verify magic > + if chunk_magic != Self::CHUNK_MAGIC { > + anyhow::bail!( > + "Invalid chunk magic at read_pt={}: expected 0x{:08x}, got 0x{:08x}", > + read_pt, > + Self::CHUNK_MAGIC, > + chunk_magic > + ); > + } > + > + // Read message data > + let data_offset = (read_pt as usize + Self::CHUNK_HEADER_WORDS) % word_size; > + let data_ptr = unsafe { shared_data.add(data_offset) as *const u8 }; > + > + let mut message = vec![0u8; chunk_size as usize]; > + > + // Handle wraparound - calculate remaining bytes in buffer before wraparound > + let remaining = (word_size - data_offset) * std::mem::size_of::(); > + if chunk_size as usize <= remaining { > + // No wraparound > + unsafe { > + std::ptr::copy_nonoverlapping(data_ptr, message.as_mut_ptr(), chunk_size as usize); > + } > + } else { > + // Wraparound > + unsafe { > + std::ptr::copy_nonoverlapping(data_ptr, message.as_mut_ptr(), remaining); > + std::ptr::copy_nonoverlapping( > + shared_data as *const u8, > + message.as_mut_ptr().add(remaining), > + chunk_size as usize - remaining, > + ); > + } > + } > + > + // Reclaim chunk: clear header and update read pointer > + let new_read_pt = self.chunk_step(read_pt, chunk_size as usize); > + > + unsafe { > + // Clear chunk size > + *shared_data.add(read_pt as usize) = 0; > + > + // Set magic to DEAD with RELEASE > + let magic_ptr = shared_data.add(magic_offset) as *mut AtomicU32; > + (*magic_ptr).store(Self::CHUNK_MAGIC_DEAD, Ordering::Release); > + > + // Update read_pt > + self.read_pt.store(new_read_pt, Ordering::Relaxed); > + > + // Signal flow control - server is ready for next request > + if let Some(fc_ptr) = flow_control_ptr { > + let refcount = self.ref_count.load(Ordering::Acquire); > + if refcount == 2 { > + let fc_atomic = fc_ptr as *mut AtomicI32; > + (*fc_atomic).store(0, Ordering::Relaxed); > + } > + } > + } > + > + Ok(Some(message)) > + } > +} > + > +/// Flow control mechanism for ring buffer backpressure > +/// > +/// Implements libqb's flow control protocol for IPC communication. > +/// The server writes flow control values to shared memory, and clients > +/// read these values to determine if they should back off. > +/// > +/// Flow control values (matching libqb's rate limiting): > +/// - `OK`: Proceed with sending (QB_IPCS_RATE_NORMAL) > +/// - `SLOW_DOWN`: Approaching capacity, reduce send rate (QB_IPCS_RATE_OFF) > +/// - `STOP`: Queue full, do not send (QB_IPCS_RATE_OFF_2) > +/// > +/// ## Disabled Flow Control > +/// > +/// When constructed with a null fc_ptr, flow control is disabled and all > +/// operations become no-ops. This matches libqb's behavior for response/event > +/// rings which don't need backpressure signaling. > +/// > +/// Matches libqb's qb_ipc_shm_fc_get/qb_ipc_shm_fc_set (ipc_shm.c:176-195) > +pub struct FlowControl { > + /// Pointer to flow control field in shared memory (i32 atomic) > + /// Located in shared_user_data area of RingBufferShared > + /// If null, flow control is disabled (no-op mode) > + fc_ptr: *mut i32, > + /// Pointer to shared header for refcount checks > + /// If null, flow control is disabled (no-op mode) > + shared_hdr: *mut RingBufferShared, > +} > + > +impl FlowControl { > + /// OK to send - queue has space (QB_IPCS_RATE_NORMAL) > + pub const OK: i32 = 0; > + > + /// Slow down - queue approaching full (QB_IPCS_RATE_OFF) > + pub const SLOW_DOWN: i32 = 1; > + > + /// Stop sending - queue full (QB_IPCS_RATE_OFF_2) > + pub const STOP: i32 = 2; > + > + /// Create a new FlowControl instance > + /// > + /// Pass null pointers to create a disabled (no-op) flow control instance. > + /// This is used for response/event rings that don't need backpressure. > + /// > + /// # Safety > + /// - If fc_ptr is non-null, it must point to valid shared memory for an i32 > + /// - If shared_hdr is non-null, it must point to valid RingBufferShared > + /// - Both must remain valid for the lifetime of FlowControl (if non-null) > + unsafe fn new(fc_ptr: *mut i32, shared_hdr: *mut RingBufferShared) -> Self { > + // Initialize to 0 if enabled - server is ready for requests > + // libqb clients check: if (fc > 0 && fc <= fc_enable_max) return EAGAIN > + // So 0 means "ready to transmit", > 0 means "flow control active/blocked" > + if !fc_ptr.is_null() { > + let fc_atomic = fc_ptr as *mut AtomicI32; > + unsafe { > + (*fc_atomic).store(0, Ordering::Relaxed); > + } > + } > + > + Self { fc_ptr, shared_hdr } > + } > + > + /// Check if flow control is enabled > + #[inline] > + fn is_enabled(&self) -> bool { > + !self.fc_ptr.is_null() > + } > + > + /// Get the raw flow control pointer (for internal use) > + #[inline] > + fn fc_ptr(&self) -> *mut i32 { > + self.fc_ptr > + } > + > + /// Get flow control value > + /// > + /// Matches libqb's qb_ipc_shm_fc_get (ipc_shm.c:185-195). > + /// Returns: > + /// - 0: Ready for requests (or flow control disabled) > + /// - >0: Flow control active (client should retry) > + /// - <0: Error (not connected) > + /// > + /// Note: This method is primarily for libqb clients, not used internally by server > + #[allow(dead_code)] > + pub fn get(&self) -> i32 { > + if !self.is_enabled() { > + return 0; // Disabled = always ready > + } > + > + // Check if both client and server are connected (refcount == 2) > + let refcount = unsafe { (*self.shared_hdr).ref_count.load(Ordering::Acquire) }; > + if refcount != 2 { > + return -libc::ENOTCONN; > + } > + > + // Read flow control value atomically > + unsafe { > + let fc_atomic = self.fc_ptr as *const AtomicI32; > + (*fc_atomic).load(Ordering::Relaxed) > + } > + } > + > + /// Set flow control value > + /// > + /// Matches libqb's qb_ipc_shm_fc_set (ipc_shm.c:176-182). > + /// - fc_enable = 0: Ready for requests > + /// - fc_enable > 0: Flow control active (backpressure) > + /// > + /// No-op if flow control is disabled. > + pub fn set(&self, fc_enable: i32) { > + if !self.is_enabled() { > + return; // Disabled = no-op > + } > + > + tracing::trace!("Setting flow control to {}", fc_enable); > + unsafe { > + let fc_atomic = self.fc_ptr as *mut AtomicI32; > + (*fc_atomic).store(fc_enable, Ordering::Relaxed); > + } > + } > +} > + > +// Safety: FlowControl uses atomic operations for synchronization > +unsafe impl Send for FlowControl {} > +unsafe impl Sync for FlowControl {} > + > +/// Ring buffer handle > +/// > +/// Owns the mmap'd memory regions and provides async message-passing API. > +pub struct RingBuffer { > + /// Mmap of shared header > + _mmap_hdr: MmapMut, > + /// Circular mmap of shared data (2x virtual mapping) > + _mmap_data: CircularMmap, > + /// Pointer to shared header (inside _mmap_hdr) > + shared_hdr: *mut RingBufferShared, > + /// Pointer to shared data array (inside _mmap_data) > + shared_data: *mut u32, > + /// Flow control mechanism > + /// Always present, but may be disabled (no-op) for response/event rings > + pub flow_control: FlowControl, > + /// Notifier for when data becomes available (for consumers) > + data_available: Arc, > + /// Notifier for when space becomes available (for producers) > + space_available: Arc, > + /// Whether this instance created the ring buffer (and thus owns cleanup) > + /// Matches libqb's QB_RB_FLAG_CREATE flag > + is_creator: bool, > +} > + > +// Safety: RingBuffer uses atomic operations for synchronization > +unsafe impl Send for RingBuffer {} > +unsafe impl Sync for RingBuffer {} > + > +impl RingBuffer { > + /// Chunk margin for space calculations (in bytes) > + /// Matches libqb: sizeof(uint32_t) * (CHUNK_HEADER_WORDS + WORD_ALIGN + CACHE_LINE_WORDS) > + /// We don't use cache line alignment, so CACHE_LINE_WORDS = 0 > + const CHUNK_MARGIN: usize = 4 * (RingBufferShared::CHUNK_HEADER_WORDS + 1); > + > + /// Create a new ring buffer in shared memory > + /// > + /// Creates two files in `/dev/shm`: > + /// - `{base_dir}/qb-{name}-header` > + /// - `{base_dir}/qb-{name}-data` > + /// > + /// # Arguments > + /// - `base_dir`: Directory for shared memory files (typically "/dev/shm") > + /// - `name`: Ring buffer name > + /// - `size_bytes`: Size of ring buffer data in bytes > + /// - `shared_user_data_size`: Extra bytes to allocate after RingBufferShared for flow control > + /// > + /// The header file size will be: sizeof(RingBufferShared) + shared_user_data_size > + /// This matches libqb's behavior: sizeof(qb_ringbuffer_shared_s) + shared_user_data_size > + pub fn new( > + base_dir: impl AsRef, > + name: &str, > + size_bytes: usize, > + shared_user_data_size: usize, > + ) -> Result { > + let base_dir = base_dir.as_ref(); > + > + // Match libqb's size calculation exactly: > + // 1. Add CHUNK_MARGIN + 1 (13 bytes) > + // CHUNK_MARGIN = sizeof(uint32_t) * (CHUNK_HEADER_WORDS + WORD_ALIGN + CACHE_LINE_WORDS) > + // = 4 * (2 + 1 + 0) = 12 bytes (without cache line alignment) > + let size = size_bytes + Self::CHUNK_MARGIN + 1; > + > + // 2. Round up to page size (typically 4096) > + let page_size = 4096; // Standard page size on Linux > + let real_size = size.div_ceil(page_size) * page_size; > + > + // 3. Calculate word_size from rounded size > + let word_size = real_size / 4; > + > + tracing::info!( > + "Creating ring buffer '{}': size_bytes={}, real_size={}, word_size={} ({}words = {} bytes)", > + name, > + size_bytes, > + real_size, > + word_size, > + word_size, > + real_size > + ); > + > + // Create header file > + let hdr_filename = format!("qb-{name}-header"); > + let hdr_path = base_dir.join(&hdr_filename); > + > + let hdr_file = OpenOptions::new() > + .read(true) > + .write(true) > + .create(true) > + .truncate(true) > + .open(&hdr_path) > + .context("Failed to create header file")?; Should explicitely set .mode(0o600) here? > + > + // Resize to fit RingBufferShared structure + shared_user_data > + // This matches libqb: sizeof(qb_ringbuffer_shared_s) + shared_user_data_size > + let hdr_size = std::mem::size_of::() + shared_user_data_size; > + hdr_file > + .set_len(hdr_size as u64) > + .context("Failed to resize header file")?; > + > + // Mmap header > + let mut mmap_hdr = > + unsafe { MmapMut::map_mut(&hdr_file) }.context("Failed to mmap header")?; > + > + // Create data file path (needed for init_in_place) > + let data_filename = format!("qb-{name}-data"); > + let data_path = base_dir.join(&data_filename); > + > + // Initialize shared header > + let shared_hdr = mmap_hdr.as_mut_ptr() as *mut RingBufferShared; > + > + unsafe { > + (*shared_hdr).init_in_place(word_size as u32, &hdr_path, &data_path)?; > + } > + > + // Create data file > + let data_file = OpenOptions::new() > + .read(true) > + .write(true) > + .create(true) > + .truncate(true) > + .open(&data_path) > + .context("Failed to create data file")?; > + > + // Create data file with real_size (NOT 2x real_size!) > + // libqb creates the file with real_size, then uses circular mmap to map it TWICE > + // in consecutive virtual address space. The file itself is only real_size bytes. > + // During cleanup, libqb unmaps 2*real_size bytes (the circular mmap), but the > + // file itself remains real_size bytes. > + data_file > + .set_len(real_size as u64) > + .context("Failed to resize data file")?; > + > + // Create circular mmap - maps the file TWICE in consecutive virtual memory > + // This matches libqb's qb_sys_circular_mmap implementation > + let data_fd = data_file.as_raw_fd(); > + let mut mmap_data = unsafe { > + CircularMmap::new(data_fd, real_size).context("Failed to create circular mmap")? > + }; > + > + // Zero-initialize the data (only need to zero first half due to circular mapping) > + unsafe { > + mmap_data.zero_initialize(); > + } > + > + let shared_data = mmap_data.as_mut_ptr(); > + > + // Write sentinel value at end of buffer (matches libqb behavior) > + // This works now because we have circular mmap with 2x virtual space! > + unsafe { > + *shared_data.add(word_size) = 5; > + } > + > + // Initialize flow control > + // If shared_user_data_size >= sizeof(i32), flow control is enabled (for request ring) > + // Otherwise, flow control is disabled (for response/event rings) > + let flow_control = if shared_user_data_size >= std::mem::size_of::() { > + unsafe { > + // Get pointer to user_data field within the structure > + // This matches libqb's: return rb->shared_hdr->user_data; > + let fc_ptr = std::ptr::addr_of_mut!((*shared_hdr).user_data) as *mut i32; > + FlowControl::new(fc_ptr, shared_hdr) > + } > + } else { > + // Disabled flow control (null pointers = no-op mode) > + unsafe { FlowControl::new(std::ptr::null_mut(), std::ptr::null_mut()) } > + }; > + > + Ok(Self { > + _mmap_hdr: mmap_hdr, > + _mmap_data: mmap_data, > + shared_hdr, > + shared_data, > + flow_control, > + data_available: Arc::new(Notify::new()), > + space_available: Arc::new(Notify::new()), > + is_creator: true, // This instance created the ring buffer > + }) > + } > + > + /// Send a message into the ring buffer (async) > + /// > + /// Allocates a chunk, writes the message data, and commits the chunk. > + /// Awaits if there's insufficient space. > + pub async fn send(&mut self, message: &[u8]) -> Result<()> { > + loop { > + match self.try_send(message) { > + Ok(()) => { > + // Notify consumers that data is available > + self.data_available.notify_one(); nobody waits on this? > + return Ok(()); > + } > + Err(e) if e.to_string().contains("Insufficient space") => { Can we please replace this string matching by introducing a enum error variant? > + // Wait for space to become available > + self.space_available.notified().await; Tokio notify only works inside one process. But another process frees up the space. So this would hang likely forever? > + continue; > + } > + Err(e) => return Err(e), > + } > + } > + } > + > + /// Try to send a message without blocking > + /// > + /// Returns an error if there's insufficient space. > + pub fn try_send(&mut self, message: &[u8]) -> Result<()> { > + // Check if we have enough space > + if !unsafe { (*self.shared_hdr).chunk_fits(message.len(), Self::CHUNK_MARGIN) } { > + let space_free = self.space_free(); > + let required = Self::CHUNK_MARGIN + message.len(); > + anyhow::bail!( > + "Insufficient space: need {required} bytes, have {space_free} bytes free" > + ); > + } > + > + // Write the chunk using RingBufferShared > + unsafe { (*self.shared_hdr).write_chunk(self.shared_data, message)? }; > + > + Ok(()) > + } > + > + /// Receive a message from the ring buffer (async) > + /// > + /// Awaits if no message is available. > + /// After processing, the chunk is automatically reclaimed. > + /// > + /// ## Implementation Note > + /// > + /// libqb uses semaphore-based blocking (sem_timedwait) to wait for data > + /// (see qb_rb_chunk_peek in libqb/lib/ringbuffer.c). > + /// > + /// We use tokio's `spawn_blocking` to wait on the POSIX semaphore without > + /// blocking the async runtime. This provides true event-driven behavior with > + /// zero polling overhead, while maintaining compatibility with libqb clients. > + pub async fn recv(&mut self) -> Result> { > + loop { > + // Wait on POSIX semaphore asynchronously > + // This matches libqb's timedwait_fn behavior in qb_rb_chunk_peek > + // SAFETY: The semaphore is properly initialized in new() and remains > + // valid for the lifetime of RingBuffer > + unsafe { (*self.shared_hdr).posix_sem.wait().await? }; > + > + // Semaphore was decremented, data should be available > + // Read and reclaim the chunk > + match self.recv_after_semwait()? { > + Some(data) => { > + // Notify producers that space is available > + self.space_available.notify_one(); > + return Ok(data); > + } > + None => { > + // Spurious wakeup or race condition - semaphore was decremented > + // but no valid data found. This shouldn't happen in normal operation. > + tracing::warn!("Spurious semaphore wakeup detected, retrying"); > + continue; > + } > + } > + } > + } > + > + /// Receive a message after semaphore has been decremented > + /// > + /// This is called after `PosixSem::wait()` has successfully decremented > + /// the semaphore. It reads the chunk data and reclaims the chunk. > + /// > + /// Returns `None` if the buffer is empty despite semaphore being decremented > + /// (which indicates a bug or race condition). > + fn recv_after_semwait(&mut self) -> Result>> { > + // Get fc_ptr if flow control is enabled, otherwise null > + let fc_ptr = if self.flow_control.is_enabled() { > + Some(self.flow_control.fc_ptr()) > + } else { > + None > + }; > + unsafe { (*self.shared_hdr).read_chunk(self.shared_data, fc_ptr) } > + } > + > + /// Calculate free space in the ring buffer (in bytes) > + fn space_free(&self) -> usize { > + unsafe { (*self.shared_hdr).space_free_bytes() } > + } > +} > + > +impl Drop for RingBuffer { > + fn drop(&mut self) { > + // Decrement ref count > + let ref_count = unsafe { (*self.shared_hdr).ref_count.fetch_sub(1, Ordering::AcqRel) }; > + > + tracing::debug!( > + "Dropping ring buffer, ref_count: {} -> {}", > + ref_count, > + ref_count - 1 > + ); > + > + // If last reference AND we created it, clean up semaphore and files > + // This matches libqb's behavior: only the creator (QB_RB_FLAG_CREATE) destroys the semaphore > + if ref_count == 1 && self.is_creator { > + unsafe { > + // Destroy the semaphore before cleaning up the mmap > + // Matches libqb's cleanup in qb_rb_close_helper > + if let Err(e) = (*self.shared_hdr).posix_sem.destroy() { > + tracing::warn!("Failed to destroy semaphore: {}", e); > + } > + > + let hdr_path = > + std::ffi::CStr::from_ptr((*self.shared_hdr).hdr_path.as_ptr() as *const i8); > + let data_path = > + std::ffi::CStr::from_ptr((*self.shared_hdr).data_path.as_ptr() as *const i8); > + > + if let Ok(hdr_path_str) = hdr_path.to_str() > + && !hdr_path_str.is_empty() > + { > + let _ = std::fs::remove_file(hdr_path_str); > + tracing::debug!("Removed header file: {}", hdr_path_str); > + } > + > + if let Ok(data_path_str) = data_path.to_str() > + && !data_path_str.is_empty() > + { > + let _ = std::fs::remove_file(data_path_str); > + tracing::debug!("Removed data file: {}", data_path_str); > + } > + } > + } > + } > +} > + > +#[cfg(test)] > +mod tests { > + use super::*; > + > + #[tokio::test] > + async fn test_ringbuffer_basic() -> Result<()> { > + let temp_dir = tempfile::tempdir()?; > + let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?; > + > + // Send a message > + rb.send(b"hello world").await?; > + > + // Receive the message > + let msg = rb.recv().await?; > + assert_eq!(msg, b"hello world"); > + > + Ok(()) > + } > + > + #[tokio::test] > + async fn test_ringbuffer_multiple_messages() -> Result<()> { > + let temp_dir = tempfile::tempdir()?; > + let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?; > + > + // Send multiple messages > + rb.send(b"message 1").await?; > + rb.send(b"message 2").await?; > + rb.send(b"message 3").await?; > + > + // Receive in order > + assert_eq!(rb.recv().await?, b"message 1"); > + assert_eq!(rb.recv().await?, b"message 2"); > + assert_eq!(rb.recv().await?, b"message 3"); > + > + Ok(()) > + } > + > + #[tokio::test] > + async fn test_ringbuffer_nonblocking_send() -> Result<()> { > + let temp_dir = tempfile::tempdir()?; > + let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?; > + > + // Test try_send (non-blocking send) with async recv > + rb.try_send(b"data")?; > + let msg = rb.recv().await?; > + assert_eq!(msg, b"data"); > + > + Ok(()) > + } > + > + #[tokio::test] > + async fn test_ringbuffer_wraparound() -> Result<()> { > + let temp_dir = tempfile::tempdir()?; > + let mut rb = RingBuffer::new(temp_dir.path(), "test", 256, 0)?; > + > + // Fill and drain to force wraparound > + for _ in 0..10 { > + rb.send(b"data").await?; > + rb.recv().await?; > + } > + > + // Should still work > + rb.send(b"after wrap").await?; > + assert_eq!(rb.recv().await?, b"after wrap"); > + > + Ok(()) > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs > new file mode 100644 > index 00000000..73d63de0 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs > @@ -0,0 +1,278 @@ > +/// Main libqb IPC server implementation > +/// > +/// This module contains the Server struct and its implementation, > +/// including connection acceptance and server lifecycle management. > +use anyhow::{Context, Result}; > +use parking_lot::Mutex; > +use std::collections::HashMap; > +use std::sync::Arc; > +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; > +use tokio::net::UnixListener; > +use tokio_util::sync::CancellationToken; > + > +use super::connection::QbConnection; > +use super::handler::Handler; > +use super::socket::bind_abstract_socket; > + > +/// Server-level connection statistics (matches libqb qb_ipcs_stats) > +#[derive(Debug, Default)] > +pub struct ServerStats { > + /// Number of currently active connections > + pub active_connections: AtomicUsize, > + /// Total number of closed connections since server start > + pub closed_connections: AtomicUsize, > +} > + > +impl ServerStats { > + fn new() -> Self { > + Self { > + active_connections: AtomicUsize::new(0), > + closed_connections: AtomicUsize::new(0), > + } > + } > + > + /// Increment active connections count (new connection established) > + fn connection_created(&self) { > + self.active_connections.fetch_add(1, Ordering::Relaxed); > + tracing::debug!( > + active = self.active_connections.load(Ordering::Relaxed), > + closed = self.closed_connections.load(Ordering::Relaxed), > + "Connection created" > + ); > + } > + > + /// Decrement active, increment closed (connection terminated) > + fn connection_closed(&self) { > + self.active_connections.fetch_sub(1, Ordering::Relaxed); > + self.closed_connections.fetch_add(1, Ordering::Relaxed); > + tracing::debug!( > + active = self.active_connections.load(Ordering::Relaxed), > + closed = self.closed_connections.load(Ordering::Relaxed), > + "Connection closed" > + ); > + } > + > + /// Get current statistics (for monitoring/debugging) > + pub fn get(&self) -> (usize, usize) { > + ( > + self.active_connections.load(Ordering::Relaxed), > + self.closed_connections.load(Ordering::Relaxed), > + ) > + } > +} > + > +/// libqb-compatible IPC server > +pub struct Server { > + service_name: String, > + > + // Setup socket (SOCK_STREAM) - accepts new connections > + setup_listener: Option>, > + > + // Per-connection state > + connections: Arc>>, > + next_conn_id: Arc, > + > + // Connection statistics (matches libqb behavior) > + stats: Arc, > + > + // Message handler (trait object, also handles authentication) > + handler: Arc, > + > + // Cancellation token for graceful shutdown > + cancellation_token: CancellationToken, > +} > + > +impl Server { > + /// Create a new libqb-compatible IPC server > + /// > + /// Uses Linux abstract Unix sockets for IPC (no filesystem paths needed). > + /// > + /// # Arguments > + /// * `service_name` - Service name (e.g., "pve2"), used as abstract socket name > + /// * `handler` - Handler implementing the Handler trait (handles both authentication and requests) > + pub fn new(service_name: &str, handler: impl Handler + 'static) -> Self { > + Self { > + service_name: service_name.to_string(), > + setup_listener: None, > + connections: Arc::new(Mutex::new(HashMap::new())), > + next_conn_id: Arc::new(AtomicU64::new(1)), > + stats: Arc::new(ServerStats::new()), > + handler: Arc::new(handler), > + cancellation_token: CancellationToken::new(), > + } > + } > + > + /// Start the IPC server > + /// > + /// Creates abstract Unix socket that libqb clients can connect to > + pub fn start(&mut self) -> Result<()> { > + tracing::info!( > + "Starting libqb-compatible IPC server: {}", > + self.service_name > + ); > + > + // Create abstract Unix socket (no filesystem paths needed) > + let std_listener = > + bind_abstract_socket(&self.service_name).context("Failed to bind abstract socket")?; > + > + // Convert to tokio listener > + std_listener.set_nonblocking(true)?; > + let listener = UnixListener::from_std(std_listener)?; > + > + tracing::info!("Bound abstract Unix socket: @{}", self.service_name); > + > + let listener_arc = Arc::new(listener); > + self.setup_listener = Some(listener_arc.clone()); > + > + // Start connection acceptor task > + let context = AcceptorContext { > + listener: listener_arc, > + service_name: self.service_name.clone(), > + connections: self.connections.clone(), > + next_conn_id: self.next_conn_id.clone(), > + stats: self.stats.clone(), > + handler: self.handler.clone(), > + cancellation_token: self.cancellation_token.child_token(), > + }; > + > + tokio::spawn(async move { > + context.run().await; > + }); > + > + tracing::info!("libqb IPC server started: {}", self.service_name); > + Ok(()) > + } > + > + /// Stop the IPC server > + pub fn stop(&mut self) { > + tracing::info!("Stopping libqb IPC server: {}", self.service_name); > + > + // Signal all tasks to stop > + self.cancellation_token.cancel(); > + > + // Close all connections > + let connections = std::mem::take(&mut *self.connections.lock()); > + let num_connections = connections.len(); > + > + for (_id, conn) in connections { > + // Clean up ring buffer files > + for rb_path in &conn.ring_buffer_paths { > + if let Err(e) = std::fs::remove_file(rb_path) { > + tracing::debug!( > + "Failed to remove ring buffer file {} (may already be cleaned up): {}", > + rb_path.display(), > + e > + ); > + } > + } > + > + // Update statistics > + self.stats.connection_closed(); > + > + // Task handles will be aborted when dropped > + } > + > + // Final stats > + if num_connections > 0 { > + let (active, closed) = self.stats.get(); > + tracing::info!( > + "Closed {} connections (final stats: active={}, closed={})", > + num_connections, > + active, > + closed > + ); > + } > + > + self.setup_listener = None; > + > + tracing::info!("libqb IPC server stopped"); > + } > +} > + > +impl Drop for Server { > + fn drop(&mut self) { > + self.stop(); > + } > +} > + > +/// Context for the connection acceptor task > +/// > +/// Bundles all the state needed by the acceptor loop to avoid passing many parameters. > +struct AcceptorContext { > + listener: Arc, > + service_name: String, > + connections: Arc>>, > + next_conn_id: Arc, > + stats: Arc, > + handler: Arc, > + cancellation_token: CancellationToken, > +} > + > +impl AcceptorContext { > + /// Run the connection acceptor loop > + /// > + /// Accepts new connections and spawns handler tasks for each. > + async fn run(self) { > + tracing::debug!("libqb IPC connection acceptor started"); > + > + loop { > + // Accept new connection with cancellation support > + let accept_result = tokio::select! { > + _ = self.cancellation_token.cancelled() => { > + tracing::debug!("Connection acceptor cancelled"); > + break; > + } > + result = self.listener.accept() => result, > + }; > + > + let (stream, _addr) = match accept_result { > + Ok((stream, addr)) => (stream, addr), > + Err(e) => { > + if !self.cancellation_token.is_cancelled() { > + tracing::error!("Error accepting connection: {}", e); > + } > + break; > + } > + }; > + > + tracing::debug!("Accepted new setup connection"); > + > + // Handle connection > + let conn_id = self.next_conn_id.fetch_add(1, Ordering::SeqCst); > + match QbConnection::accept( > + stream, > + conn_id, > + &self.service_name, > + self.handler.clone(), > + self.cancellation_token.child_token(), > + ) > + .await > + { > + Ok(conn) => { > + self.connections.lock().insert(conn_id, conn); Connections are never removed which could result into memory leak. > + // Update statistics > + self.stats.connection_created(); > + } > + Err(e) => { > + tracing::error!("Failed to accept connection {}: {}", conn_id, e); > + } > + } > + } > + > + tracing::debug!("libqb IPC connection acceptor finished"); > + } > +} > + > +#[cfg(test)] > +mod tests { > + use crate::protocol::*; > + > + #[test] > + fn test_header_sizes() { > + // Verify C struct compatibility > + assert_eq!(std::mem::size_of::(), 16); > + assert_eq!(std::mem::align_of::(), 8); > + assert_eq!(std::mem::size_of::(), 24); > + assert_eq!(std::mem::align_of::(), 8); > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs > new file mode 100644 > index 00000000..5831b329 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs > @@ -0,0 +1,84 @@ > +/// Abstract Unix socket utilities > +/// > +/// This module provides functions for working with Linux abstract Unix sockets, > +/// which are used by libqb for IPC communication. > +use anyhow::Result; > +use std::os::unix::io::FromRawFd; > +use std::os::unix::net::UnixListener; > + > +/// Bind to an abstract Unix socket (Linux-specific) > +/// > +/// Abstract sockets are identified by a name in the kernel's socket namespace, > +/// not a filesystem path. They are automatically removed when all references are closed. > +/// > +/// libqb clients create abstract sockets with FULL 108-byte sun_path (null-padded). > +/// Linux abstract sockets are length-sensitive, so we must match exactly. > +pub(super) fn bind_abstract_socket(name: &str) -> Result { > + // Create a Unix socket using libc directly > + let sock_fd = unsafe { libc::socket(libc::AF_UNIX, libc::SOCK_STREAM, 0) }; > + if sock_fd < 0 { > + anyhow::bail!( > + "Failed to create Unix socket: {}", > + std::io::Error::last_os_error() > + ); > + } > + > + // RAII guard to ensure socket is closed on error > + struct SocketGuard(i32); > + impl Drop for SocketGuard { > + fn drop(&mut self) { > + unsafe { libc::close(self.0) }; > + } > + } > + let guard = SocketGuard(sock_fd); > + > + // Create sockaddr_un with full 108-byte abstract address (matching libqb) > + // libqb format: sun_path[0] = '\0', sun_path[1..] = "name\0\0..." (null-padded) > + let mut addr: libc::sockaddr_un = unsafe { std::mem::zeroed() }; > + addr.sun_family = libc::AF_UNIX as libc::sa_family_t; > + > + // sun_path[0] is already 0 (abstract socket marker) > + // Copy name starting at sun_path[1] > + let name_bytes = name.as_bytes(); > + let copy_len = name_bytes.len().min(107); // Leave room for initial \0 > + unsafe { > + std::ptr::copy_nonoverlapping( > + name_bytes.as_ptr(), > + addr.sun_path.as_mut_ptr().offset(1) as *mut u8, > + copy_len, > + ); > + } > + > + // Use FULL sockaddr_un length for libqb compatibility! > + // libqb clients use the full 110-byte structure (2 + 108) when connecting, > + // so we MUST bind with the same length. Verified via strace. > + let addr_len = std::mem::size_of::() as libc::socklen_t; > + let bind_res = unsafe { > + libc::bind( > + sock_fd, > + &addr as *const _ as *const libc::sockaddr, > + addr_len, > + ) > + }; > + if bind_res < 0 { > + anyhow::bail!( > + "Failed to bind abstract socket: {}", > + std::io::Error::last_os_error() > + ); > + } > + > + // Set socket to listen mode (backlog = 128) > + let listen_res = unsafe { libc::listen(sock_fd, 128) }; > + if listen_res < 0 { > + anyhow::bail!( > + "Failed to listen on socket: {}", > + std::io::Error::last_os_error() > + ); > + } > + > + // Convert raw fd to UnixListener (takes ownership, forget guard) > + std::mem::forget(guard); > + let listener = unsafe { UnixListener::from_raw_fd(sock_fd) }; > + > + Ok(listener) > +} > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs b/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs > new file mode 100644 > index 00000000..f8e541b0 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs > @@ -0,0 +1,450 @@ > +//! Authentication tests for pmxcfs-ipc > +//! > +//! These tests verify that the Handler::authenticate() mechanism works correctly > +//! for different authentication policies. > +//! > +//! Note: These tests use real Unix sockets, so they test authentication behavior > +//! from the server's perspective. The UID/GID will be the test process's credentials, > +//! so we test the Handler logic rather than OS-level credential checking. > +use async_trait::async_trait; > +use pmxcfs_ipc::{Handler, Permissions, Request, Response, Server}; > +use pmxcfs_test_utils::wait_for_condition_blocking; > +use std::sync::Arc; > +use std::sync::atomic::{AtomicU32, Ordering}; > +use std::thread; > +use std::time::Duration; > + > +/// Helper to create a unique service name for each test > +fn unique_service_name() -> String { > + static COUNTER: AtomicU32 = AtomicU32::new(0); > + format!("auth-test-{}", COUNTER.fetch_add(1, Ordering::SeqCst)) > +} > + > +/// Helper to connect using the qb_wire_compat FFI client > +/// Returns true if connection succeeded, false if rejected > +fn try_connect(service_name: &str) -> bool { > + use std::ffi::CString; > + > + #[repr(C)] > + struct QbIpccConnection { > + _private: [u8; 0], > + } > + > + #[link(name = "qb")] > + unsafe extern "C" { > + fn qb_ipcc_connect(name: *const libc::c_char, max_msg_size: usize) > + -> *mut QbIpccConnection; > + fn qb_ipcc_disconnect(conn: *mut QbIpccConnection); > + } > + > + let name = CString::new(service_name).expect("Invalid service name"); > + let conn = unsafe { qb_ipcc_connect(name.as_ptr(), 8192) }; > + > + let success = !conn.is_null(); > + > + if success { > + unsafe { qb_ipcc_disconnect(conn) }; > + } > + > + success > +} > + > +// ============================================================================ > +// Test Handlers with Different Authentication Policies > +// ============================================================================ > + > +/// Handler that accepts all connections with read-write access > +struct AcceptAllHandler; > + > +#[async_trait] > +impl Handler for AcceptAllHandler { > + fn authenticate(&self, _uid: u32, _gid: u32) -> Option { > + Some(Permissions::ReadWrite) > + } > + > + async fn handle(&self, _request: Request) -> Response { > + Response::ok(b"test".to_vec()) > + } > +} > + > +/// Handler that rejects all connections > +struct RejectAllHandler; > + > +#[async_trait] > +impl Handler for RejectAllHandler { > + fn authenticate(&self, _uid: u32, _gid: u32) -> Option { > + None > + } > + > + async fn handle(&self, _request: Request) -> Response { > + Response::ok(b"test".to_vec()) > + } > +} > + > +/// Handler that only accepts root (uid=0) > +struct RootOnlyHandler; > + > +#[async_trait] > +impl Handler for RootOnlyHandler { > + fn authenticate(&self, uid: u32, _gid: u32) -> Option { > + if uid == 0 { > + Some(Permissions::ReadWrite) > + } else { > + None > + } > + } > + > + async fn handle(&self, _request: Request) -> Response { > + Response::ok(b"test".to_vec()) > + } > +} > + > +/// Handler that tracks authentication calls > +struct TrackingHandler { > + call_count: Arc, > + last_uid: Arc, > + last_gid: Arc, > +} > + > +impl TrackingHandler { > + fn new() -> (Self, Arc, Arc, Arc) { > + let call_count = Arc::new(AtomicU32::new(0)); > + let last_uid = Arc::new(AtomicU32::new(0)); > + let last_gid = Arc::new(AtomicU32::new(0)); > + > + ( > + Self { > + call_count: call_count.clone(), > + last_uid: last_uid.clone(), > + last_gid: last_gid.clone(), > + }, > + call_count, > + last_uid, > + last_gid, > + ) > + } > +} > + > +#[async_trait] > +impl Handler for TrackingHandler { > + fn authenticate(&self, uid: u32, gid: u32) -> Option { > + self.call_count.fetch_add(1, Ordering::SeqCst); > + self.last_uid.store(uid, Ordering::SeqCst); > + self.last_gid.store(gid, Ordering::SeqCst); > + Some(Permissions::ReadWrite) > + } > + > + async fn handle(&self, _request: Request) -> Response { > + Response::ok(b"test".to_vec()) > + } > +} > + > +/// Handler that grants read-only access to non-root > +struct ReadOnlyForNonRootHandler; > + > +#[async_trait] > +impl Handler for ReadOnlyForNonRootHandler { > + fn authenticate(&self, uid: u32, _gid: u32) -> Option { > + if uid == 0 { > + Some(Permissions::ReadWrite) > + } else { > + Some(Permissions::ReadOnly) > + } > + } > + > + async fn handle(&self, request: Request) -> Response { > + // read_only field is visible to the handler via the connection > + // For testing purposes, just accept requests > + Response::ok(format!("handled msg_id {}", request.msg_id).into_bytes()) > + } > +} > + > +// ============================================================================ > +// Helper to start server in background thread > +// ============================================================================ > + > +fn start_server(service_name: String, handler: H) -> thread::JoinHandle<()> { > + thread::spawn(move || { > + let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); > + rt.block_on(async { > + let mut server = Server::new(&service_name, handler); > + server.start().expect("Server startup failed"); > + std::future::pending::<()>().await; > + }); > + }) > +} > + > +/// Wait for server to be ready by checking if socket file exists > +fn wait_for_server_ready(service_name: &str) { There is already another wait_for_server_ready? Please check. Can we have shared test utils? > + // The socket is created in /dev/shm/qb-{service_name}-* > + // We'll just try to connect repeatedly until successful or timeout > + assert!( > + wait_for_condition_blocking( > + || { > + // Try a quick connection attempt > + // For servers that accept connections, this will succeed > + // For servers that reject, the socket will at least exist > + > + let socket_pattern = format!("/dev/shm/qb-{service_name}-"); > + // Check if any socket file matching the pattern exists > + if let Ok(entries) = std::fs::read_dir("/dev/shm") { > + for entry in entries.flatten() { > + if let Ok(name) = entry.file_name().into_string() > + && name.starts_with(&socket_pattern) > + { > + return true; > + } > + } > + } > + false > + }, > + Duration::from_secs(5), > + Duration::from_millis(10), > + ), > + "Server should be ready within 5 seconds" The helper can time out when the server is already listening for "reject all" case because it never creates ringbuffer files in that case. > + ); > +} > + > +// ============================================================================ > +// Tests > +// ============================================================================ > + > +#[test] > +#[ignore] // Requires libqb-dev > +fn test_accept_all_handler() { > + let service_name = unique_service_name(); > + let _server = start_server(service_name.clone(), AcceptAllHandler); > + > + wait_for_server_ready(&service_name); > + > + assert!( > + try_connect(&service_name), > + "AcceptAllHandler should accept connection" > + ); > +} > + > +#[test] > +#[ignore] // Requires libqb-dev > +fn test_reject_all_handler() { > + let service_name = unique_service_name(); > + let _server = start_server(service_name.clone(), RejectAllHandler); > + > + wait_for_server_ready(&service_name); > + > + assert!( > + !try_connect(&service_name), > + "RejectAllHandler should reject connection" > + ); > +} > + > +#[test] > +#[ignore] // Requires libqb-dev > +fn test_root_only_handler() { > + let service_name = unique_service_name(); > + let _server = start_server(service_name.clone(), RootOnlyHandler); > + > + wait_for_server_ready(&service_name); > + > + let connected = try_connect(&service_name); > + > + // Get current uid > + let current_uid = unsafe { libc::getuid() }; > + > + if current_uid == 0 { > + assert!( > + connected, > + "RootOnlyHandler should accept connection when running as root" > + ); > + } else { > + assert!( > + !connected, > + "RootOnlyHandler should reject connection when not running as root (uid={current_uid})" > + ); > + } > +} > + > +#[test] > +#[ignore] // Requires libqb-dev > +fn test_authentication_called_with_credentials() { > + let service_name = unique_service_name(); > + let (handler, call_count, last_uid, last_gid) = TrackingHandler::new(); > + let _server = start_server(service_name.clone(), handler); > + > + wait_for_server_ready(&service_name); > + > + let current_uid = unsafe { libc::getuid() }; > + let current_gid = unsafe { libc::getgid() }; > + > + assert_eq!( > + call_count.load(Ordering::SeqCst), > + 0, > + "Should not be called yet" > + ); > + > + let connected = try_connect(&service_name); > + > + assert!(connected, "TrackingHandler should accept connection"); > + assert_eq!( > + call_count.load(Ordering::SeqCst), > + 1, > + "authenticate() should be called once" > + ); > + assert_eq!( > + last_uid.load(Ordering::SeqCst), > + current_uid, > + "authenticate() should receive correct uid" > + ); > + assert_eq!( > + last_gid.load(Ordering::SeqCst), > + current_gid, > + "authenticate() should receive correct gid" > + ); > +} > + > +#[test] > +#[ignore] // Requires libqb-dev > +fn test_multiple_connections_call_authenticate_each_time() { > + let service_name = unique_service_name(); > + let (handler, call_count, _, _) = TrackingHandler::new(); > + let _server = start_server(service_name.clone(), handler); > + > + wait_for_server_ready(&service_name); > + > + // First connection > + assert!(try_connect(&service_name)); > + assert_eq!(call_count.load(Ordering::SeqCst), 1); > + > + // Second connection > + assert!(try_connect(&service_name)); > + assert_eq!(call_count.load(Ordering::SeqCst), 2); > + > + // Third connection > + assert!(try_connect(&service_name)); > + assert_eq!(call_count.load(Ordering::SeqCst), 3); > +} > + > +#[test] > +#[ignore] // Requires libqb-dev > +fn test_read_only_permissions_accepted() { > + let service_name = unique_service_name(); > + let _server = start_server(service_name.clone(), ReadOnlyForNonRootHandler); > + > + wait_for_server_ready(&service_name); > + > + // Connection should succeed regardless of whether we get ReadOnly or ReadWrite > + // (both are accepted, just with different permissions) > + assert!( > + try_connect(&service_name), > + "ReadOnlyForNonRootHandler should accept connections with appropriate permissions" > + ); > +} > + > +/// Test that demonstrates the authentication policy is enforced at connection time > +#[test] > +#[ignore] // Requires libqb-dev > +fn test_authentication_enforced_at_connection_time() { > + // This test verifies that authentication happens during connection setup, > + // not during request handling > + let service_name = unique_service_name(); > + let _server = start_server(service_name.clone(), RejectAllHandler); > + > + wait_for_server_ready(&service_name); > + > + // Connection should fail immediately, before any request is sent > + let start = std::time::Instant::now(); > + let connected = try_connect(&service_name); > + let duration = start.elapsed(); > + > + assert!(!connected, "Connection should be rejected"); > + assert!( > + duration < Duration::from_millis(100), > + "Rejection should happen quickly during handshake, not during request processing" > + ); > +} > + > +#[cfg(test)] > +mod policy_examples { > + use super::*; > + > + /// Example: Handler that mimics Proxmox VE authentication policy > + /// - Root (uid=0) gets read-write > + /// - www-data (uid=33) gets read-only (for web UI) > + /// - Others are rejected > + struct ProxmoxStyleHandler; > + > + #[async_trait] > + impl Handler for ProxmoxStyleHandler { > + fn authenticate(&self, uid: u32, _gid: u32) -> Option { > + match uid { > + 0 => Some(Permissions::ReadWrite), // root > + 33 => Some(Permissions::ReadOnly), // www-data > + _ => None, // reject others > + } > + } > + > + async fn handle(&self, request: Request) -> Response { > + // In real implementation, would check request.read_only > + // to enforce read-only restrictions > + Response::ok(format!("msg_id {}", request.msg_id).into_bytes()) > + } > + } > + > + #[test] > + #[ignore] // Requires libqb-dev > + fn test_proxmox_style_policy() { > + let service_name = unique_service_name(); > + let _server = start_server(service_name.clone(), ProxmoxStyleHandler); > + > + wait_for_server_ready(&service_name); > + > + let current_uid = unsafe { libc::getuid() }; > + let connected = try_connect(&service_name); > + > + match current_uid { > + 0 => assert!(connected, "Root should be accepted"), > + 33 => assert!(connected, "www-data should be accepted"), > + _ => assert!(!connected, "Other users should be rejected"), > + } > + } > + > + /// Example: Handler that uses group-based authentication > + struct GroupBasedHandler { > + allowed_gid: u32, > + } > + > + impl GroupBasedHandler { > + fn new(allowed_gid: u32) -> Self { > + Self { allowed_gid } > + } > + } > + > + #[async_trait] > + impl Handler for GroupBasedHandler { > + fn authenticate(&self, _uid: u32, gid: u32) -> Option { > + if gid == self.allowed_gid { > + Some(Permissions::ReadWrite) > + } else { > + None > + } > + } > + > + async fn handle(&self, _request: Request) -> Response { > + Response::ok(b"ok".to_vec()) > + } > + } > + > + #[test] > + #[ignore] // Requires libqb-dev > + fn test_group_based_authentication() { > + let service_name = unique_service_name(); > + let current_gid = unsafe { libc::getgid() }; > + let _server = start_server(service_name.clone(), GroupBasedHandler::new(current_gid)); > + > + wait_for_server_ready(&service_name); > + > + assert!( > + try_connect(&service_name), > + "Should accept connection from same group" > + ); > + } > +} > diff --git a/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs b/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs > new file mode 100644 > index 00000000..8c0db962 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs > @@ -0,0 +1,413 @@ > +//! Wire protocol compatibility test with libqb C clients > +//! > +//! This integration test verifies that our Rust Server is fully compatible > +//! with real libqb C clients by using libqb's client API via FFI. > +//! > +//! Run with: cargo test --package pmxcfs-ipc --test qb_wire_compat -- --ignored --nocapture > +//! > +//! Requires: libqb-dev installed > + > +use pmxcfs_test_utils::wait_for_condition_blocking; > +use std::ffi::CString; > +use std::thread; > +use std::time::Duration; > + > +// ============================================================================ > +// Minimal libqb FFI bindings (client-side only) > +// ============================================================================ > + > +/// libqb request header matching C's __attribute__ ((aligned(8))) > +/// Each field is i32 with 8-byte alignment, achieved via explicit padding > +#[repr(C, align(8))] > +#[derive(Debug, Copy, Clone)] > +struct QbIpcRequestHeader { > + id: i32, // 4 bytes > + _pad1: u32, // 4 bytes padding > + size: i32, // 4 bytes > + _pad2: u32, // 4 bytes padding > +} > + > +/// libqb response header matching C's __attribute__ ((aligned(8))) > +/// Each field is i32 with 8-byte alignment, achieved via explicit padding > +#[repr(C, align(8))] > +#[derive(Debug, Copy, Clone)] > +struct QbIpcResponseHeader { > + id: i32, // 4 bytes > + _pad1: u32, // 4 bytes padding > + size: i32, // 4 bytes > + _pad2: u32, // 4 bytes padding > + error: i32, // 4 bytes > + _pad3: u32, // 4 bytes padding > +} > + > +// Opaque type for connection handle > +#[repr(C)] > +struct QbIpccConnection { > + _private: [u8; 0], > +} > + > +#[link(name = "qb")] > +unsafe extern "C" { > + /// Connect to a QB IPC service > + /// Returns NULL on failure > + fn qb_ipcc_connect(name: *const libc::c_char, max_msg_size: usize) -> *mut QbIpccConnection; > + > + /// Send request and receive response (with iovec) > + /// Returns number of bytes received, or negative errno on error > + fn qb_ipcc_sendv_recv( > + conn: *mut QbIpccConnection, > + iov: *const libc::iovec, > + iov_len: u32, > + res_buf: *mut libc::c_void, > + res_buf_size: usize, > + timeout_ms: i32, > + ) -> libc::ssize_t; > + > + /// Disconnect from service > + fn qb_ipcc_disconnect(conn: *mut QbIpccConnection); > + > + /// Initialize libqb logging > + fn qb_log_init(name: *const libc::c_char, facility: i32, priority: i32); > + > + /// Control log targets > + fn qb_log_ctl(target: i32, conf: i32, arg: i32) -> i32; > + > + /// Filter control > + fn qb_log_filter_ctl( > + target: i32, > + op: i32, > + type_: i32, > + text: *const libc::c_char, > + priority: i32, > + ) -> i32; > +} > + > +// Log targets > +const QB_LOG_STDERR: i32 = 2; > + > +// Log control operations > +const QB_LOG_CONF_ENABLED: i32 = 1; > + > +// Log filter operations > +const QB_LOG_FILTER_ADD: i32 = 0; > +const QB_LOG_FILTER_FILE: i32 = 1; > + > +// Log levels (from syslog.h) > +const LOG_TRACE: i32 = 8; // LOG_DEBUG + 1 > + > +// ============================================================================ > +// Safe Rust wrapper around libqb client > +// ============================================================================ > + > +struct QbIpcClient { > + conn: *mut QbIpccConnection, > +} > + > +impl QbIpcClient { > + fn connect(service_name: &str, max_msg_size: usize) -> Result { > + let name = CString::new(service_name).map_err(|e| format!("Invalid service name: {e}"))?; > + > + let conn = unsafe { qb_ipcc_connect(name.as_ptr(), max_msg_size) }; > + > + if conn.is_null() { > + let errno = unsafe { *libc::__errno_location() }; > + let error_str = unsafe { > + let err_ptr = libc::strerror(errno); > + std::ffi::CStr::from_ptr(err_ptr) > + .to_string_lossy() > + .to_string() > + }; > + Err(format!( > + "qb_ipcc_connect returned NULL (errno={errno}: {error_str})" > + )) > + } else { > + Ok(Self { conn }) > + } > + } > + > + fn send_recv( > + &self, > + request_id: i32, > + request_data: &[u8], > + timeout_ms: i32, > + ) -> Result<(i32, Vec), String> { > + // Build request > + let req_header = QbIpcRequestHeader { > + id: request_id, > + _pad1: 0, > + size: (std::mem::size_of::() + request_data.len()) as i32, > + _pad2: 0, > + }; > + > + // Setup iovec > + let mut iov = vec![libc::iovec { > + iov_base: &req_header as *const _ as *mut libc::c_void, > + iov_len: std::mem::size_of::(), > + }]; > + > + if !request_data.is_empty() { > + iov.push(libc::iovec { > + iov_base: request_data.as_ptr() as *mut libc::c_void, > + iov_len: request_data.len(), > + }); > + } > + > + // Response buffer > + const MAX_RESPONSE: usize = 8192 * 128; > + let mut resp_buf = vec![0u8; MAX_RESPONSE]; > + > + // Send and receive > + let result = unsafe { > + qb_ipcc_sendv_recv( > + self.conn, > + iov.as_ptr(), > + iov.len() as u32, > + resp_buf.as_mut_ptr() as *mut libc::c_void, > + resp_buf.len(), > + timeout_ms, > + ) > + }; > + > + if result < 0 { > + return Err(format!("qb_ipcc_sendv_recv failed: {}", -result)); > + } > + > + let bytes_received = result as usize; > + > + // Parse response header > + if bytes_received < std::mem::size_of::() { > + return Err("Response too short".to_string()); > + } > + > + let resp_header = unsafe { *(resp_buf.as_ptr() as *const QbIpcResponseHeader) }; > + > + // Verify response ID matches request > + if resp_header.id != request_id { > + return Err(format!( > + "Response ID mismatch: expected {}, got {}", > + request_id, resp_header.id > + )); > + } > + > + // Extract data > + let data_start = std::mem::size_of::(); > + let data = resp_buf[data_start..bytes_received].to_vec(); > + > + Ok((resp_header.error, data)) > + } > +} > + > +impl Drop for QbIpcClient { > + fn drop(&mut self) { > + unsafe { > + qb_ipcc_disconnect(self.conn); > + } > + } > +} > + > +// ============================================================================ > +// Integration Test > +// ============================================================================ > + > +#[test] > +#[ignore] // Run with: cargo test -- --ignored > +fn test_libqb_wire_protocol_compatibility() { > + eprintln!("๐Ÿงช Starting wire protocol compatibility test"); > + > + // Check if libqb is available > + eprintln!("๐Ÿ” Checking if libqb is available..."); > + if !check_libqb_available() { > + eprintln!("โญ๏ธ SKIP: libqb not installed"); > + eprintln!(" Install with: sudo apt-get install libqb-dev"); > + return; > + } > + eprintln!("โœ“ libqb is available"); > + > + // Start test server > + eprintln!("๐Ÿš€ Starting test server..."); > + let server_handle = start_test_server(); > + eprintln!("โœ“ Server thread spawned"); > + > + // Wait for server to be ready > + eprintln!("โณ Waiting for server initialization..."); > + wait_for_server_ready("pve2"); > + eprintln!("โœ“ Server is ready"); > + > + // Run tests > + eprintln!("๐Ÿงช Running client tests..."); > + let test_result = run_client_tests(); > + > + // Cleanup > + drop(server_handle); > + > + // Assert results > + assert!( > + test_result.is_ok(), > + "Client tests failed: {:?}", > + test_result.err() > + ); > +} > + > +fn check_libqb_available() -> bool { > + std::process::Command::new("pkg-config") > + .args(["--exists", "libqb"]) > + .status() > + .map(|s| s.success()) > + .unwrap_or(false) > +} > + > +fn start_test_server() -> thread::JoinHandle<()> { > + use async_trait::async_trait; > + use pmxcfs_ipc::{Handler, Request, Response, Server}; > + > + // Create test handler > + struct TestHandler; > + > + #[async_trait] > + impl Handler for TestHandler { > + fn authenticate(&self, _uid: u32, _gid: u32) -> Option { > + // Accept all connections with read-write access for testing > + Some(pmxcfs_ipc::Permissions::ReadWrite) > + } > + > + async fn handle(&self, request: Request) -> Response { > + match request.msg_id { > + 1 => { > + // CFS_IPC_GET_FS_VERSION > + let response_str = r#"{"version":1,"protocol":1}"#; > + Response::ok(response_str.as_bytes().to_vec()) > + } > + 2 => { > + // CFS_IPC_GET_CLUSTER_INFO > + let response_str = r#"{"nodes":[],"quorate":false}"#; > + Response::ok(response_str.as_bytes().to_vec()) > + } > + 3 => { > + // CFS_IPC_GET_GUEST_LIST > + let response_str = r#"{"data":[]}"#; > + Response::ok(response_str.as_bytes().to_vec()) > + } > + _ => Response::err(-libc::EINVAL), > + } > + } > + } > + > + // Spawn server thread with tokio runtime > + thread::spawn(move || { > + // Initialize tracing for server (WARN level - silent on success) > + tracing_subscriber::fmt() > + .with_max_level(tracing::Level::WARN) > + .with_target(false) > + .init(); > + > + // Create tokio runtime for async server > + let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); > + > + rt.block_on(async { > + let mut server = Server::new("pve2", TestHandler); > + > + // Server uses abstract Unix socket (Linux-specific) > + if let Err(e) = server.start() { > + eprintln!("Server startup failed: {e}"); > + eprintln!("Error details: {e:?}"); > + panic!("Server startup failed"); > + } > + > + // Give tokio a chance to start the acceptor task > + tokio::task::yield_now().await; > + > + // Block forever to keep server alive > + std::future::pending::<()>().await; > + }); > + }) > +} > + > +/// Wait for server to be ready by checking if socket file exists > +fn wait_for_server_ready(service_name: &str) { > + assert!( > + wait_for_condition_blocking( > + || { > + // Check if socket file exists in /dev/shm > + let socket_pattern = format!("/dev/shm/qb-{service_name}-"); > + if let Ok(entries) = std::fs::read_dir("/dev/shm") { > + for entry in entries.flatten() { > + if let Ok(name) = entry.file_name().into_string() > + && name.starts_with(&socket_pattern) > + { > + return true; > + } > + } > + } > + false > + }, > + Duration::from_secs(5), > + Duration::from_millis(10), > + ), > + "Server should be ready within 5 seconds" The /dev/shm/qb-* ringbuffer files are created per accepted connection. For RejectAllHandler no connection is accepted, so no files appear and this can time out even though the server is already ready. > + ); > +} > + > +fn run_client_tests() -> Result<(), String> { > + // Enable libqb debug logging to see what's happening > + eprintln!("๐Ÿ”ง Enabling libqb debug logging..."); > + unsafe { > + let name = CString::new("qb_test").unwrap(); > + qb_log_init(name.as_ptr(), libc::LOG_USER, LOG_TRACE); > + qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, 1); > + // Enable all log messages from all files at TRACE level > + let all_files = CString::new("*").unwrap(); > + qb_log_filter_ctl( > + QB_LOG_STDERR, > + QB_LOG_FILTER_ADD, > + QB_LOG_FILTER_FILE, > + all_files.as_ptr(), > + LOG_TRACE, > + ); > + } > + eprintln!("โœ“ libqb logging enabled (TRACE level)"); > + > + eprintln!("๐Ÿ“ก Connecting to server..."); nit: for consistency with other repos I suggest to remove the emojies :) applies also to the other patches. > + // Connect to abstract socket "pve2" > + // Use a very large buffer size to rule out space issues > + let client = QbIpcClient::connect("pve2", 8192 * 1024)?; // 8MB instead of 1MB > + eprintln!("โœ“ Connected successfully"); > + > + eprintln!("๐Ÿงช Test 1: GET_FS_VERSION"); > + // Test 1: GET_FS_VERSION > + let (error, data) = client.send_recv(1, &[], 5000)?; > + eprintln!("โœ“ Got response: error={}, data_len={}", error, data.len()); > + if error == 0 { > + let response = String::from_utf8_lossy(&data); > + eprintln!(" Response: {response}"); > + assert!( > + response.contains("version"), > + "Response should contain version field" > + ); > + } > + > + eprintln!("๐Ÿงช Test 2: GET_CLUSTER_INFO"); > + // Test 2: GET_CLUSTER_INFO > + let (error, data) = client.send_recv(2, &[], 5000)?; > + eprintln!("โœ“ Got response: error={}, data_len={}", error, data.len()); > + if error == 0 { > + let response = String::from_utf8_lossy(&data); > + eprintln!(" Response: {response}"); > + assert!( > + response.contains("nodes"), > + "Response should contain nodes field" > + ); > + } > + > + eprintln!("๐Ÿงช Test 3: Request with data payload"); > + // Test 3: Request with data payload > + let test_payload = b"test_payload_data"; > + let (_error, _data) = client.send_recv(1, test_payload, 5000)?; > + eprintln!("โœ“ Request with payload succeeded"); > + > + eprintln!("๐Ÿงช Test 4: GET_GUEST_LIST"); > + // Test 4: GET_GUEST_LIST > + let (_error, _data) = client.send_recv(3, &[], 5000)?; > + eprintln!("โœ“ GET_GUEST_LIST succeeded"); > + > + Ok(()) > +} Can we please also test additionally for - the expected behaviour when the ring buffer is full? - on connection disconnect are ring buffer files deleted? - adversarial inputs - graceful shutdown - concurrent connections