From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id A945C1FF137 for ; Tue, 17 Feb 2026 15:17:16 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 78A3F491C; Tue, 17 Feb 2026 15:15:53 +0100 (CET) From: Daniel Kral To: pve-devel@lists.proxmox.com Subject: [RFC perl-rs 6/6] pve-rs: resource scheduling: implement pve_dynamic bindings Date: Tue, 17 Feb 2026 15:14:05 +0100 Message-ID: <20260217141437.584852-12-d.kral@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260217141437.584852-1-d.kral@proxmox.com> References: <20260217141437.584852-1-d.kral@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1771337676736 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.019 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 5PZSJ3ZUTZNFZASUUPY2Y4VAA4XSXDGH X-Message-ID-Hash: 5PZSJ3ZUTZNFZASUUPY2Y4VAA4XSXDGH X-MailFrom: d.kral@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: The implementation is similar to pve_static, but extends the node and service stats with the current dynamic resource usages and the node usage is derived from the node itself instead of the sum of service stats, which are assigned to the node. The CompactMigrationCandidate struct is shared between the pve_static and pve_dynamic implementation. Signed-off-by: Daniel Kral --- pve-rs/Makefile | 1 + .../src/bindings/resource_scheduling/mod.rs | 16 + .../resource_scheduling/pve_dynamic.rs | 349 ++++++++++++++++++ .../resource_scheduling/pve_static.rs | 15 +- pve-rs/test/resource_scheduling.pl | 1 + 5 files changed, 369 insertions(+), 13 deletions(-) create mode 100644 pve-rs/src/bindings/resource_scheduling/pve_dynamic.rs diff --git a/pve-rs/Makefile b/pve-rs/Makefile index aa7181e..19695a1 100644 --- a/pve-rs/Makefile +++ b/pve-rs/Makefile @@ -30,6 +30,7 @@ PERLMOD_PACKAGES := \ PVE::RS::OCI \ PVE::RS::OpenId \ PVE::RS::ResourceScheduling::Static \ + PVE::RS::ResourceScheduling::Dynamic \ PVE::RS::SDN::Fabrics \ PVE::RS::TFA diff --git a/pve-rs/src/bindings/resource_scheduling/mod.rs b/pve-rs/src/bindings/resource_scheduling/mod.rs index af1fb6b..ff8d94b 100644 --- a/pve-rs/src/bindings/resource_scheduling/mod.rs +++ b/pve-rs/src/bindings/resource_scheduling/mod.rs @@ -2,3 +2,19 @@ mod pve_static; pub use pve_static::pve_rs_resource_scheduling_static; + +mod pve_dynamic; +pub use pve_dynamic::pve_rs_resource_scheduling_dynamic; + +use serde::{Deserialize, Serialize}; + +/// A compact representation of MigationCandidate. +#[derive(Serialize, Deserialize)] +pub struct CompactMigrationCandidate { + /// The identifier of the leading service. + pub leader: String, + /// The services which are part of the leading service's bundle. + pub services: Vec, + /// The nodes, which are possible to migrate to for the services. + pub nodes: Vec, +} diff --git a/pve-rs/src/bindings/resource_scheduling/pve_dynamic.rs b/pve-rs/src/bindings/resource_scheduling/pve_dynamic.rs new file mode 100644 index 0000000..7c8ffce --- /dev/null +++ b/pve-rs/src/bindings/resource_scheduling/pve_dynamic.rs @@ -0,0 +1,349 @@ +#[perlmod::package(name = "PVE::RS::ResourceScheduling::Dynamic", lib = "pve_rs")] +pub mod pve_rs_resource_scheduling_dynamic { + //! The `PVE::RS::ResourceScheduling::Dynamic` package. + //! + //! Provides bindings for the dynamic resource scheduling module. + //! + //! See [`proxmox_resource_scheduling`]. + + use std::collections::{HashMap, HashSet}; + use std::sync::Mutex; + + use anyhow::{Context, Error, bail}; + + use perlmod::Value; + use proxmox_resource_scheduling::pve_dynamic::{DynamicNodeStats, DynamicServiceStats}; + use proxmox_resource_scheduling::scheduler::{ + ClusterUsage, MigrationCandidate, NodeUsage, ScoredMigration, + }; + + use crate::bindings::resource_scheduling::CompactMigrationCandidate; + + perlmod::declare_magic!(Box : &Scheduler as "PVE::RS::ResourceScheduling::Dynamic"); + + struct DynamicNodeInfo { + stats: DynamicNodeStats, + services: HashSet, + } + + struct DynamicServiceInfo { + stats: DynamicServiceStats, + nodes: HashSet, + } + + struct Usage { + nodes: HashMap, + services: HashMap, + } + + /// A scheduler instance contains the resource usage by node. + pub struct Scheduler { + inner: Mutex, + } + + /// Class method: Create a new [`Scheduler`] instance. + #[export(raw_return)] + pub fn new(#[raw] class: Value) -> Result { + let inner = Usage { + nodes: HashMap::new(), + services: HashMap::new(), + }; + + Ok(perlmod::instantiate_magic!( + &class, MAGIC => Box::new(Scheduler { inner: Mutex::new(inner) }) + )) + } + + /// Method: Add a node with its basic CPU and memory info. + /// + /// This inserts a [`DynamicNodeInfo`] entry for the node into the scheduler instance. + #[export] + pub fn add_node( + #[try_from_ref] this: &Scheduler, + nodename: String, + stats: DynamicNodeStats, + ) -> Result<(), Error> { + let mut usage = this.inner.lock().unwrap(); + + if usage.nodes.contains_key(&nodename) { + bail!("node {} already added", nodename); + } + + let node = DynamicNodeInfo { + stats, + services: HashSet::new(), + }; + + usage.nodes.insert(nodename, node); + Ok(()) + } + + /// Method: Remove a node from the scheduler. + #[export] + pub fn remove_node(#[try_from_ref] this: &Scheduler, nodename: &str) -> Result<(), Error> { + let mut usage = this.inner.lock().unwrap(); + + if let Some(node) = usage.nodes.remove(nodename) { + for sid in node.services.iter() { + match usage.services.get_mut(sid) { + Some(service) => { + service.nodes.remove(nodename); + } + None => bail!( + "service '{}' not present in services hashmap while removing node '{}'", + sid, + nodename + ), + } + } + } + + Ok(()) + } + + /// Method: Get a list of all the nodes in the scheduler. + #[export] + pub fn list_nodes(#[try_from_ref] this: &Scheduler) -> Vec { + let usage = this.inner.lock().unwrap(); + + usage + .nodes + .keys() + .map(|nodename| nodename.to_string()) + .collect() + } + + /// Method: Check whether a node exists in the scheduler. + #[export] + pub fn contains_node(#[try_from_ref] this: &Scheduler, nodename: &str) -> bool { + let usage = this.inner.lock().unwrap(); + + usage.nodes.contains_key(nodename) + } + + /// Method: Add service `sid` and its `service_usage` to the node. + #[export] + pub fn add_service_usage_to_node( + #[try_from_ref] this: &Scheduler, + nodename: &str, + sid: &str, + service_usage: DynamicServiceStats, + ) -> Result<(), Error> { + let mut usage = this.inner.lock().unwrap(); + + match usage.nodes.get_mut(nodename) { + Some(node) => { + if node.services.contains(sid) { + bail!("service '{}' already added to node '{}'", sid, nodename); + } + + node.services.insert(sid.to_string()); + } + None => bail!("node '{}' not present in usage hashmap", nodename), + } + + if let Some(service) = usage.services.get_mut(sid) { + if service.nodes.contains(nodename) { + bail!("node '{}' already added to service '{}'", nodename, sid); + } + + service.nodes.insert(nodename.to_string()); + } else { + let service = DynamicServiceInfo { + stats: service_usage, + nodes: HashSet::from([nodename.to_string()]), + }; + + usage.services.insert(sid.to_string(), service); + } + + Ok(()) + } + + /// Method: Remove service `sid` and its usage from all assigned nodes. + #[export] + fn remove_service_usage(#[try_from_ref] this: &Scheduler, sid: &str) -> Result<(), Error> { + let mut usage = this.inner.lock().unwrap(); + + if let Some(service) = usage.services.remove(sid) { + for nodename in &service.nodes { + match usage.nodes.get_mut(nodename) { + Some(node) => { + node.services.remove(sid); + } + None => bail!( + "service '{}' not present in usage hashmap on node '{}'", + sid, + nodename + ), + } + } + } + + Ok(()) + } + + fn as_cluster_usage(usage: &Usage) -> ClusterUsage { + let nodes = usage + .nodes + .iter() + .map(|(nodename, node)| NodeUsage { + name: nodename.to_string(), + stats: node.stats.into(), + }) + .collect::>(); + + ClusterUsage::from_nodes(nodes) + } + + /// Method: Calculates the loads for each node. + #[export] + pub fn calculate_node_loads(#[try_from_ref] this: &Scheduler) -> Vec<(String, f64)> { + let usage = this.inner.lock().unwrap(); + let cluster_usage = as_cluster_usage(&usage); + + cluster_usage.node_loads() + } + + /// Method: Calculates the imbalance among the nodes. + #[export] + pub fn calculate_node_imbalance(#[try_from_ref] this: &Scheduler) -> f64 { + let usage = this.inner.lock().unwrap(); + let cluster_usage = as_cluster_usage(&usage); + + cluster_usage.node_imbalance() + } + + fn generate_migration_candidates_from( + usage: &Usage, + candidates: Vec, + ) -> Result, Error> { + let mut migration_candidates = Vec::new(); + + for candidate in candidates.into_iter() { + let leader_sid = candidate.leader; + let leader = usage.services.get(&leader_sid).with_context(|| { + format!( + "leader {} is not present in services usage hashmap", + leader_sid + ) + })?; + let source_node = leader.nodes.iter().next().unwrap(); + + let mut service_candidates = Vec::new(); + + for sid in candidate.services.iter() { + let service = usage + .services + .get(sid) + .with_context(|| format!("service {} is not present in usage hashmap", sid))?; + let service_nodes = &service.nodes; + + if service_nodes.len() > 1 { + bail!("service {sid} is on multiple nodes"); + } + + if !service_nodes.contains(source_node) { + bail!("service {sid} is not on common source node {source_node}"); + } + + service_candidates.push(service); + } + + let bundle_stats = service_candidates + .into_iter() + .fold(DynamicServiceStats::default(), |total_stats, service| { + total_stats + service.stats + }); + + for target_node in candidate.nodes.into_iter() { + migration_candidates.push(MigrationCandidate { + sid: leader_sid.to_string(), + source_node: source_node.to_string(), + target_node, + stats: bundle_stats.into(), + }); + } + } + + Ok(migration_candidates) + } + + /// Method: Score the service motions by the best node imbalance improvement with exhaustive search. + #[export] + pub fn score_best_balancing_migrations( + #[try_from_ref] this: &Scheduler, + candidates: Vec, + limit: usize, + ) -> Result, Error> { + let usage = this.inner.lock().unwrap(); + + let cluster_usage = as_cluster_usage(&usage); + let candidates = generate_migration_candidates_from(&usage, candidates)?; + + cluster_usage.score_best_balancing_migrations(candidates, limit) + } + + /// Method: Select the service motion with the best node imbalance improvement with exhaustive search. + #[export] + pub fn select_best_balancing_migration( + #[try_from_ref] this: &Scheduler, + candidates: Vec, + ) -> Result, Error> { + let usage = this.inner.lock().unwrap(); + + let cluster_usage = as_cluster_usage(&usage); + let candidates = generate_migration_candidates_from(&usage, candidates)?; + + cluster_usage.select_best_balancing_migration(candidates) + } + + /// Method: Score the service motions by the best node imbalance improvement with the TOPSIS method. + #[export] + pub fn score_best_balancing_migrations_topsis( + #[try_from_ref] this: &Scheduler, + candidates: Vec, + limit: usize, + ) -> Result, Error> { + let usage = this.inner.lock().unwrap(); + + let cluster_usage = as_cluster_usage(&usage); + let candidates = generate_migration_candidates_from(&usage, candidates)?; + + cluster_usage.score_best_balancing_migrations_topsis(&candidates, limit) + } + + /// Method: Select the service motion with the best node imbalance improvement with the TOPSIS method. + #[export] + pub fn select_best_balancing_migration_topsis( + #[try_from_ref] this: &Scheduler, + candidates: Vec, + ) -> Result, Error> { + let usage = this.inner.lock().unwrap(); + + let cluster_usage = as_cluster_usage(&usage); + let candidates = generate_migration_candidates_from(&usage, candidates)?; + + cluster_usage.select_best_balancing_migration_topsis(&candidates) + } + + /// Scores all previously added nodes for starting a `service` on. + /// + /// Scoring is done according to the dynamic memory and CPU usages of the nodes as if the + /// service would already be running on each. + /// + /// Returns a vector of (nodename, score) pairs. Scores are between 0.0 and 1.0 and a higher + /// score is better. + /// + /// See [`proxmox_resource_scheduling::pve_dynamic::score_nodes_to_start_service`]. + #[export] + pub fn score_nodes_to_start_service( + #[try_from_ref] this: &Scheduler, + service: DynamicServiceStats, + ) -> Result, Error> { + let usage = this.inner.lock().unwrap(); + let cluster_usage = as_cluster_usage(&usage); + + cluster_usage.score_nodes_to_start_service(service) + } +} diff --git a/pve-rs/src/bindings/resource_scheduling/pve_static.rs b/pve-rs/src/bindings/resource_scheduling/pve_static.rs index 44ea851..f7440d2 100644 --- a/pve-rs/src/bindings/resource_scheduling/pve_static.rs +++ b/pve-rs/src/bindings/resource_scheduling/pve_static.rs @@ -9,8 +9,6 @@ pub mod pve_rs_resource_scheduling_static { use std::collections::{HashMap, HashSet}; use std::sync::Mutex; - use serde::{Deserialize, Serialize}; - use anyhow::{Context, Error, bail}; use perlmod::Value; @@ -19,6 +17,8 @@ pub mod pve_rs_resource_scheduling_static { ClusterUsage, MigrationCandidate, ScoredMigration, }; + use crate::bindings::resource_scheduling::CompactMigrationCandidate; + perlmod::declare_magic!(Box : &Scheduler as "PVE::RS::ResourceScheduling::Static"); struct StaticNodeInfo { @@ -230,17 +230,6 @@ pub mod pve_rs_resource_scheduling_static { cluster_usage.node_imbalance() } - /// A compact representation of MigationCandidate. - #[derive(Serialize, Deserialize)] - pub struct CompactMigrationCandidate { - /// The identifier of the leading service. - pub leader: String, - /// The services which are part of the leading service's bundle. - pub services: Vec, - /// The nodes, which are possible to migrate to for the services. - pub nodes: Vec, - } - fn generate_migration_candidates_from( usage: &Usage, candidates: Vec, diff --git a/pve-rs/test/resource_scheduling.pl b/pve-rs/test/resource_scheduling.pl index a332269..3775242 100755 --- a/pve-rs/test/resource_scheduling.pl +++ b/pve-rs/test/resource_scheduling.pl @@ -6,6 +6,7 @@ use warnings; use Test::More; use PVE::RS::ResourceScheduling::Static; +use PVE::RS::ResourceScheduling::Dynamic; my sub score_nodes { my ($static, $service) = @_; -- 2.47.3