From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 177D01FF144 for ; Tue, 24 Mar 2026 19:32:51 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 4710F1AC9E; Tue, 24 Mar 2026 19:31:33 +0100 (CET) From: Daniel Kral To: pve-devel@lists.proxmox.com Subject: [PATCH proxmox v2 05/40] resource-scheduling: implement generic cluster usage implementation Date: Tue, 24 Mar 2026 19:29:49 +0100 Message-ID: <20260324183029.1274972-6-d.kral@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260324183029.1274972-1-d.kral@proxmox.com> References: <20260324183029.1274972-1-d.kral@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1774376987912 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.057 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 2FNMSM2OI25YX2TTEB3EQAH7AQNKSMBO X-Message-ID-Hash: 2FNMSM2OI25YX2TTEB3EQAH7AQNKSMBO X-MailFrom: d.kral@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: This is a more generic version of the `Usage` implementation from the pve_static bindings in the pve_rs repository. As the upcoming load balancing scheduler actions and dynamic resource scheduler will need more information about each resource, this further improves on the state tracking of each resource: In this implementation, a resource is composed of its usage statistics and its two essential states: the running state and the node placement. The non_exhaustive attribute ensures that usages need to construct the a Resource instance through its API. Users can repeatedly use the current state of Usage to make scheduling decisions with the to_scheduler() method. This method takes an implementation of UsageAggregator, which dictates how the usage information is represented to the Scheduler. Signed-off-by: Daniel Kral --- changes v1 -> v2: - new! This patch is added to move the handling of specific usage stats and their (de)serialization to the pve-rs bindings and have the general functionality in this crate. proxmox-resource-scheduling/src/lib.rs | 1 + proxmox-resource-scheduling/src/node.rs | 40 +++++ proxmox-resource-scheduling/src/resource.rs | 119 +++++++++++++ proxmox-resource-scheduling/src/usage.rs | 183 ++++++++++++++++++++ proxmox-resource-scheduling/tests/usage.rs | 153 ++++++++++++++++ 5 files changed, 496 insertions(+) create mode 100644 proxmox-resource-scheduling/src/usage.rs create mode 100644 proxmox-resource-scheduling/tests/usage.rs diff --git a/proxmox-resource-scheduling/src/lib.rs b/proxmox-resource-scheduling/src/lib.rs index 12b743fe..99ca16d8 100644 --- a/proxmox-resource-scheduling/src/lib.rs +++ b/proxmox-resource-scheduling/src/lib.rs @@ -3,6 +3,7 @@ pub mod topsis; pub mod node; pub mod resource; +pub mod usage; pub mod scheduler; diff --git a/proxmox-resource-scheduling/src/node.rs b/proxmox-resource-scheduling/src/node.rs index e6227eda..be462782 100644 --- a/proxmox-resource-scheduling/src/node.rs +++ b/proxmox-resource-scheduling/src/node.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use crate::resource::ResourceStats; /// Usage statistics of a node. @@ -37,3 +39,41 @@ impl NodeStats { self.mem as f64 / self.maxmem as f64 } } + +/// A node in the cluster context. +#[derive(Clone, Debug)] +pub struct Node { + /// Base stats of the node. + stats: NodeStats, + /// The identifiers of the resources assigned to the node. + resources: HashSet, +} + +impl Node { + pub fn new(stats: NodeStats) -> Self { + Self { + stats, + resources: HashSet::new(), + } + } + + pub fn add_resource(&mut self, sid: &str) -> bool { + self.resources.insert(sid.to_string()) + } + + pub fn remove_resource(&mut self, sid: &str) -> bool { + self.resources.remove(sid) + } + + pub fn stats(&self) -> NodeStats { + self.stats + } + + pub fn resources_iter(&self) -> impl Iterator { + self.resources.iter() + } + + pub fn contains_resource(&self, sid: &str) -> bool { + self.resources.contains(sid) + } +} diff --git a/proxmox-resource-scheduling/src/resource.rs b/proxmox-resource-scheduling/src/resource.rs index 1eb9d15e..2aa16a51 100644 --- a/proxmox-resource-scheduling/src/resource.rs +++ b/proxmox-resource-scheduling/src/resource.rs @@ -1,5 +1,7 @@ use std::{iter::Sum, ops::Add}; +use anyhow::{bail, Error}; + /// Usage statistics for a resource. #[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Default)] pub struct ResourceStats { @@ -31,3 +33,120 @@ impl Sum for ResourceStats { iter.fold(Self::default(), |a, b| a + b) } } + +/// Execution state of a resource. +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +#[non_exhaustive] +pub enum ResourceState { + /// The resource is stopped. + Stopped, + /// The resource is scheduled to start. + Starting, + /// The resource is started and currently running. + Started, +} + +/// Placement of a resource. +#[derive(Clone, PartialEq, Eq, Debug)] +#[non_exhaustive] +pub enum ResourcePlacement { + /// The resource is on `current_node`. + Stationary { current_node: String }, + /// The resource is being moved from `current_node` to `target_node`. + Moving { + current_node: String, + target_node: String, + }, +} + +impl ResourcePlacement { + fn nodenames(&self) -> Vec<&str> { + match self { + ResourcePlacement::Stationary { current_node } => vec![¤t_node], + ResourcePlacement::Moving { + current_node, + target_node, + } => vec![¤t_node, &target_node], + } + } +} + +/// A resource in the cluster context. +#[derive(Clone, Debug)] +#[non_exhaustive] +pub struct Resource { + /// The usage statistics of the resource. + stats: ResourceStats, + /// The execution state of the resource. + state: ResourceState, + /// The placement of the resource. + placement: ResourcePlacement, +} + +impl Resource { + pub fn new(stats: ResourceStats, state: ResourceState, placement: ResourcePlacement) -> Self { + Self { + stats, + state, + placement, + } + } + + /// Put the resource into a moving state with `target_node`. + /// + /// This method fails if the resource is already moving. + pub fn moving_to(&mut self, target_node: String) -> Result<(), Error> { + match &self.placement { + ResourcePlacement::Stationary { current_node } => { + self.placement = ResourcePlacement::Moving { + current_node: current_node.to_string(), + target_node, + }; + } + ResourcePlacement::Moving { .. } => bail!("resource is already moving"), + }; + + Ok(()) + } + + /// Handles the external removal of a node. + /// + /// Returns whether the resource does not have any node left. + pub fn remove_node(&mut self, nodename: &str) -> bool { + match &self.placement { + ResourcePlacement::Stationary { current_node } => current_node == nodename, + ResourcePlacement::Moving { + current_node, + target_node, + } => { + if current_node == nodename { + self.placement = ResourcePlacement::Stationary { + current_node: target_node.to_string(), + }; + } else if target_node == nodename { + self.placement = ResourcePlacement::Stationary { + current_node: current_node.to_string(), + }; + } + + false + } + } + } + + pub fn state(&self) -> ResourceState { + self.state + } + + pub fn stats(&self) -> ResourceStats { + self.stats + } + + pub fn placement(&self) -> &ResourcePlacement { + &self.placement + } + + pub fn nodenames(&self) -> Vec<&str> { + self.placement.nodenames() + } +} diff --git a/proxmox-resource-scheduling/src/usage.rs b/proxmox-resource-scheduling/src/usage.rs new file mode 100644 index 00000000..78ccc453 --- /dev/null +++ b/proxmox-resource-scheduling/src/usage.rs @@ -0,0 +1,183 @@ +use anyhow::{bail, Error}; + +use std::collections::HashMap; + +use crate::{ + node::{Node, NodeStats}, + resource::{Resource, ResourcePlacement, ResourceState, ResourceStats}, + scheduler::{NodeUsage, Scheduler}, +}; + +/// The state of the usage in the cluster. +/// +/// The cluster usage represents the current state of the assignments between nodes and resources +/// and their usage statistics. A resource can be placed on these nodes according to their +/// placement state. See [`crate::resource::Resource`] for more information. +/// +/// The cluster usage state can be used to build a current state for the [`Scheduler`]. +#[derive(Default)] +pub struct Usage { + nodes: HashMap, + resources: HashMap, +} + +/// An aggregator for the [`Usage`] maps the cluster usage to node usage statistics that are +/// relevant for the scheduler. +pub trait UsageAggregator { + fn aggregate(usage: &Usage) -> Vec; +} + +impl Usage { + /// Instantiate an empty cluster usage. + pub fn new() -> Self { + Self::default() + } + + /// Add a node to the cluster usage. + /// + /// This method fails if a node with the same `nodename` already exists. + pub fn add_node(&mut self, nodename: String, stats: NodeStats) -> Result<(), Error> { + if self.nodes.contains_key(&nodename) { + bail!("node '{}' already exists", nodename); + } + + self.nodes.insert(nodename, Node::new(stats)); + + Ok(()) + } + + /// Remove a node from the cluster usage. + pub fn remove_node(&mut self, nodename: &str) { + if let Some(node) = self.nodes.remove(nodename) { + node.resources_iter().for_each(|sid| { + if let Some(resource) = self.resources.get_mut(sid) + && resource.remove_node(nodename) + { + self.resources.remove(sid); + } + }); + } + } + + /// Returns a reference to the [`Node`] with the identifier `nodename`. + pub fn get_node(&self, nodename: &str) -> Option<&Node> { + self.nodes.get(nodename) + } + + /// Returns an iterator for the cluster usage's nodes. + pub fn nodes_iter(&self) -> impl Iterator { + self.nodes.iter() + } + + /// Returns an iterator for the cluster usage's nodes. + pub fn nodenames_iter(&self) -> impl Iterator { + self.nodes.keys() + } + + /// Returns whether the node with the identifier `nodename` is present in the cluster usage. + pub fn contains_node(&self, nodename: &str) -> bool { + self.nodes.contains_key(nodename) + } + + fn add_resource_to_nodes(&mut self, sid: &str, nodenames: Vec<&str>) -> Result<(), Error> { + if nodenames + .iter() + .any(|nodename| !self.nodes.contains_key(*nodename)) + { + bail!("resource nodes do not exist"); + } + + nodenames.iter().for_each(|nodename| { + if let Some(node) = self.nodes.get_mut(*nodename) { + node.add_resource(sid); + } + }); + + Ok(()) + } + + fn remove_resource_from_nodes(&mut self, sid: &str, nodenames: &[&str]) { + nodenames.iter().for_each(|nodename| { + if let Some(node) = self.nodes.get_mut(*nodename) { + node.remove_resource(sid); + } + }); + } + + /// Add `resource` with identifier `sid` to cluster usage. + /// + /// This method fails if a resource with the same `sid` already exists or the resource's nodes + /// do not exist in the cluster usage. + pub fn add_resource(&mut self, sid: String, resource: Resource) -> Result<(), Error> { + if self.resources.contains_key(&sid) { + bail!("resource '{}' already exists", sid); + } + + self.add_resource_to_nodes(&sid, resource.nodenames())?; + + self.resources.insert(sid.to_string(), resource); + + Ok(()) + } + + /// Add `stats` from resource with identifier `sid` to node `nodename` in cluster usage. + /// + /// For the first call, the resource is assumed to be started and stationary on the given node. + /// If there was no intermediate call to remove the resource, the second call will assume that + /// the given node is the target node and the resource is being moved there. The second call + /// will ignore the value of `stats`. + #[deprecated = "only for backwards compatibility, use add_resource(...) instead"] + pub fn add_resource_usage_to_node( + &mut self, + nodename: &str, + sid: &str, + stats: ResourceStats, + ) -> Result<(), Error> { + if let Some(resource) = self.resources.get_mut(sid) { + resource.moving_to(nodename.to_string())?; + + self.add_resource_to_nodes(sid, vec![nodename]) + } else { + let placement = ResourcePlacement::Stationary { + current_node: nodename.to_string(), + }; + let resource = Resource::new(stats, ResourceState::Started, placement); + + self.add_resource(sid.to_string(), resource) + } + } + + /// Remove resource with identifier `sid` from cluster usage. + pub fn remove_resource(&mut self, sid: &str) { + if let Some(resource) = self.resources.remove(sid) { + match resource.placement() { + ResourcePlacement::Stationary { current_node } => { + self.remove_resource_from_nodes(sid, &[current_node]); + } + ResourcePlacement::Moving { + current_node, + target_node, + } => { + self.remove_resource_from_nodes(sid, &[current_node, target_node]); + } + } + } + } + + /// Returns a reference to the [`Resource`] with the identifier `sid`. + pub fn get_resource(&self, sid: &str) -> Option<&Resource> { + self.resources.get(sid) + } + + /// Returns an iterator for the cluster usage's resources. + pub fn resources_iter(&self) -> impl Iterator { + self.resources.iter() + } + + /// Use the current cluster usage as a base for a scheduling action. + pub fn to_scheduler(&self) -> Scheduler { + let node_usages = F::aggregate(self); + + Scheduler::from_nodes(node_usages) + } +} diff --git a/proxmox-resource-scheduling/tests/usage.rs b/proxmox-resource-scheduling/tests/usage.rs new file mode 100644 index 00000000..eb00d2c6 --- /dev/null +++ b/proxmox-resource-scheduling/tests/usage.rs @@ -0,0 +1,153 @@ +use anyhow::{bail, Error}; +use proxmox_resource_scheduling::{ + node::NodeStats, + resource::{Resource, ResourcePlacement, ResourceState, ResourceStats}, + usage::Usage, +}; + +#[test] +fn test_no_duplicate_nodes() -> Result<(), Error> { + let mut usage = Usage::new(); + + usage.add_node("node1".to_string(), NodeStats::default())?; + + match usage.add_node("node1".to_string(), NodeStats::default()) { + Ok(_) => bail!("cluster usage does allow duplicate node entries"), + Err(_) => Ok(()), + } +} + +#[test] +fn test_no_duplicate_resources() -> Result<(), Error> { + let mut usage = Usage::new(); + + usage.add_node("node1".to_string(), NodeStats::default())?; + + let placement = ResourcePlacement::Stationary { + current_node: "node1".to_string(), + }; + let resource = Resource::new(ResourceStats::default(), ResourceState::Stopped, placement); + + usage.add_resource("vm:101".to_string(), resource.clone())?; + + match usage.add_resource("vm:101".to_string(), resource) { + Ok(_) => bail!("cluster usage does allow duplicate resource entries"), + Err(_) => Ok(()), + } +} + +#[test] +#[allow(deprecated)] +fn test_add_resource_usage_to_node() -> Result<(), Error> { + let mut usage = Usage::new(); + + usage.add_node("node1".to_string(), NodeStats::default())?; + usage.add_node("node2".to_string(), NodeStats::default())?; + usage.add_node("node3".to_string(), NodeStats::default())?; + + usage.add_resource_usage_to_node("node1", "vm:101", ResourceStats::default())?; + usage.add_resource_usage_to_node("node2", "vm:101", ResourceStats::default())?; + + if usage + .add_resource_usage_to_node("node3", "vm:101", ResourceStats::default()) + .is_ok() + { + bail!("add_resource_usage_to_node() allows adding resource to more than two nodes"); + } + + Ok(()) +} + +#[test] +fn test_add_remove_stationary_resource() -> Result<(), Error> { + let mut usage = Usage::new(); + + let (sid, nodename) = ("vm:101", "node1"); + + usage.add_node(nodename.to_string(), NodeStats::default())?; + + let placement = ResourcePlacement::Stationary { + current_node: nodename.to_string(), + }; + let resource = Resource::new(ResourceStats::default(), ResourceState::Stopped, placement); + + usage.add_resource(sid.to_string(), resource)?; + + match (usage.get_resource(sid), usage.get_node(nodename)) { + (Some(_), Some(node)) => { + if !node.contains_resource(sid) { + bail!("resource '{sid}' was not added to node '{nodename}'"); + } + } + _ => bail!("resource '{sid}' or node '{nodename}' were not added"), + } + + usage.remove_resource(sid); + + match (usage.get_resource(sid), usage.get_node(nodename)) { + (None, Some(node)) => { + if node.contains_resource(sid) { + bail!("resource '{sid}' was not removed from node '{nodename}'"); + } + } + _ => bail!("resource '{sid}' was not removed"), + } + + Ok(()) +} + +#[test] +fn test_add_remove_moving_resource() -> Result<(), Error> { + let mut usage = Usage::new(); + + let (sid, current_nodename, target_nodename) = ("vm:101", "node1", "node2"); + + usage.add_node(current_nodename.to_string(), NodeStats::default())?; + usage.add_node(target_nodename.to_string(), NodeStats::default())?; + + let placement = ResourcePlacement::Moving { + current_node: current_nodename.to_string(), + target_node: target_nodename.to_string(), + }; + let resource = Resource::new(ResourceStats::default(), ResourceState::Stopped, placement); + + usage.add_resource(sid.to_string(), resource)?; + + match ( + usage.get_resource(sid), + usage.get_node(current_nodename), + usage.get_node(target_nodename), + ) { + (Some(_), Some(current_node), Some(target_node)) => { + if !current_node.contains_resource("vm:101") { + bail!("resource '{sid}' was not added to current node '{current_nodename}'"); + } + + if !target_node.contains_resource("vm:101") { + bail!("resource '{sid}' was not added to target node '{target_nodename}'"); + } + } + _ => bail!("resource '{sid}' or nodes were not added"), + } + + usage.remove_resource(sid); + + match ( + usage.get_resource(sid), + usage.get_node(current_nodename), + usage.get_node(target_nodename), + ) { + (None, Some(current_node), Some(target_node)) => { + if current_node.contains_resource(sid) { + bail!("resource '{sid}' was not removed from current node '{current_nodename}'"); + } + + if target_node.contains_resource(sid) { + bail!("resource '{sid}' was not removed from target node '{target_nodename}'"); + } + } + _ => bail!("resource '{sid}' was not removed"), + } + + Ok(()) +} -- 2.47.3