From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 6D4C91FF140 for ; Fri, 13 Mar 2026 15:10:32 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C90DB41B5; Fri, 13 Mar 2026 15:10:34 +0100 (CET) Message-ID: <5b4be846-d09b-4306-8e5c-f52cbfbd4797@proxmox.com> Date: Fri, 13 Mar 2026 15:09:52 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird From: Samuel Rufinatscha Subject: Re: [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate To: Kefu Chai , pve-devel@lists.proxmox.com References: <20260213094119.2379288-1-k.chai@proxmox.com> <20260213094119.2379288-6-k.chai@proxmox.com> Content-Language: en-US In-Reply-To: <20260213094119.2379288-6-k.chai@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.832 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_MSPIKE_H2 0.001 Average reputation (+2) RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.408 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.819 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.903 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: ZYT3YPGNJX6GPNFXGZU2N4Q2CU76WL4J X-Message-ID-Hash: ZYT3YPGNJX6GPNFXGZU2N4Q2CU76WL4J X-MailFrom: s.rufinatscha@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Thanks also for this patch and for working through / including most of my previous v1 suggestions. Please see my comments inline. On 2/13/26 10:47 AM, Kefu Chai wrote: > Add RRD (Round-Robin Database) file persistence system: > - RrdWriter: Main API for RRD operations > - Schema definitions for CPU, memory, network metrics > - Format migration support (v1/v2/v3) > - rrdcached integration for batched writes > - Data transformation for legacy formats > > This is an independent crate with no internal dependencies, > only requiring external RRD libraries (rrd, rrdcached-client) > and tokio for async operations. It handles time-series data > storage compatible with the C implementation. > > Includes comprehensive unit tests for data transformation, > schema generation, and multi-source data processing. > > Signed-off-by: Kefu Chai > --- > src/pmxcfs-rs/Cargo.toml | 12 + > src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml | 23 + > src/pmxcfs-rs/pmxcfs-rrd/README.md | 119 ++++ > src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs | 62 ++ > .../pmxcfs-rrd/src/backend/backend_daemon.rs | 184 ++++++ > .../pmxcfs-rrd/src/backend/backend_direct.rs | 586 ++++++++++++++++++ > .../src/backend/backend_fallback.rs | 212 +++++++ > src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs | 140 +++++ This file doesn't seem to be included (there is no mod definition). Please remove if not needed. > src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs | 408 ++++++++++++ > src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs | 23 + > src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs | 124 ++++ > .../pmxcfs-rrd/src/rrdcached/LICENSE | 21 + > .../pmxcfs-rrd/src/rrdcached/client.rs | 208 +++++++ > .../src/rrdcached/consolidation_function.rs | 30 + > .../pmxcfs-rrd/src/rrdcached/create.rs | 410 ++++++++++++ > .../pmxcfs-rrd/src/rrdcached/errors.rs | 29 + > src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs | 45 ++ > src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs | 18 + > .../pmxcfs-rrd/src/rrdcached/parsers.rs | 65 ++ > .../pmxcfs-rrd/src/rrdcached/sanitisation.rs | 100 +++ > src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs | 577 +++++++++++++++++ > src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs | 582 +++++++++++++++++ > 22 files changed, 3978 insertions(+) > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/README.md > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs > create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs > > diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml > index d26fac04c..2457fe368 100644 > --- a/src/pmxcfs-rs/Cargo.toml > +++ b/src/pmxcfs-rs/Cargo.toml > @@ -4,6 +4,7 @@ members = [ > "pmxcfs-api-types", # Shared types and error definitions > "pmxcfs-config", # Configuration management > "pmxcfs-logger", # Cluster log with ring buffer and deduplication > + "pmxcfs-rrd", # RRD (Round-Robin Database) persistence > ] > resolver = "2" > > @@ -20,16 +21,27 @@ rust-version = "1.85" > pmxcfs-api-types = { path = "pmxcfs-api-types" } > pmxcfs-config = { path = "pmxcfs-config" } > pmxcfs-logger = { path = "pmxcfs-logger" } > +pmxcfs-rrd = { path = "pmxcfs-rrd" } > + > +# Core async runtime > +tokio = { version = "1.35", features = ["full"] } > > # Error handling > +anyhow = "1.0" > thiserror = "1.0" > > +# Logging and tracing > +tracing = "0.1" > + > # Concurrency primitives > parking_lot = "0.12" > > # System integration > libc = "0.2" > > +# Development dependencies > +tempfile = "3.8" > + > [workspace.lints.clippy] > uninlined_format_args = "warn" > > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml > new file mode 100644 > index 000000000..33c87ec91 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml > @@ -0,0 +1,23 @@ > +[package] > +name = "pmxcfs-rrd" > +version.workspace = true > +edition.workspace = true > +authors.workspace = true > +license.workspace = true > + > +[features] > +default = ["rrdcached"] > +rrdcached = [] > + > +[dependencies] > +anyhow.workspace = true > +async-trait = "0.1" > +chrono = { version = "0.4", default-features = false, features = ["clock"] } > +nom = "8.0" Do we actually need this extra dependency? It seems like we only use it for basic string operations. > +rrd = "0.2" > +thiserror = "2.0" In the workspace there is already thiserror = "1.0". Please align accordingly. > +tokio.workspace = true > +tracing.workspace = true > + > +[dev-dependencies] > +tempfile.workspace = true > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/README.md b/src/pmxcfs-rs/pmxcfs-rrd/README.md > new file mode 100644 > index 000000000..d6f6ad9b1 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-rrd/README.md > @@ -0,0 +1,119 @@ > +# pmxcfs-rrd > + > +RRD (Round-Robin Database) persistence for pmxcfs performance metrics. > + > +## Overview > + > +This crate provides RRD file management for storing time-series performance data from Proxmox nodes and VMs. It handles file creation, updates, and integration with rrdcached daemon for efficient writes. > + > +### Key Features > + > +- RRD file creation with schema-based initialization > +- RRD updates (write metrics to disk) > +- rrdcached integration for batched writes > +- Support for both legacy and current schema versions (v1/v2/v3) > +- Type-safe key parsing and validation > +- Compatible with existing C-created RRD files > + > +## Usage Flow > + > +The typical data flow through this crate: > + > +1. **Metrics Collection**: pmxcfs-status collects performance metrics (CPU, memory, network, etc.) > +2. **Key Generation**: Metrics are organized by key type (node, VM, storage) > +3. **Schema Selection**: Appropriate RRD schema is selected based on key type and version > +4. **Data Transformation**: Legacy data (v1/v2) is transformed to current format (v3) if needed > +5. **Backend Selection**: > + - **Daemon backend**: Preferred for performance, batches writes via rrdcached > + - **Direct backend**: Fallback using librrd directly when daemon unavailable > + - **Fallback backend**: Tries daemon first, falls back to direct on failure > +6. **File Operations**: Create RRD files if needed, update with new data points > + > +### Data Transformation > + > +The crate handles migration between schema versions: > +- **v1 → v2**: Adds additional data sources for extended metrics > +- **v2 → v3**: Consolidates and optimizes data sources > +- **Transform logic**: `schema.rs:transform_data()` handles conversion, skipping incompatible entries > + > +### Backend Differences > + > +- **Daemon Backend** (`backend_daemon.rs`): > + - Uses vendored rrdcached client for async communication > + - Batches multiple updates for efficiency > + - Requires rrdcached daemon running > + - Best for high-frequency updates And: The C code tries rrdc_update() on every call, only falling back to rrd_update_r() for that individual call if it fails, it doesn't permanently disable the daemon path. So this is a difference too and should be documented, or fixed. > + > +- **Direct Backend** (`backend_direct.rs`): > + - Uses rrd crate (librrd FFI bindings) directly > + - Synchronous file operations > + - No external daemon required > + - Reliable fallback option > + > +- **Fallback Backend** (`backend_fallback.rs`): > + - Composite pattern: tries daemon, falls back to direct > + - Matches C implementation behavior > + - Provides best of both worlds > + > +## Module Structure > + > +| Module | Purpose | > +|--------|---------| > +| `writer.rs` | Main RrdWriter API - high-level interface for RRD operations | > +| `schema.rs` | RRD schema definitions (DS, RRA) and data transformation logic | > +| `key_type.rs` | RRD key parsing, validation, and path sanitization | > +| `daemon.rs` | rrdcached daemon client wrapper | > +| `backend.rs` | Backend trait and implementations (daemon/direct/fallback) | > +| `rrdcached/` | Vendored rrdcached client implementation (adapted from rrdcached-client v0.1.5) | > + > +## Usage Example > + > +```rust > +use pmxcfs_rrd::{RrdWriter, RrdFallbackBackend}; RrdFallbackBackend is not exported from lib.rs. Also the signature below doesn't match the current code. Please verify. > + > +// Create writer with fallback backend > +let backend = RrdFallbackBackend::new("/var/run/rrdcached.sock").await?; > +let writer = RrdWriter::new(backend); > + > +// Update node CPU metrics > +writer.update( > + "pve/nodes/node1/cpu", > + &[0.45, 0.52, 0.38, 0.61], // CPU usage values > + None, // Use current timestamp > +).await?; > + > +// Create new RRD file for VM > +writer.create( > + "pve/qemu/100/cpu", > + 1704067200, // Start timestamp > +).await?; > +``` > + > +## External Dependencies > + > +- **rrd crate**: Provides Rust bindings to librrd (RRDtool C library) > +- **rrdcached client**: Vendored and adapted from rrdcached-client v0.1.5 (Apache-2.0 license) > + - Original source: https://github.com/SINTEF/rrdcached-client > + - Vendored to gain full control and adapt to our specific needs > + - Can be disabled via the `rrdcached` feature flag > + > +## Testing > + > +Unit tests verify: > +- Schema generation and validation > +- Key parsing for different RRD types (node, VM, storage) > +- RRD file creation and update operations > +- rrdcached client connection and fallback behavior > + > +Run tests with: > +```bash > +cargo test -p pmxcfs-rrd > +``` > + > +## References > + > +- **C Implementation**: `src/pmxcfs/status.c` (RRD code embedded) > +- **Related Crates**: > + - `pmxcfs-status` - Uses RrdWriter for metrics persistence > + - `pmxcfs` - FUSE `.rrd` plugin reads RRD files > +- **RRDtool Documentation**: https://oss.oetiker.ch/rrdtool/ > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs > new file mode 100644 > index 000000000..2fa4fa39d > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs > @@ -0,0 +1,62 @@ > +/// RRD Backend Trait and Implementations > +/// > +/// This module provides an abstraction over different RRD writing mechanisms: > +/// - Daemon-based (via rrdcached) for performance and batching > +/// - Direct file writing for reliability and fallback scenarios > +/// - Fallback composite that tries daemon first, then falls back to direct > +/// > +/// This design matches the C implementation's behavior in status.c where > +/// it attempts daemon update first, then falls back to direct file writes. > +use super::schema::RrdSchema; > +use anyhow::Result; > +use async_trait::async_trait; > +use std::path::Path; > + > +/// Constants for RRD configuration > +pub const DEFAULT_SOCKET_PATH: &str = "/var/run/rrdcached.sock"; > +pub const RRD_STEP_SECONDS: u64 = 60; > + > +/// Trait for RRD backend implementations > +/// > +/// Provides abstraction over different RRD writing mechanisms. > +/// All methods are async to support both async (daemon) and sync (direct file) operations. > +#[async_trait] > +pub trait RrdBackend: Send + Sync { > + /// Update RRD file with new data > + /// > + /// # Arguments > + /// * `file_path` - Full path to the RRD file > + /// * `data` - Update data in format "timestamp:value1:value2:..." > + async fn update(&mut self, file_path: &Path, data: &str) -> Result<()>; > + > + /// Create new RRD file with schema > + /// > + /// # Arguments > + /// * `file_path` - Full path where RRD file should be created > + /// * `schema` - RRD schema defining data sources and archives > + /// * `start_timestamp` - Start time for the RRD file (Unix timestamp) > + async fn create( > + &mut self, > + file_path: &Path, > + schema: &RrdSchema, > + start_timestamp: i64, > + ) -> Result<()>; > + > + /// Flush pending updates to disk > + /// > + /// For daemon backends, this sends a FLUSH command. > + /// For direct backends, this is a no-op (writes are immediate). > + async fn flush(&mut self) -> Result<()>; > + > + /// Get a human-readable name for this backend > + fn name(&self) -> &str; > +} > + > +// Backend implementations > +mod backend_daemon; The rrdcached module is conditional, but the daemon backend is always included. Please feature gate this too. > +mod backend_direct; > +mod backend_fallback; > + > +pub use backend_daemon::RrdCachedBackend; Also this should be gated, no? And similarly please gate the daemon usage in backend_fallback.rs and writer.rs where the fallback backend tries to connect to the daemon. > +pub use backend_direct::RrdDirectBackend; > +pub use backend_fallback::RrdFallbackBackend; > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs [..] > +} > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs > new file mode 100644 > index 000000000..fabe7e669 > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs > @@ -0,0 +1,408 @@ > +/// RRD Key Type Parsing and Path Resolution > +/// > +/// This module handles parsing RRD status update keys and mapping them > +/// to the appropriate file paths and schemas. > +use super::schema::{RrdFormat, RrdSchema}; > +use anyhow::{Context, Result}; > +use std::path::{Path, PathBuf}; > + > +/// Metric type for determining column skipping rules > +#[derive(Debug, Clone, Copy, PartialEq, Eq)] > +pub enum MetricType { > + Node, > + Vm, > + Storage, > +} > + > +impl MetricType { > + /// Number of non-archivable columns to skip from the start of the data string > + /// > + /// The data from pvestatd has non-archivable fields at the beginning: > + /// - Node: skip 2 (uptime, sublevel) - then ctime:loadavg:maxcpu:... > + /// - VM: skip 4 (uptime, name, status, template) - then ctime:maxcpu:cpu:... > + /// - Storage: skip 0 - data starts with ctime:total:used > + /// > + /// C implementation: status.c:1300 (node skip=2), status.c:1335 (VM skip=4) > + pub fn skip_columns(self) -> usize { > + match self { > + MetricType::Node => 2, > + MetricType::Vm => 4, > + MetricType::Storage => 0, > + } > + } > + > + /// Get column count for a specific RRD format > + #[allow(dead_code)] > + pub fn column_count(self, format: RrdFormat) -> usize { > + match (format, self) { > + (RrdFormat::Pve2, MetricType::Node) => 12, > + (RrdFormat::Pve9_0, MetricType::Node) => 19, > + (RrdFormat::Pve2, MetricType::Vm) => 10, > + (RrdFormat::Pve9_0, MetricType::Vm) => 17, > + (_, MetricType::Storage) => 2, // Same for both formats > + } > + } > +} > + > +/// RRD key types for routing to correct schema and path > +/// > +/// This enum represents the different types of RRD metrics that pmxcfs tracks: > +/// - Node metrics (CPU, memory, network for a node) > +/// - VM metrics (CPU, memory, disk, network for a VM/CT) > +/// - Storage metrics (total/used space for a storage) > +#[derive(Debug, Clone, PartialEq, Eq)] > +pub(crate) enum RrdKeyType { > + /// Node metrics: pve2-node/{nodename} or pve-node-9.0/{nodename} > + Node { nodename: String, format: RrdFormat }, > + /// VM metrics: pve2.3-vm/{vmid} or pve-vm-9.0/{vmid} > + Vm { vmid: String, format: RrdFormat }, > + /// Storage metrics: pve2-storage/{node}/{storage} or pve-storage-9.0/{node}/{storage} > + Storage { > + nodename: String, > + storage: String, > + format: RrdFormat, > + }, > +} > + > +impl RrdKeyType { > + /// Parse RRD key from status update key > + /// > + /// Supported formats: > + /// - "pve2-node/node1" → Node { nodename: "node1", format: Pve2 } > + /// - "pve-node-9.0/node1" → Node { nodename: "node1", format: Pve9_0 } > + /// - "pve2.3-vm/100" → Vm { vmid: "100", format: Pve2 } > + /// - "pve-storage-9.0/node1/local" → Storage { nodename: "node1", storage: "local", format: Pve9_0 } > + /// > + /// # Security > + /// > + /// Path components are validated to prevent directory traversal attacks: > + /// - Rejects paths containing ".." > + /// - Rejects absolute paths > + /// - Rejects paths with special characters that could be exploited > + pub(crate) fn parse(key: &str) -> Result { > + let parts: Vec<&str> = key.split('/').collect(); > + > + if parts.is_empty() { > + anyhow::bail!("Empty RRD key"); > + } > + > + // Validate all path components for security > + for part in &parts[1..] { > + Self::validate_path_component(part)?; > + } > + > + match parts[0] { > + "pve2-node" => { > + let nodename = parts.get(1).context("Missing nodename")?.to_string(); > + Ok(RrdKeyType::Node { > + nodename, > + format: RrdFormat::Pve2, > + }) > + } > + prefix if prefix.starts_with("pve-node-") => { > + let nodename = parts.get(1).context("Missing nodename")?.to_string(); > + Ok(RrdKeyType::Node { > + nodename, > + format: RrdFormat::Pve9_0, "pve-node-9.0" matches, but so does "pve-node-9.1", "pve-node-10.0" all treated as Pve9_0 I think we maybe parse the suffix and match exactly? > + }) > + } > + "pve2.3-vm" => { > + let vmid = parts.get(1).context("Missing vmid")?.to_string(); > + Ok(RrdKeyType::Vm { > + vmid, > + format: RrdFormat::Pve2, > + }) > + } > + prefix if prefix.starts_with("pve-vm-") => { > + let vmid = parts.get(1).context("Missing vmid")?.to_string(); > + Ok(RrdKeyType::Vm { > + vmid, > + format: RrdFormat::Pve9_0, > + }) > + } > + "pve2-storage" => { > + let nodename = parts.get(1).context("Missing nodename")?.to_string(); > + let storage = parts.get(2).context("Missing storage")?.to_string(); > + Ok(RrdKeyType::Storage { > + nodename, > + storage, > + format: RrdFormat::Pve2, > + }) > + } > + prefix if prefix.starts_with("pve-storage-") => { > + let nodename = parts.get(1).context("Missing nodename")?.to_string(); > + let storage = parts.get(2).context("Missing storage")?.to_string(); > + Ok(RrdKeyType::Storage { > + nodename, > + storage, > + format: RrdFormat::Pve9_0, > + }) > + } > + _ => anyhow::bail!("Unknown RRD key format: {key}"), > + } > + } > + > + /// Validate a path component for security > + /// > + /// Prevents directory traversal attacks by rejecting: > + /// - ".." (parent directory) > + /// - Absolute paths (starting with "/") > + /// - Empty components > + /// - Components with null bytes or other dangerous characters > + fn validate_path_component(component: &str) -> Result<()> { > + if component.is_empty() { > + anyhow::bail!("Empty path component"); > + } > + > + if component == ".." { > + anyhow::bail!("Path traversal attempt: '..' not allowed"); > + } > + > + if component.starts_with('/') { > + anyhow::bail!("Absolute paths not allowed"); > + } > + > + if component.contains('\0') { > + anyhow::bail!("Null byte in path component"); > + } > + > + // Reject other potentially dangerous characters > + if component.contains(['\\', '\n', '\r']) { > + anyhow::bail!("Invalid characters in path component"); > + } > + > + Ok(()) > + } > + > + /// Get the RRD file path for this key type > + /// > + /// Always returns paths using the current format (9.0), regardless of the input format. > + /// This enables transparent format migration: old PVE8 nodes can send `pve2-node/` keys, > + /// and they'll be written to `pve-node-9.0/` files automatically. > + /// > + /// # Format Migration Strategy > + /// > + /// Returns the file path for this RRD key (without .rrd extension) > + /// > + /// The C implementation always creates files in the current format directory > + /// (see status.c:1287). This Rust implementation follows the same approach: > + /// - Input: `pve2-node/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1` > + /// - Input: `pve-node-9.0/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1` > + /// > + /// This allows rolling upgrades where old and new nodes coexist in the same cluster. > + /// > + /// Note: The path does NOT include .rrd extension, matching C implementation. > + /// The librrd functions (rrd_create_r, rrdc_update) add .rrd internally. > + pub(crate) fn file_path(&self, base_dir: &Path) -> PathBuf { > + match self { > + RrdKeyType::Node { nodename, .. } => { > + // Always use current format path > + base_dir.join("pve-node-9.0").join(nodename) > + } > + RrdKeyType::Vm { vmid, .. } => { > + // Always use current format path > + base_dir.join("pve-vm-9.0").join(vmid) > + } > + RrdKeyType::Storage { > + nodename, storage, .. > + } => { > + // Always use current format path > + base_dir > + .join("pve-storage-9.0") > + .join(nodename) > + .join(storage) > + } > + } > + } > + > + /// Get the source format from the input key > + /// > + /// This is used for data transformation (padding/truncation). > + pub(crate) fn source_format(&self) -> RrdFormat { > + match self { > + RrdKeyType::Node { format, .. } > + | RrdKeyType::Vm { format, .. } > + | RrdKeyType::Storage { format, .. } => *format, > + } > + } > + > + /// Get the target RRD schema (always current format) > + /// > + /// Files are always created using the current format (Pve9_0), > + /// regardless of the source format in the key. > + pub(crate) fn schema(&self) -> RrdSchema { > + match self { > + RrdKeyType::Node { .. } => RrdSchema::node(RrdFormat::Pve9_0), > + RrdKeyType::Vm { .. } => RrdSchema::vm(RrdFormat::Pve9_0), > + RrdKeyType::Storage { .. } => RrdSchema::storage(RrdFormat::Pve9_0), > + } > + } > + > + /// Get the metric type for this key > + pub(crate) fn metric_type(&self) -> MetricType { > + match self { > + RrdKeyType::Node { .. } => MetricType::Node, > + RrdKeyType::Vm { .. } => MetricType::Vm, > + RrdKeyType::Storage { .. } => MetricType::Storage, > + } > + } > +} > + > +#[cfg(test)] > +mod tests { [..] > +} > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs > new file mode 100644 > index 000000000..8da6b633d > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs > @@ -0,0 +1,100 @@ > +use super::errors::RRDCachedClientError; > + > +pub fn check_data_source_name(name: &str) -> Result<(), RRDCachedClientError> { > + if name.is_empty() || name.len() > 64 { > + return Err(RRDCachedClientError::InvalidDataSourceName( > + "name must be between 1 and 64 characters".to_string(), > + )); > + } > + if !name > + .chars() > + .all(|c| c.is_alphanumeric() || c == '_' || c == '-') > + { > + return Err(RRDCachedClientError::InvalidDataSourceName( > + "name must only contain alphanumeric characters and underscores".to_string(), > + )); > + } > + Ok(()) > +} > + > +pub fn check_rrd_path(name: &str) -> Result<(), RRDCachedClientError> { > + if name.is_empty() || name.len() > 64 { > + return Err(RRDCachedClientError::InvalidCreateDataSerie( > + "name must be between 1 and 64 characters".to_string(), > + )); > + } > + if !name > + .chars() > + .all(|c| c.is_alphanumeric() || c == '_' || c == '-') This rejects "/" and ".", but we pass full system paths to it. Also please check the path length limitation above. > + { > + return Err(RRDCachedClientError::InvalidCreateDataSerie( > + "name must only contain alphanumeric characters and underscores".to_string(), > + )); > + } > + Ok(()) > +} [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs [..] > diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs > new file mode 100644 > index 000000000..6c48940be > --- /dev/null > +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs > @@ -0,0 +1,582 @@ > +/// RRD File Writer > +/// > +/// Handles creating and updating RRD files via pluggable backends. > +/// Supports daemon-based (rrdcached) and direct file writing modes. > +use super::backend::{DEFAULT_SOCKET_PATH, RrdFallbackBackend}; > +use super::key_type::{MetricType, RrdKeyType}; > +use super::schema::{RrdFormat, RrdSchema}; > +use anyhow::{Context, Result}; > +use chrono::Local; > +use std::fs; > +use std::path::{Path, PathBuf}; > + > + > +/// RRD writer for persistent metric storage > +/// > +/// Uses pluggable backends (daemon, direct, or fallback) for RRD operations. > +pub struct RrdWriter { > + /// Base directory for RRD files (default: /var/lib/rrdcached/db) > + base_dir: PathBuf, > + /// Backend for RRD operations (daemon, direct, or fallback) > + backend: Box, > +} > + > +impl RrdWriter { > + /// Create new RRD writer with default fallback backend > + /// > + /// Uses the fallback backend that tries daemon first, then falls back to direct file writes. > + /// This matches the C implementation's behavior. > + /// > + /// # Arguments > + /// * `base_dir` - Base directory for RRD files > + pub async fn new>(base_dir: P) -> Result { > + let backend = Self::default_backend().await?; > + Self::with_backend(base_dir, backend).await > + } > + > + /// Create new RRD writer with specific backend > + /// > + /// # Arguments > + /// * `base_dir` - Base directory for RRD files > + /// * `backend` - RRD backend to use (daemon, direct, or fallback) > + pub(crate) async fn with_backend>( > + base_dir: P, > + backend: Box, > + ) -> Result { > + let base_dir = base_dir.as_ref().to_path_buf(); > + > + // Create base directory if it doesn't exist > + fs::create_dir_all(&base_dir) > + .with_context(|| format!("Failed to create RRD base directory: {base_dir:?}"))?; > + > + tracing::info!("RRD writer using backend: {}", backend.name()); > + > + Ok(Self { base_dir, backend }) > + } > + > + /// Create default backend (fallback: daemon + direct) > + /// > + /// This matches the C implementation's behavior: > + /// - Tries rrdcached daemon first for performance > + /// - Falls back to direct file writes if daemon fails > + async fn default_backend() -> Result> { > + let backend = RrdFallbackBackend::new(DEFAULT_SOCKET_PATH).await; > + Ok(Box::new(backend)) > + } > + > + /// Update RRD file with metric data > + /// > + /// This will: > + /// 1. Transform data from source format to target format (padding/truncation/column skipping) > + /// 2. Create the RRD file if it doesn't exist > + /// 3. Update via rrdcached daemon > + /// > + /// # Arguments > + /// * `key` - RRD key (e.g., "pve2-node/node1", "pve-vm-9.0/100") > + /// * `data` - Raw metric data string from pvestatd (format: "skipped_fields...:ctime:val1:val2:...") > + pub async fn update(&mut self, key: &str, data: &str) -> Result<()> { > + // Parse the key to determine file path and schema > + let key_type = RrdKeyType::parse(key).with_context(|| format!("Invalid RRD key: {key}"))?; > + > + // Get source format and target schema > + let source_format = key_type.source_format(); > + let target_schema = key_type.schema(); > + let metric_type = key_type.metric_type(); > + > + // Transform data from source to target format > + let transformed_data = > + Self::transform_data(data, source_format, &target_schema, metric_type) > + .with_context(|| format!("Failed to transform RRD data for key: {key}"))?; > + > + // Get the file path (always uses current format) > + let file_path = key_type.file_path(&self.base_dir); > + > + // Ensure the RRD file exists > + // Always check file existence directly - handles file deletion/rotation > + if !file_path.exists() { > + self.create_rrd_file(&key_type, &file_path).await?; The on-disk naming convention for .rrd is inconsistent across the crate and I think this can break the logic here. file_path() in key_type.rs is documented as returning paths without .rrd, and that's what this existence check runs against. But the vendored rrdcached client in rrdcached/client.rs and rrdcached/create.rs appends .rrd when building the update and create commands, so the daemon backend creates files at path.rrd. Meanwhile the direct backend tests in backend_direct.rs also construct paths with .rrd explicitly. Can we please pin down whether the rrd crate's create() / update_all() auto append .rrd or not then make one consistent decision and align file_path(), the existence check, the vendored client and the direct backend tests to the same convention? > + } > + > + // Update the RRD file via backend > + self.backend.update(&file_path, &transformed_data).await?; > + > + Ok(()) > + } > + > + /// Create RRD file with appropriate schema via backend > + async fn create_rrd_file(&mut self, key_type: &RrdKeyType, file_path: &Path) -> Result<()> { > + // Ensure parent directory exists > + if let Some(parent) = file_path.parent() { > + fs::create_dir_all(parent) > + .with_context(|| format!("Failed to create directory: {parent:?}"))?; > + } > + > + // Get schema for this RRD type > + let schema = key_type.schema(); > + > + // Calculate start time (at day boundary, matching C implementation) > + // C uses localtime() (status.c:1206-1219), not UTC > + let now = Local::now(); > + let start = now > + .date_naive() > + .and_hms_opt(0, 0, 0) > + .expect("00:00:00 is always a valid time") > + .and_local_timezone(Local) > + .single() This might return None and would panic in that case. Maybe earliest() would help here? > + .expect("Local midnight should have single timezone mapping"); > + let start_timestamp = start.timestamp(); > + > + tracing::debug!( > + "Creating RRD file: {:?} with {} data sources via {}", > + file_path, > + schema.column_count(), > + self.backend.name() > + ); > + > + // Delegate to backend for creation > + self.backend > + .create(file_path, &schema, start_timestamp) > + .await?; > + > + tracing::info!("Created RRD file: {:?} ({})", file_path, schema); > + > + Ok(()) > + } > ++#[cfg(test)] > +mod tests { > + use super::super::schema::{RrdFormat, RrdSchema}; > + use super::*; > + + #[test] + fn test_rrd_file_path_generation() { + let temp_dir = std::path::PathBuf::from("/tmp/test"); + + let key_node = RrdKeyType::Node { + nodename: "testnode".to_string(), + format: RrdFormat::Pve9_0, + }; + let path = key_node.file_path(&temp_dir); + assert_eq!(path, temp_dir.join("pve-node-9.0").join("testnode")); + } + + // ===== Format Adaptation Tests ===== + + #[test] + fn test_transform_data_node_pve2_to_pve9() { + // Test padding old format (12 archivable cols) to new format (19 archivable cols) + // pvestatd data format for node: "uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout" + // = 2 non-archivable + 1 timestamp + 12 archivable = 15 fields + let data = "1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap(); + + // After skip(2): "1234567890:1.5:4:2.0:0.5:...:500000" = 13 fields + // Pad to 20 total (timestamp + 19 values): 13 + 7 "U" = 20 + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1234567890", "Timestamp should be preserved"); + assert_eq!(parts.len(), 20, "Should have timestamp + 19 values"); + assert_eq!(parts[1], "1.5", "First value after skip should be loadavg"); + assert_eq!(parts[2], "4", "Second value should be maxcpu"); + assert_eq!(parts[12], "500000", "Last data value should be netout"); + + // Check padding (7 columns: 19 - 12 = 7) + for (i, item) in parts.iter().enumerate().take(20).skip(13) { + assert_eq!(item, &"U", "Column {} should be padded with U", i); + } + } + + #[test] + fn test_transform_data_vm_pve2_to_pve9() { + // Test VM transformation with 4 columns skipped + // pvestatd data format for VM: "uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite" + // = 4 non-archivable + 1 timestamp + 10 archivable = 15 fields + let data = "1000:myvm:1:0:1234567890:4:2:4096:2048:100000:50000:1000:500:100:50"; + + let schema = RrdSchema::vm(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Vm).unwrap(); + + // After skip(4): "1234567890:4:2:4096:...:50" = 11 fields + // Pad to 18 total (timestamp + 17 values): 11 + 7 "U" = 18 + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1234567890"); + assert_eq!(parts.len(), 18, "Should have timestamp + 17 values"); + assert_eq!(parts[1], "4", "First value after skip should be maxcpu"); + assert_eq!(parts[10], "50", "Last data value should be diskwrite"); + + // Check padding (7 columns: 17 - 10 = 7) + for (i, item) in parts.iter().enumerate().take(18).skip(11) { + assert_eq!(item, &"U", "Column {} should be padded", i); + } + } + + #[test] + fn test_transform_data_no_padding_needed() { + // Test when source and target have same column count (Pve9_0 node: 19 archivable cols) + // pvestatd format: "uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout:memavail:arcsize:cpu_some:io_some:io_full:mem_some:mem_full" + // = 2 non-archivable + 1 timestamp + 19 archivable = 22 fields + let data = "1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000:7000000000:0:0.12:0.05:0.02:0.08:0.03"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap(); + + // After skip(2): 20 fields = timestamp + 19 values (exact match, no padding) + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts.len(), 20, "Should have timestamp + 19 values"); + assert_eq!(parts[0], "1234567890", "Timestamp should be ctime"); + assert_eq!(parts[1], "1.5", "First value after skip should be loadavg"); + assert_eq!(parts[19], "0.03", "Last value should be mem_full (no padding)"); + } + + #[test] + fn test_transform_data_future_format_truncation() { + // Test truncation when a future format sends more columns than current pve9.0 + // Simulating: uptime:sublevel:ctime:1:2:3:...:25 (2 skipped + timestamp + 25 archivable = 28 fields) + let data = + "999:0:1234567890:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20:21:22:23:24:25"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap(); + + // After skip(2): "1234567890:1:2:...:25" = 26 fields + // take(20): truncate to timestamp + 19 values + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts.len(), 20, "Should truncate to timestamp + 19 values"); + assert_eq!(parts[0], "1234567890", "Timestamp should be ctime"); + assert_eq!(parts[1], "1", "First archivable value"); + assert_eq!(parts[19], "19", "Last value should be column 19 (truncated)"); + } + + #[test] + fn test_transform_data_storage_no_change() { + // Storage format is same for Pve2 and Pve9_0 (2 columns, no skipping) + let data = "1234567890:1000000000000:500000000000"; + + let schema = RrdSchema::storage(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Storage).unwrap(); + + assert_eq!(result, data, "Storage data should not be transformed"); + } + + #[test] + fn test_metric_type_methods() { + assert_eq!(MetricType::Node.skip_columns(), 2); + assert_eq!(MetricType::Vm.skip_columns(), 4); + assert_eq!(MetricType::Storage.skip_columns(), 0); + } + + #[test] + fn test_format_column_counts() { + assert_eq!(MetricType::Node.column_count(RrdFormat::Pve2), 12); + assert_eq!(MetricType::Node.column_count(RrdFormat::Pve9_0), 19); + assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve2), 10); + assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve9_0), 17); + assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve2), 2); + assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve9_0), 2); + } + + // ===== Real Payload Fixtures from Production Systems ===== + // + // These tests use actual RRD data captured from running PVE systems + // to validate transform_data() correctness against real-world payloads. + + #[test] + fn test_real_payload_node_pve2() { + // Real pve2-node payload captured from PVE 6.x system + // Format: uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout + let data = "432156:0:1709123456:0.15:8:3.2:0.8:33554432000:12884901888:8589934592:0:107374182400:53687091200:1234567890:987654321"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap(); + + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1709123456", "Timestamp preserved"); + assert_eq!(parts.len(), 20, "Should have timestamp + 19 values"); + + // Verify key metrics are preserved + assert_eq!(parts[1], "0.15", "Load average preserved"); + assert_eq!(parts[2], "8", "Max CPU preserved"); + assert_eq!(parts[3], "3.2", "CPU usage preserved"); + assert_eq!(parts[4], "0.8", "IO wait preserved"); + + // Verify padding for new columns (7 new columns in Pve9_0) + for i in 13..20 { + assert_eq!(parts[i], "U", "New column {} should be padded", i); + } + } + + #[test] + fn test_real_payload_vm_pve2() { + // Real pve2.3-vm payload captured from PVE 6.x system + // Format: uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite + let data = "86400:vm-100-disk-0:running:0:1709123456:4:45.3:8589934592:4294967296:107374182400:32212254720:123456789:98765432:1048576:2097152"; + + let schema = RrdSchema::vm(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Vm).unwrap(); + + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1709123456", "Timestamp preserved"); + assert_eq!(parts.len(), 18, "Should have timestamp + 17 values"); + + // Verify key metrics are preserved + assert_eq!(parts[1], "4", "Max CPU preserved"); + assert_eq!(parts[2], "45.3", "CPU usage preserved"); + assert_eq!(parts[3], "8589934592", "Max memory preserved"); + assert_eq!(parts[4], "4294967296", "Memory usage preserved"); + + // Verify padding for new columns (7 new columns in Pve9_0) + for i in 11..18 { + assert_eq!(parts[i], "U", "New column {} should be padded", i); + } + } + + #[test] + fn test_real_payload_storage_pve2() { + // Real pve2-storage payload captured from PVE 6.x system + // Format: ctime:total:used + let data = "1709123456:1099511627776:549755813888"; + + let schema = RrdSchema::storage(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Storage) + .unwrap(); + + // Storage format unchanged between Pve2 and Pve9_0 + assert_eq!(result, data, "Storage data should not be transformed"); + + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1709123456", "Timestamp preserved"); + assert_eq!(parts[1], "1099511627776", "Total storage preserved"); + assert_eq!(parts[2], "549755813888", "Used storage preserved"); + } + + #[test] + fn test_real_payload_node_pve9_0() { + // Real pve-node-9.0 payload from PVE 8.x system (already in target format) Can we please add real binary fixtures instead? We would catch more issues using that. + // Input has 19 fields, after skip(2) = 17 archivable columns + // Schema expects 19 archivable columns, so 2 "U" padding added + let data = "864321:0:1709123456:0.25:16:8.5:1.2:67108864000:25769803776:17179869184:0:214748364800:107374182400:2345678901:1876543210:x86_64:6.5.11:0.3:250"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node) + .unwrap(); + + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1709123456", "Timestamp preserved"); + assert_eq!(parts.len(), 20, "Should have timestamp + 19 values"); + + // Verify all columns preserved + assert_eq!(parts[1], "0.25", "Load average preserved"); + assert_eq!(parts[13], "x86_64", "CPU info preserved"); + assert_eq!(parts[14], "6.5.11", "Kernel version preserved"); + assert_eq!(parts[15], "0.3", "Wait time preserved"); + assert_eq!(parts[16], "250", "Process count preserved"); + + // Last 3 columns are padding (input had 17 archivable, schema expects 19) + assert_eq!(parts[17], "U", "Padding column 1"); + assert_eq!(parts[18], "U", "Padding column 2"); + assert_eq!(parts[19], "U", "Padding column 3"); + } + + #[test] + fn test_real_payload_with_missing_values() { + // Real payload with some missing values (represented as "U") + // This can happen when metrics are temporarily unavailable + let data = "432156:0:1709123456:0.15:8:U:0.8:33554432000:12884901888:U:0:107374182400:53687091200:1234567890:987654321"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap(); + + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1709123456", "Timestamp preserved"); + + // Verify "U" values are preserved (after skip(2), positions shift) + assert_eq!(parts[3], "U", "Missing CPU value preserved as U"); + assert_eq!(parts[7], "U", "Missing swap total preserved as U"); + } [..]