public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Samuel Rufinatscha <s.rufinatscha@proxmox.com>
To: Kefu Chai <k.chai@proxmox.com>, pve-devel@lists.proxmox.com
Subject: Re: [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate
Date: Fri, 13 Mar 2026 15:09:52 +0100	[thread overview]
Message-ID: <5b4be846-d09b-4306-8e5c-f52cbfbd4797@proxmox.com> (raw)
In-Reply-To: <20260213094119.2379288-6-k.chai@proxmox.com>

Thanks also for this patch and for working through / including most of
my previous v1 suggestions.

Please see my comments inline.

On 2/13/26 10:47 AM, Kefu Chai wrote:
> Add RRD (Round-Robin Database) file persistence system:
> - RrdWriter: Main API for RRD operations
> - Schema definitions for CPU, memory, network metrics
> - Format migration support (v1/v2/v3)
> - rrdcached integration for batched writes
> - Data transformation for legacy formats
> 
> This is an independent crate with no internal dependencies,
> only requiring external RRD libraries (rrd, rrdcached-client)
> and tokio for async operations. It handles time-series data
> storage compatible with the C implementation.
> 
> Includes comprehensive unit tests for data transformation,
> schema generation, and multi-source data processing.
> 
> Signed-off-by: Kefu Chai <k.chai@proxmox.com>
> ---
>   src/pmxcfs-rs/Cargo.toml                      |  12 +
>   src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml           |  23 +
>   src/pmxcfs-rs/pmxcfs-rrd/README.md            | 119 ++++
>   src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs       |  62 ++
>   .../pmxcfs-rrd/src/backend/backend_daemon.rs  | 184 ++++++
>   .../pmxcfs-rrd/src/backend/backend_direct.rs  | 586 ++++++++++++++++++
>   .../src/backend/backend_fallback.rs           | 212 +++++++
>   src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs        | 140 +++++

This file doesn't seem to be included (there is no mod definition).
Please remove if not needed.

>   src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs      | 408 ++++++++++++
>   src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs           |  23 +
>   src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs         | 124 ++++
>   .../pmxcfs-rrd/src/rrdcached/LICENSE          |  21 +
>   .../pmxcfs-rrd/src/rrdcached/client.rs        | 208 +++++++
>   .../src/rrdcached/consolidation_function.rs   |  30 +
>   .../pmxcfs-rrd/src/rrdcached/create.rs        | 410 ++++++++++++
>   .../pmxcfs-rrd/src/rrdcached/errors.rs        |  29 +
>   src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs |  45 ++
>   src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs |  18 +
>   .../pmxcfs-rrd/src/rrdcached/parsers.rs       |  65 ++
>   .../pmxcfs-rrd/src/rrdcached/sanitisation.rs  | 100 +++
>   src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs        | 577 +++++++++++++++++
>   src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs        | 582 +++++++++++++++++
>   22 files changed, 3978 insertions(+)
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/README.md
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
>   create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
> 
> diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
> index d26fac04c..2457fe368 100644
> --- a/src/pmxcfs-rs/Cargo.toml
> +++ b/src/pmxcfs-rs/Cargo.toml
> @@ -4,6 +4,7 @@ members = [
>       "pmxcfs-api-types",  # Shared types and error definitions
>       "pmxcfs-config",     # Configuration management
>       "pmxcfs-logger",     # Cluster log with ring buffer and deduplication
> +    "pmxcfs-rrd",        # RRD (Round-Robin Database) persistence
>   ]
>   resolver = "2"
>   
> @@ -20,16 +21,27 @@ rust-version = "1.85"
>   pmxcfs-api-types = { path = "pmxcfs-api-types" }
>   pmxcfs-config = { path = "pmxcfs-config" }
>   pmxcfs-logger = { path = "pmxcfs-logger" }
> +pmxcfs-rrd = { path = "pmxcfs-rrd" }
> +
> +# Core async runtime
> +tokio = { version = "1.35", features = ["full"] }
>   
>   # Error handling
> +anyhow = "1.0"
>   thiserror = "1.0"
>   
> +# Logging and tracing
> +tracing = "0.1"
> +
>   # Concurrency primitives
>   parking_lot = "0.12"
>   
>   # System integration
>   libc = "0.2"
>   
> +# Development dependencies
> +tempfile = "3.8"
> +
>   [workspace.lints.clippy]
>   uninlined_format_args = "warn"
>   
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
> new file mode 100644
> index 000000000..33c87ec91
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
> @@ -0,0 +1,23 @@
> +[package]
> +name = "pmxcfs-rrd"
> +version.workspace = true
> +edition.workspace = true
> +authors.workspace = true
> +license.workspace = true
> +
> +[features]
> +default = ["rrdcached"]
> +rrdcached = []
> +
> +[dependencies]
> +anyhow.workspace = true
> +async-trait = "0.1"
> +chrono = { version = "0.4", default-features = false, features = ["clock"] }
> +nom = "8.0"

Do we actually need this extra dependency?
It seems like we only use it for basic string operations.

> +rrd = "0.2"
> +thiserror = "2.0"

In the workspace there is already thiserror = "1.0".
Please align accordingly.

> +tokio.workspace = true
> +tracing.workspace = true
> +
> +[dev-dependencies]
> +tempfile.workspace = true
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/README.md b/src/pmxcfs-rs/pmxcfs-rrd/README.md
> new file mode 100644
> index 000000000..d6f6ad9b1
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/README.md
> @@ -0,0 +1,119 @@
> +# pmxcfs-rrd
> +
> +RRD (Round-Robin Database) persistence for pmxcfs performance metrics.
> +
> +## Overview
> +
> +This crate provides RRD file management for storing time-series performance data from Proxmox nodes and VMs. It handles file creation, updates, and integration with rrdcached daemon for efficient writes.
> +
> +### Key Features
> +
> +- RRD file creation with schema-based initialization
> +- RRD updates (write metrics to disk)
> +- rrdcached integration for batched writes
> +- Support for both legacy and current schema versions (v1/v2/v3)
> +- Type-safe key parsing and validation
> +- Compatible with existing C-created RRD files
> +
> +## Usage Flow
> +
> +The typical data flow through this crate:
> +
> +1. **Metrics Collection**: pmxcfs-status collects performance metrics (CPU, memory, network, etc.)
> +2. **Key Generation**: Metrics are organized by key type (node, VM, storage)
> +3. **Schema Selection**: Appropriate RRD schema is selected based on key type and version
> +4. **Data Transformation**: Legacy data (v1/v2) is transformed to current format (v3) if needed
> +5. **Backend Selection**:
> +   - **Daemon backend**: Preferred for performance, batches writes via rrdcached
> +   - **Direct backend**: Fallback using librrd directly when daemon unavailable
> +   - **Fallback backend**: Tries daemon first, falls back to direct on failure
> +6. **File Operations**: Create RRD files if needed, update with new data points
> +
> +### Data Transformation
> +
> +The crate handles migration between schema versions:
> +- **v1 → v2**: Adds additional data sources for extended metrics
> +- **v2 → v3**: Consolidates and optimizes data sources
> +- **Transform logic**: `schema.rs:transform_data()` handles conversion, skipping incompatible entries
> +
> +### Backend Differences
> +
> +- **Daemon Backend** (`backend_daemon.rs`):
> +  - Uses vendored rrdcached client for async communication
> +  - Batches multiple updates for efficiency
> +  - Requires rrdcached daemon running
> +  - Best for high-frequency updates

And:

The C code tries rrdc_update() on every call, only falling back to
rrd_update_r() for that individual call if it fails, it doesn't
permanently disable the daemon path. So this is a difference too and
should be documented, or fixed.

> +
> +- **Direct Backend** (`backend_direct.rs`):
> +  - Uses rrd crate (librrd FFI bindings) directly
> +  - Synchronous file operations
> +  - No external daemon required
> +  - Reliable fallback option
> +
> +- **Fallback Backend** (`backend_fallback.rs`):
> +  - Composite pattern: tries daemon, falls back to direct
> +  - Matches C implementation behavior
> +  - Provides best of both worlds
> +
> +## Module Structure
> +
> +| Module | Purpose |
> +|--------|---------|
> +| `writer.rs` | Main RrdWriter API - high-level interface for RRD operations |
> +| `schema.rs` | RRD schema definitions (DS, RRA) and data transformation logic |
> +| `key_type.rs` | RRD key parsing, validation, and path sanitization |
> +| `daemon.rs` | rrdcached daemon client wrapper |
> +| `backend.rs` | Backend trait and implementations (daemon/direct/fallback) |
> +| `rrdcached/` | Vendored rrdcached client implementation (adapted from rrdcached-client v0.1.5) |
> +
> +## Usage Example
> +
> +```rust
> +use pmxcfs_rrd::{RrdWriter, RrdFallbackBackend};

RrdFallbackBackend is not exported from lib.rs.
Also the signature below doesn't match the current code.
Please verify.

> +
> +// Create writer with fallback backend
> +let backend = RrdFallbackBackend::new("/var/run/rrdcached.sock").await?;
> +let writer = RrdWriter::new(backend);
> +
> +// Update node CPU metrics
> +writer.update(
> +    "pve/nodes/node1/cpu",
> +    &[0.45, 0.52, 0.38, 0.61], // CPU usage values
> +    None, // Use current timestamp
> +).await?;
> +
> +// Create new RRD file for VM
> +writer.create(
> +    "pve/qemu/100/cpu",
> +    1704067200, // Start timestamp
> +).await?;
> +```
> +
> +## External Dependencies
> +
> +- **rrd crate**: Provides Rust bindings to librrd (RRDtool C library)
> +- **rrdcached client**: Vendored and adapted from rrdcached-client v0.1.5 (Apache-2.0 license)
> +  - Original source: https://github.com/SINTEF/rrdcached-client
> +  - Vendored to gain full control and adapt to our specific needs
> +  - Can be disabled via the `rrdcached` feature flag
> +
> +## Testing
> +
> +Unit tests verify:
> +- Schema generation and validation
> +- Key parsing for different RRD types (node, VM, storage)
> +- RRD file creation and update operations
> +- rrdcached client connection and fallback behavior
> +
> +Run tests with:
> +```bash
> +cargo test -p pmxcfs-rrd
> +```
> +
> +## References
> +
> +- **C Implementation**: `src/pmxcfs/status.c` (RRD code embedded)
> +- **Related Crates**:
> +  - `pmxcfs-status` - Uses RrdWriter for metrics persistence
> +  - `pmxcfs` - FUSE `.rrd` plugin reads RRD files
> +- **RRDtool Documentation**: https://oss.oetiker.ch/rrdtool/
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
> new file mode 100644
> index 000000000..2fa4fa39d
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
> @@ -0,0 +1,62 @@
> +/// RRD Backend Trait and Implementations
> +///
> +/// This module provides an abstraction over different RRD writing mechanisms:
> +/// - Daemon-based (via rrdcached) for performance and batching
> +/// - Direct file writing for reliability and fallback scenarios
> +/// - Fallback composite that tries daemon first, then falls back to direct
> +///
> +/// This design matches the C implementation's behavior in status.c where
> +/// it attempts daemon update first, then falls back to direct file writes.
> +use super::schema::RrdSchema;
> +use anyhow::Result;
> +use async_trait::async_trait;
> +use std::path::Path;
> +
> +/// Constants for RRD configuration
> +pub const DEFAULT_SOCKET_PATH: &str = "/var/run/rrdcached.sock";
> +pub const RRD_STEP_SECONDS: u64 = 60;
> +
> +/// Trait for RRD backend implementations
> +///
> +/// Provides abstraction over different RRD writing mechanisms.
> +/// All methods are async to support both async (daemon) and sync (direct file) operations.
> +#[async_trait]
> +pub trait RrdBackend: Send + Sync {
> +    /// Update RRD file with new data
> +    ///
> +    /// # Arguments
> +    /// * `file_path` - Full path to the RRD file
> +    /// * `data` - Update data in format "timestamp:value1:value2:..."
> +    async fn update(&mut self, file_path: &Path, data: &str) -> Result<()>;
> +
> +    /// Create new RRD file with schema
> +    ///
> +    /// # Arguments
> +    /// * `file_path` - Full path where RRD file should be created
> +    /// * `schema` - RRD schema defining data sources and archives
> +    /// * `start_timestamp` - Start time for the RRD file (Unix timestamp)
> +    async fn create(
> +        &mut self,
> +        file_path: &Path,
> +        schema: &RrdSchema,
> +        start_timestamp: i64,
> +    ) -> Result<()>;
> +
> +    /// Flush pending updates to disk
> +    ///
> +    /// For daemon backends, this sends a FLUSH command.
> +    /// For direct backends, this is a no-op (writes are immediate).
> +    async fn flush(&mut self) -> Result<()>;
> +
> +    /// Get a human-readable name for this backend
> +    fn name(&self) -> &str;
> +}
> +
> +// Backend implementations
> +mod backend_daemon;

The rrdcached module is conditional, but the daemon backend is always 
included. Please feature gate this too.

> +mod backend_direct;
> +mod backend_fallback;
> +
> +pub use backend_daemon::RrdCachedBackend;

Also this should be gated, no?

And similarly please gate the daemon usage in backend_fallback.rs and 
writer.rs where the fallback backend tries to connect to the daemon.

> +pub use backend_direct::RrdDirectBackend;
> +pub use backend_fallback::RrdFallbackBackend;
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs

[..]

> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
> new file mode 100644
> index 000000000..fabe7e669
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
> @@ -0,0 +1,408 @@
> +/// RRD Key Type Parsing and Path Resolution
> +///
> +/// This module handles parsing RRD status update keys and mapping them
> +/// to the appropriate file paths and schemas.
> +use super::schema::{RrdFormat, RrdSchema};
> +use anyhow::{Context, Result};
> +use std::path::{Path, PathBuf};
> +
> +/// Metric type for determining column skipping rules
> +#[derive(Debug, Clone, Copy, PartialEq, Eq)]
> +pub enum MetricType {
> +    Node,
> +    Vm,
> +    Storage,
> +}
> +
> +impl MetricType {
> +    /// Number of non-archivable columns to skip from the start of the data string
> +    ///
> +    /// The data from pvestatd has non-archivable fields at the beginning:
> +    /// - Node: skip 2 (uptime, sublevel) - then ctime:loadavg:maxcpu:...
> +    /// - VM: skip 4 (uptime, name, status, template) - then ctime:maxcpu:cpu:...
> +    /// - Storage: skip 0 - data starts with ctime:total:used
> +    ///
> +    /// C implementation: status.c:1300 (node skip=2), status.c:1335 (VM skip=4)
> +    pub fn skip_columns(self) -> usize {
> +        match self {
> +            MetricType::Node => 2,
> +            MetricType::Vm => 4,
> +            MetricType::Storage => 0,
> +        }
> +    }
> +
> +    /// Get column count for a specific RRD format
> +    #[allow(dead_code)]
> +    pub fn column_count(self, format: RrdFormat) -> usize {
> +        match (format, self) {
> +            (RrdFormat::Pve2, MetricType::Node) => 12,
> +            (RrdFormat::Pve9_0, MetricType::Node) => 19,
> +            (RrdFormat::Pve2, MetricType::Vm) => 10,
> +            (RrdFormat::Pve9_0, MetricType::Vm) => 17,
> +            (_, MetricType::Storage) => 2, // Same for both formats
> +        }
> +    }
> +}
> +
> +/// RRD key types for routing to correct schema and path
> +///
> +/// This enum represents the different types of RRD metrics that pmxcfs tracks:
> +/// - Node metrics (CPU, memory, network for a node)
> +/// - VM metrics (CPU, memory, disk, network for a VM/CT)
> +/// - Storage metrics (total/used space for a storage)
> +#[derive(Debug, Clone, PartialEq, Eq)]
> +pub(crate) enum RrdKeyType {
> +    /// Node metrics: pve2-node/{nodename} or pve-node-9.0/{nodename}
> +    Node { nodename: String, format: RrdFormat },
> +    /// VM metrics: pve2.3-vm/{vmid} or pve-vm-9.0/{vmid}
> +    Vm { vmid: String, format: RrdFormat },
> +    /// Storage metrics: pve2-storage/{node}/{storage} or pve-storage-9.0/{node}/{storage}
> +    Storage {
> +        nodename: String,
> +        storage: String,
> +        format: RrdFormat,
> +    },
> +}
> +
> +impl RrdKeyType {
> +    /// Parse RRD key from status update key
> +    ///
> +    /// Supported formats:
> +    /// - "pve2-node/node1" → Node { nodename: "node1", format: Pve2 }
> +    /// - "pve-node-9.0/node1" → Node { nodename: "node1", format: Pve9_0 }
> +    /// - "pve2.3-vm/100" → Vm { vmid: "100", format: Pve2 }
> +    /// - "pve-storage-9.0/node1/local" → Storage { nodename: "node1", storage: "local", format: Pve9_0 }
> +    ///
> +    /// # Security
> +    ///
> +    /// Path components are validated to prevent directory traversal attacks:
> +    /// - Rejects paths containing ".."
> +    /// - Rejects absolute paths
> +    /// - Rejects paths with special characters that could be exploited
> +    pub(crate) fn parse(key: &str) -> Result<Self> {
> +        let parts: Vec<&str> = key.split('/').collect();
> +
> +        if parts.is_empty() {
> +            anyhow::bail!("Empty RRD key");
> +        }
> +
> +        // Validate all path components for security
> +        for part in &parts[1..] {
> +            Self::validate_path_component(part)?;
> +        }
> +
> +        match parts[0] {
> +            "pve2-node" => {
> +                let nodename = parts.get(1).context("Missing nodename")?.to_string();
> +                Ok(RrdKeyType::Node {
> +                    nodename,
> +                    format: RrdFormat::Pve2,
> +                })
> +            }
> +            prefix if prefix.starts_with("pve-node-") => {
> +                let nodename = parts.get(1).context("Missing nodename")?.to_string();
> +                Ok(RrdKeyType::Node {
> +                    nodename,
> +                    format: RrdFormat::Pve9_0,

"pve-node-9.0" matches, but so does "pve-node-9.1", "pve-node-10.0" all 
treated as Pve9_0

I think we maybe parse the suffix and match exactly?

> +                })
> +            }
> +            "pve2.3-vm" => {
> +                let vmid = parts.get(1).context("Missing vmid")?.to_string();
> +                Ok(RrdKeyType::Vm {
> +                    vmid,
> +                    format: RrdFormat::Pve2,
> +                })
> +            }
> +            prefix if prefix.starts_with("pve-vm-") => {
> +                let vmid = parts.get(1).context("Missing vmid")?.to_string();
> +                Ok(RrdKeyType::Vm {
> +                    vmid,
> +                    format: RrdFormat::Pve9_0,
> +                })
> +            }
> +            "pve2-storage" => {
> +                let nodename = parts.get(1).context("Missing nodename")?.to_string();
> +                let storage = parts.get(2).context("Missing storage")?.to_string();
> +                Ok(RrdKeyType::Storage {
> +                    nodename,
> +                    storage,
> +                    format: RrdFormat::Pve2,
> +                })
> +            }
> +            prefix if prefix.starts_with("pve-storage-") => {
> +                let nodename = parts.get(1).context("Missing nodename")?.to_string();
> +                let storage = parts.get(2).context("Missing storage")?.to_string();
> +                Ok(RrdKeyType::Storage {
> +                    nodename,
> +                    storage,
> +                    format: RrdFormat::Pve9_0,
> +                })
> +            }
> +            _ => anyhow::bail!("Unknown RRD key format: {key}"),
> +        }
> +    }
> +
> +    /// Validate a path component for security
> +    ///
> +    /// Prevents directory traversal attacks by rejecting:
> +    /// - ".." (parent directory)
> +    /// - Absolute paths (starting with "/")
> +    /// - Empty components
> +    /// - Components with null bytes or other dangerous characters
> +    fn validate_path_component(component: &str) -> Result<()> {
> +        if component.is_empty() {
> +            anyhow::bail!("Empty path component");
> +        }
> +
> +        if component == ".." {
> +            anyhow::bail!("Path traversal attempt: '..' not allowed");
> +        }
> +
> +        if component.starts_with('/') {
> +            anyhow::bail!("Absolute paths not allowed");
> +        }
> +
> +        if component.contains('\0') {
> +            anyhow::bail!("Null byte in path component");
> +        }
> +
> +        // Reject other potentially dangerous characters
> +        if component.contains(['\\', '\n', '\r']) {
> +            anyhow::bail!("Invalid characters in path component");
> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Get the RRD file path for this key type
> +    ///
> +    /// Always returns paths using the current format (9.0), regardless of the input format.
> +    /// This enables transparent format migration: old PVE8 nodes can send `pve2-node/` keys,
> +    /// and they'll be written to `pve-node-9.0/` files automatically.
> +    ///
> +    /// # Format Migration Strategy
> +    ///
> +    /// Returns the file path for this RRD key (without .rrd extension)
> +    ///
> +    /// The C implementation always creates files in the current format directory
> +    /// (see status.c:1287). This Rust implementation follows the same approach:
> +    /// - Input: `pve2-node/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1`
> +    /// - Input: `pve-node-9.0/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1`
> +    ///
> +    /// This allows rolling upgrades where old and new nodes coexist in the same cluster.
> +    ///
> +    /// Note: The path does NOT include .rrd extension, matching C implementation.
> +    /// The librrd functions (rrd_create_r, rrdc_update) add .rrd internally.
> +    pub(crate) fn file_path(&self, base_dir: &Path) -> PathBuf {
> +        match self {
> +            RrdKeyType::Node { nodename, .. } => {
> +                // Always use current format path
> +                base_dir.join("pve-node-9.0").join(nodename)
> +            }
> +            RrdKeyType::Vm { vmid, .. } => {
> +                // Always use current format path
> +                base_dir.join("pve-vm-9.0").join(vmid)
> +            }
> +            RrdKeyType::Storage {
> +                nodename, storage, ..
> +            } => {
> +                // Always use current format path
> +                base_dir
> +                    .join("pve-storage-9.0")
> +                    .join(nodename)
> +                    .join(storage)
> +            }
> +        }
> +    }
> +
> +    /// Get the source format from the input key
> +    ///
> +    /// This is used for data transformation (padding/truncation).
> +    pub(crate) fn source_format(&self) -> RrdFormat {
> +        match self {
> +            RrdKeyType::Node { format, .. }
> +            | RrdKeyType::Vm { format, .. }
> +            | RrdKeyType::Storage { format, .. } => *format,
> +        }
> +    }
> +
> +    /// Get the target RRD schema (always current format)
> +    ///
> +    /// Files are always created using the current format (Pve9_0),
> +    /// regardless of the source format in the key.
> +    pub(crate) fn schema(&self) -> RrdSchema {
> +        match self {
> +            RrdKeyType::Node { .. } => RrdSchema::node(RrdFormat::Pve9_0),
> +            RrdKeyType::Vm { .. } => RrdSchema::vm(RrdFormat::Pve9_0),
> +            RrdKeyType::Storage { .. } => RrdSchema::storage(RrdFormat::Pve9_0),
> +        }
> +    }
> +
> +    /// Get the metric type for this key
> +    pub(crate) fn metric_type(&self) -> MetricType {
> +        match self {
> +            RrdKeyType::Node { .. } => MetricType::Node,
> +            RrdKeyType::Vm { .. } => MetricType::Vm,
> +            RrdKeyType::Storage { .. } => MetricType::Storage,
> +        }
> +    }
> +}
> +
> +#[cfg(test)]
> +mod tests {

[..]

> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
> new file mode 100644
> index 000000000..8da6b633d
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
> @@ -0,0 +1,100 @@
> +use super::errors::RRDCachedClientError;
> +
> +pub fn check_data_source_name(name: &str) -> Result<(), RRDCachedClientError> {
> +    if name.is_empty() || name.len() > 64 {
> +        return Err(RRDCachedClientError::InvalidDataSourceName(
> +            "name must be between 1 and 64 characters".to_string(),
> +        ));
> +    }
> +    if !name
> +        .chars()
> +        .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
> +    {
> +        return Err(RRDCachedClientError::InvalidDataSourceName(
> +            "name must only contain alphanumeric characters and underscores".to_string(),
> +        ));
> +    }
> +    Ok(())
> +}
> +
> +pub fn check_rrd_path(name: &str) -> Result<(), RRDCachedClientError> {
> +    if name.is_empty() || name.len() > 64 {
> +        return Err(RRDCachedClientError::InvalidCreateDataSerie(
> +            "name must be between 1 and 64 characters".to_string(),
> +        ));
> +    }
> +    if !name
> +        .chars()
> +        .all(|c| c.is_alphanumeric() || c == '_' || c == '-')

This rejects "/" and ".", but we pass full system paths to it. Also 
please check the path length limitation above.

> +    {
> +        return Err(RRDCachedClientError::InvalidCreateDataSerie(
> +            "name must only contain alphanumeric characters and underscores".to_string(),
> +        ));
> +    }
> +    Ok(())
> +}

[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
[..]

> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
> new file mode 100644
> index 000000000..6c48940be
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
> @@ -0,0 +1,582 @@
> +/// RRD File Writer
> +///
> +/// Handles creating and updating RRD files via pluggable backends.
> +/// Supports daemon-based (rrdcached) and direct file writing modes.
> +use super::backend::{DEFAULT_SOCKET_PATH, RrdFallbackBackend};
> +use super::key_type::{MetricType, RrdKeyType};
> +use super::schema::{RrdFormat, RrdSchema};
> +use anyhow::{Context, Result};
> +use chrono::Local;
> +use std::fs;
> +use std::path::{Path, PathBuf};
> +
> +
> +/// RRD writer for persistent metric storage
> +///
> +/// Uses pluggable backends (daemon, direct, or fallback) for RRD operations.
> +pub struct RrdWriter {
> +    /// Base directory for RRD files (default: /var/lib/rrdcached/db)
> +    base_dir: PathBuf,
> +    /// Backend for RRD operations (daemon, direct, or fallback)
> +    backend: Box<dyn super::backend::RrdBackend>,
> +}
> +
> +impl RrdWriter {
> +    /// Create new RRD writer with default fallback backend
> +    ///
> +    /// Uses the fallback backend that tries daemon first, then falls back to direct file writes.
> +    /// This matches the C implementation's behavior.
> +    ///
> +    /// # Arguments
> +    /// * `base_dir` - Base directory for RRD files
> +    pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
> +        let backend = Self::default_backend().await?;
> +        Self::with_backend(base_dir, backend).await
> +    }
> +
> +    /// Create new RRD writer with specific backend
> +    ///
> +    /// # Arguments
> +    /// * `base_dir` - Base directory for RRD files
> +    /// * `backend` - RRD backend to use (daemon, direct, or fallback)
> +    pub(crate) async fn with_backend<P: AsRef<Path>>(
> +        base_dir: P,
> +        backend: Box<dyn super::backend::RrdBackend>,
> +    ) -> Result<Self> {
> +        let base_dir = base_dir.as_ref().to_path_buf();
> +
> +        // Create base directory if it doesn't exist
> +        fs::create_dir_all(&base_dir)
> +            .with_context(|| format!("Failed to create RRD base directory: {base_dir:?}"))?;
> +
> +        tracing::info!("RRD writer using backend: {}", backend.name());
> +
> +        Ok(Self { base_dir, backend })
> +    }
> +
> +    /// Create default backend (fallback: daemon + direct)
> +    ///
> +    /// This matches the C implementation's behavior:
> +    /// - Tries rrdcached daemon first for performance
> +    /// - Falls back to direct file writes if daemon fails
> +    async fn default_backend() -> Result<Box<dyn super::backend::RrdBackend>> {
> +        let backend = RrdFallbackBackend::new(DEFAULT_SOCKET_PATH).await;
> +        Ok(Box::new(backend))
> +    }
> +
> +    /// Update RRD file with metric data
> +    ///
> +    /// This will:
> +    /// 1. Transform data from source format to target format (padding/truncation/column skipping)
> +    /// 2. Create the RRD file if it doesn't exist
> +    /// 3. Update via rrdcached daemon
> +    ///
> +    /// # Arguments
> +    /// * `key` - RRD key (e.g., "pve2-node/node1", "pve-vm-9.0/100")
> +    /// * `data` - Raw metric data string from pvestatd (format: "skipped_fields...:ctime:val1:val2:...")
> +    pub async fn update(&mut self, key: &str, data: &str) -> Result<()> {
> +        // Parse the key to determine file path and schema
> +        let key_type = RrdKeyType::parse(key).with_context(|| format!("Invalid RRD key: {key}"))?;
> +
> +        // Get source format and target schema
> +        let source_format = key_type.source_format();
> +        let target_schema = key_type.schema();
> +        let metric_type = key_type.metric_type();
> +
> +        // Transform data from source to target format
> +        let transformed_data =
> +            Self::transform_data(data, source_format, &target_schema, metric_type)
> +                .with_context(|| format!("Failed to transform RRD data for key: {key}"))?;
> +
> +        // Get the file path (always uses current format)
> +        let file_path = key_type.file_path(&self.base_dir);
> +
> +        // Ensure the RRD file exists
> +        // Always check file existence directly - handles file deletion/rotation
> +        if !file_path.exists() {
> +            self.create_rrd_file(&key_type, &file_path).await?;

The on-disk naming convention for .rrd is inconsistent across the crate
and I think this can break the logic here.
file_path() in key_type.rs is documented as returning paths without
.rrd, and that's what this existence check runs against. But the vendored
rrdcached client in rrdcached/client.rs and rrdcached/create.rs
appends .rrd when building the update and create commands, so the
daemon backend creates files at path.rrd. Meanwhile the direct backend
tests in backend_direct.rs also construct paths with .rrd explicitly.

Can we please pin down whether the rrd crate's create() / update_all()
auto append .rrd or not then make one consistent decision and
align file_path(), the existence check, the vendored client and the
direct backend tests to the same convention?

> +        }
> +
> +        // Update the RRD file via backend
> +        self.backend.update(&file_path, &transformed_data).await?;
> +
> +        Ok(())
> +    }
> +
> +    /// Create RRD file with appropriate schema via backend
> +    async fn create_rrd_file(&mut self, key_type: &RrdKeyType, file_path: &Path) -> Result<()> {
> +        // Ensure parent directory exists
> +        if let Some(parent) = file_path.parent() {
> +            fs::create_dir_all(parent)
> +                .with_context(|| format!("Failed to create directory: {parent:?}"))?;
> +        }
> +
> +        // Get schema for this RRD type
> +        let schema = key_type.schema();
> +
> +        // Calculate start time (at day boundary, matching C implementation)
> +        // C uses localtime() (status.c:1206-1219), not UTC
> +        let now = Local::now();
> +        let start = now
> +            .date_naive()
> +            .and_hms_opt(0, 0, 0)
> +            .expect("00:00:00 is always a valid time")
> +            .and_local_timezone(Local)
> +            .single()

This might return None and would panic in that case.
Maybe earliest() would help here?

> +            .expect("Local midnight should have single timezone mapping");
> +        let start_timestamp = start.timestamp();
> +
> +        tracing::debug!(
> +            "Creating RRD file: {:?} with {} data sources via {}",
> +            file_path,
> +            schema.column_count(),
> +            self.backend.name()
> +        );
> +
> +        // Delegate to backend for creation
> +        self.backend
> +            .create(file_path, &schema, start_timestamp)
> +            .await?;
> +
> +        tracing::info!("Created RRD file: {:?} ({})", file_path, schema);
> +
> +        Ok(())
> +    }
> ++#[cfg(test)]
> +mod tests {
> +    use super::super::schema::{RrdFormat, RrdSchema};
> +    use super::*;
> +
+    #[test]
+    fn test_rrd_file_path_generation() {
+        let temp_dir = std::path::PathBuf::from("/tmp/test");
+
+        let key_node = RrdKeyType::Node {
+            nodename: "testnode".to_string(),
+            format: RrdFormat::Pve9_0,
+        };
+        let path = key_node.file_path(&temp_dir);
+        assert_eq!(path, temp_dir.join("pve-node-9.0").join("testnode"));
+    }
+
+    // ===== Format Adaptation Tests =====
+
+    #[test]
+    fn test_transform_data_node_pve2_to_pve9() {
+        // Test padding old format (12 archivable cols) to new format 
(19 archivable cols)
+        // pvestatd data format for node: 
"uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout"
+        // = 2 non-archivable + 1 timestamp + 12 archivable = 15 fields
+        let data = 
"1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, 
MetricType::Node).unwrap();
+
+        // After skip(2): "1234567890:1.5:4:2.0:0.5:...:500000" = 13 fields
+        // Pad to 20 total (timestamp + 19 values): 13 + 7 "U" = 20
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1234567890", "Timestamp should be 
preserved");
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+        assert_eq!(parts[1], "1.5", "First value after skip should be 
loadavg");
+        assert_eq!(parts[2], "4", "Second value should be maxcpu");
+        assert_eq!(parts[12], "500000", "Last data value should be 
netout");
+
+        // Check padding (7 columns: 19 - 12 = 7)
+        for (i, item) in parts.iter().enumerate().take(20).skip(13) {
+            assert_eq!(item, &"U", "Column {} should be padded with U", i);
+        }
+    }
+
+    #[test]
+    fn test_transform_data_vm_pve2_to_pve9() {
+        // Test VM transformation with 4 columns skipped
+        // pvestatd data format for VM: 
"uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite"
+        // = 4 non-archivable + 1 timestamp + 10 archivable = 15 fields
+        let data = 
"1000:myvm:1:0:1234567890:4:2:4096:2048:100000:50000:1000:500:100:50";
+
+        let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, 
MetricType::Vm).unwrap();
+
+        // After skip(4): "1234567890:4:2:4096:...:50" = 11 fields
+        // Pad to 18 total (timestamp + 17 values): 11 + 7 "U" = 18
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1234567890");
+        assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+        assert_eq!(parts[1], "4", "First value after skip should be 
maxcpu");
+        assert_eq!(parts[10], "50", "Last data value should be diskwrite");
+
+        // Check padding (7 columns: 17 - 10 = 7)
+        for (i, item) in parts.iter().enumerate().take(18).skip(11) {
+            assert_eq!(item, &"U", "Column {} should be padded", i);
+        }
+    }
+
+    #[test]
+    fn test_transform_data_no_padding_needed() {
+        // Test when source and target have same column count (Pve9_0 
node: 19 archivable cols)
+        // pvestatd format: 
"uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout:memavail:arcsize:cpu_some:io_some:io_full:mem_some:mem_full"
+        // = 2 non-archivable + 1 timestamp + 19 archivable = 22 fields
+        let data = 
"1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000:7000000000:0:0.12:0.05:0.02:0.08:0.03";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, 
MetricType::Node).unwrap();
+
+        // After skip(2): 20 fields = timestamp + 19 values (exact 
match, no padding)
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+        assert_eq!(parts[0], "1234567890", "Timestamp should be ctime");
+        assert_eq!(parts[1], "1.5", "First value after skip should be 
loadavg");
+        assert_eq!(parts[19], "0.03", "Last value should be mem_full 
(no padding)");
+    }
+
+    #[test]
+    fn test_transform_data_future_format_truncation() {
+        // Test truncation when a future format sends more columns than 
current pve9.0
+        // Simulating: uptime:sublevel:ctime:1:2:3:...:25 (2 skipped + 
timestamp + 25 archivable = 28 fields)
+        let data =
+ 
"999:0:1234567890:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20:21:22:23:24:25";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, 
MetricType::Node).unwrap();
+
+        // After skip(2): "1234567890:1:2:...:25" = 26 fields
+        // take(20): truncate to timestamp + 19 values
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts.len(), 20, "Should truncate to timestamp + 19 
values");
+        assert_eq!(parts[0], "1234567890", "Timestamp should be ctime");
+        assert_eq!(parts[1], "1", "First archivable value");
+        assert_eq!(parts[19], "19", "Last value should be column 19 
(truncated)");
+    }
+
+    #[test]
+    fn test_transform_data_storage_no_change() {
+        // Storage format is same for Pve2 and Pve9_0 (2 columns, no 
skipping)
+        let data = "1234567890:1000000000000:500000000000";
+
+        let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, 
MetricType::Storage).unwrap();
+
+        assert_eq!(result, data, "Storage data should not be transformed");
+    }
+
+    #[test]
+    fn test_metric_type_methods() {
+        assert_eq!(MetricType::Node.skip_columns(), 2);
+        assert_eq!(MetricType::Vm.skip_columns(), 4);
+        assert_eq!(MetricType::Storage.skip_columns(), 0);
+    }
+
+    #[test]
+    fn test_format_column_counts() {
+        assert_eq!(MetricType::Node.column_count(RrdFormat::Pve2), 12);
+        assert_eq!(MetricType::Node.column_count(RrdFormat::Pve9_0), 19);
+        assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve2), 10);
+        assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve9_0), 17);
+        assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve2), 2);
+        assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve9_0), 2);
+    }
+
+    // ===== Real Payload Fixtures from Production Systems =====
+    //
+    // These tests use actual RRD data captured from running PVE systems
+    // to validate transform_data() correctness against real-world 
payloads.
+
+    #[test]
+    fn test_real_payload_node_pve2() {
+        // Real pve2-node payload captured from PVE 6.x system
+        // Format: 
uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout
+        let data = 
"432156:0:1709123456:0.15:8:3.2:0.8:33554432000:12884901888:8589934592:0:107374182400:53687091200:1234567890:987654321";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, 
MetricType::Node).unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+
+        // Verify key metrics are preserved
+        assert_eq!(parts[1], "0.15", "Load average preserved");
+        assert_eq!(parts[2], "8", "Max CPU preserved");
+        assert_eq!(parts[3], "3.2", "CPU usage preserved");
+        assert_eq!(parts[4], "0.8", "IO wait preserved");
+
+        // Verify padding for new columns (7 new columns in Pve9_0)
+        for i in 13..20 {
+            assert_eq!(parts[i], "U", "New column {} should be padded", i);
+        }
+    }
+
+    #[test]
+    fn test_real_payload_vm_pve2() {
+        // Real pve2.3-vm payload captured from PVE 6.x system
+        // Format: 
uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite
+        let data = 
"86400:vm-100-disk-0:running:0:1709123456:4:45.3:8589934592:4294967296:107374182400:32212254720:123456789:98765432:1048576:2097152";
+
+        let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, 
MetricType::Vm).unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+        assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+
+        // Verify key metrics are preserved
+        assert_eq!(parts[1], "4", "Max CPU preserved");
+        assert_eq!(parts[2], "45.3", "CPU usage preserved");
+        assert_eq!(parts[3], "8589934592", "Max memory preserved");
+        assert_eq!(parts[4], "4294967296", "Memory usage preserved");
+
+        // Verify padding for new columns (7 new columns in Pve9_0)
+        for i in 11..18 {
+            assert_eq!(parts[i], "U", "New column {} should be padded", i);
+        }
+    }
+
+    #[test]
+    fn test_real_payload_storage_pve2() {
+        // Real pve2-storage payload captured from PVE 6.x system
+        // Format: ctime:total:used
+        let data = "1709123456:1099511627776:549755813888";
+
+        let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, 
MetricType::Storage)
+                .unwrap();
+
+        // Storage format unchanged between Pve2 and Pve9_0
+        assert_eq!(result, data, "Storage data should not be transformed");
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+        assert_eq!(parts[1], "1099511627776", "Total storage preserved");
+        assert_eq!(parts[2], "549755813888", "Used storage preserved");
+    }
+
+    #[test]
+    fn test_real_payload_node_pve9_0() {
+        // Real pve-node-9.0 payload from PVE 8.x system (already in 
target format)

