From: Aaron Lauterer <a.lauterer@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH pve9-rrd-migration-tool 1/1] introduce rrd migration tool for pve8 -> pve9
Date: Fri, 23 May 2025 18:00:13 +0200 [thread overview]
Message-ID: <20250523160029.404400-4-a.lauterer@proxmox.com> (raw)
In-Reply-To: <20250523160029.404400-1-a.lauterer@proxmox.com>
Signed-off-by: Aaron Lauterer <a.lauterer@proxmox.com>
---
.cargo/config.toml | 8 +
.gitignore | 5 +
Cargo.toml | 20 ++
build.rs | 29 +++
src/lib.rs | 5 +
src/main.rs | 504 ++++++++++++++++++++++++++++++++++++++++
src/parallel_handler.rs | 162 +++++++++++++
wrapper.h | 1 +
8 files changed, 734 insertions(+)
create mode 100644 .cargo/config.toml
create mode 100644 .gitignore
create mode 100644 Cargo.toml
create mode 100644 build.rs
create mode 100644 src/lib.rs
create mode 100644 src/main.rs
create mode 100644 src/parallel_handler.rs
create mode 100644 wrapper.h
diff --git a/.cargo/config.toml b/.cargo/config.toml
new file mode 100644
index 0000000..a439c97
--- /dev/null
+++ b/.cargo/config.toml
@@ -0,0 +1,8 @@
+[source]
+[source.debian-packages]
+directory = "/usr/share/cargo/registry"
+[source.crates-io]
+replace-with = "debian-packages"
+
+[profile.release]
+debug=true
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7741e63
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+./target
+./build
+
+Cargo.lock
+
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..d3523f3
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox_rrd_migration_8-9"
+version = "0.1.0"
+edition = "2021"
+authors = [
+ "Aaron Lauterer <a.lauterer@proxmox.com>",
+ "Proxmox Support Team <support@proxmox.com>",
+]
+license = "AGPL-3"
+homepage = "https://www.proxmox.com"
+
+[dependencies]
+anyhow = "1.0.86"
+pico-args = "0.5.0"
+proxmox-async = "0.4"
+crossbeam-channel = "0.5"
+
+[build-dependencies]
+bindgen = "0.66.1"
+pkg-config = "0.3"
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..56d07cc
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,29 @@
+use std::env;
+use std::path::PathBuf;
+
+fn main() {
+ println!("cargo:rustc-link-lib=rrd");
+
+ println!("cargo:rerun-if-changed=wrapper.h");
+ // The bindgen::Builder is the main entry point
+ // to bindgen, and lets you build up options for
+ // the resulting bindings.
+
+ let bindings = bindgen::Builder::default()
+ // The input header we would like to generate
+ // bindings for.
+ .header("wrapper.h")
+ // Tell cargo to invalidate the built crate whenever any of the
+ // included header files changed.
+ .parse_callbacks(Box::new(bindgen::CargoCallbacks))
+ // Finish the builder and generate the bindings.
+ .generate()
+ // Unwrap the Result and panic on failure.
+ .expect("Unable to generate bindings");
+
+ // Write the bindings to the $OUT_DIR/bindings.rs file.
+ let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
+ bindings
+ .write_to_file(out_path.join("bindings.rs"))
+ .expect("Couldn't write bindings!");
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..a38a13a
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,5 @@
+#![allow(non_upper_case_globals)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+
+include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..43f181c
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,504 @@
+use anyhow::{bail, Error, Result};
+use proxmox_rrd_migration_8_9::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error};
+use std::ffi::{CStr, CString, OsString};
+use std::fs;
+use std::os::unix::ffi::OsStrExt;
+use std::os::unix::fs::PermissionsExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use crate::parallel_handler::ParallelHandler;
+
+pub mod parallel_handler;
+
+const BASE_DIR: &str = "/var/lib/rrdcached/db";
+const SOURCE_SUBDIR_NODE: &str = "pve2-node";
+const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
+const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
+const TARGET_SUBDIR_NODE: &str = "pve9-node";
+const TARGET_SUBDIR_GUEST: &str = "pve9-vm";
+const TARGET_SUBDIR_STORAGE: &str = "pve9-storage";
+const MAX_THREADS: usize = 4;
+const RRD_STEP_SIZE: usize = 60;
+
+// RRAs are defined in the following way:
+//
+// RRA:CF:xff:step:rows
+// CF: AVERAGE or MAX
+// xff: 0.5
+// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size:
+// e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
+// rows: how many aggregated rows are kept, as in how far back in time we store data
+//
+// how many seconds are aggregated per RRA: steps * stepsize * rows
+// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
+// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
+// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
+
+const RRD_VM_DEF: [&CStr; 25] = [
+ c"DS:maxcpu:GAUGE:120:0:U",
+ c"DS:cpu:GAUGE:120:0:U",
+ c"DS:maxmem:GAUGE:120:0:U",
+ c"DS:mem:GAUGE:120:0:U",
+ c"DS:maxdisk:GAUGE:120:0:U",
+ c"DS:disk:GAUGE:120:0:U",
+ c"DS:netin:DERIVE:120:0:U",
+ c"DS:netout:DERIVE:120:0:U",
+ c"DS:diskread:DERIVE:120:0:U",
+ c"DS:diskwrite:DERIVE:120:0:U",
+ c"DS:memhost:GAUGE:120:0:U",
+ c"DS:pressurecpusome:GAUGE:120:0:U",
+ c"DS:pressurecpufull:GAUGE:120:0:U",
+ c"DS:pressureiosome:GAUGE:120:0:U",
+ c"DS:pressureiofull:GAUGE:120:0:U",
+ c"DS:pressurememorysome:GAUGE:120:0:U",
+ c"DS:pressurememoryfull:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const RRD_NODE_DEF: [&CStr; 29] = [
+ c"DS:loadavg:GAUGE:120:0:U",
+ c"DS:maxcpu:GAUGE:120:0:U",
+ c"DS:cpu:GAUGE:120:0:U",
+ c"DS:iowait:GAUGE:120:0:U",
+ c"DS:memtotal:GAUGE:120:0:U",
+ c"DS:memused:GAUGE:120:0:U",
+ c"DS:swaptotal:GAUGE:120:0:U",
+ c"DS:swapused:GAUGE:120:0:U",
+ c"DS:roottotal:GAUGE:120:0:U",
+ c"DS:rootused:GAUGE:120:0:U",
+ c"DS:netin:DERIVE:120:0:U",
+ c"DS:netout:DERIVE:120:0:U",
+ c"DS:memfree:GAUGE:120:0:U",
+ c"DS:membuffers:GAUGE:120:0:U",
+ c"DS:memcached:GAUGE:120:0:U",
+ c"DS:arcsize:GAUGE:120:0:U",
+ c"DS:pressurecpusome:GAUGE:120:0:U",
+ c"DS:pressureiosome:GAUGE:120:0:U",
+ c"DS:pressureiofull:GAUGE:120:0:U",
+ c"DS:pressurememorysome:GAUGE:120:0:U",
+ c"DS:pressurememoryfull:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const RRD_STORAGE_DEF: [&CStr; 10] = [
+ c"DS:total:GAUGE:120:0:U",
+ c"DS:used:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const HELP: &str = "\
+proxmox-rrd-migration tool
+
+Migrates existing RRD graph data to the new format.
+
+Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide!
+
+USAGE:
+ proxmox-rrd-migration [OPTIONS]
+
+ FLAGS:
+ -h, --help Prints this help information
+
+ OPTIONS:
+ --force Migrate, even if the target already exists.
+ This will overwrite any migrated RRD files!
+
+ --threads THREADS Number of paralell threads. Default from 1 to 4.
+
+ --test For internal use only.
+ Tests parallel guest migration only!
+ --source For internal use only. Source directory.
+ --target For internal use only. Target directory.
+ ";
+
+#[derive(Debug)]
+struct Args {
+ force: bool,
+ threads: Option<usize>,
+ test: bool,
+ source: Option<PathBuf>,
+ target: Option<PathBuf>,
+}
+
+fn parse_args() -> Result<Args, Error> {
+ let mut pargs = pico_args::Arguments::from_env();
+
+ // Help has a higher priority and should be handled separately.
+ if pargs.contains(["-h", "--help"]) {
+ print!("{}", HELP);
+ std::process::exit(0);
+ }
+
+ let mut args = Args {
+ threads: pargs.opt_value_from_str("--threads").unwrap(),
+ force: false,
+ test: false,
+ source: pargs.opt_value_from_str("--source").unwrap(),
+ target: pargs.opt_value_from_str("--target").unwrap(),
+ };
+
+ if pargs.contains("--test") {
+ args.test = true;
+ }
+ if pargs.contains("--force") {
+ args.force = true;
+ }
+
+ // It's up to the caller what to do with the remaining arguments.
+ let remaining = pargs.finish();
+ if !remaining.is_empty() {
+ bail!(format!("Warning: unused arguments left: {:?}", remaining));
+ }
+
+ Ok(args)
+}
+
+fn main() {
+ let args = match parse_args() {
+ Ok(v) => v,
+ Err(e) => {
+ eprintln!("Error: {}.", e);
+ std::process::exit(1);
+ }
+ };
+
+ let mut source_dir_guests: PathBuf = [BASE_DIR, SOURCE_SUBDIR_GUEST].iter().collect();
+ let mut target_dir_guests: PathBuf = [BASE_DIR, TARGET_SUBDIR_GUEST].iter().collect();
+ let source_dir_nodes: PathBuf = [BASE_DIR, SOURCE_SUBDIR_NODE].iter().collect();
+ let target_dir_nodes: PathBuf = [BASE_DIR, TARGET_SUBDIR_NODE].iter().collect();
+ let source_dir_storage: PathBuf = [BASE_DIR, SOURCE_SUBDIR_STORAGE].iter().collect();
+ let target_dir_storage: PathBuf = [BASE_DIR, TARGET_SUBDIR_STORAGE].iter().collect();
+
+ if args.test {
+ source_dir_guests = args.source.clone().unwrap();
+ target_dir_guests = args.target.clone().unwrap();
+ }
+
+ if !args.force && target_dir_guests.exists() {
+ eprintln!(
+ "Aborting! Target path for guests already exists. Use '--force' to still migrate. It will overwrite existing files!"
+ );
+ std::process::exit(1);
+ }
+ if !args.force && target_dir_nodes.exists() {
+ eprintln!(
+ "Aborting! Target path for nodes already exists. Use '--force' to still migrate. It will overwrite existing files!"
+ );
+ std::process::exit(1);
+ }
+ if !args.force && target_dir_storage.exists() {
+ eprintln!(
+ "Aborting! Target path for storages already exists. Use '--force' to still migrate. It will overwrite existing files!"
+ );
+ std::process::exit(1);
+ }
+
+ if !args.test {
+ if let Err(e) = migrate_nodes(source_dir_nodes, target_dir_nodes) {
+ eprintln!("Error migrating nodes: {}", e);
+ std::process::exit(1);
+ }
+ if let Err(e) = migrate_storage(source_dir_storage, target_dir_storage) {
+ eprintln!("Error migrating storage: {}", e);
+ std::process::exit(1);
+ }
+ }
+ if let Err(e) = migrate_guests(source_dir_guests, target_dir_guests, set_threads(&args)) {
+ eprintln!("Error migrating guests: {}", e);
+ std::process::exit(1);
+ }
+}
+
+/// Set number of threads
+///
+/// Either a fixed parameter or determining a range between 1 to 4 threads
+/// based on the number of CPU cores available in the system.
+fn set_threads(args: &Args) -> usize {
+ if args.threads.is_some() {
+ return args.threads.unwrap();
+ }
+ // check for a way to get physical cores and not threads?
+ let cpus: usize = String::from_utf8_lossy(
+ std::process::Command::new("nproc")
+ .output()
+ .expect("Error running nproc")
+ .stdout
+ .as_slice()
+ .trim_ascii(),
+ )
+ .parse::<usize>()
+ .expect("Could not parse nproc output");
+
+ if cpus < 32 {
+ let threads = cpus / 8;
+ if threads == 0 {
+ return 1;
+ }
+ return threads;
+ }
+ return MAX_THREADS;
+}
+
+/// Migrate guest RRD files
+///
+/// In parallel to speed up the process as most time is spent on converting the
+/// data to the new format.
+fn migrate_guests(
+ source_dir_guests: PathBuf,
+ target_dir_guests: PathBuf,
+ threads: usize,
+) -> Result<(), Error> {
+ println!("Migrating RRD data for guests…");
+ println!("Using {} thread(s)", threads);
+
+ let mut guest_source_files: Vec<(CString, OsString)> = Vec::new();
+
+ fs::read_dir(&source_dir_guests)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_file())
+ .for_each(|file| {
+ let path = CString::new(file.as_path().as_os_str().as_bytes())
+ .expect("Could not convert path to CString.");
+ let fname = file
+ .file_name()
+ .map(|v| v.to_os_string())
+ .expect("Could not convert fname to OsString.");
+ guest_source_files.push((path, fname))
+ });
+ if !target_dir_guests.exists() {
+ println!("Creating new directory: '{}'", target_dir_guests.display());
+ std::fs::create_dir(&target_dir_guests)?;
+ }
+
+ let total_guests = guest_source_files.len();
+ let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let guests2 = guests.clone();
+ let start_time = std::time::SystemTime::now();
+
+ let migration_pool = ParallelHandler::new(
+ "guest rrd migration",
+ threads,
+ move |(path, fname): (CString, OsString)| {
+ let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+ source[0] = path.as_ptr();
+
+ let node_name = fname;
+ let mut target_path = target_dir_guests.clone();
+ target_path.push(node_name);
+
+ let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+ unsafe {
+ rrd_get_context();
+ rrd_clear_error();
+ let res = rrd_create_r2(
+ target_path.as_ptr(),
+ RRD_STEP_SIZE as u64,
+ 0,
+ 0,
+ source.as_mut_ptr(),
+ std::ptr::null(),
+ RRD_VM_DEF.len() as i32,
+ RRD_VM_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+ );
+ if res != 0 {
+ bail!(
+ "RRD create Error: {}",
+ CStr::from_ptr(rrd_get_error()).to_string_lossy()
+ );
+ }
+ }
+ let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ if current_guests > 0 && current_guests % 200 == 0 {
+ println!("Migrated {} of {} guests", current_guests, total_guests);
+ }
+ Ok(())
+ },
+ );
+ let migration_channel = migration_pool.channel();
+
+ for file in guest_source_files {
+ let migration_channel = migration_channel.clone();
+ migration_channel.send(file)?;
+ }
+
+ drop(migration_channel);
+ migration_pool.complete()?;
+
+ let elapsed = start_time.elapsed()?.as_secs_f64();
+ let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
+ println!("Migrated {} guests in {:.2}s", guests, elapsed,);
+
+ Ok(())
+}
+
+/// Migrate node RRD files
+///
+/// In serial as the number of nodes will not be high.
+fn migrate_nodes(source_dir_nodes: PathBuf, target_dir_nodes: PathBuf) -> Result<(), Error> {
+ println!("Migrating RRD data for nodes…");
+
+ if !target_dir_nodes.exists() {
+ println!("Creating new directory: '{}'", target_dir_nodes.display());
+ std::fs::create_dir(&target_dir_nodes)?;
+ }
+
+ let mut node_source_files: Vec<(CString, OsString)> = Vec::new();
+ fs::read_dir(&source_dir_nodes)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_file())
+ .for_each(|file| {
+ let path = CString::new(file.as_path().as_os_str().as_bytes())
+ .expect("Could not convert path to CString.");
+ let fname = file
+ .file_name()
+ .map(|v| v.to_os_string())
+ .expect("Could not convert fname to OsString.");
+ node_source_files.push((path, fname))
+ });
+
+ for file in node_source_files {
+ println!("Node: '{}'", PathBuf::from(file.1.clone()).display());
+ let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+
+ source[0] = file.0.as_ptr();
+
+ let node_name = file.1;
+ let mut target_path = target_dir_nodes.clone();
+ target_path.push(node_name);
+
+ let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+ unsafe {
+ rrd_get_context();
+ rrd_clear_error();
+ let res = rrd_create_r2(
+ target_path.as_ptr(),
+ RRD_STEP_SIZE as u64,
+ 0,
+ 0,
+ source.as_mut_ptr(),
+ std::ptr::null(),
+ RRD_NODE_DEF.len() as i32,
+ RRD_NODE_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+ );
+ if res != 0 {
+ bail!(
+ "RRD create Error: {}",
+ CStr::from_ptr(rrd_get_error()).to_string_lossy()
+ );
+ }
+ }
+ }
+ println!("Migrated all nodes");
+
+ Ok(())
+}
+
+/// Migrate storage RRD files
+///
+/// In serial as the number of storage will not be that high.
+fn migrate_storage(source_dir_storage: PathBuf, target_dir_storage: PathBuf) -> Result<(), Error> {
+ println!("Migrating RRD data for storages…");
+
+ if !target_dir_storage.exists() {
+ println!("Creating new directory: '{}'", target_dir_storage.display());
+ std::fs::create_dir(&target_dir_storage)?;
+ }
+
+ // storage has another layer of directories per node over which we need to iterate
+ fs::read_dir(&source_dir_storage)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_dir())
+ .try_for_each(|node| {
+ let mut storage_source_files: Vec<(CString, OsString)> = Vec::new();
+
+ let mut source_node_subdir = source_dir_storage.clone();
+ source_node_subdir.push(&node.file_name().unwrap());
+
+ let mut target_node_subdir = target_dir_storage.clone();
+ target_node_subdir.push(&node.file_name().unwrap());
+
+ fs::create_dir(target_node_subdir.as_path())?;
+ let metadata = target_node_subdir.metadata()?;
+ let mut permissions = metadata.permissions();
+ permissions.set_mode(0o755);
+
+ fs::read_dir(&source_node_subdir)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_file())
+ .for_each(|file| {
+ let path = CString::new(file.as_path().as_os_str().as_bytes())
+ .expect("Could not convert path to CString.");
+ let fname = file
+ .file_name()
+ .map(|v| v.to_os_string())
+ .expect("Could not convert fname to OsString.");
+ storage_source_files.push((path, fname))
+ });
+
+ for file in storage_source_files {
+ println!("Storage: '{}'", PathBuf::from(file.1.clone()).display());
+ let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+
+ source[0] = file.0.as_ptr();
+
+ let node_name = file.1;
+ let mut target_path = target_node_subdir.clone();
+ target_path.push(node_name);
+
+ let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+ unsafe {
+ rrd_get_context();
+ rrd_clear_error();
+ let res = rrd_create_r2(
+ target_path.as_ptr(),
+ RRD_STEP_SIZE as u64,
+ 0,
+ 0,
+ source.as_mut_ptr(),
+ std::ptr::null(),
+ RRD_STORAGE_DEF.len() as i32,
+ RRD_STORAGE_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+ );
+ if res != 0 {
+ bail!(
+ "RRD create Error: {}",
+ CStr::from_ptr(rrd_get_error()).to_string_lossy()
+ );
+ }
+ }
+ }
+ Ok(())
+ })?;
+ println!("Migrated all nodes");
+
+ Ok(())
+}
diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
new file mode 100644
index 0000000..787742a
--- /dev/null
+++ b/src/parallel_handler.rs
@@ -0,0 +1,162 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{Error, bail, format_err};
+use crossbeam_channel::{Sender, bounded};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+ input: Sender<I>,
+ abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
+ }
+ Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ check_abort(&self.abort)?;
+ match self.input.send(input) {
+ Ok(()) => Ok(()),
+ Err(_) => bail!("send failed - channel closed"),
+ }
+ }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+ handles: Vec<JoinHandle<()>>,
+ name: String,
+ input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+ fn clone(&self) -> Self {
+ Self {
+ input: self.input.clone(),
+ abort: Arc::clone(&self.abort),
+ }
+ }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+ /// Create a new thread pool, each thread processing incoming data
+ /// with 'handler_fn'.
+ pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ where
+ F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+ {
+ let mut handles = Vec::new();
+ let (input_tx, input_rx) = bounded::<I>(threads);
+
+ let abort = Arc::new(Mutex::new(None));
+
+ for i in 0..threads {
+ let input_rx = input_rx.clone();
+ let abort = Arc::clone(&abort);
+ let handler_fn = handler_fn.clone();
+
+ handles.push(
+ std::thread::Builder::new()
+ .name(format!("{} ({})", name, i))
+ .spawn(move || {
+ loop {
+ let data = match input_rx.recv() {
+ Ok(data) => data,
+ Err(_) => return,
+ };
+ if let Err(err) = (handler_fn)(data) {
+ let mut guard = abort.lock().unwrap();
+ if guard.is_none() {
+ *guard = Some(err.to_string());
+ }
+ }
+ }
+ })
+ .unwrap(),
+ );
+ }
+ Self {
+ handles,
+ name: name.to_string(),
+ input: Some(SendHandle {
+ input: input_tx,
+ abort,
+ }),
+ }
+ }
+
+ /// Returns a cloneable channel to send data to the worker threads
+ pub fn channel(&self) -> SendHandle<I> {
+ self.input.as_ref().unwrap().clone()
+ }
+
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ self.input.as_ref().unwrap().send(input)?;
+ Ok(())
+ }
+
+ /// Wait for worker threads to complete and check for errors
+ pub fn complete(mut self) -> Result<(), Error> {
+ let input = self.input.take().unwrap();
+ let abort = Arc::clone(&input.abort);
+ check_abort(&abort)?;
+ drop(input);
+
+ let msg_list = self.join_threads();
+
+ // an error might be encountered while waiting for the join
+ check_abort(&abort)?;
+
+ if msg_list.is_empty() {
+ return Ok(());
+ }
+ Err(format_err!("{}", msg_list.join("\n")))
+ }
+
+ fn join_threads(&mut self) -> Vec<String> {
+ let mut msg_list = Vec::new();
+
+ let mut i = 0;
+ while let Some(handle) = self.handles.pop() {
+ if let Err(panic) = handle.join() {
+ if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else {
+ msg_list.push(format!("thread {} ({i}) panicked", self.name));
+ }
+ }
+ i += 1;
+ }
+ msg_list
+ }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+ fn drop(&mut self) {
+ drop(self.input.take());
+ while let Some(handle) = self.handles.pop() {
+ let _ = handle.join();
+ }
+ }
+}
diff --git a/wrapper.h b/wrapper.h
new file mode 100644
index 0000000..64d0aa6
--- /dev/null
+++ b/wrapper.h
@@ -0,0 +1 @@
+#include <rrd.h>
--
2.39.5
_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel
next prev parent reply other threads:[~2025-05-23 16:00 UTC|newest]
Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-05-23 16:00 [pve-devel] [RFC cluster/common/container/manager/pve9-rrd-migration-tool/qemu-server/storage 00/19] Expand and migrate RRD data Aaron Lauterer
2025-05-23 16:00 ` [pve-devel] [PATCH cluster-pve8 1/2] cfs status.c: drop old pve2-vm rrd schema support Aaron Lauterer
2025-05-23 16:00 ` [pve-devel] [PATCH cluster-pve8 2/2] status: handle new pve9- metrics update data Aaron Lauterer
2025-05-23 16:35 ` Aaron Lauterer
2025-06-02 13:31 ` Thomas Lamprecht
2025-06-11 14:18 ` Aaron Lauterer
2025-05-23 16:00 ` Aaron Lauterer [this message]
2025-05-23 16:00 ` [pve-devel] [PATCH cluster 1/1] status: introduce new pve9- rrd and metric format Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH common 1/4] fix error in pressure parsing Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH common 2/4] add functions to retrieve pressures for vm/ct Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH common 3/4] add helper to fetch value from smaps_rollup for pid Aaron Lauterer
2025-06-02 14:11 ` Thomas Lamprecht
2025-05-23 16:37 ` [pve-devel] [PATCH common 4/4] metrics: add buffer and cache to meminfo Aaron Lauterer
2025-06-02 14:07 ` Thomas Lamprecht
2025-06-11 15:17 ` Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH manager 1/5] api2tools: drop old VM rrd schema Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH manager 2/5] pvestatd: collect and distribute new pve9- metrics Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH manager 3/5] api: nodes: rrd and rrddata fetch from new pve9-node rrd files if present Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH manager 4/5] api2tools: extract stats: handle existence of new pve9- data Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH manager 5/5] ui: rrdmodels: add new columns Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH storage 1/1] status: rrddata: use new pve9 rrd location if file is present Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH qemu-server 1/4] metrics: add pressure to metrics Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH qemu-server 2/4] vmstatus: add memhost for host view of vm mem consumption Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH qemu-server 3/4] vmstatus: switch mem stat to PSS of VM cgroup Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH qemu-server 4/4] rrddata: use new pve9 rrd location if file is present Aaron Lauterer
2025-05-23 16:37 ` [pve-devel] [PATCH container 1/1] " Aaron Lauterer
2025-06-02 14:39 ` [pve-devel] [PATCH common 2/4] add functions to retrieve pressures for vm/ct Thomas Lamprecht
2025-05-26 11:52 ` [pve-devel] [RFC cluster/common/container/manager/pve9-rrd-migration-tool/qemu-server/storage 00/19] Expand and migrate RRD data DERUMIER, Alexandre via pve-devel
2025-07-09 11:26 ` Aaron Lauterer
2025-07-09 11:22 [pve-devel] [PATCH many 00/19] Expand and migrate RRD data (excluding GUI) Aaron Lauterer
2025-07-09 11:22 ` [pve-devel] [PATCH pve9-rrd-migration-tool 1/1] introduce rrd migration tool for pve8 -> pve9 Aaron Lauterer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250523160029.404400-4-a.lauterer@proxmox.com \
--to=a.lauterer@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.