all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Aaron Lauterer <a.lauterer@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH pve9-rrd-migration-tool v3 1/1] introduce rrd migration tool for pve8 -> pve9
Date: Tue, 15 Jul 2025 16:31:49 +0200	[thread overview]
Message-ID: <20250715143218.1548306-6-a.lauterer@proxmox.com> (raw)
In-Reply-To: <20250715143218.1548306-1-a.lauterer@proxmox.com>

Signed-off-by: Aaron Lauterer <a.lauterer@proxmox.com>
---
 .cargo/config.toml      |   5 +
 .gitignore              |   5 +
 Cargo.toml              |  20 ++
 build.rs                |  29 +++
 src/lib.rs              |   5 +
 src/main.rs             | 502 ++++++++++++++++++++++++++++++++++++++++
 src/parallel_handler.rs | 162 +++++++++++++
 wrapper.h               |   1 +
 8 files changed, 729 insertions(+)
 create mode 100644 .cargo/config.toml
 create mode 100644 .gitignore
 create mode 100644 Cargo.toml
 create mode 100644 build.rs
 create mode 100644 src/lib.rs
 create mode 100644 src/main.rs
 create mode 100644 src/parallel_handler.rs
 create mode 100644 wrapper.h

diff --git a/.cargo/config.toml b/.cargo/config.toml
new file mode 100644
index 0000000..3b5b6e4
--- /dev/null
+++ b/.cargo/config.toml
@@ -0,0 +1,5 @@
+[source]
+[source.debian-packages]
+directory = "/usr/share/cargo/registry"
+[source.crates-io]
+replace-with = "debian-packages"
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7741e63
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+./target
+./build
+
+Cargo.lock
+
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..d3523f3
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox_rrd_migration_8-9"
+version = "0.1.0"
+edition = "2021"
+authors = [
+    "Aaron Lauterer <a.lauterer@proxmox.com>",
+    "Proxmox Support Team <support@proxmox.com>",
+]
+license = "AGPL-3"
+homepage = "https://www.proxmox.com"
+
+[dependencies]
+anyhow = "1.0.86"
+pico-args = "0.5.0"
+proxmox-async = "0.4"
+crossbeam-channel = "0.5"
+
+[build-dependencies]
+bindgen = "0.66.1"
+pkg-config = "0.3"
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..56d07cc
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,29 @@
+use std::env;
+use std::path::PathBuf;
+
+fn main() {
+    println!("cargo:rustc-link-lib=rrd");
+
+    println!("cargo:rerun-if-changed=wrapper.h");
+    // The bindgen::Builder is the main entry point
+    // to bindgen, and lets you build up options for
+    // the resulting bindings.
+
+    let bindings = bindgen::Builder::default()
+        // The input header we would like to generate
+        // bindings for.
+        .header("wrapper.h")
+        // Tell cargo to invalidate the built crate whenever any of the
+        // included header files changed.
+        .parse_callbacks(Box::new(bindgen::CargoCallbacks))
+        // Finish the builder and generate the bindings.
+        .generate()
+        // Unwrap the Result and panic on failure.
+        .expect("Unable to generate bindings");
+
+    // Write the bindings to the $OUT_DIR/bindings.rs file.
+    let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
+    bindings
+        .write_to_file(out_path.join("bindings.rs"))
+        .expect("Couldn't write bindings!");
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..a38a13a
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,5 @@
+#![allow(non_upper_case_globals)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+
+include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..6e6f91c
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,502 @@
+use anyhow::{bail, Error, Result};
+use proxmox_rrd_migration_8_9::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error};
+use std::ffi::{CStr, CString, OsString};
+use std::fs;
+use std::os::unix::ffi::OsStrExt;
+use std::os::unix::fs::PermissionsExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use crate::parallel_handler::ParallelHandler;
+
+pub mod parallel_handler;
+
+const BASE_DIR: &str = "/var/lib/rrdcached/db";
+const SOURCE_SUBDIR_NODE: &str = "pve2-node";
+const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
+const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
+const TARGET_SUBDIR_NODE: &str = "pve-node-9.0";
+const TARGET_SUBDIR_GUEST: &str = "pve-vm-9.0";
+const TARGET_SUBDIR_STORAGE: &str = "pve-storage-9.0";
+const MAX_THREADS: usize = 4;
+const RRD_STEP_SIZE: usize = 60;
+
+// RRAs are defined in the following way:
+//
+// RRA:CF:xff:step:rows
+// CF: AVERAGE or MAX
+// xff: 0.5
+// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size:
+//	e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
+// rows: how many aggregated rows are kept, as in how far back in time we store data
+//
+// how many seconds are aggregated per RRA: steps * stepsize * rows
+// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
+// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
+// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
+
+const RRD_VM_DEF: [&CStr; 25] = [
+    c"DS:maxcpu:GAUGE:120:0:U",
+    c"DS:cpu:GAUGE:120:0:U",
+    c"DS:maxmem:GAUGE:120:0:U",
+    c"DS:mem:GAUGE:120:0:U",
+    c"DS:maxdisk:GAUGE:120:0:U",
+    c"DS:disk:GAUGE:120:0:U",
+    c"DS:netin:DERIVE:120:0:U",
+    c"DS:netout:DERIVE:120:0:U",
+    c"DS:diskread:DERIVE:120:0:U",
+    c"DS:diskwrite:DERIVE:120:0:U",
+    c"DS:memhost:GAUGE:120:0:U",
+    c"DS:pressurecpusome:GAUGE:120:0:U",
+    c"DS:pressurecpufull:GAUGE:120:0:U",
+    c"DS:pressureiosome:GAUGE:120:0:U",
+    c"DS:pressureiofull:GAUGE:120:0:U",
+    c"DS:pressurememorysome:GAUGE:120:0:U",
+    c"DS:pressurememoryfull:GAUGE:120:0:U",
+    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
+    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
+    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
+    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
+    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
+];
+
+const RRD_NODE_DEF: [&CStr; 27] = [
+    c"DS:loadavg:GAUGE:120:0:U",
+    c"DS:maxcpu:GAUGE:120:0:U",
+    c"DS:cpu:GAUGE:120:0:U",
+    c"DS:iowait:GAUGE:120:0:U",
+    c"DS:memtotal:GAUGE:120:0:U",
+    c"DS:memused:GAUGE:120:0:U",
+    c"DS:swaptotal:GAUGE:120:0:U",
+    c"DS:swapused:GAUGE:120:0:U",
+    c"DS:roottotal:GAUGE:120:0:U",
+    c"DS:rootused:GAUGE:120:0:U",
+    c"DS:netin:DERIVE:120:0:U",
+    c"DS:netout:DERIVE:120:0:U",
+    c"DS:memfree:GAUGE:120:0:U",
+    c"DS:arcsize:GAUGE:120:0:U",
+    c"DS:pressurecpusome:GAUGE:120:0:U",
+    c"DS:pressureiosome:GAUGE:120:0:U",
+    c"DS:pressureiofull:GAUGE:120:0:U",
+    c"DS:pressurememorysome:GAUGE:120:0:U",
+    c"DS:pressurememoryfull:GAUGE:120:0:U",
+    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
+    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
+    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
+    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
+    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
+];
+
+const RRD_STORAGE_DEF: [&CStr; 10] = [
+    c"DS:total:GAUGE:120:0:U",
+    c"DS:used:GAUGE:120:0:U",
+    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
+    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
+    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
+    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
+    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
+    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
+];
+
+const HELP: &str = "\
+proxmox-rrd-migration tool
+
+Migrates existing RRD graph data to the new format.
+
+Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide!
+
+USAGE:
+    proxmox-rrd-migration [OPTIONS]
+
+    FLAGS:
+        -h, --help                  Prints this help information
+
+    OPTIONS:
+        --force                     Migrate, even if the target already exists.
+                                    This will overwrite any migrated RRD files!
+
+        --threads THREADS           Number of paralell threads. Default from 1 to 4.
+
+        --test                      For internal use only.
+                                    Tests parallel guest migration only!
+        --source                    For internal use only. Source directory.
+        --target                    For internal use only. Target directory.
+      ";
+
+#[derive(Debug)]
+struct Args {
+    force: bool,
+    threads: Option<usize>,
+    test: bool,
+    source: Option<PathBuf>,
+    target: Option<PathBuf>,
+}
+
+fn parse_args() -> Result<Args, Error> {
+    let mut pargs = pico_args::Arguments::from_env();
+
+    // Help has a higher priority and should be handled separately.
+    if pargs.contains(["-h", "--help"]) {
+        print!("{}", HELP);
+        std::process::exit(0);
+    }
+
+    let mut args = Args {
+        threads: pargs.opt_value_from_str("--threads").unwrap(),
+        force: false,
+        test: false,
+        source: pargs.opt_value_from_str("--source").unwrap(),
+        target: pargs.opt_value_from_str("--target").unwrap(),
+    };
+
+    if pargs.contains("--test") {
+        args.test = true;
+    }
+    if pargs.contains("--force") {
+        args.force = true;
+    }
+
+    // It's up to the caller what to do with the remaining arguments.
+    let remaining = pargs.finish();
+    if !remaining.is_empty() {
+        bail!(format!("Warning: unused arguments left: {:?}", remaining));
+    }
+
+    Ok(args)
+}
+
+fn main() {
+    let args = match parse_args() {
+        Ok(v) => v,
+        Err(e) => {
+            eprintln!("Error: {}.", e);
+            std::process::exit(1);
+        }
+    };
+
+    let mut source_dir_guests: PathBuf = [BASE_DIR, SOURCE_SUBDIR_GUEST].iter().collect();
+    let mut target_dir_guests: PathBuf = [BASE_DIR, TARGET_SUBDIR_GUEST].iter().collect();
+    let source_dir_nodes: PathBuf = [BASE_DIR, SOURCE_SUBDIR_NODE].iter().collect();
+    let target_dir_nodes: PathBuf = [BASE_DIR, TARGET_SUBDIR_NODE].iter().collect();
+    let source_dir_storage: PathBuf = [BASE_DIR, SOURCE_SUBDIR_STORAGE].iter().collect();
+    let target_dir_storage: PathBuf = [BASE_DIR, TARGET_SUBDIR_STORAGE].iter().collect();
+
+    if args.test {
+        source_dir_guests = args.source.clone().unwrap();
+        target_dir_guests = args.target.clone().unwrap();
+    }
+
+    if !args.force && target_dir_guests.exists() {
+        eprintln!(
+            "Aborting! Target path for guests already exists. Use '--force' to still migrate. It will overwrite existing files!"
+        );
+        std::process::exit(1);
+    }
+    if !args.force && target_dir_nodes.exists() {
+        eprintln!(
+            "Aborting! Target path for nodes already exists. Use '--force' to still migrate. It will overwrite existing files!"
+        );
+        std::process::exit(1);
+    }
+    if !args.force && target_dir_storage.exists() {
+        eprintln!(
+            "Aborting! Target path for storages already exists. Use '--force' to still migrate. It will overwrite existing files!"
+        );
+        std::process::exit(1);
+    }
+
+    if !args.test {
+        if let Err(e) = migrate_nodes(source_dir_nodes, target_dir_nodes) {
+            eprintln!("Error migrating nodes: {}", e);
+            std::process::exit(1);
+        }
+        if let Err(e) = migrate_storage(source_dir_storage, target_dir_storage) {
+            eprintln!("Error migrating storage: {}", e);
+            std::process::exit(1);
+        }
+    }
+    if let Err(e) = migrate_guests(source_dir_guests, target_dir_guests, set_threads(&args)) {
+        eprintln!("Error migrating guests: {}", e);
+        std::process::exit(1);
+    }
+}
+
+/// Set number of threads
+///
+/// Either a fixed parameter or determining a range between 1 to 4 threads
+///  based on the number of CPU cores available in the system.
+fn set_threads(args: &Args) -> usize {
+    if args.threads.is_some() {
+        return args.threads.unwrap();
+    }
+    // check for a way to get physical cores and not threads?
+    let cpus: usize = String::from_utf8_lossy(
+        std::process::Command::new("nproc")
+            .output()
+            .expect("Error running nproc")
+            .stdout
+            .as_slice()
+            .trim_ascii(),
+    )
+    .parse::<usize>()
+    .expect("Could not parse nproc output");
+
+    if cpus < 32 {
+        let threads = cpus / 8;
+        if threads == 0 {
+            return 1;
+        }
+        return threads;
+    }
+    return MAX_THREADS;
+}
+
+/// Migrate guest RRD files
+///
+/// In parallel to speed up the process as most time is spent on converting the
+/// data to the new format.
+fn migrate_guests(
+    source_dir_guests: PathBuf,
+    target_dir_guests: PathBuf,
+    threads: usize,
+) -> Result<(), Error> {
+    println!("Migrating RRD data for guests…");
+    println!("Using {} thread(s)", threads);
+
+    let mut guest_source_files: Vec<(CString, OsString)> = Vec::new();
+
+    fs::read_dir(&source_dir_guests)?
+        .filter(|f| f.is_ok())
+        .map(|f| f.unwrap().path())
+        .filter(|f| f.is_file())
+        .for_each(|file| {
+            let path = CString::new(file.as_path().as_os_str().as_bytes())
+                .expect("Could not convert path to CString.");
+            let fname = file
+                .file_name()
+                .map(|v| v.to_os_string())
+                .expect("Could not convert fname to OsString.");
+            guest_source_files.push((path, fname))
+        });
+    if !target_dir_guests.exists() {
+        println!("Creating new directory: '{}'", target_dir_guests.display());
+        std::fs::create_dir(&target_dir_guests)?;
+    }
+
+    let total_guests = guest_source_files.len();
+    let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+    let guests2 = guests.clone();
+    let start_time = std::time::SystemTime::now();
+
+    let migration_pool = ParallelHandler::new(
+        "guest rrd migration",
+        threads,
+        move |(path, fname): (CString, OsString)| {
+            let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+            source[0] = path.as_ptr();
+
+            let node_name = fname;
+            let mut target_path = target_dir_guests.clone();
+            target_path.push(node_name);
+
+            let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+            unsafe {
+                rrd_get_context();
+                rrd_clear_error();
+                let res = rrd_create_r2(
+                    target_path.as_ptr(),
+                    RRD_STEP_SIZE as u64,
+                    0,
+                    0,
+                    source.as_mut_ptr(),
+                    std::ptr::null(),
+                    RRD_VM_DEF.len() as i32,
+                    RRD_VM_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+                );
+                if res != 0 {
+                    bail!(
+                        "RRD create Error: {}",
+                        CStr::from_ptr(rrd_get_error()).to_string_lossy()
+                    );
+                }
+            }
+            let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+            if current_guests > 0 && current_guests % 200 == 0 {
+                println!("Migrated {} of {} guests", current_guests, total_guests);
+            }
+            Ok(())
+        },
+    );
+    let migration_channel = migration_pool.channel();
+
+    for file in guest_source_files {
+        let migration_channel = migration_channel.clone();
+        migration_channel.send(file)?;
+    }
+
+    drop(migration_channel);
+    migration_pool.complete()?;
+
+    let elapsed = start_time.elapsed()?.as_secs_f64();
+    let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
+    println!("Migrated {} guests in {:.2}s", guests, elapsed,);
+
+    Ok(())
+}
+
+/// Migrate node RRD files
+///
+/// In serial as the number of nodes will not be high.
+fn migrate_nodes(source_dir_nodes: PathBuf, target_dir_nodes: PathBuf) -> Result<(), Error> {
+    println!("Migrating RRD data for nodes…");
+
+    if !target_dir_nodes.exists() {
+        println!("Creating new directory: '{}'", target_dir_nodes.display());
+        std::fs::create_dir(&target_dir_nodes)?;
+    }
+
+    let mut node_source_files: Vec<(CString, OsString)> = Vec::new();
+    fs::read_dir(&source_dir_nodes)?
+        .filter(|f| f.is_ok())
+        .map(|f| f.unwrap().path())
+        .filter(|f| f.is_file())
+        .for_each(|file| {
+            let path = CString::new(file.as_path().as_os_str().as_bytes())
+                .expect("Could not convert path to CString.");
+            let fname = file
+                .file_name()
+                .map(|v| v.to_os_string())
+                .expect("Could not convert fname to OsString.");
+            node_source_files.push((path, fname))
+        });
+
+    for file in node_source_files {
+        println!("Node: '{}'", PathBuf::from(file.1.clone()).display());
+        let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+
+        source[0] = file.0.as_ptr();
+
+        let node_name = file.1;
+        let mut target_path = target_dir_nodes.clone();
+        target_path.push(node_name);
+
+        let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+        unsafe {
+            rrd_get_context();
+            rrd_clear_error();
+            let res = rrd_create_r2(
+                target_path.as_ptr(),
+                RRD_STEP_SIZE as u64,
+                0,
+                0,
+                source.as_mut_ptr(),
+                std::ptr::null(),
+                RRD_NODE_DEF.len() as i32,
+                RRD_NODE_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+            );
+            if res != 0 {
+                bail!(
+                    "RRD create Error: {}",
+                    CStr::from_ptr(rrd_get_error()).to_string_lossy()
+                );
+            }
+        }
+    }
+    println!("Migrated all nodes");
+
+    Ok(())
+}
+
+/// Migrate storage RRD files
+///
+/// In serial as the number of storage will not be that high.
+fn migrate_storage(source_dir_storage: PathBuf, target_dir_storage: PathBuf) -> Result<(), Error> {
+    println!("Migrating RRD data for storages…");
+
+    if !target_dir_storage.exists() {
+        println!("Creating new directory: '{}'", target_dir_storage.display());
+        std::fs::create_dir(&target_dir_storage)?;
+    }
+
+    // storage has another layer of directories per node over which we need to iterate
+    fs::read_dir(&source_dir_storage)?
+        .filter(|f| f.is_ok())
+        .map(|f| f.unwrap().path())
+        .filter(|f| f.is_dir())
+        .try_for_each(|node| {
+            let mut storage_source_files: Vec<(CString, OsString)> = Vec::new();
+
+            let mut source_node_subdir = source_dir_storage.clone();
+            source_node_subdir.push(&node.file_name().unwrap());
+
+            let mut target_node_subdir = target_dir_storage.clone();
+            target_node_subdir.push(&node.file_name().unwrap());
+
+            fs::create_dir(target_node_subdir.as_path())?;
+            let metadata = target_node_subdir.metadata()?;
+            let mut permissions = metadata.permissions();
+            permissions.set_mode(0o755);
+
+            fs::read_dir(&source_node_subdir)?
+                .filter(|f| f.is_ok())
+                .map(|f| f.unwrap().path())
+                .filter(|f| f.is_file())
+                .for_each(|file| {
+                    let path = CString::new(file.as_path().as_os_str().as_bytes())
+                        .expect("Could not convert path to CString.");
+                    let fname = file
+                        .file_name()
+                        .map(|v| v.to_os_string())
+                        .expect("Could not convert fname to OsString.");
+                    storage_source_files.push((path, fname))
+                });
+
+            for file in storage_source_files {
+                println!("Storage: '{}'", PathBuf::from(file.1.clone()).display());
+                let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+
+                source[0] = file.0.as_ptr();
+
+                let node_name = file.1;
+                let mut target_path = target_node_subdir.clone();
+                target_path.push(node_name);
+
+                let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+                unsafe {
+                    rrd_get_context();
+                    rrd_clear_error();
+                    let res = rrd_create_r2(
+                        target_path.as_ptr(),
+                        RRD_STEP_SIZE as u64,
+                        0,
+                        0,
+                        source.as_mut_ptr(),
+                        std::ptr::null(),
+                        RRD_STORAGE_DEF.len() as i32,
+                        RRD_STORAGE_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+                    );
+                    if res != 0 {
+                        bail!(
+                            "RRD create Error: {}",
+                            CStr::from_ptr(rrd_get_error()).to_string_lossy()
+                        );
+                    }
+                }
+            }
+            Ok(())
+        })?;
+    println!("Migrated all nodes");
+
+    Ok(())
+}
diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
new file mode 100644
index 0000000..787742a
--- /dev/null
+++ b/src/parallel_handler.rs
@@ -0,0 +1,162 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{Error, bail, format_err};
+use crossbeam_channel::{Sender, bounded};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+    input: Sender<I>,
+    abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+    let guard = abort.lock().unwrap();
+    if let Some(err_msg) = &*guard {
+        return Err(format_err!("{}", err_msg));
+    }
+    Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+    /// Send data to the worker threads
+    pub fn send(&self, input: I) -> Result<(), Error> {
+        check_abort(&self.abort)?;
+        match self.input.send(input) {
+            Ok(()) => Ok(()),
+            Err(_) => bail!("send failed - channel closed"),
+        }
+    }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+    handles: Vec<JoinHandle<()>>,
+    name: String,
+    input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+    fn clone(&self) -> Self {
+        Self {
+            input: self.input.clone(),
+            abort: Arc::clone(&self.abort),
+        }
+    }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+    /// Create a new thread pool, each thread processing incoming data
+    /// with 'handler_fn'.
+    pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+    where
+        F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+    {
+        let mut handles = Vec::new();
+        let (input_tx, input_rx) = bounded::<I>(threads);
+
+        let abort = Arc::new(Mutex::new(None));
+
+        for i in 0..threads {
+            let input_rx = input_rx.clone();
+            let abort = Arc::clone(&abort);
+            let handler_fn = handler_fn.clone();
+
+            handles.push(
+                std::thread::Builder::new()
+                    .name(format!("{} ({})", name, i))
+                    .spawn(move || {
+                        loop {
+                            let data = match input_rx.recv() {
+                                Ok(data) => data,
+                                Err(_) => return,
+                            };
+                            if let Err(err) = (handler_fn)(data) {
+                                let mut guard = abort.lock().unwrap();
+                                if guard.is_none() {
+                                    *guard = Some(err.to_string());
+                                }
+                            }
+                        }
+                    })
+                    .unwrap(),
+            );
+        }
+        Self {
+            handles,
+            name: name.to_string(),
+            input: Some(SendHandle {
+                input: input_tx,
+                abort,
+            }),
+        }
+    }
+
+    /// Returns a cloneable channel to send data to the worker threads
+    pub fn channel(&self) -> SendHandle<I> {
+        self.input.as_ref().unwrap().clone()
+    }
+
+    /// Send data to the worker threads
+    pub fn send(&self, input: I) -> Result<(), Error> {
+        self.input.as_ref().unwrap().send(input)?;
+        Ok(())
+    }
+
+    /// Wait for worker threads to complete and check for errors
+    pub fn complete(mut self) -> Result<(), Error> {
+        let input = self.input.take().unwrap();
+        let abort = Arc::clone(&input.abort);
+        check_abort(&abort)?;
+        drop(input);
+
+        let msg_list = self.join_threads();
+
+        // an error might be encountered while waiting for the join
+        check_abort(&abort)?;
+
+        if msg_list.is_empty() {
+            return Ok(());
+        }
+        Err(format_err!("{}", msg_list.join("\n")))
+    }
+
+    fn join_threads(&mut self) -> Vec<String> {
+        let mut msg_list = Vec::new();
+
+        let mut i = 0;
+        while let Some(handle) = self.handles.pop() {
+            if let Err(panic) = handle.join() {
+                if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+                    msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+                } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
+                    msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+                } else {
+                    msg_list.push(format!("thread {} ({i}) panicked", self.name));
+                }
+            }
+            i += 1;
+        }
+        msg_list
+    }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+    fn drop(&mut self) {
+        drop(self.input.take());
+        while let Some(handle) = self.handles.pop() {
+            let _ = handle.join();
+        }
+    }
+}
diff --git a/wrapper.h b/wrapper.h
new file mode 100644
index 0000000..64d0aa6
--- /dev/null
+++ b/wrapper.h
@@ -0,0 +1 @@
+#include <rrd.h>
-- 
2.39.5



_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel

  parent reply	other threads:[~2025-07-15 14:33 UTC|newest]

Thread overview: 59+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-07-15 14:31 [pve-devel] [PATCH many v3 00/34] Expand and migrate RRD data and add/change summary graphs Aaron Lauterer
2025-07-15 14:31 ` [pve-devel] [PATCH cluster-pve8 v3 1/2] cfs status.c: drop old pve2-vm rrd schema support Aaron Lauterer
2025-07-16 22:32   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH cluster-pve8 v3 2/2] status: handle new metrics update data Aaron Lauterer
2025-07-16 22:32   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH manager-pve8 v3 1/2] api2tools: drop old VM rrd schema Aaron Lauterer
2025-07-16 22:32   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH manager-pve8 v3 2/2] api2tools: extract stats: handle existence of new pve-{type}-9.0 data Aaron Lauterer
2025-07-16 22:32   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` Aaron Lauterer [this message]
2025-07-16 22:32   ` [pve-devel] [PATCH pve9-rrd-migration-tool v3 1/1] introduce rrd migration tool for pve8 -> pve9 Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH cluster v3 1/4] cfs status.c: drop old pve2-vm rrd schema support Aaron Lauterer
2025-07-16 22:32   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH cluster v3 2/4] status: handle new metrics update data Aaron Lauterer
2025-07-16 22:32   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH cluster v3 3/4] status: introduce new pve-{type}- rrd and metric format Aaron Lauterer
2025-07-15 14:31 ` [pve-devel] [PATCH cluster v3 4/4] rrd: adapt to new RRD format with different aggregation windows Aaron Lauterer
2025-07-15 14:31 ` [pve-devel] [PATCH common v3 1/2] fix error in pressure parsing Aaron Lauterer
2025-07-16 22:33   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH common v3 2/2] add function to retrieve pressures from cgroup Aaron Lauterer
2025-07-16 22:33   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH widget-toolkit v3 1/2] rrdchart: allow to override the series object Aaron Lauterer
2025-07-21 11:42   ` Dominik Csapak
2025-07-21 15:08     ` Aaron Lauterer
2025-07-15 14:31 ` [pve-devel] [PATCH widget-toolkit v3 2/2] rrdchart: use reference for undo button Aaron Lauterer
2025-07-21 11:43   ` Dominik Csapak
2025-07-15 14:31 ` [pve-devel] [PATCH manager v3 01/14] api2tools: drop old VM rrd schema Aaron Lauterer
2025-07-18 19:17   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:31 ` [pve-devel] [PATCH manager v3 02/14] api2tools: extract stats: handle existence of new pve-{type}-9.0 data Aaron Lauterer
2025-07-18 19:17   ` [pve-devel] applied: " Thomas Lamprecht
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 03/14] pvestatd: collect and distribute new pve-{type}-9.0 metrics Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 04/14] api: nodes: rrd and rrddata add decade option and use new pve-node-9.0 rrd files Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 05/14] api2tools: extract_vm_status add new vm memhost column Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 06/14] ui: rrdmodels: add new columns and update existing Aaron Lauterer
2025-07-21 11:48   ` Dominik Csapak
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 07/14] ui: node summary: use stacked memory graph with zfs arc Aaron Lauterer
2025-07-21 12:01   ` Dominik Csapak
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 08/14] ui: add pressure graphs to node and guest summary Aaron Lauterer
2025-07-21 12:05   ` Dominik Csapak
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 09/14] ui: GuestStatusView: add memhost for VM guests Aaron Lauterer
2025-07-21 12:34   ` Dominik Csapak
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 10/14] ui: GuestSummary: memory switch to stacked and add hostmem Aaron Lauterer
2025-07-21 12:37   ` Dominik Csapak
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 11/14] ui: nodesummary: guestsummary: add tooltip info buttons Aaron Lauterer
2025-07-21 12:40   ` Dominik Csapak
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 12/14] ui: summaries: use titles for disk and network series Aaron Lauterer
2025-07-21 12:40   ` Dominik Csapak
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 13/14] ui: ResourceStore: add memhost column Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH manager v3 14/14] fix #6068: ui: utils: calculate and render host memory usage correctly Aaron Lauterer
2025-07-21 12:52   ` Dominik Csapak
2025-07-15 14:32 ` [pve-devel] [PATCH storage v3 1/1] status: rrddata: use new pve-storage-9.0 rrd location if file is present Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH qemu-server v3 1/4] metrics: add pressure to metrics Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH qemu-server v3 2/4] vmstatus: add memhost for host view of vm mem consumption Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH qemu-server v3 3/4] vmstatus: switch mem stat to PSS of VM cgroup Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH qemu-server v3 4/4] rrddata: use new pve-vm-9.0 rrd location if file is present Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH container v3 1/2] metrics: add pressures to metrics Aaron Lauterer
2025-07-15 14:32 ` [pve-devel] [PATCH container v3 2/2] rrddata: use new pve-vm-9.0 rrd location if file is present Aaron Lauterer
2025-07-23 10:15 ` [pve-devel] [PATCH many v3 00/34] Expand and migrate RRD data and add/change summary graphs Laurențiu Leahu-Vlăducu
2025-07-26  1:13 ` [pve-devel] SUPERSEEDED " Aaron Lauterer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250715143218.1548306-6-a.lauterer@proxmox.com \
    --to=a.lauterer@proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal