From: "Lukas Wagner" <l.wagner@proxmox.com>
To: "Proxmox VE development discussion" <pve-devel@lists.proxmox.com>
Cc: "pve-devel" <pve-devel-bounces@lists.proxmox.com>
Subject: Re: [pve-devel] [PATCH proxmox-rrd-migration-tool v4 1/3] create proxmox-rrd-migration-tool
Date: Mon, 28 Jul 2025 16:25:15 +0200 [thread overview]
Message-ID: <DBNR9VGV874U.3TNS24092ZP0H@proxmox.com> (raw)
In-Reply-To: <20250726010626.1496866-2-a.lauterer@proxmox.com>
Hey Aaron,
some comments inline.
On Sat Jul 26, 2025 at 3:05 AM CEST, Aaron Lauterer wrote:
> This tool is intended to migrate the Proxmox VE (PVE) RRD data files to
> the new schema.
>
> Up until PVE8 the schema has been the same for a long time. With PVE9 we
> introduced new columns to guests (vm) and nodes. We also switched all
> types (vm, node, storate) to the same aggregation schemas as we do it in
> PBS.
> The result of both are a much finer resolution for long time spans, but
> also larger RRD files.
>
> * node: 79K -> 1.4M
> * vm: 66K -> 1.3m
> * storage: 14K -> 156K
>
> The old directories for VMs used to be in `/var/lib/rrdcached/db/` with
> the following sub directories:
>
> * nodes: `pve2-node`
> * guests (VM/CT): `pve2-vm`
> * storage: `pve2-storage`
>
> With this change we also introduce a new key schema, that makes it
> easier in the future to introduce new ones. Instead of the
> `pve{version}-{type}` we are switching to `pve-{type}-{version}`.
>
> This enables us to add new columns with a new version, without breaking
> nodes that are not yet updated. We are NOT allowed to remove or re-use
> existing columns. That would be a breaking change.
> We are currently at version 9.0. But in the future, if needed, this tool
> can be adapted to do other migrations too.
> For example, {old, 9.0} -> 9.2, should that be necessary.
>
> The actual migration is handled by `librrd` to which we pass the path to
> the old and new files, and the new RRD definitions. The `rrd_create_r2`
> call then does the hard work of migrating and converting exisiting data
> into the new file and aggregation schema.
>
> This can take some time. Quick tests on a Ryzen 7900X with the following
> files:
> * 1 node RRD file
> * 10k vm RRD files
> * 1 storage RRD file
>
> showed the folling results:
>
> * 1 thread: 179.61s user 14.82s system 100% cpu 3:14.17 total
> * 4 threads: 187.57s user 16.98s system 399% cpu 51.198 total
>
> This is why we do not migrate inline, but have it as a separate step
> during package upgrades.
>
> Behavior: By default nothing will be changed and a dry or test run will
> happen.
> Only if the `--migrate` parameter is added will the actual migration be
> done.
>
> For each found RRD source file, the tool checks if a matching target
> file already exists. By default, those will be skipped to not overwrite
> target files that might already store newer data.
> With the `--force` parameter this can be changed.
>
> That means, one can run the tool multiple times (without --force) and it
> will pick up where it might have left off. For example it the migration
> was interrupted for some reason.
>
> Once a source file has been processed it will be renamed with the `.old`
> appendix. It will be excluded from future runs as we check for files
> without an extension.
>
> The tool has some simple heuristic to determine how many threads should
> be used. Be default the range is between 1 to 4 threads. But the
> `--threads` parameter allows a manual override.
>
> Signed-off-by: Aaron Lauterer <a.lauterer@proxmox.com>
> ---
> .cargo/config.toml | 5 +
> .gitignore | 9 +
> Cargo.toml | 20 ++
> build.rs | 29 ++
> src/lib.rs | 5 +
> src/main.rs | 567 ++++++++++++++++++++++++++++++++++++++++
> src/parallel_handler.rs | 160 ++++++++++++
> wrapper.h | 1 +
> 8 files changed, 796 insertions(+)
> create mode 100644 .cargo/config.toml
> create mode 100644 .gitignore
> create mode 100644 Cargo.toml
> create mode 100644 build.rs
> create mode 100644 src/lib.rs
> create mode 100644 src/main.rs
> create mode 100644 src/parallel_handler.rs
> create mode 100644 wrapper.h
>
> diff --git a/.cargo/config.toml b/.cargo/config.toml
> new file mode 100644
> index 0000000..3b5b6e4
> --- /dev/null
> +++ b/.cargo/config.toml
> @@ -0,0 +1,5 @@
> +[source]
> +[source.debian-packages]
> +directory = "/usr/share/cargo/registry"
> +[source.crates-io]
> +replace-with = "debian-packages"
> diff --git a/.gitignore b/.gitignore
> new file mode 100644
> index 0000000..06ac1a1
> --- /dev/null
> +++ b/.gitignore
> @@ -0,0 +1,9 @@
> +*.build
> +*.buildinfo
> +*.changes
> +*.deb
> +*.dsc
> +*.tar*
> +target/
> +/Cargo.lock
> +/proxmox-rrd-migration-tool-[0-9]*/
> diff --git a/Cargo.toml b/Cargo.toml
> new file mode 100644
> index 0000000..d3523f3
> --- /dev/null
> +++ b/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox_rrd_migration_8-9"
> +version = "0.1.0"
> +edition = "2021"
> +authors = [
> + "Aaron Lauterer <a.lauterer@proxmox.com>",
> + "Proxmox Support Team <support@proxmox.com>",
> +]
> +license = "AGPL-3"
> +homepage = "https://www.proxmox.com"
> +
> +[dependencies]
> +anyhow = "1.0.86"
> +pico-args = "0.5.0"
> +proxmox-async = "0.4"
> +crossbeam-channel = "0.5"
> +
> +[build-dependencies]
> +bindgen = "0.66.1"
> +pkg-config = "0.3"
> diff --git a/build.rs b/build.rs
> new file mode 100644
> index 0000000..56d07cc
> --- /dev/null
> +++ b/build.rs
> @@ -0,0 +1,29 @@
> +use std::env;
> +use std::path::PathBuf;
> +
> +fn main() {
> + println!("cargo:rustc-link-lib=rrd");
> +
> + println!("cargo:rerun-if-changed=wrapper.h");
> + // The bindgen::Builder is the main entry point
> + // to bindgen, and lets you build up options for
> + // the resulting bindings.
> +
> + let bindings = bindgen::Builder::default()
> + // The input header we would like to generate
> + // bindings for.
> + .header("wrapper.h")
> + // Tell cargo to invalidate the built crate whenever any of the
> + // included header files changed.
> + .parse_callbacks(Box::new(bindgen::CargoCallbacks))
> + // Finish the builder and generate the bindings.
> + .generate()
> + // Unwrap the Result and panic on failure.
> + .expect("Unable to generate bindings");
> +
> + // Write the bindings to the $OUT_DIR/bindings.rs file.
> + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
> + bindings
> + .write_to_file(out_path.join("bindings.rs"))
> + .expect("Couldn't write bindings!");
> +}
> diff --git a/src/lib.rs b/src/lib.rs
> new file mode 100644
> index 0000000..a38a13a
> --- /dev/null
> +++ b/src/lib.rs
> @@ -0,0 +1,5 @@
> +#![allow(non_upper_case_globals)]
> +#![allow(non_camel_case_types)]
> +#![allow(non_snake_case)]
> +
> +include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
> diff --git a/src/main.rs b/src/main.rs
> new file mode 100644
> index 0000000..5e6418c
> --- /dev/null
> +++ b/src/main.rs
> @@ -0,0 +1,567 @@
> +use anyhow::{bail, Error, Result};
> +use std::{
> + ffi::{CStr, CString, OsString},
> + fs,
> + os::unix::{ffi::OsStrExt, fs::PermissionsExt},
> + path::{Path, PathBuf},
> + sync::Arc,
> +};
> +
> +use proxmox_rrd_migration_tool::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error};
> +
> +use crate::parallel_handler::ParallelHandler;
> +
> +pub mod parallel_handler;
> +
> +const BASE_DIR: &str = "/var/lib/rrdcached/db";
> +const SOURCE_SUBDIR_NODE: &str = "pve2-node";
> +const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
> +const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
> +const TARGET_SUBDIR_NODE: &str = "pve-node-9.0";
> +const TARGET_SUBDIR_GUEST: &str = "pve-vm-9.0";
> +const TARGET_SUBDIR_STORAGE: &str = "pve-storage-9.0";
> +const RESOURCE_BASE_DIR: &str = "/etc/pve";
> +const MAX_THREADS: usize = 4;
> +const RRD_STEP_SIZE: usize = 60;
> +
> +type File = (CString, OsString);
Maybe use some different name here in order to avoid confusion with
std::fs::File? e.g. RRDFile
> +
> +// RRAs are defined in the following way:
> +//
> +// RRA:CF:xff:step:rows
> +// CF: AVERAGE or MAX
> +// xff: 0.5
> +// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size:
> +// e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
> +// rows: how many aggregated rows are kept, as in how far back in time we store data
> +//
> +// how many seconds are aggregated per RRA: steps * stepsize * rows
> +// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
> +// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
> +// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
> +
> +const RRD_VM_DEF: [&CStr; 25] = [
> + c"DS:maxcpu:GAUGE:120:0:U",
> + c"DS:cpu:GAUGE:120:0:U",
> + c"DS:maxmem:GAUGE:120:0:U",
> + c"DS:mem:GAUGE:120:0:U",
> + c"DS:maxdisk:GAUGE:120:0:U",
> + c"DS:disk:GAUGE:120:0:U",
> + c"DS:netin:DERIVE:120:0:U",
> + c"DS:netout:DERIVE:120:0:U",
> + c"DS:diskread:DERIVE:120:0:U",
> + c"DS:diskwrite:DERIVE:120:0:U",
> + c"DS:memhost:GAUGE:120:0:U",
> + c"DS:pressurecpusome:GAUGE:120:0:U",
> + c"DS:pressurecpufull:GAUGE:120:0:U",
> + c"DS:pressureiosome:GAUGE:120:0:U",
> + c"DS:pressureiofull:GAUGE:120:0:U",
> + c"DS:pressurememorysome:GAUGE:120:0:U",
> + c"DS:pressurememoryfull:GAUGE:120:0:U",
> + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
> +];
> +
> +const RRD_NODE_DEF: [&CStr; 27] = [
> + c"DS:loadavg:GAUGE:120:0:U",
> + c"DS:maxcpu:GAUGE:120:0:U",
> + c"DS:cpu:GAUGE:120:0:U",
> + c"DS:iowait:GAUGE:120:0:U",
> + c"DS:memtotal:GAUGE:120:0:U",
> + c"DS:memused:GAUGE:120:0:U",
> + c"DS:swaptotal:GAUGE:120:0:U",
> + c"DS:swapused:GAUGE:120:0:U",
> + c"DS:roottotal:GAUGE:120:0:U",
> + c"DS:rootused:GAUGE:120:0:U",
> + c"DS:netin:DERIVE:120:0:U",
> + c"DS:netout:DERIVE:120:0:U",
> + c"DS:memfree:GAUGE:120:0:U",
> + c"DS:arcsize:GAUGE:120:0:U",
> + c"DS:pressurecpusome:GAUGE:120:0:U",
> + c"DS:pressureiosome:GAUGE:120:0:U",
> + c"DS:pressureiofull:GAUGE:120:0:U",
> + c"DS:pressurememorysome:GAUGE:120:0:U",
> + c"DS:pressurememoryfull:GAUGE:120:0:U",
> + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
> +];
> +
> +const RRD_STORAGE_DEF: [&CStr; 10] = [
> + c"DS:total:GAUGE:120:0:U",
> + c"DS:used:GAUGE:120:0:U",
> + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
> +];
> +
> +const HELP: &str = "\
> +proxmox-rrd-migration tool
> +
> +Migrates existing RRD graph data to the new format.
> +
> +Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide!
> +
> +USAGE:
> + proxmox-rrd-migration [OPTIONS]
> +
> + FLAGS:
> + -h, --help Prints this help information
> +
> + OPTIONS:
> + --migrate Start the migration. Without it, only a dry run will be done.
> +
> + --force Migrate, even if the target already exists.
> + This will overwrite any migrated RRD files!
> +
> + --threads THREADS Number of paralell threads.
> +
> + --source <SOURCE DIR> Source base directory. Mainly for tests!
> + Default: /var/lib/rrdcached/db
> +
> + --target <TARGET DIR> Target base directory. Mainly for tests!
> + Default: /var/lib/rrdcached/db
> +
> + --resources <DIR> Directory that contains .vmlist and .member files. Mainly for tests!
> + Default: /etc/pve
> +
> +";
> +
> +#[derive(Debug)]
> +struct Args {
> + migrate: bool,
> + force: bool,
> + threads: Option<usize>,
> + source: Option<String>,
> + target: Option<String>,
> + resources: Option<String>,
> +}
> +
> +fn parse_args() -> Result<Args, Error> {
> + let mut pargs = pico_args::Arguments::from_env();
> +
> + // Help has a higher priority and should be handled separately.
> + if pargs.contains(["-h", "--help"]) {
> + print!("{}", HELP);
> + std::process::exit(0);
> + }
> +
> + let mut args = Args {
> + migrate: false,
> + threads: pargs
> + .opt_value_from_str("--threads")
> + .expect("Could not parse --threads parameter"),
> + force: false,
> + source: pargs
> + .opt_value_from_str("--source")
> + .expect("Could not parse --source parameter"),
> + target: pargs
> + .opt_value_from_str("--target")
> + .expect("Could not parse --target parameter"),
> + resources: pargs
> + .opt_value_from_str("--resources")
> + .expect("Could not parse --resources parameter"),
> + };
> +
> + if pargs.contains("--migrate") {
> + args.migrate = true;
> + }
> + if pargs.contains("--force") {
> + args.force = true;
> + }
> +
> + // It's up to the caller what to do with the remaining arguments.
> + let remaining = pargs.finish();
> + if !remaining.is_empty() {
> + bail!(format!("Warning: unused arguments left: {:?}", remaining));
No need to use format! here, bail! supports formatting natively:
bail!("Warning: .... {remaining:?}");
> + }
> +
> + Ok(args)
> +}
> +
> +fn main() {
> + let args = match parse_args() {
> + Ok(v) => v,
> + Err(e) => {
> + eprintln!("Error: {}.", e);
> + std::process::exit(1);
> + }
> + };
> +
> + let source_base_dir = match args.source {
> + Some(ref v) => v.as_str(),
> + None => BASE_DIR,
> + };
you can use this instead, it's shorter and a bit nicer to read IMO:
let source_base_dir = args.source.as_deref().unwrap_or(BASE_DIR);
> +
> + let target_base_dir = match args.target {
> + Some(ref v) => v.as_str(),
> + None => BASE_DIR,
> + };
> +
> + let resource_base_dir = match args.resources {
> + Some(ref v) => v.as_str(),
> + None => RESOURCE_BASE_DIR,
> + };
same for the previous two
> +
> + let source_dir_guests: PathBuf = [source_base_dir, SOURCE_SUBDIR_GUEST].iter().collect();
> + let target_dir_guests: PathBuf = [target_base_dir, TARGET_SUBDIR_GUEST].iter().collect();
> + let source_dir_nodes: PathBuf = [source_base_dir, SOURCE_SUBDIR_NODE].iter().collect();
> + let target_dir_nodes: PathBuf = [target_base_dir, TARGET_SUBDIR_NODE].iter().collect();
> + let source_dir_storage: PathBuf = [source_base_dir, SOURCE_SUBDIR_STORAGE].iter().collect();
> + let target_dir_storage: PathBuf = [target_base_dir, TARGET_SUBDIR_STORAGE].iter().collect();
What do you think about:
let source_base_dir = Path::new(args.source.as_deref().unwrap_or(BASE_DIR));
let target_base_dir = Path::new(args.target.as_deref().unwrap_or(BASE_DIR));
let source_dir_guests = source_base_dir.join(SOURCE_SUBDIR_GUEST);
let target_dir_guests = source_base_dir.join(SOURCE_SUBDIR_GUEST);
> +
> + if !args.migrate {
> + println!("DRYRUN! Use the --migrate parameter to start the migration.");
> + }
> + if args.force {
> + println!("Force mode! Will overwrite existing target RRD files!");
> + }
> +
> + if let Err(e) = migrate_nodes(
> + source_dir_nodes,
> + target_dir_nodes,
> + resource_base_dir,
> + args.migrate,
> + args.force,
> + ) {
> + eprintln!("Error migrating nodes: {}", e);
> + std::process::exit(1);
> + }
> + if let Err(e) = migrate_storage(
> + source_dir_storage,
> + target_dir_storage,
> + args.migrate,
> + args.force,
> + ) {
> + eprintln!("Error migrating storage: {}", e);
> + std::process::exit(1);
> + }
> + if let Err(e) = migrate_guests(
> + source_dir_guests,
> + target_dir_guests,
> + resource_base_dir,
> + set_threads(&args),
> + args.migrate,
> + args.force,
> + ) {
> + eprintln!("Error migrating guests: {}", e);
> + std::process::exit(1);
> + }
Error handling in this function could be a bit cleaner if broken out
into a separate function and by using anyhow's .context/.with_context:
fn do_main() -> Result<(), Error> {
let args = parse_args(...).context("Could not parse args")?;
...
migrate_guests(...).context("Error migrating guests")?;
Ok(())
}
fn main() {
if let Err(e) = do_main() {
eprintln!("{e}");
std::process:exit(1);
}
}
What do you think?
> +}
> +
> +/// Set number of threads
> +///
> +/// Either a fixed parameter or determining a range between 1 to 4 threads
> +/// based on the number of CPU cores available in the system.
> +fn set_threads(args: &Args) -> usize {
I think the name 'set_threads' is rather confusing for something that
*returns* the number of threads to use. Maybe call it
'threads_from_core_count' or something alike? (under the assumption that
you remove the let Some(...) as suggested below. If you keep it there,
'get_threads' might be an ok choice.
> + if let Some(threads) = args.threads {
> + return threads;
> + }
^ Personally I'd keep this part outside of the helper, but no hard
feelings.
fn do_main() {
...
let threads = args.threads.unwrap_or_else(threads_from_core_count);
migrate_guests(..., threads)?;
Ok(())
}
fn threads_from_core_count() -> usize {
...
}
> +
> + // check for a way to get physical cores and not threads?
> + let cpus: usize = String::from_utf8_lossy(
> + std::process::Command::new("nproc")
> + .output()
> + .expect("Error running nproc")
> + .stdout
> + .as_slice()
> + .trim_ascii(),
> + )
> + .parse::<usize>()
> + .expect("Could not parse nproc output");
> +
> + if cpus < 32 {
> + let threads = cpus / 8;
> + if threads == 0 {
> + return 1;
> + }
> + return threads;
> + }
> + MAX_THREADS
> +}
> +
> +/// Check if a VMID is currently configured
> +fn resource_present(path: &str, resource: &str) -> Result<bool> {
> + let resourcelist = fs::read_to_string(path)?;
> + Ok(resourcelist.contains(format!("\"{resource}\"").as_str()))
> +}
> +
> +/// Rename file to old, when migrated or resource not present at all -> old RRD file
> +fn mv_old(file: &str) -> Result<()> {
> + let old = format!("{}.old", file);
> + fs::rename(file, old)?;
> + Ok(())
> +}
> +
> +/// Colllect all RRD files in the provided directory
> +fn collect_rrd_files(location: &PathBuf) -> Result<Vec<(CString, OsString)>> {
^
Maybe use the type you've defined here? `File`, although I'd
prefer a different name to avoid confusion with std::fs::File.
> + let mut files: Vec<(CString, OsString)> = Vec::new();
> +
> + fs::read_dir(location)?
> + .filter(|f| f.is_ok())
> + .map(|f| f.unwrap().path())
You can use filter_map here, maybe like this:
fs::read_dir(location)?
.filter_map(|f| match f {
Ok(a) => Some(a.path()),
Err(e) => {
eprintln!("could not read dir entry: {e}");
None
}
})
or, if you don't want to log the error:
fs::read_dir(location)?
.filter_map(Result::ok)
.map(|entry| entry.path())
(untested, but you get the idea)
> + .filter(|f| f.is_file() && f.extension().is_none())
> + .for_each(|file| {
> + let path = CString::new(file.as_path().as_os_str().as_bytes())
> + .expect("Could not convert path to CString.");
^
> + let fname = file
> + .file_name()
> + .map(|v| v.to_os_string())
Reading the docs for the CString::new function, it should only fail if
there is a NUL byte in the string, which should AFAIK be impossible
here since the string came from the file name. Maybe express that in
some comment here?
v
> + .expect("Could not convert fname to OsString.");
> + files.push((path, fname))
> + });
> + Ok(files)
> +}
> +
> +/// Does the actual migration for the given file
> +fn do_rrd_migration(
> + file: File,
> + target_location: &Path,
> + rrd_def: &[&CStr],
> + migrate: bool,
> + force: bool,
> +) -> Result<()> {
> + if !migrate {
> + println!("would migrate but in dry run mode");
> + }
> +
> + let resource = file.1;
> + let mut target_path = target_location.to_path_buf();
Since the first thing you do with target_location is to convert it to
a PathBuf, I'd suggest just passing it as a PathBuf and let the caller
take care of the allocation.
> + target_path.push(resource);
> +
> + if target_path.exists() && !force {
> + println!(
> + "already migrated, use --force to overwrite target file: {}",
> + target_path.display()
> + );
> + }
> +
> + if !migrate || (target_path.exists() && !force) {
> + bail!("skipping");
> + }
you could pull out the 'target_path.exists() && !force' into a variable so
that you don't have to evaluate the same thing twice
> +
> + let mut source: [*const i8; 2] = [std::ptr::null(); 2];
> + source[0] = file.0.as_ptr();
> +
> + let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
> +
> + unsafe {
> + rrd_get_context();
> + rrd_clear_error();
> + let res = rrd_create_r2(
> + target_path.as_ptr(),
> + RRD_STEP_SIZE as u64,
> + 0,
> + 0,
> + source.as_mut_ptr(),
> + std::ptr::null(),
> + rrd_def.len() as i32,
> + rrd_def
> + .iter()
> + .map(|v| v.as_ptr())
> + .collect::<Vec<_>>()
> + .as_mut_ptr(),
> + );
> + if res != 0 {
> + bail!(
> + "RRD create Error: {}",
> + CStr::from_ptr(rrd_get_error()).to_string_lossy()
> + );
> + }
> + }
> + Ok(())
> +}
> +
> +/// Migrate guest RRD files
> +///
> +/// In parallel to speed up the process as most time is spent on converting the
> +/// data to the new format.
> +fn migrate_guests(
> + source_dir_guests: PathBuf,
> + target_dir_guests: PathBuf,
> + resources: &str,
> + threads: usize,
> + migrate: bool,
> + force: bool,
> +) -> Result<(), Error> {
> + println!("Migrating RRD data for guests…");
> + println!("Using {} thread(s)", threads);
> +
> + let guest_source_files = collect_rrd_files(&source_dir_guests)?;
> +
> + if !target_dir_guests.exists() && migrate {
> + println!("Creating new directory: '{}'", target_dir_guests.display());
> + std::fs::create_dir(&target_dir_guests)?;
> + }
> +
> + let total_guests = guest_source_files.len();
> + let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
> + let guests2 = guests.clone();
Just FIY, when cloning an Arc it's better to use Arc::clone(&guests),
because this makes it clearer *what* you are a actually cloning,
the Arc vs. the content of the Arc
> + let start_time = std::time::SystemTime::now();
Since you only measure the elapsed time it might be more idiomatic to
use std::time::Instant here, but not hard feelings.
> +
> + let migration_pool = ParallelHandler::new(
> + "guest rrd migration",
> + threads,
> + move |file: (CString, OsString)| {
Please add some comment regarding the .unwrap here.
> + let full_path = file.0.clone().into_string().unwrap();
> +
> + if let Ok(()) = do_rrd_migration(
> + file,
> + &target_dir_guests,
> + RRD_VM_DEF.as_slice(),
> + migrate,
> + force,
> + ) {
Since do_rrd_migration does not return any data in the Option, you could
just
if do_rrd_migration(....).is_ok() {
....
}
> + mv_old(full_path.as_str())?;
> + let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
> + if current_guests > 0 && current_guests % 200 == 0 {
> + println!("Migrated {} of {} guests", current_guests, total_guests);
> + }
> + }
> + Ok(())
> + },
> + );
> + let migration_channel = migration_pool.channel();
> +
> + for file in guest_source_files {
> + let node = file.1.clone().into_string().unwrap();
> + if !resource_present(format!("{resources}/.vmlist").as_str(), node.as_str())? {
> + println!("VMID: '{node}' not present. Skip and mark as old.");
> + mv_old(format!("{}", file.0.to_string_lossy()).as_str())?;
> + }
> + let migration_channel = migration_channel.clone();
Is this clone here needed? Seems to compile fine without here....
> + migration_channel.send(file)?;
> + }
> +
> + drop(migration_channel);
> + migration_pool.complete()?;
> +
> + let elapsed = start_time.elapsed()?.as_secs_f64();
> + let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
> + println!("Migrated {} guests", guests);
> + println!("It took {:.2}s", elapsed);
> +
> + Ok(())
> +}
> +
> +/// Migrate node RRD files
> +///
> +/// In serial as the number of nodes will not be high.
> +fn migrate_nodes(
> + source_dir_nodes: PathBuf,
> + target_dir_nodes: PathBuf,
> + resources: &str,
Any reason why this one is a &str instead of a PathBuf? As far as I can
tell it is also a path (/etc/pve by default). Also the
name of the variable somehow makes it not really clear that this is
suppose to be a path, I only deduced it from RESOURCE_BASE_DIR.
> + migrate: bool,
> + force: bool,
> +) -> Result<(), Error> {
> + println!("Migrating RRD data for nodes…");
> +
> + if !target_dir_nodes.exists() && migrate {
> + println!("Creating new directory: '{}'", target_dir_nodes.display());
> + std::fs::create_dir(&target_dir_nodes)?;
> + }
> +
> + let node_source_files = collect_rrd_files(&source_dir_nodes)?;
> +
> + for file in node_source_files {
> + let node = file.1.clone().into_string().unwrap();
> + let full_path = file.0.clone().into_string().unwrap();
Please add some comment why it is okay to .unwrap here (or just return
or ignore the error, if that makes more sense).
> + println!("Node: '{node}'");
> + if !resource_present(format!("{resources}/.members").as_str(), node.as_str())? {
You can just use &format!... and &node instead of the .as_str() calls
(a bit nicer to read and more idionmatic, but no hard feelings).
> + println!("Node: '{node}' not present. Skip and mark as old.");
> + mv_old(format!("{}/{}", file.0.to_string_lossy(), node).as_str())?;
> + }
> + if let Ok(()) = do_rrd_migration(
> + file,
> + &target_dir_nodes,
> + RRD_NODE_DEF.as_slice(),
> + migrate,
> + force,
> + ) {
Since do_rrd_migration does not return any data in the Option, you could
just
if do_rrd_migration(....).is_ok() {
....
}
> + mv_old(full_path.as_str())?;
> + }
> + }
> + println!("Migrated all nodes");
> +
> + Ok(())
> +}
> +
> +/// Migrate storage RRD files
> +///
> +/// In serial as the number of storage will not be that high.
> +fn migrate_storage(
> + source_dir_storage: PathBuf,
> + target_dir_storage: PathBuf,
> + migrate: bool,
> + force: bool,
> +) -> Result<(), Error> {
> + println!("Migrating RRD data for storages…");
> +
> + if !target_dir_storage.exists() && migrate {
> + println!("Creating new directory: '{}'", target_dir_storage.display());
> + std::fs::create_dir(&target_dir_storage)?;
> + }
> +
> + // storage has another layer of directories per node over which we need to iterate
> + fs::read_dir(&source_dir_storage)?
> + .filter(|f| f.is_ok())
> + .map(|f| f.unwrap().path())
> + .filter(|f| f.is_dir())
you can use filter_map here, as explained in collect_rrd_files
> + .try_for_each(|node| {
> + let mut source_storage_subdir = source_dir_storage.clone();
> + source_storage_subdir.push(node.file_name().unwrap());
> +
> + let mut target_storage_subdir = target_dir_storage.clone();
> + target_storage_subdir.push(node.file_name().unwrap());
> +
> + if !target_storage_subdir.exists() && migrate {
> + fs::create_dir(target_storage_subdir.as_path())?;
You can use & here instead of .as_path() :)
> + let metadata = target_storage_subdir.metadata()?;
> + let mut permissions = metadata.permissions();
> + permissions.set_mode(0o755);
You need to actually apply the permissions to the dir, here you only set
the permission bits in the Permissions data type.
std::fs::set_permissions(...)
> + }
> +
> + let storage_source_files = collect_rrd_files(&source_storage_subdir)?;
> +
> + for file in storage_source_files {
> + println!(
> + "Storage: '{}/{}'",
> + node.file_name()
> + .expect("no file name present")
Same thing here regarding the potential panic
> + .to_string_lossy(),
> + PathBuf::from(file.1.clone()).display()
Starting with rustc 1.87, you can directly call file.1.display() on the underlying OsStr(ing).
> + );
> +
> + let full_path = file.0.clone().into_string().unwrap();
> + if let Ok(()) = do_rrd_migration(
> + file,
> + &target_storage_subdir,
> + RRD_STORAGE_DEF.as_slice(),
> + migrate,
> + force,
> + ) {
> + mv_old(full_path.as_str())?;
> + }
Since do_rrd_migration does not return any data in the Option, you could
just
if do_rrd_migration(....).is_ok() {
....
}
> + }
> + Ok::<(), Error>(())
> + })?;
> + println!("Migrated all storages");
> +
> + Ok(())
> +}
> diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
> new file mode 100644
> index 0000000..d8ee3c7
> --- /dev/null
> +++ b/src/parallel_handler.rs
> @@ -0,0 +1,160 @@
> +//! A thread pool which run a closure in parallel.
> +
> +use std::sync::{Arc, Mutex};
> +use std::thread::JoinHandle;
> +
> +use anyhow::{bail, format_err, Error};
> +use crossbeam_channel::{bounded, Sender};
> +
> +/// A handle to send data to the worker thread (implements clone)
> +pub struct SendHandle<I> {
> + input: Sender<I>,
> + abort: Arc<Mutex<Option<String>>>,
> +}
> +
> +/// Returns the first error happened, if any
> +pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
> + let guard = abort.lock().unwrap();
> + if let Some(err_msg) = &*guard {
> + return Err(format_err!("{}", err_msg));
> + }
> + Ok(())
> +}
> +
> +impl<I: Send> SendHandle<I> {
> + /// Send data to the worker threads
> + pub fn send(&self, input: I) -> Result<(), Error> {
> + check_abort(&self.abort)?;
> + match self.input.send(input) {
> + Ok(()) => Ok(()),
> + Err(_) => bail!("send failed - channel closed"),
> + }
might be more idiomatic to use .map_err here
> + }
> +}
> +
> +/// A thread pool which run the supplied closure
> +///
> +/// The send command sends data to the worker threads. If one handler
> +/// returns an error, we mark the channel as failed and it is no
> +/// longer possible to send data.
> +///
> +/// When done, the 'complete()' method needs to be called to check for
> +/// outstanding errors.
> +pub struct ParallelHandler<I> {
> + handles: Vec<JoinHandle<()>>,
> + name: String,
> + input: Option<SendHandle<I>>,
> +}
> +
> +impl<I> Clone for SendHandle<I> {
> + fn clone(&self) -> Self {
> + Self {
> + input: self.input.clone(),
> + abort: Arc::clone(&self.abort),
> + }
> + }
> +}
> +
> +impl<I: Send + 'static> ParallelHandler<I> {
> + /// Create a new thread pool, each thread processing incoming data
> + /// with 'handler_fn'.
> + pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
> + where
> + F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
> + {
> + let mut handles = Vec::new();
> + let (input_tx, input_rx) = bounded::<I>(threads);
> +
> + let abort = Arc::new(Mutex::new(None));
> +
> + for i in 0..threads {
> + let input_rx = input_rx.clone();
> + let abort = Arc::clone(&abort);
> + let handler_fn = handler_fn.clone();
> +
> + handles.push(
> + std::thread::Builder::new()
> + .name(format!("{} ({})", name, i))
> + .spawn(move || loop {
> + let data = match input_rx.recv() {
> + Ok(data) => data,
> + Err(_) => return,
> + };
> + if let Err(err) = (handler_fn)(data) {
> + let mut guard = abort.lock().unwrap();
.unwrap on Mutex::lock is fine IMO, but should have a comment explaining
that it only .unwrap's on a poisioned mutex.
> + if guard.is_none() {
> + *guard = Some(err.to_string());
> + }
> + }
> + })
> + .unwrap(),
This shouldn't .unwrap() IMO, rather return an error from this function.
> + );
> + }
> + Self {
> + handles,
> + name: name.to_string(),
> + input: Some(SendHandle {
> + input: input_tx,
> + abort,
> + }),
> + }
> + }
> +
> + /// Returns a cloneable channel to send data to the worker threads
> + pub fn channel(&self) -> SendHandle<I> {
> + self.input.as_ref().unwrap().clone()
Please add a comment why .unwrap is okay here or bubble some error up
> + }
> +
> + /// Send data to the worker threads
> + pub fn send(&self, input: I) -> Result<(), Error> {
> + self.input.as_ref().unwrap().send(input)?;
Please add a comment why .unwrap is okay here or bubble some error up
> + Ok(())
> + }
> +
> + /// Wait for worker threads to complete and check for errors
> + pub fn complete(mut self) -> Result<(), Error> {
> + let input = self.input.take().unwrap();
> + let abort = Arc::clone(&input.abort);
> + check_abort(&abort)?;
> + drop(input);
> +
> + let msg_list = self.join_threads();
> +
> + // an error might be encountered while waiting for the join
> + check_abort(&abort)?;
> +
> + if msg_list.is_empty() {
> + return Ok(());
> + }
> + Err(format_err!("{}", msg_list.join("\n")))
I'd rather
if !msg_list.is_empty() {
bail!("{}", msg_list.join('\n'));
}
Ok(())
> + }
> +
> + fn join_threads(&mut self) -> Vec<String> {
> + let mut msg_list = Vec::new();
> +
> + let mut i = 0;
> + while let Some(handle) = self.handles.pop() {
> + if let Err(panic) = handle.join() {
> + if let Some(panic_msg) = panic.downcast_ref::<&str>() {
> + msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
> + } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
> + msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
> + } else {
> + msg_list.push(format!("thread {} ({i}) panicked", self.name));
> + }
> + }
> + i += 1;
> + }
> + msg_list
> + }
> +}
> +
> +// Note: We make sure that all threads will be joined
> +impl<I> Drop for ParallelHandler<I> {
> + fn drop(&mut self) {
> + drop(self.input.take());
> + while let Some(handle) = self.handles.pop() {
> + let _ = handle.join();
> + }
> + }
> +}
> diff --git a/wrapper.h b/wrapper.h
> new file mode 100644
> index 0000000..64d0aa6
> --- /dev/null
> +++ b/wrapper.h
> @@ -0,0 +1 @@
> +#include <rrd.h>
_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel
next prev parent reply other threads:[~2025-07-28 14:24 UTC|newest]
Thread overview: 50+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-07-26 1:05 [pve-devel] [PATCH many v4 00/31] Expand and migrate RRD data and add/change summary graphs Aaron Lauterer
2025-07-26 1:05 ` [pve-devel] [PATCH proxmox-rrd-migration-tool v4 1/3] create proxmox-rrd-migration-tool Aaron Lauterer
2025-07-28 14:25 ` Lukas Wagner [this message]
2025-07-26 1:05 ` [pve-devel] [PATCH proxmox-rrd-migration-tool v4 2/3] add first tests Aaron Lauterer
2025-07-28 14:52 ` Lukas Wagner
2025-07-26 1:05 ` [pve-devel] [PATCH proxmox-rrd-migration-tool v4 3/3] add debian packaging Aaron Lauterer
2025-07-28 14:36 ` Lukas Wagner
2025-07-29 9:29 ` Thomas Lamprecht
2025-07-29 9:49 ` Lukas Wagner
2025-07-30 17:57 ` [pve-devel] applied: " Thomas Lamprecht
2025-07-26 1:05 ` [pve-devel] [PATCH cluster v4 1/2] status: introduce new pve-{type}- rrd and metric format Aaron Lauterer
2025-07-29 9:44 ` Lukas Wagner
2025-07-30 11:21 ` Lukas Wagner
2025-07-31 3:23 ` Thomas Lamprecht
2025-07-26 1:06 ` [pve-devel] [PATCH cluster v4 2/2] rrd: adapt to new RRD format with different aggregation windows Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH widget-toolkit v4 1/4] rrdchart: allow to override the series object Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH widget-toolkit v4 2/4] rrdchart: use reference for undo button Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH widget-toolkit v4 3/4] rrdchard: set cursor pointer for legend Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH widget-toolkit v4 4/4] rrdchart: add dummy listener for legend clicks Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 01/15] pvestatd: collect and distribute new pve-{type}-9.0 metrics Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 02/15] api: nodes: rrd and rrddata add decade option and use new pve-node-9.0 rrd files Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 03/15] api2tools: extract_vm_status add new vm memhost column Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 04/15] ui: rrdmodels: add new columns and update existing Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 05/15] ui: node summary: use stacked memory graph with zfs arc Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 06/15] ui: add pressure graphs to node and guest summary Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 07/15] ui: GuestStatusView: add memhost for VM guests Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 08/15] ui: GuestSummary: memory switch to stacked and add hostmem Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 09/15] ui: GuestSummary: remember visibility of host memory view Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 10/15] ui: nodesummary: guestsummary: add tooltip info buttons Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 11/15] ui: summaries: use titles for disk and network series Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 12/15] fix #6068: ui: utils: calculate and render host memory usage correctly Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 13/15] d/control: require proxmox-rrd-migration-tool >= 1.0.0 Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH manager v4 14/15] d/postinst: run promox-rrd-migration-tool Aaron Lauterer
2025-07-29 12:09 ` Lukas Wagner
2025-07-26 1:06 ` [pve-devel] [PATCH manager stabe-8+master v4 15/15] pve8to9: add checkfs for RRD migration Aaron Lauterer
2025-07-29 8:15 ` Lukas Wagner
2025-07-29 9:16 ` Thomas Lamprecht
2025-07-26 1:06 ` [pve-devel] [PATCH storage v4 1/1] status: rrddata: use new pve-storage-9.0 rrd location if file is present Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH qemu-server v4 1/4] metrics: add pressure to metrics Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH qemu-server v4 2/4] vmstatus: add memhost for host view of vm mem consumption Aaron Lauterer
2025-07-29 12:49 ` Lukas Wagner
2025-07-31 3:37 ` Thomas Lamprecht
2025-07-31 6:51 ` Lukas Wagner
2025-07-26 1:06 ` [pve-devel] [PATCH qemu-server v4 3/4] vmstatus: switch mem stat to PSS of VM cgroup Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH qemu-server v4 4/4] rrddata: use new pve-vm-9.0 rrd location if file is present Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH container v4 1/2] metrics: add pressures to metrics Aaron Lauterer
2025-07-26 1:06 ` [pve-devel] [PATCH container v4 2/2] rrddata: use new pve-vm-9.0 rrd location if file is present Aaron Lauterer
2025-07-28 14:42 ` [pve-devel] [PATCH many v4 00/31] Expand and migrate RRD data and add/change summary graphs Thomas Lamprecht
2025-07-29 12:19 ` Lukas Wagner
2025-07-31 4:12 ` [pve-devel] applied: " Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=DBNR9VGV874U.3TNS24092ZP0H@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pve-devel-bounces@lists.proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.