Can we please add real binary fixtures instead?
We would catch more issues using that.

+        // Input has 19 fields, after skip(2) = 17 archivable columns
+        // Schema expects 19 archivable columns, so 2 "U" padding added
+        let data = 
"864321:0:1709123456:0.25:16:8.5:1.2:67108864000:25769803776:17179869184:0:214748364800:107374182400:2345678901:1876543210:x86_64:6.5.11:0.3:250";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, 
MetricType::Node)
+                .unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+
+        // Verify all columns preserved
+        assert_eq!(parts[1], "0.25", "Load average preserved");
+        assert_eq!(parts[13], "x86_64", "CPU info preserved");
+        assert_eq!(parts[14], "6.5.11", "Kernel version preserved");
+        assert_eq!(parts[15], "0.3", "Wait time preserved");
+        assert_eq!(parts[16], "250", "Process count preserved");
+
+        // Last 3 columns are padding (input had 17 archivable, schema 
expects 19)
+        assert_eq!(parts[17], "U", "Padding column 1");
+        assert_eq!(parts[18], "U", "Padding column 2");
+        assert_eq!(parts[19], "U", "Padding column 3");
+    }
+
+    #[test]
+    fn test_real_payload_with_missing_values() {
+        // Real payload with some missing values (represented as "U")
+        // This can happen when metrics are temporarily unavailable
+        let data = 
"432156:0:1709123456:0.15:8:U:0.8:33554432000:12884901888:U:0:107374182400:53687091200:1234567890:987654321";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, 
MetricType::Node).unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+
+        // Verify "U" values are preserved (after skip(2), positions shift)
+        assert_eq!(parts[3], "U", "Missing CPU value preserved as U");
+        assert_eq!(parts[7], "U", "Missing swap total preserved as U");
+    }
[..]




  reply	other threads:[~2026-03-13 14:10 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-02-13  9:33 [PATCH pve-cluster 00/14 v2] Rewrite pmxcfs with Rust Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 01/14 v2] pmxcfs-rs: add Rust workspace configuration Kefu Chai
2026-02-18 10:41   ` Samuel Rufinatscha
2026-02-13  9:33 ` [PATCH pve-cluster 02/14 v2] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-02-18 15:06   ` Samuel Rufinatscha
2026-02-13  9:33 ` [PATCH pve-cluster 03/14 v2] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-02-18 16:41   ` Samuel Rufinatscha
2026-02-13  9:33 ` [PATCH pve-cluster 04/14 v2] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-02-24 16:17   ` Samuel Rufinatscha
2026-02-13  9:33 ` [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-03-13 14:09   ` Samuel Rufinatscha [this message]
2026-02-13  9:33 ` [PATCH pve-cluster 06/14 v2] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 07/14 v2] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 08/14 v2] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 09/14 v2] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 10/14 v2] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 11/14 v2] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 12/14 v2] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-02-13  9:33 ` [PATCH pve-cluster 14/14 v2] pmxcfs-rs: add project documentation Kefu Chai

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=5b4be846-d09b-4306-8e5c-f52cbfbd4797@proxmox.com \
    --to=s.rufinatscha@proxmox.com \
    --cc=k.chai@proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal