From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 1D4D81FF141 for ; Mon, 30 Mar 2026 16:43:43 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id E0238373E8; Mon, 30 Mar 2026 16:42:03 +0200 (CEST) From: Daniel Kral To: pve-devel@lists.proxmox.com Subject: [PATCH proxmox v3 05/40] resource-scheduling: implement generic cluster usage implementation Date: Mon, 30 Mar 2026 16:30:14 +0200 Message-ID: <20260330144101.668747-6-d.kral@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260330144101.668747-1-d.kral@proxmox.com> References: <20260330144101.668747-1-d.kral@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1774881613158 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.065 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: TQFUHZSMI2DHK4CRSWWO2VERQFJ3DFEP X-Message-ID-Hash: TQFUHZSMI2DHK4CRSWWO2VERQFJ3DFEP X-MailFrom: d.kral@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: This is a more generic version of the `Usage` implementation from the pve_static bindings in the pve_rs repository. As the upcoming load balancing scheduler actions and dynamic resource scheduler will need more information about each resource, this further improves on the state tracking of each resource: In this implementation, a resource is composed of its usage statistics and its two essential states: the running state and the node placement. The non_exhaustive attribute ensures that usages need to construct the a Resource instance through its API. Users can repeatedly use the current state of Usage to make scheduling decisions with the to_scheduler() method. This method takes an implementation of UsageAggregator, which dictates how the usage information is represented to the Scheduler. Signed-off-by: Daniel Kral --- changes v2 -> v3: - inline bail! formatting variables - s/to_string/to_owned/ where reasonable - make Node::resources_iter(&self) return &str Iterator impl - drop add_resource_to_nodes() and remove_resource_from_nodes() - drop ResourcePlacement::nodenames() and Resource::nodenames() - drop Resource::moving_to() - fix behavior of add_resource_usage_to_node() for already added resources: if the next nodename is non-existing, the resource would still be put into moving but then not add the resource to the nodes; this is fixed now by improving the handling - inline behavior of add_resource() to be more consise how both placement strategies are handled - no change in Resource::remove_node() documentation as I did not find a better description in the meantime, but as it's internal it can be improved later on as well test changes v2 -> v3: - use assertions whether nodes were added correctly in test cases - use assertions whether resource were added correctly in test cases - additionally assert whether resource cannot be added to non-existing node with add_resource_usage_to_node() and does not alter state of the Resource for that resource in the mean time as it was in v2 - use assert!() instead of bail!() in test cases as much as appropriate proxmox-resource-scheduling/src/lib.rs | 1 + proxmox-resource-scheduling/src/node.rs | 40 ++++ proxmox-resource-scheduling/src/resource.rs | 84 ++++++++ proxmox-resource-scheduling/src/usage.rs | 208 ++++++++++++++++++++ proxmox-resource-scheduling/tests/usage.rs | 181 +++++++++++++++++ 5 files changed, 514 insertions(+) create mode 100644 proxmox-resource-scheduling/src/usage.rs create mode 100644 proxmox-resource-scheduling/tests/usage.rs diff --git a/proxmox-resource-scheduling/src/lib.rs b/proxmox-resource-scheduling/src/lib.rs index 12b743fe..99ca16d8 100644 --- a/proxmox-resource-scheduling/src/lib.rs +++ b/proxmox-resource-scheduling/src/lib.rs @@ -3,6 +3,7 @@ pub mod topsis; pub mod node; pub mod resource; +pub mod usage; pub mod scheduler; diff --git a/proxmox-resource-scheduling/src/node.rs b/proxmox-resource-scheduling/src/node.rs index e6227eda..304582ee 100644 --- a/proxmox-resource-scheduling/src/node.rs +++ b/proxmox-resource-scheduling/src/node.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use crate::resource::ResourceStats; /// Usage statistics of a node. @@ -37,3 +39,41 @@ impl NodeStats { self.mem as f64 / self.maxmem as f64 } } + +/// A node in the cluster context. +#[derive(Clone, Debug)] +pub struct Node { + /// Base stats of the node. + stats: NodeStats, + /// The identifiers of the resources assigned to the node. + resources: HashSet, +} + +impl Node { + pub fn new(stats: NodeStats) -> Self { + Self { + stats, + resources: HashSet::new(), + } + } + + pub fn add_resource(&mut self, sid: String) -> bool { + self.resources.insert(sid) + } + + pub fn remove_resource(&mut self, sid: &str) -> bool { + self.resources.remove(sid) + } + + pub fn stats(&self) -> NodeStats { + self.stats + } + + pub fn resources_iter(&self) -> impl Iterator { + self.resources.iter().map(String::as_str) + } + + pub fn contains_resource(&self, sid: &str) -> bool { + self.resources.contains(sid) + } +} diff --git a/proxmox-resource-scheduling/src/resource.rs b/proxmox-resource-scheduling/src/resource.rs index 1eb9d15e..2dbe6fa4 100644 --- a/proxmox-resource-scheduling/src/resource.rs +++ b/proxmox-resource-scheduling/src/resource.rs @@ -31,3 +31,87 @@ impl Sum for ResourceStats { iter.fold(Self::default(), |a, b| a + b) } } + +/// Execution state of a resource. +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +#[non_exhaustive] +pub enum ResourceState { + /// The resource is stopped. + Stopped, + /// The resource is scheduled to start. + Starting, + /// The resource is started and currently running. + Started, +} + +/// Placement of a resource. +#[derive(Clone, PartialEq, Eq, Debug)] +#[non_exhaustive] +pub enum ResourcePlacement { + /// The resource is on `current_node`. + Stationary { current_node: String }, + /// The resource is being moved from `current_node` to `target_node`. + Moving { + current_node: String, + target_node: String, + }, +} + +/// A resource in the cluster context. +#[derive(Clone, Debug)] +#[non_exhaustive] +pub struct Resource { + /// The usage statistics of the resource. + stats: ResourceStats, + /// The execution state of the resource. + state: ResourceState, + /// The placement of the resource. + placement: ResourcePlacement, +} + +impl Resource { + pub fn new(stats: ResourceStats, state: ResourceState, placement: ResourcePlacement) -> Self { + Self { + stats, + state, + placement, + } + } + + /// Handles the external removal of a node. + /// + /// Returns whether the resource does not have any node left. + pub fn remove_node(&mut self, nodename: &str) -> bool { + match &self.placement { + ResourcePlacement::Stationary { current_node } => current_node == nodename, + ResourcePlacement::Moving { + current_node, + target_node, + } => { + if current_node == nodename { + self.placement = ResourcePlacement::Stationary { + current_node: target_node.to_owned(), + }; + } else if target_node == nodename { + self.placement = ResourcePlacement::Stationary { + current_node: current_node.to_owned(), + }; + } + + false + } + } + } + + pub fn state(&self) -> ResourceState { + self.state + } + + pub fn stats(&self) -> ResourceStats { + self.stats + } + + pub fn placement(&self) -> &ResourcePlacement { + &self.placement + } +} diff --git a/proxmox-resource-scheduling/src/usage.rs b/proxmox-resource-scheduling/src/usage.rs new file mode 100644 index 00000000..81b88452 --- /dev/null +++ b/proxmox-resource-scheduling/src/usage.rs @@ -0,0 +1,208 @@ +use anyhow::{bail, Error}; + +use std::collections::HashMap; + +use crate::{ + node::{Node, NodeStats}, + resource::{Resource, ResourcePlacement, ResourceState, ResourceStats}, + scheduler::{NodeUsage, Scheduler}, +}; + +/// The state of the usage in the cluster. +/// +/// The cluster usage represents the current state of the assignments between nodes and resources +/// and their usage statistics. A resource can be placed on these nodes according to their +/// placement state. See [`crate::resource::Resource`] for more information. +/// +/// The cluster usage state can be used to build a current state for the [`Scheduler`]. +#[derive(Default)] +pub struct Usage { + nodes: HashMap, + resources: HashMap, +} + +/// An aggregator for the [`Usage`] maps the cluster usage to node usage statistics that are +/// relevant for the scheduler. +pub trait UsageAggregator { + fn aggregate(usage: &Usage) -> Vec; +} + +impl Usage { + /// Instantiate an empty cluster usage. + pub fn new() -> Self { + Self::default() + } + + /// Add a node to the cluster usage. + /// + /// This method fails if a node with the same `nodename` already exists. + pub fn add_node(&mut self, nodename: String, stats: NodeStats) -> Result<(), Error> { + if self.nodes.contains_key(&nodename) { + bail!("node '{nodename}' already exists"); + } + + self.nodes.insert(nodename, Node::new(stats)); + + Ok(()) + } + + /// Remove a node from the cluster usage. + pub fn remove_node(&mut self, nodename: &str) { + if let Some(node) = self.nodes.remove(nodename) { + node.resources_iter().for_each(|sid| { + if let Some(resource) = self.resources.get_mut(sid) + && resource.remove_node(nodename) + { + self.resources.remove(sid); + } + }); + } + } + + /// Returns a reference to the [`Node`] with the identifier `nodename`. + pub fn get_node(&self, nodename: &str) -> Option<&Node> { + self.nodes.get(nodename) + } + + /// Returns an iterator for the cluster usage's nodes. + pub fn nodes_iter(&self) -> impl Iterator { + self.nodes.iter() + } + + /// Returns an iterator for the cluster usage's nodes. + pub fn nodenames_iter(&self) -> impl Iterator { + self.nodes.keys() + } + + /// Returns whether the node with the identifier `nodename` is present in the cluster usage. + pub fn contains_node(&self, nodename: &str) -> bool { + self.nodes.contains_key(nodename) + } + + /// Add `resource` with identifier `sid` to cluster usage. + /// + /// This method fails if a resource with the same `sid` already exists or the resource's nodes + /// do not exist in the cluster usage. + pub fn add_resource(&mut self, sid: String, resource: Resource) -> Result<(), Error> { + if self.resources.contains_key(&sid) { + bail!("resource '{sid}' already exists"); + } + + match resource.placement() { + ResourcePlacement::Stationary { current_node } => { + match self.nodes.get_mut(current_node) { + Some(current_node) => { + current_node.add_resource(sid.to_owned()); + } + _ => bail!("current node for resource '{sid}' does not exist"), + } + } + ResourcePlacement::Moving { + current_node, + target_node, + } => { + if current_node == target_node { + bail!("resource '{sid}' has the same current and target node"); + } + + match self.nodes.get_disjoint_mut([current_node, target_node]) { + [Some(current_node), Some(target_node)] => { + current_node.add_resource(sid.to_owned()); + target_node.add_resource(sid.to_owned()); + } + _ => bail!("nodes for resource '{sid}' do not exist"), + } + } + } + + self.resources.insert(sid, resource); + + Ok(()) + } + + /// Add `stats` from resource with identifier `sid` to node `nodename` in cluster usage. + /// + /// For the first call, the resource is assumed to be started and stationary on the given node. + /// If there was no intermediate call to remove the resource, the second call will assume that + /// the given node is the target node and the resource is being moved there. The second call + /// will ignore the value of `stats`. + #[deprecated = "only for backwards compatibility, use add_resource(...) instead"] + pub fn add_resource_usage_to_node( + &mut self, + nodename: &str, + sid: &str, + stats: ResourceStats, + ) -> Result<(), Error> { + if let Some(resource) = self.resources.remove(sid) { + match resource.placement() { + ResourcePlacement::Stationary { current_node } => { + let placement = ResourcePlacement::Moving { + current_node: current_node.to_owned(), + target_node: nodename.to_owned(), + }; + let new_resource = Resource::new(resource.stats(), resource.state(), placement); + + if let Err(err) = self.add_resource(sid.to_owned(), new_resource) { + self.add_resource(sid.to_owned(), resource)?; + + bail!(err); + } + + Ok(()) + } + ResourcePlacement::Moving { target_node, .. } => { + bail!("resource '{sid}' is already moving to target node '{target_node}'") + } + } + } else { + let placement = ResourcePlacement::Stationary { + current_node: nodename.to_owned(), + }; + let resource = Resource::new(stats, ResourceState::Started, placement); + + self.add_resource(sid.to_owned(), resource) + } + } + + /// Remove resource with identifier `sid` from cluster usage. + pub fn remove_resource(&mut self, sid: &str) { + if let Some(resource) = self.resources.remove(sid) { + match resource.placement() { + ResourcePlacement::Stationary { current_node } => { + if let Some(current_node) = self.nodes.get_mut(current_node) { + current_node.remove_resource(sid); + } + } + ResourcePlacement::Moving { + current_node, + target_node, + } => { + if let Some(current_node) = self.nodes.get_mut(current_node) { + current_node.remove_resource(sid); + } + + if let Some(target_node) = self.nodes.get_mut(target_node) { + target_node.remove_resource(sid); + } + } + } + } + } + + /// Returns a reference to the [`Resource`] with the identifier `sid`. + pub fn get_resource(&self, sid: &str) -> Option<&Resource> { + self.resources.get(sid) + } + + /// Returns an iterator for the cluster usage's resources. + pub fn resources_iter(&self) -> impl Iterator { + self.resources.iter() + } + + /// Use the current cluster usage as a base for a scheduling action. + pub fn to_scheduler(&self) -> Scheduler { + let node_usages = F::aggregate(self); + + Scheduler::from_nodes(node_usages) + } +} diff --git a/proxmox-resource-scheduling/tests/usage.rs b/proxmox-resource-scheduling/tests/usage.rs new file mode 100644 index 00000000..b6cb5a6e --- /dev/null +++ b/proxmox-resource-scheduling/tests/usage.rs @@ -0,0 +1,181 @@ +use proxmox_resource_scheduling::{ + node::NodeStats, + resource::{Resource, ResourcePlacement, ResourceState, ResourceStats}, + usage::Usage, +}; + +#[test] +fn test_no_duplicate_nodes() { + let mut usage = Usage::new(); + + assert!(usage + .add_node("node1".to_owned(), NodeStats::default()) + .is_ok()); + + assert!( + usage + .add_node("node1".to_owned(), NodeStats::default()) + .is_err(), + "cluster usage does allow duplicate node entries" + ); +} + +#[test] +fn test_no_duplicate_resources() { + let mut usage = Usage::new(); + + assert!(usage + .add_node("node1".to_owned(), NodeStats::default()) + .is_ok()); + + let placement = ResourcePlacement::Stationary { + current_node: "node1".to_owned(), + }; + let resource = Resource::new(ResourceStats::default(), ResourceState::Stopped, placement); + + assert!(usage + .add_resource("vm:101".to_owned(), resource.clone()) + .is_ok()); + + assert!( + usage.add_resource("vm:101".to_owned(), resource).is_err(), + "cluster usage does allow duplicate resource entries" + ); +} + +fn assert_add_node(usage: &mut Usage, nodename: &str) { + assert!(usage + .add_node(nodename.to_owned(), NodeStats::default()) + .is_ok()); + + assert!( + usage.get_node(nodename).is_some(), + "node '{nodename}' was not added" + ); +} + +fn assert_add_resource(usage: &mut Usage, sid: &str, resource: Resource) { + assert!(usage.add_resource(sid.to_owned(), resource).is_ok()); + + assert!( + usage.get_resource(sid).is_some(), + "resource '{sid}' was not added" + ); +} + +#[test] +#[allow(deprecated)] +fn test_add_resource_usage_to_node() { + let mut usage = Usage::new(); + + assert_add_node(&mut usage, "node1"); + assert_add_node(&mut usage, "node2"); + assert_add_node(&mut usage, "node3"); + + assert!(usage + .add_resource_usage_to_node("node1", "vm:101", ResourceStats::default()) + .is_ok()); + + assert!( + usage + .add_resource_usage_to_node("node4", "vm:101", ResourceStats::default()) + .is_err(), + "add_resource_usage_to_node() allows adding non-existent nodes" + ); + + assert!(usage + .add_resource_usage_to_node("node2", "vm:101", ResourceStats::default()) + .is_ok()); + + assert!( + usage + .add_resource_usage_to_node("node3", "vm:101", ResourceStats::default()) + .is_err(), + "add_resource_usage_to_node() allows adding resources to more than two nodes" + ); +} + +#[test] +fn test_add_remove_stationary_resource() { + let mut usage = Usage::new(); + + let (sid, nodename) = ("vm:101", "node1"); + + assert_add_node(&mut usage, nodename); + + let placement = ResourcePlacement::Stationary { + current_node: nodename.to_owned(), + }; + let resource = Resource::new(ResourceStats::default(), ResourceState::Stopped, placement); + + assert_add_resource(&mut usage, sid, resource); + + if let Some(node) = usage.get_node(nodename) { + assert!( + node.contains_resource(sid), + "resource '{sid}' was not added from node '{nodename}'" + ); + } + + usage.remove_resource(sid); + + assert!( + usage.get_resource(sid).is_none(), + "resource '{sid}' was not removed" + ); + + if let Some(node) = usage.get_node(nodename) { + assert!( + !node.contains_resource(sid), + "resource '{sid}' was not removed from node '{nodename}'" + ); + } +} + +#[test] +fn test_add_remove_moving_resource() { + let mut usage = Usage::new(); + + let (sid, current_nodename, target_nodename) = ("vm:101", "node1", "node2"); + + assert_add_node(&mut usage, current_nodename); + assert_add_node(&mut usage, target_nodename); + + let placement = ResourcePlacement::Moving { + current_node: current_nodename.to_owned(), + target_node: target_nodename.to_owned(), + }; + let resource = Resource::new(ResourceStats::default(), ResourceState::Stopped, placement); + + assert_add_resource(&mut usage, sid, resource); + + if let Some(current_node) = usage.get_node(current_nodename) { + assert!( + current_node.contains_resource(sid), + "resource '{sid}' was not added to current node '{current_nodename}'" + ); + } + + if let Some(target_node) = usage.get_node(target_nodename) { + assert!( + target_node.contains_resource(sid), + "resource '{sid}' was not added to target node '{target_nodename}'" + ); + } + + usage.remove_resource(sid); + + if let Some(current_node) = usage.get_node(current_nodename) { + assert!( + !current_node.contains_resource(sid), + "resource '{sid}' was not removed from current node '{current_nodename}'" + ); + } + + if let Some(target_node) = usage.get_node(target_nodename) { + assert!( + !target_node.contains_resource(sid), + "resource '{sid}' was not removed from target node '{target_nodename}'" + ); + } +} -- 2.47.3