From: Aaron Lauterer <a.lauterer@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH proxmox-rrd-migration-tool v4 1/3] create proxmox-rrd-migration-tool
Date: Sat, 26 Jul 2025 03:05:56 +0200 [thread overview]
Message-ID: <20250726010626.1496866-2-a.lauterer@proxmox.com> (raw)
In-Reply-To: <20250726010626.1496866-1-a.lauterer@proxmox.com>
This tool is intended to migrate the Proxmox VE (PVE) RRD data files to
the new schema.
Up until PVE8 the schema has been the same for a long time. With PVE9 we
introduced new columns to guests (vm) and nodes. We also switched all
types (vm, node, storate) to the same aggregation schemas as we do it in
PBS.
The result of both are a much finer resolution for long time spans, but
also larger RRD files.
* node: 79K -> 1.4M
* vm: 66K -> 1.3m
* storage: 14K -> 156K
The old directories for VMs used to be in `/var/lib/rrdcached/db/` with
the following sub directories:
* nodes: `pve2-node`
* guests (VM/CT): `pve2-vm`
* storage: `pve2-storage`
With this change we also introduce a new key schema, that makes it
easier in the future to introduce new ones. Instead of the
`pve{version}-{type}` we are switching to `pve-{type}-{version}`.
This enables us to add new columns with a new version, without breaking
nodes that are not yet updated. We are NOT allowed to remove or re-use
existing columns. That would be a breaking change.
We are currently at version 9.0. But in the future, if needed, this tool
can be adapted to do other migrations too.
For example, {old, 9.0} -> 9.2, should that be necessary.
The actual migration is handled by `librrd` to which we pass the path to
the old and new files, and the new RRD definitions. The `rrd_create_r2`
call then does the hard work of migrating and converting exisiting data
into the new file and aggregation schema.
This can take some time. Quick tests on a Ryzen 7900X with the following
files:
* 1 node RRD file
* 10k vm RRD files
* 1 storage RRD file
showed the folling results:
* 1 thread: 179.61s user 14.82s system 100% cpu 3:14.17 total
* 4 threads: 187.57s user 16.98s system 399% cpu 51.198 total
This is why we do not migrate inline, but have it as a separate step
during package upgrades.
Behavior: By default nothing will be changed and a dry or test run will
happen.
Only if the `--migrate` parameter is added will the actual migration be
done.
For each found RRD source file, the tool checks if a matching target
file already exists. By default, those will be skipped to not overwrite
target files that might already store newer data.
With the `--force` parameter this can be changed.
That means, one can run the tool multiple times (without --force) and it
will pick up where it might have left off. For example it the migration
was interrupted for some reason.
Once a source file has been processed it will be renamed with the `.old`
appendix. It will be excluded from future runs as we check for files
without an extension.
The tool has some simple heuristic to determine how many threads should
be used. Be default the range is between 1 to 4 threads. But the
`--threads` parameter allows a manual override.
Signed-off-by: Aaron Lauterer <a.lauterer@proxmox.com>
---
.cargo/config.toml | 5 +
.gitignore | 9 +
Cargo.toml | 20 ++
build.rs | 29 ++
src/lib.rs | 5 +
src/main.rs | 567 ++++++++++++++++++++++++++++++++++++++++
src/parallel_handler.rs | 160 ++++++++++++
wrapper.h | 1 +
8 files changed, 796 insertions(+)
create mode 100644 .cargo/config.toml
create mode 100644 .gitignore
create mode 100644 Cargo.toml
create mode 100644 build.rs
create mode 100644 src/lib.rs
create mode 100644 src/main.rs
create mode 100644 src/parallel_handler.rs
create mode 100644 wrapper.h
diff --git a/.cargo/config.toml b/.cargo/config.toml
new file mode 100644
index 0000000..3b5b6e4
--- /dev/null
+++ b/.cargo/config.toml
@@ -0,0 +1,5 @@
+[source]
+[source.debian-packages]
+directory = "/usr/share/cargo/registry"
+[source.crates-io]
+replace-with = "debian-packages"
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..06ac1a1
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,9 @@
+*.build
+*.buildinfo
+*.changes
+*.deb
+*.dsc
+*.tar*
+target/
+/Cargo.lock
+/proxmox-rrd-migration-tool-[0-9]*/
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..d3523f3
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox_rrd_migration_8-9"
+version = "0.1.0"
+edition = "2021"
+authors = [
+ "Aaron Lauterer <a.lauterer@proxmox.com>",
+ "Proxmox Support Team <support@proxmox.com>",
+]
+license = "AGPL-3"
+homepage = "https://www.proxmox.com"
+
+[dependencies]
+anyhow = "1.0.86"
+pico-args = "0.5.0"
+proxmox-async = "0.4"
+crossbeam-channel = "0.5"
+
+[build-dependencies]
+bindgen = "0.66.1"
+pkg-config = "0.3"
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..56d07cc
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,29 @@
+use std::env;
+use std::path::PathBuf;
+
+fn main() {
+ println!("cargo:rustc-link-lib=rrd");
+
+ println!("cargo:rerun-if-changed=wrapper.h");
+ // The bindgen::Builder is the main entry point
+ // to bindgen, and lets you build up options for
+ // the resulting bindings.
+
+ let bindings = bindgen::Builder::default()
+ // The input header we would like to generate
+ // bindings for.
+ .header("wrapper.h")
+ // Tell cargo to invalidate the built crate whenever any of the
+ // included header files changed.
+ .parse_callbacks(Box::new(bindgen::CargoCallbacks))
+ // Finish the builder and generate the bindings.
+ .generate()
+ // Unwrap the Result and panic on failure.
+ .expect("Unable to generate bindings");
+
+ // Write the bindings to the $OUT_DIR/bindings.rs file.
+ let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
+ bindings
+ .write_to_file(out_path.join("bindings.rs"))
+ .expect("Couldn't write bindings!");
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..a38a13a
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,5 @@
+#![allow(non_upper_case_globals)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+
+include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..5e6418c
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,567 @@
+use anyhow::{bail, Error, Result};
+use std::{
+ ffi::{CStr, CString, OsString},
+ fs,
+ os::unix::{ffi::OsStrExt, fs::PermissionsExt},
+ path::{Path, PathBuf},
+ sync::Arc,
+};
+
+use proxmox_rrd_migration_tool::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error};
+
+use crate::parallel_handler::ParallelHandler;
+
+pub mod parallel_handler;
+
+const BASE_DIR: &str = "/var/lib/rrdcached/db";
+const SOURCE_SUBDIR_NODE: &str = "pve2-node";
+const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
+const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
+const TARGET_SUBDIR_NODE: &str = "pve-node-9.0";
+const TARGET_SUBDIR_GUEST: &str = "pve-vm-9.0";
+const TARGET_SUBDIR_STORAGE: &str = "pve-storage-9.0";
+const RESOURCE_BASE_DIR: &str = "/etc/pve";
+const MAX_THREADS: usize = 4;
+const RRD_STEP_SIZE: usize = 60;
+
+type File = (CString, OsString);
+
+// RRAs are defined in the following way:
+//
+// RRA:CF:xff:step:rows
+// CF: AVERAGE or MAX
+// xff: 0.5
+// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size:
+// e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
+// rows: how many aggregated rows are kept, as in how far back in time we store data
+//
+// how many seconds are aggregated per RRA: steps * stepsize * rows
+// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
+// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
+// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
+
+const RRD_VM_DEF: [&CStr; 25] = [
+ c"DS:maxcpu:GAUGE:120:0:U",
+ c"DS:cpu:GAUGE:120:0:U",
+ c"DS:maxmem:GAUGE:120:0:U",
+ c"DS:mem:GAUGE:120:0:U",
+ c"DS:maxdisk:GAUGE:120:0:U",
+ c"DS:disk:GAUGE:120:0:U",
+ c"DS:netin:DERIVE:120:0:U",
+ c"DS:netout:DERIVE:120:0:U",
+ c"DS:diskread:DERIVE:120:0:U",
+ c"DS:diskwrite:DERIVE:120:0:U",
+ c"DS:memhost:GAUGE:120:0:U",
+ c"DS:pressurecpusome:GAUGE:120:0:U",
+ c"DS:pressurecpufull:GAUGE:120:0:U",
+ c"DS:pressureiosome:GAUGE:120:0:U",
+ c"DS:pressureiofull:GAUGE:120:0:U",
+ c"DS:pressurememorysome:GAUGE:120:0:U",
+ c"DS:pressurememoryfull:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const RRD_NODE_DEF: [&CStr; 27] = [
+ c"DS:loadavg:GAUGE:120:0:U",
+ c"DS:maxcpu:GAUGE:120:0:U",
+ c"DS:cpu:GAUGE:120:0:U",
+ c"DS:iowait:GAUGE:120:0:U",
+ c"DS:memtotal:GAUGE:120:0:U",
+ c"DS:memused:GAUGE:120:0:U",
+ c"DS:swaptotal:GAUGE:120:0:U",
+ c"DS:swapused:GAUGE:120:0:U",
+ c"DS:roottotal:GAUGE:120:0:U",
+ c"DS:rootused:GAUGE:120:0:U",
+ c"DS:netin:DERIVE:120:0:U",
+ c"DS:netout:DERIVE:120:0:U",
+ c"DS:memfree:GAUGE:120:0:U",
+ c"DS:arcsize:GAUGE:120:0:U",
+ c"DS:pressurecpusome:GAUGE:120:0:U",
+ c"DS:pressureiosome:GAUGE:120:0:U",
+ c"DS:pressureiofull:GAUGE:120:0:U",
+ c"DS:pressurememorysome:GAUGE:120:0:U",
+ c"DS:pressurememoryfull:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const RRD_STORAGE_DEF: [&CStr; 10] = [
+ c"DS:total:GAUGE:120:0:U",
+ c"DS:used:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const HELP: &str = "\
+proxmox-rrd-migration tool
+
+Migrates existing RRD graph data to the new format.
+
+Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide!
+
+USAGE:
+ proxmox-rrd-migration [OPTIONS]
+
+ FLAGS:
+ -h, --help Prints this help information
+
+ OPTIONS:
+ --migrate Start the migration. Without it, only a dry run will be done.
+
+ --force Migrate, even if the target already exists.
+ This will overwrite any migrated RRD files!
+
+ --threads THREADS Number of paralell threads.
+
+ --source <SOURCE DIR> Source base directory. Mainly for tests!
+ Default: /var/lib/rrdcached/db
+
+ --target <TARGET DIR> Target base directory. Mainly for tests!
+ Default: /var/lib/rrdcached/db
+
+ --resources <DIR> Directory that contains .vmlist and .member files. Mainly for tests!
+ Default: /etc/pve
+
+";
+
+#[derive(Debug)]
+struct Args {
+ migrate: bool,
+ force: bool,
+ threads: Option<usize>,
+ source: Option<String>,
+ target: Option<String>,
+ resources: Option<String>,
+}
+
+fn parse_args() -> Result<Args, Error> {
+ let mut pargs = pico_args::Arguments::from_env();
+
+ // Help has a higher priority and should be handled separately.
+ if pargs.contains(["-h", "--help"]) {
+ print!("{}", HELP);
+ std::process::exit(0);
+ }
+
+ let mut args = Args {
+ migrate: false,
+ threads: pargs
+ .opt_value_from_str("--threads")
+ .expect("Could not parse --threads parameter"),
+ force: false,
+ source: pargs
+ .opt_value_from_str("--source")
+ .expect("Could not parse --source parameter"),
+ target: pargs
+ .opt_value_from_str("--target")
+ .expect("Could not parse --target parameter"),
+ resources: pargs
+ .opt_value_from_str("--resources")
+ .expect("Could not parse --resources parameter"),
+ };
+
+ if pargs.contains("--migrate") {
+ args.migrate = true;
+ }
+ if pargs.contains("--force") {
+ args.force = true;
+ }
+
+ // It's up to the caller what to do with the remaining arguments.
+ let remaining = pargs.finish();
+ if !remaining.is_empty() {
+ bail!(format!("Warning: unused arguments left: {:?}", remaining));
+ }
+
+ Ok(args)
+}
+
+fn main() {
+ let args = match parse_args() {
+ Ok(v) => v,
+ Err(e) => {
+ eprintln!("Error: {}.", e);
+ std::process::exit(1);
+ }
+ };
+
+ let source_base_dir = match args.source {
+ Some(ref v) => v.as_str(),
+ None => BASE_DIR,
+ };
+
+ let target_base_dir = match args.target {
+ Some(ref v) => v.as_str(),
+ None => BASE_DIR,
+ };
+
+ let resource_base_dir = match args.resources {
+ Some(ref v) => v.as_str(),
+ None => RESOURCE_BASE_DIR,
+ };
+
+ let source_dir_guests: PathBuf = [source_base_dir, SOURCE_SUBDIR_GUEST].iter().collect();
+ let target_dir_guests: PathBuf = [target_base_dir, TARGET_SUBDIR_GUEST].iter().collect();
+ let source_dir_nodes: PathBuf = [source_base_dir, SOURCE_SUBDIR_NODE].iter().collect();
+ let target_dir_nodes: PathBuf = [target_base_dir, TARGET_SUBDIR_NODE].iter().collect();
+ let source_dir_storage: PathBuf = [source_base_dir, SOURCE_SUBDIR_STORAGE].iter().collect();
+ let target_dir_storage: PathBuf = [target_base_dir, TARGET_SUBDIR_STORAGE].iter().collect();
+
+ if !args.migrate {
+ println!("DRYRUN! Use the --migrate parameter to start the migration.");
+ }
+ if args.force {
+ println!("Force mode! Will overwrite existing target RRD files!");
+ }
+
+ if let Err(e) = migrate_nodes(
+ source_dir_nodes,
+ target_dir_nodes,
+ resource_base_dir,
+ args.migrate,
+ args.force,
+ ) {
+ eprintln!("Error migrating nodes: {}", e);
+ std::process::exit(1);
+ }
+ if let Err(e) = migrate_storage(
+ source_dir_storage,
+ target_dir_storage,
+ args.migrate,
+ args.force,
+ ) {
+ eprintln!("Error migrating storage: {}", e);
+ std::process::exit(1);
+ }
+ if let Err(e) = migrate_guests(
+ source_dir_guests,
+ target_dir_guests,
+ resource_base_dir,
+ set_threads(&args),
+ args.migrate,
+ args.force,
+ ) {
+ eprintln!("Error migrating guests: {}", e);
+ std::process::exit(1);
+ }
+}
+
+/// Set number of threads
+///
+/// Either a fixed parameter or determining a range between 1 to 4 threads
+/// based on the number of CPU cores available in the system.
+fn set_threads(args: &Args) -> usize {
+ if let Some(threads) = args.threads {
+ return threads;
+ }
+
+ // check for a way to get physical cores and not threads?
+ let cpus: usize = String::from_utf8_lossy(
+ std::process::Command::new("nproc")
+ .output()
+ .expect("Error running nproc")
+ .stdout
+ .as_slice()
+ .trim_ascii(),
+ )
+ .parse::<usize>()
+ .expect("Could not parse nproc output");
+
+ if cpus < 32 {
+ let threads = cpus / 8;
+ if threads == 0 {
+ return 1;
+ }
+ return threads;
+ }
+ MAX_THREADS
+}
+
+/// Check if a VMID is currently configured
+fn resource_present(path: &str, resource: &str) -> Result<bool> {
+ let resourcelist = fs::read_to_string(path)?;
+ Ok(resourcelist.contains(format!("\"{resource}\"").as_str()))
+}
+
+/// Rename file to old, when migrated or resource not present at all -> old RRD file
+fn mv_old(file: &str) -> Result<()> {
+ let old = format!("{}.old", file);
+ fs::rename(file, old)?;
+ Ok(())
+}
+
+/// Colllect all RRD files in the provided directory
+fn collect_rrd_files(location: &PathBuf) -> Result<Vec<(CString, OsString)>> {
+ let mut files: Vec<(CString, OsString)> = Vec::new();
+
+ fs::read_dir(location)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_file() && f.extension().is_none())
+ .for_each(|file| {
+ let path = CString::new(file.as_path().as_os_str().as_bytes())
+ .expect("Could not convert path to CString.");
+ let fname = file
+ .file_name()
+ .map(|v| v.to_os_string())
+ .expect("Could not convert fname to OsString.");
+ files.push((path, fname))
+ });
+ Ok(files)
+}
+
+/// Does the actual migration for the given file
+fn do_rrd_migration(
+ file: File,
+ target_location: &Path,
+ rrd_def: &[&CStr],
+ migrate: bool,
+ force: bool,
+) -> Result<()> {
+ if !migrate {
+ println!("would migrate but in dry run mode");
+ }
+
+ let resource = file.1;
+ let mut target_path = target_location.to_path_buf();
+ target_path.push(resource);
+
+ if target_path.exists() && !force {
+ println!(
+ "already migrated, use --force to overwrite target file: {}",
+ target_path.display()
+ );
+ }
+
+ if !migrate || (target_path.exists() && !force) {
+ bail!("skipping");
+ }
+
+ let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+ source[0] = file.0.as_ptr();
+
+ let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+ unsafe {
+ rrd_get_context();
+ rrd_clear_error();
+ let res = rrd_create_r2(
+ target_path.as_ptr(),
+ RRD_STEP_SIZE as u64,
+ 0,
+ 0,
+ source.as_mut_ptr(),
+ std::ptr::null(),
+ rrd_def.len() as i32,
+ rrd_def
+ .iter()
+ .map(|v| v.as_ptr())
+ .collect::<Vec<_>>()
+ .as_mut_ptr(),
+ );
+ if res != 0 {
+ bail!(
+ "RRD create Error: {}",
+ CStr::from_ptr(rrd_get_error()).to_string_lossy()
+ );
+ }
+ }
+ Ok(())
+}
+
+/// Migrate guest RRD files
+///
+/// In parallel to speed up the process as most time is spent on converting the
+/// data to the new format.
+fn migrate_guests(
+ source_dir_guests: PathBuf,
+ target_dir_guests: PathBuf,
+ resources: &str,
+ threads: usize,
+ migrate: bool,
+ force: bool,
+) -> Result<(), Error> {
+ println!("Migrating RRD data for guests…");
+ println!("Using {} thread(s)", threads);
+
+ let guest_source_files = collect_rrd_files(&source_dir_guests)?;
+
+ if !target_dir_guests.exists() && migrate {
+ println!("Creating new directory: '{}'", target_dir_guests.display());
+ std::fs::create_dir(&target_dir_guests)?;
+ }
+
+ let total_guests = guest_source_files.len();
+ let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let guests2 = guests.clone();
+ let start_time = std::time::SystemTime::now();
+
+ let migration_pool = ParallelHandler::new(
+ "guest rrd migration",
+ threads,
+ move |file: (CString, OsString)| {
+ let full_path = file.0.clone().into_string().unwrap();
+
+ if let Ok(()) = do_rrd_migration(
+ file,
+ &target_dir_guests,
+ RRD_VM_DEF.as_slice(),
+ migrate,
+ force,
+ ) {
+ mv_old(full_path.as_str())?;
+ let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ if current_guests > 0 && current_guests % 200 == 0 {
+ println!("Migrated {} of {} guests", current_guests, total_guests);
+ }
+ }
+ Ok(())
+ },
+ );
+ let migration_channel = migration_pool.channel();
+
+ for file in guest_source_files {
+ let node = file.1.clone().into_string().unwrap();
+ if !resource_present(format!("{resources}/.vmlist").as_str(), node.as_str())? {
+ println!("VMID: '{node}' not present. Skip and mark as old.");
+ mv_old(format!("{}", file.0.to_string_lossy()).as_str())?;
+ }
+ let migration_channel = migration_channel.clone();
+ migration_channel.send(file)?;
+ }
+
+ drop(migration_channel);
+ migration_pool.complete()?;
+
+ let elapsed = start_time.elapsed()?.as_secs_f64();
+ let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
+ println!("Migrated {} guests", guests);
+ println!("It took {:.2}s", elapsed);
+
+ Ok(())
+}
+
+/// Migrate node RRD files
+///
+/// In serial as the number of nodes will not be high.
+fn migrate_nodes(
+ source_dir_nodes: PathBuf,
+ target_dir_nodes: PathBuf,
+ resources: &str,
+ migrate: bool,
+ force: bool,
+) -> Result<(), Error> {
+ println!("Migrating RRD data for nodes…");
+
+ if !target_dir_nodes.exists() && migrate {
+ println!("Creating new directory: '{}'", target_dir_nodes.display());
+ std::fs::create_dir(&target_dir_nodes)?;
+ }
+
+ let node_source_files = collect_rrd_files(&source_dir_nodes)?;
+
+ for file in node_source_files {
+ let node = file.1.clone().into_string().unwrap();
+ let full_path = file.0.clone().into_string().unwrap();
+ println!("Node: '{node}'");
+ if !resource_present(format!("{resources}/.members").as_str(), node.as_str())? {
+ println!("Node: '{node}' not present. Skip and mark as old.");
+ mv_old(format!("{}/{}", file.0.to_string_lossy(), node).as_str())?;
+ }
+ if let Ok(()) = do_rrd_migration(
+ file,
+ &target_dir_nodes,
+ RRD_NODE_DEF.as_slice(),
+ migrate,
+ force,
+ ) {
+ mv_old(full_path.as_str())?;
+ }
+ }
+ println!("Migrated all nodes");
+
+ Ok(())
+}
+
+/// Migrate storage RRD files
+///
+/// In serial as the number of storage will not be that high.
+fn migrate_storage(
+ source_dir_storage: PathBuf,
+ target_dir_storage: PathBuf,
+ migrate: bool,
+ force: bool,
+) -> Result<(), Error> {
+ println!("Migrating RRD data for storages…");
+
+ if !target_dir_storage.exists() && migrate {
+ println!("Creating new directory: '{}'", target_dir_storage.display());
+ std::fs::create_dir(&target_dir_storage)?;
+ }
+
+ // storage has another layer of directories per node over which we need to iterate
+ fs::read_dir(&source_dir_storage)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_dir())
+ .try_for_each(|node| {
+ let mut source_storage_subdir = source_dir_storage.clone();
+ source_storage_subdir.push(node.file_name().unwrap());
+
+ let mut target_storage_subdir = target_dir_storage.clone();
+ target_storage_subdir.push(node.file_name().unwrap());
+
+ if !target_storage_subdir.exists() && migrate {
+ fs::create_dir(target_storage_subdir.as_path())?;
+ let metadata = target_storage_subdir.metadata()?;
+ let mut permissions = metadata.permissions();
+ permissions.set_mode(0o755);
+ }
+
+ let storage_source_files = collect_rrd_files(&source_storage_subdir)?;
+
+ for file in storage_source_files {
+ println!(
+ "Storage: '{}/{}'",
+ node.file_name()
+ .expect("no file name present")
+ .to_string_lossy(),
+ PathBuf::from(file.1.clone()).display()
+ );
+
+ let full_path = file.0.clone().into_string().unwrap();
+ if let Ok(()) = do_rrd_migration(
+ file,
+ &target_storage_subdir,
+ RRD_STORAGE_DEF.as_slice(),
+ migrate,
+ force,
+ ) {
+ mv_old(full_path.as_str())?;
+ }
+ }
+ Ok::<(), Error>(())
+ })?;
+ println!("Migrated all storages");
+
+ Ok(())
+}
diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
new file mode 100644
index 0000000..d8ee3c7
--- /dev/null
+++ b/src/parallel_handler.rs
@@ -0,0 +1,160 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{bail, format_err, Error};
+use crossbeam_channel::{bounded, Sender};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+ input: Sender<I>,
+ abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
+ }
+ Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ check_abort(&self.abort)?;
+ match self.input.send(input) {
+ Ok(()) => Ok(()),
+ Err(_) => bail!("send failed - channel closed"),
+ }
+ }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+ handles: Vec<JoinHandle<()>>,
+ name: String,
+ input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+ fn clone(&self) -> Self {
+ Self {
+ input: self.input.clone(),
+ abort: Arc::clone(&self.abort),
+ }
+ }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+ /// Create a new thread pool, each thread processing incoming data
+ /// with 'handler_fn'.
+ pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ where
+ F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+ {
+ let mut handles = Vec::new();
+ let (input_tx, input_rx) = bounded::<I>(threads);
+
+ let abort = Arc::new(Mutex::new(None));
+
+ for i in 0..threads {
+ let input_rx = input_rx.clone();
+ let abort = Arc::clone(&abort);
+ let handler_fn = handler_fn.clone();
+
+ handles.push(
+ std::thread::Builder::new()
+ .name(format!("{} ({})", name, i))
+ .spawn(move || loop {
+ let data = match input_rx.recv() {
+ Ok(data) => data,
+ Err(_) => return,
+ };
+ if let Err(err) = (handler_fn)(data) {
+ let mut guard = abort.lock().unwrap();
+ if guard.is_none() {
+ *guard = Some(err.to_string());
+ }
+ }
+ })
+ .unwrap(),
+ );
+ }
+ Self {
+ handles,
+ name: name.to_string(),
+ input: Some(SendHandle {
+ input: input_tx,
+ abort,
+ }),
+ }
+ }
+
+ /// Returns a cloneable channel to send data to the worker threads
+ pub fn channel(&self) -> SendHandle<I> {
+ self.input.as_ref().unwrap().clone()
+ }
+
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ self.input.as_ref().unwrap().send(input)?;
+ Ok(())
+ }
+
+ /// Wait for worker threads to complete and check for errors
+ pub fn complete(mut self) -> Result<(), Error> {
+ let input = self.input.take().unwrap();
+ let abort = Arc::clone(&input.abort);
+ check_abort(&abort)?;
+ drop(input);
+
+ let msg_list = self.join_threads();
+
+ // an error might be encountered while waiting for the join
+ check_abort(&abort)?;
+
+ if msg_list.is_empty() {
+ return Ok(());
+ }
+ Err(format_err!("{}", msg_list.join("\n")))
+ }
+
+ fn join_threads(&mut self) -> Vec<String> {
+ let mut msg_list = Vec::new();
+
+ let mut i = 0;
+ while let Some(handle) = self.handles.pop() {
+ if let Err(panic) = handle.join() {
+ if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else {
+ msg_list.push(format!("thread {} ({i}) panicked", self.name));
+ }
+ }
+ i += 1;
+ }
+ msg_list
+ }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+ fn drop(&mut self) {
+ drop(self.input.take());
+ while let Some(handle) = self.handles.pop() {
+ let _ = handle.join();
+ }
+ }
+}
diff --git a/wrapper.h b/wrapper.h
new file mode 100644
index 0000000..64d0aa6
--- /dev/null
+++ b/wrapper.h
@@ -0,0 +1 @@
+#include <rrd.h>
--
2.39.5
_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel
next prev parent reply other threads:[~2025-07-26 1:06 UTC|newest]
Thread overview: 50+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-07-26 1:05 [pve-devel] [PATCH many v4 00/31] Expand and migrate RRD data and add/change summary graphs Aaron Lauterer
2025-07-26 1:05 ` Aaron Lauterer [this message]
2025-07-28 14:25 ` [pve-devel] [PATCH proxmox-rrd-migration-tool v4 1/3] create proxmox-rrd-migration-tool Lukas Wagner
2025-07-26 1:05 ` [pve-devel] [PATCH proxmox-rrd-migration-tool v4 2/3] add first tests Aaron Lauterer
2025-07-28 14:52 ` Lukas Wagner
2025-07-26 1:05 ` [pve-devel] [PATCH proxmox-rrd-migration-tool v4 3/3] add debian packaging Aaron Lauterer
2025-07-28 14:36 ` Lukas Wagner
2025-07-29 9:29 ` Thomas Lamprecht
2025-07-29 9:49 ` Lukas Wagner
2025-07-30 17:57 ` [pve-devel] applied: " Thomas Lamprecht
2025-07-26 1:05 ` [pve-devel] [PATCH cluster v4 1/2] status: introduce new pve-{type}- rrd and metric format Aaron Lauterer
2025-07-29 9:44 ` Lukas Wagner
2025-07-30 11:21 ` Lukas Wagner
2025-07-31 3:23 ` Thomas Lamprecht
2025-07-26 1:06 ` [pve-devel] [PATCH cluster v4 2/2] rrd: adapt to new RRD format with different aggregation windows Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH widget-toolkit v4 1/4] rrdchart: allow to override the series object Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH widget-toolkit v4 2/4] rrdchart: use reference for undo button Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH widget-toolkit v4 3/4] rrdchard: set cursor pointer for legend Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH widget-toolkit v4 4/4] rrdchart: add dummy listener for legend clicks Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 01/15] pvestatd: collect and distribute new pve-{type}-9.0 metrics Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 02/15] api: nodes: rrd and rrddata add decade option and use new pve-node-9.0 rrd files Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 03/15] api2tools: extract_vm_status add new vm memhost column Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 04/15] ui: rrdmodels: add new columns and update existing Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 05/15] ui: node summary: use stacked memory graph with zfs arc Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 06/15] ui: add pressure graphs to node and guest summary Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 07/15] ui: GuestStatusView: add memhost for VM guests Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 08/15] ui: GuestSummary: memory switch to stacked and add hostmem Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 09/15] ui: GuestSummary: remember visibility of host memory view Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 10/15] ui: nodesummary: guestsummary: add tooltip info buttons Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 11/15] ui: summaries: use titles for disk and network series Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 12/15] fix #6068: ui: utils: calculate and render host memory usage correctly Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 13/15] d/control: require proxmox-rrd-migration-tool >= 1.0.0 Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 14/15] d/postinst: run promox-rrd-migration-tool Aaron Lauterer
2025-07-29 12:09 ` Lukas Wagner
2025-07-26 1:06 ` [pve-devel] [PATCH manager stabe-8+master v4 15/15] pve8to9: add checkfs for RRD migration Aaron Lauterer
2025-07-29 8:15 ` Lukas Wagner
2025-07-29 9:16 ` Thomas Lamprecht
2025-07-26 1:06 ` [pve-devel] [PATCH storage v4 1/1] status: rrddata: use new pve-storage-9.0 rrd location if file is present Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH qemu-server v4 1/4] metrics: add pressure to metrics Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH qemu-server v4 2/4] vmstatus: add memhost for host view of vm mem consumption Aaron Lauterer
2025-07-29 12:49 ` Lukas Wagner
2025-07-31 3:37 ` Thomas Lamprecht
2025-07-31 6:51 ` Lukas Wagner
2025-07-26 1:06 ` [pve-devel] [PATCH qemu-server v4 3/4] vmstatus: switch mem stat to PSS of VM cgroup Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH qemu-server v4 4/4] rrddata: use new pve-vm-9.0 rrd location if file is present Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH container v4 1/2] metrics: add pressures to metrics Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH container v4 2/2] rrddata: use new pve-vm-9.0 rrd location if file is present Aaron Lauterer
2025-07-28 14:42 ` [pve-devel] [PATCH many v4 00/31] Expand and migrate RRD data and add/change summary graphs Thomas Lamprecht
2025-07-29 12:19 ` Lukas Wagner
2025-07-31 4:12 ` [pve-devel] applied: " Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250726010626.1496866-2-a.lauterer@proxmox.com \
--to=a.lauterer@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.