From: Samuel Rufinatscha <s.rufinatscha@proxmox.com>
To: Kefu Chai <k.chai@proxmox.com>, pve-devel@lists.proxmox.com
Subject: Re: [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate
Date: Fri, 13 Mar 2026 15:09:52 +0100 [thread overview]
Message-ID: <5b4be846-d09b-4306-8e5c-f52cbfbd4797@proxmox.com> (raw)
In-Reply-To: <20260213094119.2379288-6-k.chai@proxmox.com>
Thanks also for this patch and for working through / including most of
my previous v1 suggestions.
Please see my comments inline.
On 2/13/26 10:47 AM, Kefu Chai wrote:
> Add RRD (Round-Robin Database) file persistence system:
> - RrdWriter: Main API for RRD operations
> - Schema definitions for CPU, memory, network metrics
> - Format migration support (v1/v2/v3)
> - rrdcached integration for batched writes
> - Data transformation for legacy formats
>
> This is an independent crate with no internal dependencies,
> only requiring external RRD libraries (rrd, rrdcached-client)
> and tokio for async operations. It handles time-series data
> storage compatible with the C implementation.
>
> Includes comprehensive unit tests for data transformation,
> schema generation, and multi-source data processing.
>
> Signed-off-by: Kefu Chai <k.chai@proxmox.com>
> ---
> src/pmxcfs-rs/Cargo.toml | 12 +
> src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml | 23 +
> src/pmxcfs-rs/pmxcfs-rrd/README.md | 119 ++++
> src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs | 62 ++
> .../pmxcfs-rrd/src/backend/backend_daemon.rs | 184 ++++++
> .../pmxcfs-rrd/src/backend/backend_direct.rs | 586 ++++++++++++++++++
> .../src/backend/backend_fallback.rs | 212 +++++++
> src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs | 140 +++++
This file doesn't seem to be included (there is no mod definition).
Please remove if not needed.
> src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs | 408 ++++++++++++
> src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs | 23 +
> src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs | 124 ++++
> .../pmxcfs-rrd/src/rrdcached/LICENSE | 21 +
> .../pmxcfs-rrd/src/rrdcached/client.rs | 208 +++++++
> .../src/rrdcached/consolidation_function.rs | 30 +
> .../pmxcfs-rrd/src/rrdcached/create.rs | 410 ++++++++++++
> .../pmxcfs-rrd/src/rrdcached/errors.rs | 29 +
> src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs | 45 ++
> src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs | 18 +
> .../pmxcfs-rrd/src/rrdcached/parsers.rs | 65 ++
> .../pmxcfs-rrd/src/rrdcached/sanitisation.rs | 100 +++
> src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs | 577 +++++++++++++++++
> src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs | 582 +++++++++++++++++
> 22 files changed, 3978 insertions(+)
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/README.md
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
> create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
>
> diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
> index d26fac04c..2457fe368 100644
> --- a/src/pmxcfs-rs/Cargo.toml
> +++ b/src/pmxcfs-rs/Cargo.toml
> @@ -4,6 +4,7 @@ members = [
> "pmxcfs-api-types", # Shared types and error definitions
> "pmxcfs-config", # Configuration management
> "pmxcfs-logger", # Cluster log with ring buffer and deduplication
> + "pmxcfs-rrd", # RRD (Round-Robin Database) persistence
> ]
> resolver = "2"
>
> @@ -20,16 +21,27 @@ rust-version = "1.85"
> pmxcfs-api-types = { path = "pmxcfs-api-types" }
> pmxcfs-config = { path = "pmxcfs-config" }
> pmxcfs-logger = { path = "pmxcfs-logger" }
> +pmxcfs-rrd = { path = "pmxcfs-rrd" }
> +
> +# Core async runtime
> +tokio = { version = "1.35", features = ["full"] }
>
> # Error handling
> +anyhow = "1.0"
> thiserror = "1.0"
>
> +# Logging and tracing
> +tracing = "0.1"
> +
> # Concurrency primitives
> parking_lot = "0.12"
>
> # System integration
> libc = "0.2"
>
> +# Development dependencies
> +tempfile = "3.8"
> +
> [workspace.lints.clippy]
> uninlined_format_args = "warn"
>
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
> new file mode 100644
> index 000000000..33c87ec91
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
> @@ -0,0 +1,23 @@
> +[package]
> +name = "pmxcfs-rrd"
> +version.workspace = true
> +edition.workspace = true
> +authors.workspace = true
> +license.workspace = true
> +
> +[features]
> +default = ["rrdcached"]
> +rrdcached = []
> +
> +[dependencies]
> +anyhow.workspace = true
> +async-trait = "0.1"
> +chrono = { version = "0.4", default-features = false, features = ["clock"] }
> +nom = "8.0"
Do we actually need this extra dependency?
It seems like we only use it for basic string operations.
> +rrd = "0.2"
> +thiserror = "2.0"
In the workspace there is already thiserror = "1.0".
Please align accordingly.
> +tokio.workspace = true
> +tracing.workspace = true
> +
> +[dev-dependencies]
> +tempfile.workspace = true
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/README.md b/src/pmxcfs-rs/pmxcfs-rrd/README.md
> new file mode 100644
> index 000000000..d6f6ad9b1
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/README.md
> @@ -0,0 +1,119 @@
> +# pmxcfs-rrd
> +
> +RRD (Round-Robin Database) persistence for pmxcfs performance metrics.
> +
> +## Overview
> +
> +This crate provides RRD file management for storing time-series performance data from Proxmox nodes and VMs. It handles file creation, updates, and integration with rrdcached daemon for efficient writes.
> +
> +### Key Features
> +
> +- RRD file creation with schema-based initialization
> +- RRD updates (write metrics to disk)
> +- rrdcached integration for batched writes
> +- Support for both legacy and current schema versions (v1/v2/v3)
> +- Type-safe key parsing and validation
> +- Compatible with existing C-created RRD files
> +
> +## Usage Flow
> +
> +The typical data flow through this crate:
> +
> +1. **Metrics Collection**: pmxcfs-status collects performance metrics (CPU, memory, network, etc.)
> +2. **Key Generation**: Metrics are organized by key type (node, VM, storage)
> +3. **Schema Selection**: Appropriate RRD schema is selected based on key type and version
> +4. **Data Transformation**: Legacy data (v1/v2) is transformed to current format (v3) if needed
> +5. **Backend Selection**:
> + - **Daemon backend**: Preferred for performance, batches writes via rrdcached
> + - **Direct backend**: Fallback using librrd directly when daemon unavailable
> + - **Fallback backend**: Tries daemon first, falls back to direct on failure
> +6. **File Operations**: Create RRD files if needed, update with new data points
> +
> +### Data Transformation
> +
> +The crate handles migration between schema versions:
> +- **v1 → v2**: Adds additional data sources for extended metrics
> +- **v2 → v3**: Consolidates and optimizes data sources
> +- **Transform logic**: `schema.rs:transform_data()` handles conversion, skipping incompatible entries
> +
> +### Backend Differences
> +
> +- **Daemon Backend** (`backend_daemon.rs`):
> + - Uses vendored rrdcached client for async communication
> + - Batches multiple updates for efficiency
> + - Requires rrdcached daemon running
> + - Best for high-frequency updates
And:
The C code tries rrdc_update() on every call, only falling back to
rrd_update_r() for that individual call if it fails, it doesn't
permanently disable the daemon path. So this is a difference too and
should be documented, or fixed.
> +
> +- **Direct Backend** (`backend_direct.rs`):
> + - Uses rrd crate (librrd FFI bindings) directly
> + - Synchronous file operations
> + - No external daemon required
> + - Reliable fallback option
> +
> +- **Fallback Backend** (`backend_fallback.rs`):
> + - Composite pattern: tries daemon, falls back to direct
> + - Matches C implementation behavior
> + - Provides best of both worlds
> +
> +## Module Structure
> +
> +| Module | Purpose |
> +|--------|---------|
> +| `writer.rs` | Main RrdWriter API - high-level interface for RRD operations |
> +| `schema.rs` | RRD schema definitions (DS, RRA) and data transformation logic |
> +| `key_type.rs` | RRD key parsing, validation, and path sanitization |
> +| `daemon.rs` | rrdcached daemon client wrapper |
> +| `backend.rs` | Backend trait and implementations (daemon/direct/fallback) |
> +| `rrdcached/` | Vendored rrdcached client implementation (adapted from rrdcached-client v0.1.5) |
> +
> +## Usage Example
> +
> +```rust
> +use pmxcfs_rrd::{RrdWriter, RrdFallbackBackend};
RrdFallbackBackend is not exported from lib.rs.
Also the signature below doesn't match the current code.
Please verify.
> +
> +// Create writer with fallback backend
> +let backend = RrdFallbackBackend::new("/var/run/rrdcached.sock").await?;
> +let writer = RrdWriter::new(backend);
> +
> +// Update node CPU metrics
> +writer.update(
> + "pve/nodes/node1/cpu",
> + &[0.45, 0.52, 0.38, 0.61], // CPU usage values
> + None, // Use current timestamp
> +).await?;
> +
> +// Create new RRD file for VM
> +writer.create(
> + "pve/qemu/100/cpu",
> + 1704067200, // Start timestamp
> +).await?;
> +```
> +
> +## External Dependencies
> +
> +- **rrd crate**: Provides Rust bindings to librrd (RRDtool C library)
> +- **rrdcached client**: Vendored and adapted from rrdcached-client v0.1.5 (Apache-2.0 license)
> + - Original source: https://github.com/SINTEF/rrdcached-client
> + - Vendored to gain full control and adapt to our specific needs
> + - Can be disabled via the `rrdcached` feature flag
> +
> +## Testing
> +
> +Unit tests verify:
> +- Schema generation and validation
> +- Key parsing for different RRD types (node, VM, storage)
> +- RRD file creation and update operations
> +- rrdcached client connection and fallback behavior
> +
> +Run tests with:
> +```bash
> +cargo test -p pmxcfs-rrd
> +```
> +
> +## References
> +
> +- **C Implementation**: `src/pmxcfs/status.c` (RRD code embedded)
> +- **Related Crates**:
> + - `pmxcfs-status` - Uses RrdWriter for metrics persistence
> + - `pmxcfs` - FUSE `.rrd` plugin reads RRD files
> +- **RRDtool Documentation**: https://oss.oetiker.ch/rrdtool/
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
> new file mode 100644
> index 000000000..2fa4fa39d
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
> @@ -0,0 +1,62 @@
> +/// RRD Backend Trait and Implementations
> +///
> +/// This module provides an abstraction over different RRD writing mechanisms:
> +/// - Daemon-based (via rrdcached) for performance and batching
> +/// - Direct file writing for reliability and fallback scenarios
> +/// - Fallback composite that tries daemon first, then falls back to direct
> +///
> +/// This design matches the C implementation's behavior in status.c where
> +/// it attempts daemon update first, then falls back to direct file writes.
> +use super::schema::RrdSchema;
> +use anyhow::Result;
> +use async_trait::async_trait;
> +use std::path::Path;
> +
> +/// Constants for RRD configuration
> +pub const DEFAULT_SOCKET_PATH: &str = "/var/run/rrdcached.sock";
> +pub const RRD_STEP_SECONDS: u64 = 60;
> +
> +/// Trait for RRD backend implementations
> +///
> +/// Provides abstraction over different RRD writing mechanisms.
> +/// All methods are async to support both async (daemon) and sync (direct file) operations.
> +#[async_trait]
> +pub trait RrdBackend: Send + Sync {
> + /// Update RRD file with new data
> + ///
> + /// # Arguments
> + /// * `file_path` - Full path to the RRD file
> + /// * `data` - Update data in format "timestamp:value1:value2:..."
> + async fn update(&mut self, file_path: &Path, data: &str) -> Result<()>;
> +
> + /// Create new RRD file with schema
> + ///
> + /// # Arguments
> + /// * `file_path` - Full path where RRD file should be created
> + /// * `schema` - RRD schema defining data sources and archives
> + /// * `start_timestamp` - Start time for the RRD file (Unix timestamp)
> + async fn create(
> + &mut self,
> + file_path: &Path,
> + schema: &RrdSchema,
> + start_timestamp: i64,
> + ) -> Result<()>;
> +
> + /// Flush pending updates to disk
> + ///
> + /// For daemon backends, this sends a FLUSH command.
> + /// For direct backends, this is a no-op (writes are immediate).
> + async fn flush(&mut self) -> Result<()>;
> +
> + /// Get a human-readable name for this backend
> + fn name(&self) -> &str;
> +}
> +
> +// Backend implementations
> +mod backend_daemon;
The rrdcached module is conditional, but the daemon backend is always
included. Please feature gate this too.
> +mod backend_direct;
> +mod backend_fallback;
> +
> +pub use backend_daemon::RrdCachedBackend;
Also this should be gated, no?
And similarly please gate the daemon usage in backend_fallback.rs and
writer.rs where the fallback backend tries to connect to the daemon.
> +pub use backend_direct::RrdDirectBackend;
> +pub use backend_fallback::RrdFallbackBackend;
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
[..]
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
> new file mode 100644
> index 000000000..fabe7e669
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
> @@ -0,0 +1,408 @@
> +/// RRD Key Type Parsing and Path Resolution
> +///
> +/// This module handles parsing RRD status update keys and mapping them
> +/// to the appropriate file paths and schemas.
> +use super::schema::{RrdFormat, RrdSchema};
> +use anyhow::{Context, Result};
> +use std::path::{Path, PathBuf};
> +
> +/// Metric type for determining column skipping rules
> +#[derive(Debug, Clone, Copy, PartialEq, Eq)]
> +pub enum MetricType {
> + Node,
> + Vm,
> + Storage,
> +}
> +
> +impl MetricType {
> + /// Number of non-archivable columns to skip from the start of the data string
> + ///
> + /// The data from pvestatd has non-archivable fields at the beginning:
> + /// - Node: skip 2 (uptime, sublevel) - then ctime:loadavg:maxcpu:...
> + /// - VM: skip 4 (uptime, name, status, template) - then ctime:maxcpu:cpu:...
> + /// - Storage: skip 0 - data starts with ctime:total:used
> + ///
> + /// C implementation: status.c:1300 (node skip=2), status.c:1335 (VM skip=4)
> + pub fn skip_columns(self) -> usize {
> + match self {
> + MetricType::Node => 2,
> + MetricType::Vm => 4,
> + MetricType::Storage => 0,
> + }
> + }
> +
> + /// Get column count for a specific RRD format
> + #[allow(dead_code)]
> + pub fn column_count(self, format: RrdFormat) -> usize {
> + match (format, self) {
> + (RrdFormat::Pve2, MetricType::Node) => 12,
> + (RrdFormat::Pve9_0, MetricType::Node) => 19,
> + (RrdFormat::Pve2, MetricType::Vm) => 10,
> + (RrdFormat::Pve9_0, MetricType::Vm) => 17,
> + (_, MetricType::Storage) => 2, // Same for both formats
> + }
> + }
> +}
> +
> +/// RRD key types for routing to correct schema and path
> +///
> +/// This enum represents the different types of RRD metrics that pmxcfs tracks:
> +/// - Node metrics (CPU, memory, network for a node)
> +/// - VM metrics (CPU, memory, disk, network for a VM/CT)
> +/// - Storage metrics (total/used space for a storage)
> +#[derive(Debug, Clone, PartialEq, Eq)]
> +pub(crate) enum RrdKeyType {
> + /// Node metrics: pve2-node/{nodename} or pve-node-9.0/{nodename}
> + Node { nodename: String, format: RrdFormat },
> + /// VM metrics: pve2.3-vm/{vmid} or pve-vm-9.0/{vmid}
> + Vm { vmid: String, format: RrdFormat },
> + /// Storage metrics: pve2-storage/{node}/{storage} or pve-storage-9.0/{node}/{storage}
> + Storage {
> + nodename: String,
> + storage: String,
> + format: RrdFormat,
> + },
> +}
> +
> +impl RrdKeyType {
> + /// Parse RRD key from status update key
> + ///
> + /// Supported formats:
> + /// - "pve2-node/node1" → Node { nodename: "node1", format: Pve2 }
> + /// - "pve-node-9.0/node1" → Node { nodename: "node1", format: Pve9_0 }
> + /// - "pve2.3-vm/100" → Vm { vmid: "100", format: Pve2 }
> + /// - "pve-storage-9.0/node1/local" → Storage { nodename: "node1", storage: "local", format: Pve9_0 }
> + ///
> + /// # Security
> + ///
> + /// Path components are validated to prevent directory traversal attacks:
> + /// - Rejects paths containing ".."
> + /// - Rejects absolute paths
> + /// - Rejects paths with special characters that could be exploited
> + pub(crate) fn parse(key: &str) -> Result<Self> {
> + let parts: Vec<&str> = key.split('/').collect();
> +
> + if parts.is_empty() {
> + anyhow::bail!("Empty RRD key");
> + }
> +
> + // Validate all path components for security
> + for part in &parts[1..] {
> + Self::validate_path_component(part)?;
> + }
> +
> + match parts[0] {
> + "pve2-node" => {
> + let nodename = parts.get(1).context("Missing nodename")?.to_string();
> + Ok(RrdKeyType::Node {
> + nodename,
> + format: RrdFormat::Pve2,
> + })
> + }
> + prefix if prefix.starts_with("pve-node-") => {
> + let nodename = parts.get(1).context("Missing nodename")?.to_string();
> + Ok(RrdKeyType::Node {
> + nodename,
> + format: RrdFormat::Pve9_0,
"pve-node-9.0" matches, but so does "pve-node-9.1", "pve-node-10.0" all
treated as Pve9_0
I think we maybe parse the suffix and match exactly?
> + })
> + }
> + "pve2.3-vm" => {
> + let vmid = parts.get(1).context("Missing vmid")?.to_string();
> + Ok(RrdKeyType::Vm {
> + vmid,
> + format: RrdFormat::Pve2,
> + })
> + }
> + prefix if prefix.starts_with("pve-vm-") => {
> + let vmid = parts.get(1).context("Missing vmid")?.to_string();
> + Ok(RrdKeyType::Vm {
> + vmid,
> + format: RrdFormat::Pve9_0,
> + })
> + }
> + "pve2-storage" => {
> + let nodename = parts.get(1).context("Missing nodename")?.to_string();
> + let storage = parts.get(2).context("Missing storage")?.to_string();
> + Ok(RrdKeyType::Storage {
> + nodename,
> + storage,
> + format: RrdFormat::Pve2,
> + })
> + }
> + prefix if prefix.starts_with("pve-storage-") => {
> + let nodename = parts.get(1).context("Missing nodename")?.to_string();
> + let storage = parts.get(2).context("Missing storage")?.to_string();
> + Ok(RrdKeyType::Storage {
> + nodename,
> + storage,
> + format: RrdFormat::Pve9_0,
> + })
> + }
> + _ => anyhow::bail!("Unknown RRD key format: {key}"),
> + }
> + }
> +
> + /// Validate a path component for security
> + ///
> + /// Prevents directory traversal attacks by rejecting:
> + /// - ".." (parent directory)
> + /// - Absolute paths (starting with "/")
> + /// - Empty components
> + /// - Components with null bytes or other dangerous characters
> + fn validate_path_component(component: &str) -> Result<()> {
> + if component.is_empty() {
> + anyhow::bail!("Empty path component");
> + }
> +
> + if component == ".." {
> + anyhow::bail!("Path traversal attempt: '..' not allowed");
> + }
> +
> + if component.starts_with('/') {
> + anyhow::bail!("Absolute paths not allowed");
> + }
> +
> + if component.contains('\0') {
> + anyhow::bail!("Null byte in path component");
> + }
> +
> + // Reject other potentially dangerous characters
> + if component.contains(['\\', '\n', '\r']) {
> + anyhow::bail!("Invalid characters in path component");
> + }
> +
> + Ok(())
> + }
> +
> + /// Get the RRD file path for this key type
> + ///
> + /// Always returns paths using the current format (9.0), regardless of the input format.
> + /// This enables transparent format migration: old PVE8 nodes can send `pve2-node/` keys,
> + /// and they'll be written to `pve-node-9.0/` files automatically.
> + ///
> + /// # Format Migration Strategy
> + ///
> + /// Returns the file path for this RRD key (without .rrd extension)
> + ///
> + /// The C implementation always creates files in the current format directory
> + /// (see status.c:1287). This Rust implementation follows the same approach:
> + /// - Input: `pve2-node/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1`
> + /// - Input: `pve-node-9.0/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1`
> + ///
> + /// This allows rolling upgrades where old and new nodes coexist in the same cluster.
> + ///
> + /// Note: The path does NOT include .rrd extension, matching C implementation.
> + /// The librrd functions (rrd_create_r, rrdc_update) add .rrd internally.
> + pub(crate) fn file_path(&self, base_dir: &Path) -> PathBuf {
> + match self {
> + RrdKeyType::Node { nodename, .. } => {
> + // Always use current format path
> + base_dir.join("pve-node-9.0").join(nodename)
> + }
> + RrdKeyType::Vm { vmid, .. } => {
> + // Always use current format path
> + base_dir.join("pve-vm-9.0").join(vmid)
> + }
> + RrdKeyType::Storage {
> + nodename, storage, ..
> + } => {
> + // Always use current format path
> + base_dir
> + .join("pve-storage-9.0")
> + .join(nodename)
> + .join(storage)
> + }
> + }
> + }
> +
> + /// Get the source format from the input key
> + ///
> + /// This is used for data transformation (padding/truncation).
> + pub(crate) fn source_format(&self) -> RrdFormat {
> + match self {
> + RrdKeyType::Node { format, .. }
> + | RrdKeyType::Vm { format, .. }
> + | RrdKeyType::Storage { format, .. } => *format,
> + }
> + }
> +
> + /// Get the target RRD schema (always current format)
> + ///
> + /// Files are always created using the current format (Pve9_0),
> + /// regardless of the source format in the key.
> + pub(crate) fn schema(&self) -> RrdSchema {
> + match self {
> + RrdKeyType::Node { .. } => RrdSchema::node(RrdFormat::Pve9_0),
> + RrdKeyType::Vm { .. } => RrdSchema::vm(RrdFormat::Pve9_0),
> + RrdKeyType::Storage { .. } => RrdSchema::storage(RrdFormat::Pve9_0),
> + }
> + }
> +
> + /// Get the metric type for this key
> + pub(crate) fn metric_type(&self) -> MetricType {
> + match self {
> + RrdKeyType::Node { .. } => MetricType::Node,
> + RrdKeyType::Vm { .. } => MetricType::Vm,
> + RrdKeyType::Storage { .. } => MetricType::Storage,
> + }
> + }
> +}
> +
> +#[cfg(test)]
> +mod tests {
[..]
> +}
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/parse.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/LICENSE
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/client.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/consolidation_function.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/create.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/errors.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/mod.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/now.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/parsers.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
> new file mode 100644
> index 000000000..8da6b633d
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/rrdcached/sanitisation.rs
> @@ -0,0 +1,100 @@
> +use super::errors::RRDCachedClientError;
> +
> +pub fn check_data_source_name(name: &str) -> Result<(), RRDCachedClientError> {
> + if name.is_empty() || name.len() > 64 {
> + return Err(RRDCachedClientError::InvalidDataSourceName(
> + "name must be between 1 and 64 characters".to_string(),
> + ));
> + }
> + if !name
> + .chars()
> + .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
> + {
> + return Err(RRDCachedClientError::InvalidDataSourceName(
> + "name must only contain alphanumeric characters and underscores".to_string(),
> + ));
> + }
> + Ok(())
> +}
> +
> +pub fn check_rrd_path(name: &str) -> Result<(), RRDCachedClientError> {
> + if name.is_empty() || name.len() > 64 {
> + return Err(RRDCachedClientError::InvalidCreateDataSerie(
> + "name must be between 1 and 64 characters".to_string(),
> + ));
> + }
> + if !name
> + .chars()
> + .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
This rejects "/" and ".", but we pass full system paths to it. Also
please check the path length limitation above.
> + {
> + return Err(RRDCachedClientError::InvalidCreateDataSerie(
> + "name must only contain alphanumeric characters and underscores".to_string(),
> + ));
> + }
> + Ok(())
> +}
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
[..]
> diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
> new file mode 100644
> index 000000000..6c48940be
> --- /dev/null
> +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
> @@ -0,0 +1,582 @@
> +/// RRD File Writer
> +///
> +/// Handles creating and updating RRD files via pluggable backends.
> +/// Supports daemon-based (rrdcached) and direct file writing modes.
> +use super::backend::{DEFAULT_SOCKET_PATH, RrdFallbackBackend};
> +use super::key_type::{MetricType, RrdKeyType};
> +use super::schema::{RrdFormat, RrdSchema};
> +use anyhow::{Context, Result};
> +use chrono::Local;
> +use std::fs;
> +use std::path::{Path, PathBuf};
> +
> +
> +/// RRD writer for persistent metric storage
> +///
> +/// Uses pluggable backends (daemon, direct, or fallback) for RRD operations.
> +pub struct RrdWriter {
> + /// Base directory for RRD files (default: /var/lib/rrdcached/db)
> + base_dir: PathBuf,
> + /// Backend for RRD operations (daemon, direct, or fallback)
> + backend: Box<dyn super::backend::RrdBackend>,
> +}
> +
> +impl RrdWriter {
> + /// Create new RRD writer with default fallback backend
> + ///
> + /// Uses the fallback backend that tries daemon first, then falls back to direct file writes.
> + /// This matches the C implementation's behavior.
> + ///
> + /// # Arguments
> + /// * `base_dir` - Base directory for RRD files
> + pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
> + let backend = Self::default_backend().await?;
> + Self::with_backend(base_dir, backend).await
> + }
> +
> + /// Create new RRD writer with specific backend
> + ///
> + /// # Arguments
> + /// * `base_dir` - Base directory for RRD files
> + /// * `backend` - RRD backend to use (daemon, direct, or fallback)
> + pub(crate) async fn with_backend<P: AsRef<Path>>(
> + base_dir: P,
> + backend: Box<dyn super::backend::RrdBackend>,
> + ) -> Result<Self> {
> + let base_dir = base_dir.as_ref().to_path_buf();
> +
> + // Create base directory if it doesn't exist
> + fs::create_dir_all(&base_dir)
> + .with_context(|| format!("Failed to create RRD base directory: {base_dir:?}"))?;
> +
> + tracing::info!("RRD writer using backend: {}", backend.name());
> +
> + Ok(Self { base_dir, backend })
> + }
> +
> + /// Create default backend (fallback: daemon + direct)
> + ///
> + /// This matches the C implementation's behavior:
> + /// - Tries rrdcached daemon first for performance
> + /// - Falls back to direct file writes if daemon fails
> + async fn default_backend() -> Result<Box<dyn super::backend::RrdBackend>> {
> + let backend = RrdFallbackBackend::new(DEFAULT_SOCKET_PATH).await;
> + Ok(Box::new(backend))
> + }
> +
> + /// Update RRD file with metric data
> + ///
> + /// This will:
> + /// 1. Transform data from source format to target format (padding/truncation/column skipping)
> + /// 2. Create the RRD file if it doesn't exist
> + /// 3. Update via rrdcached daemon
> + ///
> + /// # Arguments
> + /// * `key` - RRD key (e.g., "pve2-node/node1", "pve-vm-9.0/100")
> + /// * `data` - Raw metric data string from pvestatd (format: "skipped_fields...:ctime:val1:val2:...")
> + pub async fn update(&mut self, key: &str, data: &str) -> Result<()> {
> + // Parse the key to determine file path and schema
> + let key_type = RrdKeyType::parse(key).with_context(|| format!("Invalid RRD key: {key}"))?;
> +
> + // Get source format and target schema
> + let source_format = key_type.source_format();
> + let target_schema = key_type.schema();
> + let metric_type = key_type.metric_type();
> +
> + // Transform data from source to target format
> + let transformed_data =
> + Self::transform_data(data, source_format, &target_schema, metric_type)
> + .with_context(|| format!("Failed to transform RRD data for key: {key}"))?;
> +
> + // Get the file path (always uses current format)
> + let file_path = key_type.file_path(&self.base_dir);
> +
> + // Ensure the RRD file exists
> + // Always check file existence directly - handles file deletion/rotation
> + if !file_path.exists() {
> + self.create_rrd_file(&key_type, &file_path).await?;
The on-disk naming convention for .rrd is inconsistent across the crate
and I think this can break the logic here.
file_path() in key_type.rs is documented as returning paths without
.rrd, and that's what this existence check runs against. But the vendored
rrdcached client in rrdcached/client.rs and rrdcached/create.rs
appends .rrd when building the update and create commands, so the
daemon backend creates files at path.rrd. Meanwhile the direct backend
tests in backend_direct.rs also construct paths with .rrd explicitly.
Can we please pin down whether the rrd crate's create() / update_all()
auto append .rrd or not then make one consistent decision and
align file_path(), the existence check, the vendored client and the
direct backend tests to the same convention?
> + }
> +
> + // Update the RRD file via backend
> + self.backend.update(&file_path, &transformed_data).await?;
> +
> + Ok(())
> + }
> +
> + /// Create RRD file with appropriate schema via backend
> + async fn create_rrd_file(&mut self, key_type: &RrdKeyType, file_path: &Path) -> Result<()> {
> + // Ensure parent directory exists
> + if let Some(parent) = file_path.parent() {
> + fs::create_dir_all(parent)
> + .with_context(|| format!("Failed to create directory: {parent:?}"))?;
> + }
> +
> + // Get schema for this RRD type
> + let schema = key_type.schema();
> +
> + // Calculate start time (at day boundary, matching C implementation)
> + // C uses localtime() (status.c:1206-1219), not UTC
> + let now = Local::now();
> + let start = now
> + .date_naive()
> + .and_hms_opt(0, 0, 0)
> + .expect("00:00:00 is always a valid time")
> + .and_local_timezone(Local)
> + .single()
This might return None and would panic in that case.
Maybe earliest() would help here?
> + .expect("Local midnight should have single timezone mapping");
> + let start_timestamp = start.timestamp();
> +
> + tracing::debug!(
> + "Creating RRD file: {:?} with {} data sources via {}",
> + file_path,
> + schema.column_count(),
> + self.backend.name()
> + );
> +
> + // Delegate to backend for creation
> + self.backend
> + .create(file_path, &schema, start_timestamp)
> + .await?;
> +
> + tracing::info!("Created RRD file: {:?} ({})", file_path, schema);
> +
> + Ok(())
> + }
> ++#[cfg(test)]
> +mod tests {
> + use super::super::schema::{RrdFormat, RrdSchema};
> + use super::*;
> +
+ #[test]
+ fn test_rrd_file_path_generation() {
+ let temp_dir = std::path::PathBuf::from("/tmp/test");
+
+ let key_node = RrdKeyType::Node {
+ nodename: "testnode".to_string(),
+ format: RrdFormat::Pve9_0,
+ };
+ let path = key_node.file_path(&temp_dir);
+ assert_eq!(path, temp_dir.join("pve-node-9.0").join("testnode"));
+ }
+
+ // ===== Format Adaptation Tests =====
+
+ #[test]
+ fn test_transform_data_node_pve2_to_pve9() {
+ // Test padding old format (12 archivable cols) to new format
(19 archivable cols)
+ // pvestatd data format for node:
"uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout"
+ // = 2 non-archivable + 1 timestamp + 12 archivable = 15 fields
+ let data =
"1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Node).unwrap();
+
+ // After skip(2): "1234567890:1.5:4:2.0:0.5:...:500000" = 13 fields
+ // Pad to 20 total (timestamp + 19 values): 13 + 7 "U" = 20
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1234567890", "Timestamp should be
preserved");
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+ assert_eq!(parts[1], "1.5", "First value after skip should be
loadavg");
+ assert_eq!(parts[2], "4", "Second value should be maxcpu");
+ assert_eq!(parts[12], "500000", "Last data value should be
netout");
+
+ // Check padding (7 columns: 19 - 12 = 7)
+ for (i, item) in parts.iter().enumerate().take(20).skip(13) {
+ assert_eq!(item, &"U", "Column {} should be padded with U", i);
+ }
+ }
+
+ #[test]
+ fn test_transform_data_vm_pve2_to_pve9() {
+ // Test VM transformation with 4 columns skipped
+ // pvestatd data format for VM:
"uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite"
+ // = 4 non-archivable + 1 timestamp + 10 archivable = 15 fields
+ let data =
"1000:myvm:1:0:1234567890:4:2:4096:2048:100000:50000:1000:500:100:50";
+
+ let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Vm).unwrap();
+
+ // After skip(4): "1234567890:4:2:4096:...:50" = 11 fields
+ // Pad to 18 total (timestamp + 17 values): 11 + 7 "U" = 18
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1234567890");
+ assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+ assert_eq!(parts[1], "4", "First value after skip should be
maxcpu");
+ assert_eq!(parts[10], "50", "Last data value should be diskwrite");
+
+ // Check padding (7 columns: 17 - 10 = 7)
+ for (i, item) in parts.iter().enumerate().take(18).skip(11) {
+ assert_eq!(item, &"U", "Column {} should be padded", i);
+ }
+ }
+
+ #[test]
+ fn test_transform_data_no_padding_needed() {
+ // Test when source and target have same column count (Pve9_0
node: 19 archivable cols)
+ // pvestatd format:
"uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:root_t:root_u:netin:netout:memavail:arcsize:cpu_some:io_some:io_full:mem_some:mem_full"
+ // = 2 non-archivable + 1 timestamp + 19 archivable = 22 fields
+ let data =
"1000:0:1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000:7000000000:0:0.12:0.05:0.02:0.08:0.03";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema,
MetricType::Node).unwrap();
+
+ // After skip(2): 20 fields = timestamp + 19 values (exact
match, no padding)
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+ assert_eq!(parts[0], "1234567890", "Timestamp should be ctime");
+ assert_eq!(parts[1], "1.5", "First value after skip should be
loadavg");
+ assert_eq!(parts[19], "0.03", "Last value should be mem_full
(no padding)");
+ }
+
+ #[test]
+ fn test_transform_data_future_format_truncation() {
+ // Test truncation when a future format sends more columns than
current pve9.0
+ // Simulating: uptime:sublevel:ctime:1:2:3:...:25 (2 skipped +
timestamp + 25 archivable = 28 fields)
+ let data =
+
"999:0:1234567890:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20:21:22:23:24:25";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema,
MetricType::Node).unwrap();
+
+ // After skip(2): "1234567890:1:2:...:25" = 26 fields
+ // take(20): truncate to timestamp + 19 values
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts.len(), 20, "Should truncate to timestamp + 19
values");
+ assert_eq!(parts[0], "1234567890", "Timestamp should be ctime");
+ assert_eq!(parts[1], "1", "First archivable value");
+ assert_eq!(parts[19], "19", "Last value should be column 19
(truncated)");
+ }
+
+ #[test]
+ fn test_transform_data_storage_no_change() {
+ // Storage format is same for Pve2 and Pve9_0 (2 columns, no
skipping)
+ let data = "1234567890:1000000000000:500000000000";
+
+ let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Storage).unwrap();
+
+ assert_eq!(result, data, "Storage data should not be transformed");
+ }
+
+ #[test]
+ fn test_metric_type_methods() {
+ assert_eq!(MetricType::Node.skip_columns(), 2);
+ assert_eq!(MetricType::Vm.skip_columns(), 4);
+ assert_eq!(MetricType::Storage.skip_columns(), 0);
+ }
+
+ #[test]
+ fn test_format_column_counts() {
+ assert_eq!(MetricType::Node.column_count(RrdFormat::Pve2), 12);
+ assert_eq!(MetricType::Node.column_count(RrdFormat::Pve9_0), 19);
+ assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve2), 10);
+ assert_eq!(MetricType::Vm.column_count(RrdFormat::Pve9_0), 17);
+ assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve2), 2);
+ assert_eq!(MetricType::Storage.column_count(RrdFormat::Pve9_0), 2);
+ }
+
+ // ===== Real Payload Fixtures from Production Systems =====
+ //
+ // These tests use actual RRD data captured from running PVE systems
+ // to validate transform_data() correctness against real-world
payloads.
+
+ #[test]
+ fn test_real_payload_node_pve2() {
+ // Real pve2-node payload captured from PVE 6.x system
+ // Format:
uptime:sublevel:ctime:loadavg:maxcpu:cpu:iowait:memtotal:memused:swaptotal:swapused:roottotal:rootused:netin:netout
+ let data =
"432156:0:1709123456:0.15:8:3.2:0.8:33554432000:12884901888:8589934592:0:107374182400:53687091200:1234567890:987654321";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Node).unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+
+ // Verify key metrics are preserved
+ assert_eq!(parts[1], "0.15", "Load average preserved");
+ assert_eq!(parts[2], "8", "Max CPU preserved");
+ assert_eq!(parts[3], "3.2", "CPU usage preserved");
+ assert_eq!(parts[4], "0.8", "IO wait preserved");
+
+ // Verify padding for new columns (7 new columns in Pve9_0)
+ for i in 13..20 {
+ assert_eq!(parts[i], "U", "New column {} should be padded", i);
+ }
+ }
+
+ #[test]
+ fn test_real_payload_vm_pve2() {
+ // Real pve2.3-vm payload captured from PVE 6.x system
+ // Format:
uptime:name:status:template:ctime:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite
+ let data =
"86400:vm-100-disk-0:running:0:1709123456:4:45.3:8589934592:4294967296:107374182400:32212254720:123456789:98765432:1048576:2097152";
+
+ let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Vm).unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+ assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+
+ // Verify key metrics are preserved
+ assert_eq!(parts[1], "4", "Max CPU preserved");
+ assert_eq!(parts[2], "45.3", "CPU usage preserved");
+ assert_eq!(parts[3], "8589934592", "Max memory preserved");
+ assert_eq!(parts[4], "4294967296", "Memory usage preserved");
+
+ // Verify padding for new columns (7 new columns in Pve9_0)
+ for i in 11..18 {
+ assert_eq!(parts[i], "U", "New column {} should be padded", i);
+ }
+ }
+
+ #[test]
+ fn test_real_payload_storage_pve2() {
+ // Real pve2-storage payload captured from PVE 6.x system
+ // Format: ctime:total:used
+ let data = "1709123456:1099511627776:549755813888";
+
+ let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Storage)
+ .unwrap();
+
+ // Storage format unchanged between Pve2 and Pve9_0
+ assert_eq!(result, data, "Storage data should not be transformed");
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+ assert_eq!(parts[1], "1099511627776", "Total storage preserved");
+ assert_eq!(parts[2], "549755813888", "Used storage preserved");
+ }
+
+ #[test]
+ fn test_real_payload_node_pve9_0() {
+ // Real pve-node-9.0 payload from PVE 8.x system (already in
target format)
Can we please add real binary fixtures instead?
We would catch more issues using that.
+ // Input has 19 fields, after skip(2) = 17 archivable columns
+ // Schema expects 19 archivable columns, so 2 "U" padding added
+ let data =
"864321:0:1709123456:0.25:16:8.5:1.2:67108864000:25769803776:17179869184:0:214748364800:107374182400:2345678901:1876543210:x86_64:6.5.11:0.3:250";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema,
MetricType::Node)
+ .unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values");
+
+ // Verify all columns preserved
+ assert_eq!(parts[1], "0.25", "Load average preserved");
+ assert_eq!(parts[13], "x86_64", "CPU info preserved");
+ assert_eq!(parts[14], "6.5.11", "Kernel version preserved");
+ assert_eq!(parts[15], "0.3", "Wait time preserved");
+ assert_eq!(parts[16], "250", "Process count preserved");
+
+ // Last 3 columns are padding (input had 17 archivable, schema
expects 19)
+ assert_eq!(parts[17], "U", "Padding column 1");
+ assert_eq!(parts[18], "U", "Padding column 2");
+ assert_eq!(parts[19], "U", "Padding column 3");
+ }
+
+ #[test]
+ fn test_real_payload_with_missing_values() {
+ // Real payload with some missing values (represented as "U")
+ // This can happen when metrics are temporarily unavailable
+ let data =
"432156:0:1709123456:0.15:8:U:0.8:33554432000:12884901888:U:0:107374182400:53687091200:1234567890:987654321";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema,
MetricType::Node).unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1709123456", "Timestamp preserved");
+
+ // Verify "U" values are preserved (after skip(2), positions shift)
+ assert_eq!(parts[3], "U", "Missing CPU value preserved as U");
+ assert_eq!(parts[7], "U", "Missing swap total preserved as U");
+ }
[..]
next prev parent reply other threads:[~2026-03-13 14:10 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-02-13 9:33 [PATCH pve-cluster 00/14 v2] Rewrite pmxcfs with Rust Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 01/14 v2] pmxcfs-rs: add Rust workspace configuration Kefu Chai
2026-02-18 10:41 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 02/14 v2] pmxcfs-rs: add pmxcfs-api-types crate Kefu Chai
2026-02-18 15:06 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 03/14 v2] pmxcfs-rs: add pmxcfs-config crate Kefu Chai
2026-02-18 16:41 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 04/14 v2] pmxcfs-rs: add pmxcfs-logger crate Kefu Chai
2026-02-24 16:17 ` Samuel Rufinatscha
2026-02-13 9:33 ` [PATCH pve-cluster 05/14 v2] pmxcfs-rs: add pmxcfs-rrd crate Kefu Chai
2026-03-13 14:09 ` Samuel Rufinatscha [this message]
2026-02-13 9:33 ` [PATCH pve-cluster 06/14 v2] pmxcfs-rs: add pmxcfs-memdb crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 07/14 v2] pmxcfs-rs: add pmxcfs-status and pmxcfs-test-utils crates Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 08/14 v2] pmxcfs-rs: add pmxcfs-services crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 09/14 v2] pmxcfs-rs: add pmxcfs-ipc crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 10/14 v2] pmxcfs-rs: add pmxcfs-dfsm crate Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 11/14 v2] pmxcfs-rs: vendor patched rust-corosync for CPG compatibility Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 12/14 v2] pmxcfs-rs: add pmxcfs main daemon binary Kefu Chai
2026-02-13 9:33 ` [PATCH pve-cluster 14/14 v2] pmxcfs-rs: add project documentation Kefu Chai
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=5b4be846-d09b-4306-8e5c-f52cbfbd4797@proxmox.com \
--to=s.rufinatscha@proxmox.com \
--cc=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox