From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 4/5] parallel fetcher: improve result type ergonomics
Date: Wed, 4 Feb 2026 16:27:22 +0100 [thread overview]
Message-ID: <20260204152723.482258-5-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260204152723.482258-1-l.wagner@proxmox.com>
Completely overhaul the *Result types:
- use the term 'Outcome' instead of result, to avoid confusion with
Rust's Result type
- use generics to avoid the awkwardness of accessing the node outcome
of a single node when using do_for_all_remotes (we only call out to
a single node anyways)
- implement .iter()/.into_iter() where it makes sense for conveniently
iterating over remote outcomes and node outcomes
- make all members private and offer getters to access fields
- use a sorted vec as an internal data structure for storing
remote outcomes instead a hash map. This avoids the awkwardness of
having the remote name as key *and* in the struct that is used as a
value. Since the vec is sorted by the remote name, we still can
access specific remotes quite effectively via binary search.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/api/pve/firewall.rs | 53 ++--
server/src/api/sdn/controllers.rs | 38 +--
server/src/api/sdn/vnets.rs | 35 +--
server/src/api/sdn/zones.rs | 35 +--
.../tasks/remote_tasks.rs | 31 +-
server/src/parallel_fetcher.rs | 268 ++++++++++++------
server/src/remote_updates.rs | 34 ++-
7 files changed, 273 insertions(+), 221 deletions(-)
diff --git a/server/src/api/pve/firewall.rs b/server/src/api/pve/firewall.rs
index d81592f7..bf5df3f8 100644
--- a/server/src/api/pve/firewall.rs
+++ b/server/src/api/pve/firewall.rs
@@ -271,12 +271,10 @@ pub async fn pve_firewall_status(
// 2: build context with guests for each remote and fetch node-level data
let mut guests_per_remote = HashMap::new();
- for (remote_id, remote_result) in &cluster_results.remote_results {
- if let Ok(remote_result) = remote_result {
- if let Ok(node_result) = remote_result.node_results.get("localhost").unwrap() {
- guests_per_remote
- .insert(remote_id.clone(), Arc::new(node_result.data.guests.clone()));
- }
+ for remote_outcome in &cluster_results {
+ let remote_id = remote_outcome.remote().to_string();
+ if let Ok(node_result) = remote_outcome.data() {
+ guests_per_remote.insert(remote_id, Arc::new(node_result.guests.clone()));
}
}
@@ -298,26 +296,18 @@ pub async fn pve_firewall_status(
let mut result = Vec::new();
for remote in &pve_remotes {
let mut cluster_status = cluster_results
- .remote_results
- .get(&remote.id)
- .and_then(|r| r.as_ref().ok())
- .and_then(|r| r.node_results.get("localhost"))
- .and_then(|n| n.as_ref().ok())
- .and_then(|n| n.data.status.clone());
+ .get_remote_outcome(&remote.id)
+ .and_then(|r| r.data().ok())
+ .and_then(|n| n.status.clone());
- let node_fetch_result = node_results.remote_results.get(&remote.id);
+ let node_fetch_result = node_results.get_remote_outcome(&remote.id);
let nodes = node_fetch_result
- .and_then(|r| r.as_ref().ok())
- .map(|r| {
- r.node_results
- .values()
- .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
- .collect()
- })
+ .and_then(|r| r.nodes().ok())
+ .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
.unwrap_or_default();
- if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+ if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
cluster_status = None;
}
@@ -389,12 +379,8 @@ pub async fn cluster_firewall_status(
.await;
let cluster_data = cluster_results
- .remote_results
- .get(&remote)
- .and_then(|r| r.as_ref().ok())
- .and_then(|r| r.node_results.get("localhost"))
- .and_then(|n| n.as_ref().ok())
- .map(|n| &n.data);
+ .get_remote_outcome(&remote)
+ .and_then(|r| r.data().ok());
let (cluster_status, guests) = match cluster_data {
Some(data) => (data.status.clone(), data.guests.clone()),
@@ -418,19 +404,14 @@ pub async fn cluster_firewall_status(
.await;
// 3: collect node results
- let node_fetch_result = node_results.remote_results.get(&remote);
+ let node_fetch_result = node_results.get_remote_outcome(&remote);
let nodes = node_fetch_result
- .and_then(|r| r.as_ref().ok())
- .map(|r| {
- r.node_results
- .values()
- .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
- .collect()
- })
+ .and_then(|r| r.nodes().ok())
+ .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
.unwrap_or_default();
- let final_status = if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+ let final_status = if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
None
} else {
cluster_status
diff --git a/server/src/api/sdn/controllers.rs b/server/src/api/sdn/controllers.rs
index 96612516..50f9cb51 100644
--- a/server/src/api/sdn/controllers.rs
+++ b/server/src/api/sdn/controllers.rs
@@ -9,10 +9,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::ListControllersType;
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher};
pub const ROUTER: Router = Router::new().get(&API_METHOD_LIST_CONTROLLERS);
@@ -105,26 +102,19 @@ pub async fn list_controllers(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|controller| ListController {
- remote: remote.clone(),
- controller,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
+ for remote_outcome in results {
+ let remote = remote_outcome.remote().to_string();
+
+ match remote_outcome.into_data() {
+ Ok(sdn_controllers) => {
+ vnets.extend(
+ sdn_controllers
+ .into_iter()
+ .map(|controller| ListController {
+ remote: remote.clone(),
+ controller,
+ }),
+ )
}
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/vnets.rs b/server/src/api/sdn/vnets.rs
index 5e14c6ab..561b1d30 100644
--- a/server/src/api/sdn/vnets.rs
+++ b/server/src/api/sdn/vnets.rs
@@ -13,11 +13,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::{CreateVnet, SdnVnetType};
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
- sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
pub const ROUTER: Router = Router::new()
.get(&API_METHOD_LIST_VNETS)
@@ -105,26 +101,15 @@ async fn list_vnets(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|vnet| ListVnet {
- remote: remote.clone(),
- vnet,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
+ for remote_outcome in results {
+ let remote = remote_outcome.remote().to_string();
+
+ match remote_outcome.into_data() {
+ Ok(vnets_on_this_remote) => {
+ vnets.extend(vnets_on_this_remote.into_iter().map(|vnet| ListVnet {
+ remote: remote.clone(),
+ vnet,
+ }))
}
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/zones.rs b/server/src/api/sdn/zones.rs
index c4552795..b5186e27 100644
--- a/server/src/api/sdn/zones.rs
+++ b/server/src/api/sdn/zones.rs
@@ -14,11 +14,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::{CreateZone, ListZonesType};
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
- sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
pub const ROUTER: Router = Router::new()
.get(&API_METHOD_LIST_ZONES)
@@ -111,27 +107,14 @@ pub async fn list_zones(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|zone| ListZone {
- remote: remote.clone(),
- zone,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
- }
+ for remote_outcome in results {
+ let remote = remote_outcome.remote().to_string();
+
+ match remote_outcome.into_data() {
+ Ok(zones) => vnets.extend(zones.into_iter().map(|zone| ListZone {
+ remote: remote.clone(),
+ zone,
+ })),
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
}
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 93a0d05e..03e71a02 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -16,7 +16,7 @@ use proxmox_section_config::typed::SectionConfigData;
use server::{
connection,
- parallel_fetcher::{NodeResults, ParallelFetcher},
+ parallel_fetcher::ParallelFetcher,
pbs_client,
remote_tasks::{
self,
@@ -276,23 +276,32 @@ async fn fetch_remotes(
let mut all_tasks = Vec::new();
let mut node_success_map = NodeFetchSuccessMap::default();
- for (remote_name, result) in fetch_results.remote_results {
- match result {
- Ok(remote_result) => {
- for (node_name, node_result) in remote_result.node_results {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- all_tasks.extend(data);
- node_success_map.set_node_success(remote_name.clone(), node_name);
+ for remote_outcome in fetch_results {
+ match remote_outcome.nodes() {
+ Ok(node_outcomes) => {
+ for node_outcome in node_outcomes {
+ let node_name = node_outcome.node_name().to_string();
+
+ match node_outcome.data() {
+ Ok(data) => {
+ all_tasks.extend(data.clone());
+ node_success_map
+ .set_node_success(remote_outcome.remote().to_string(), node_name);
}
Err(err) => {
- log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+ log::error!(
+ "could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}",
+ remote_name = remote_outcome.remote()
+ );
}
}
}
}
Err(err) => {
- log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
+ log::error!(
+ "could not fetch tasks from remote '{remote}': {err:#}",
+ remote = remote_outcome.remote()
+ );
}
}
}
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 1eded293..57011096 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
@@ -18,45 +17,133 @@ use crate::connection;
pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
-pub struct ParallelFetcher<C> {
- max_connections: usize,
- max_connections_per_remote: usize,
- context: C,
+/// Outcome type produced by [`ParallelFetcher::do_for_all_remotes`] or
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+///
+/// This type contains the individual outcomes for each remote. These can be accessed
+/// by iterating over this type (`.iter()`, `.into_iter()`) or by calling
+/// [`FetcherOutcome::get_remote_outcome`].
+pub struct FetcherOutcome<O> {
+ // NOTE: This vector is sorted ascending by remote name.
+ outcomes: Vec<RemoteOutcome<O>>,
}
-pub struct FetchResults<T> {
- /// Per-remote results. The key in the map is the remote name.
- pub remote_results: HashMap<String, Result<RemoteResult<T>, Error>>,
-}
+impl<O> IntoIterator for FetcherOutcome<O> {
+ type Item = RemoteOutcome<O>;
+ type IntoIter = std::vec::IntoIter<Self::Item>;
-impl<T> Default for FetchResults<T> {
- fn default() -> Self {
- Self {
- remote_results: Default::default(),
- }
+ fn into_iter(self) -> Self::IntoIter {
+ self.outcomes.into_iter()
}
}
-#[derive(Debug)]
-pub struct RemoteResult<T> {
- /// Per-node results. The key in the map is the node name.
- pub node_results: HashMap<String, Result<NodeResults<T>, Error>>,
-}
+impl<'a, O> IntoIterator for &'a FetcherOutcome<O> {
+ type Item = &'a RemoteOutcome<O>;
+ type IntoIter = std::slice::Iter<'a, RemoteOutcome<O>>;
-impl<T> Default for RemoteResult<T> {
- fn default() -> Self {
- Self {
- node_results: Default::default(),
- }
+ fn into_iter(self) -> Self::IntoIter {
+ self.iter()
}
}
-#[derive(Debug)]
-pub struct NodeResults<T> {
- /// The data returned from the passed function.
- pub data: T,
- /// Time needed waiting for the passed function to return.
- pub api_response_time: Duration,
+impl<O> FetcherOutcome<O> {
+ /// Create a new iterator of all contained [`RemoteOutcome`]s.
+ pub fn iter<'a>(&'a self) -> std::slice::Iter<'a, RemoteOutcome<O>> {
+ self.outcomes.iter()
+ }
+
+ /// Get the outcome for a particular remote.
+ pub fn get_remote_outcome(&self, remote: &str) -> Option<&RemoteOutcome<O>> {
+ self.outcomes
+ .binary_search_by(|probe| probe.remote().cmp(remote))
+ .ok()
+ .map(|index| &self.outcomes[index])
+ }
+}
+
+/// Outcome for one remote.
+pub struct RemoteOutcome<O> {
+ remote_name: String,
+ remote_type: RemoteType,
+ outcome: O,
+}
+
+impl<O> RemoteOutcome<O> {
+ /// Returns the remote id.
+ pub fn remote(&self) -> &str {
+ self.remote_name.as_str()
+ }
+
+ /// Returns the type of the remote.
+ pub fn remote_type(&self) -> RemoteType {
+ self.remote_type
+ }
+}
+
+impl<T> RemoteOutcome<NodeOutcome<T>> {
+ /// Access the data that was returned by the handler function.
+ pub fn data(&self) -> Result<&T, &Error> {
+ self.outcome.data()
+ }
+
+ /// Access the data that was returned by the handler function, consuming self.
+ pub fn into_data(self) -> Result<T, Error> {
+ self.outcome.into_data()
+ }
+
+ /// The [`Duration`] of the handler call.
+ pub fn handler_duration(&self) -> Duration {
+ self.outcome.handler_duration()
+ }
+}
+
+impl<T> RemoteOutcome<MultipleNodeOutcome<T>> {
+ /// Access the node results.
+ ///
+ /// This returns an error if the list of nodes could not be fetched
+ /// during [`ParallelFetcher::do_for_all_remote_nodes`].
+ pub fn nodes(&self) -> Result<&Vec<NodeOutcome<T>>, &Error> {
+ self.outcome.inner.as_ref()
+ }
+}
+
+/// Wrapper type used to contain the node results when using
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+pub struct MultipleNodeOutcome<T> {
+ inner: Result<Vec<NodeOutcome<T>>, Error>,
+}
+
+/// Outcome for a single node.
+pub struct NodeOutcome<T> {
+ node_name: String,
+ data: Result<T, Error>,
+ api_response_time: Duration,
+}
+
+impl<T> NodeOutcome<T> {
+ /// Name of the node.
+ ///
+ /// At the moment, this is always `localhost` if `do_for_all_remotes` was used.
+ /// If `do_for_all_remote_nodes` is used, this is the actual nodename for PVE remotes and
+ /// `localhost` for PBS remotes.
+ pub fn node_name(&self) -> &str {
+ &self.node_name
+ }
+
+ /// Access the data that was returned by the handler function.
+ pub fn data(&self) -> Result<&T, &Error> {
+ self.data.as_ref()
+ }
+
+ /// Access the data that was returned by the handler function, consuming `self`.
+ pub fn into_data(self) -> Result<T, Error> {
+ self.data
+ }
+
+ /// The [`Duration`] of the handler call.
+ pub fn handler_duration(&self) -> Duration {
+ self.api_response_time
+ }
}
/// Builder for the [`ParallelFetcher`] struct.
@@ -101,6 +188,13 @@ impl<C> ParallelFetcherBuilder<C> {
}
}
+/// Helper for parallelizing API requests to multiple remotes/nodes.
+pub struct ParallelFetcher<C> {
+ max_connections: usize,
+ max_connections_per_remote: usize,
+ context: C,
+}
+
impl<C: Clone + Send + 'static> ParallelFetcher<C> {
/// Create a [`ParallelFetcher`] with default settings.
pub fn new(context: C) -> Self {
@@ -112,7 +206,12 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
ParallelFetcherBuilder::new(context)
}
- pub async fn do_for_all_remote_nodes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ /// Invoke a function `func` for all nodes of a given list of remotes in parallel.
+ pub async fn do_for_all_remote_nodes<A, F, T, Ft>(
+ self,
+ remotes: A,
+ func: F,
+ ) -> FetcherOutcome<MultipleNodeOutcome<T>>
where
A: Iterator<Item = Remote>,
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -142,20 +241,20 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
- let mut results = FetchResults::default();
+ let mut results = Vec::new();
while let Some(a) = remote_join_set.join_next().await {
match a {
- Ok((remote_name, remote_result)) => {
- results.remote_results.insert(remote_name, remote_result);
- }
+ Ok(remote_outcome) => results.push(remote_outcome),
Err(err) => {
log::error!("join error when waiting for future: {err}")
}
}
}
- results
+ results.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+ FetcherOutcome { outcomes: results }
}
async fn fetch_remote<F, Ft, T>(
@@ -164,13 +263,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
semaphore: Arc<Semaphore>,
func: F,
max_connections_per_remote: usize,
- ) -> (String, Result<RemoteResult<T>, Error>)
+ ) -> RemoteOutcome<MultipleNodeOutcome<T>>
where
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
Ft: Future<Output = Result<T, Error>> + Send + 'static,
T: Send + Debug + 'static,
{
- let mut per_remote_results = RemoteResult::default();
+ let mut node_outcomes = Vec::new();
let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap());
let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote));
@@ -188,7 +287,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
.await
{
Ok(nodes) => nodes,
- Err(err) => return (remote.id.clone(), Err(err)),
+ Err(err) => {
+ return RemoteOutcome {
+ remote_name: remote.id,
+ remote_type: remote.ty,
+ outcome: MultipleNodeOutcome { inner: Err(err) },
+ }
+ }
};
let mut nodes_join_set = JoinSet::new();
@@ -228,10 +333,8 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
while let Some(join_result) = nodes_join_set.join_next().await {
match join_result {
- Ok((node_name, per_node_result)) => {
- per_remote_results
- .node_results
- .insert(node_name, per_node_result);
+ Ok(node_outcome) => {
+ node_outcomes.push(node_outcome);
}
Err(e) => {
log::error!("join error when waiting for future: {e}")
@@ -240,7 +343,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
RemoteType::Pbs => {
- let (nodename, result) = Self::fetch_node(
+ let node_outcome = Self::fetch_node(
func,
context,
remote.clone(),
@@ -250,14 +353,17 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
)
.await;
- match result {
- Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)),
- Err(err) => per_remote_results.node_results.insert(nodename, Err(err)),
- };
+ node_outcomes.push(node_outcome)
}
}
- (remote.id, Ok(per_remote_results))
+ RemoteOutcome {
+ remote_name: remote.id,
+ remote_type: remote.ty,
+ outcome: MultipleNodeOutcome {
+ inner: Ok(node_outcomes),
+ },
+ }
}
async fn fetch_node<F, Ft, T>(
@@ -267,7 +373,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
node: String,
_permit: OwnedSemaphorePermit,
_per_remote_connections_permit: Option<OwnedSemaphorePermit>,
- ) -> (String, Result<NodeResults<T>, Error>)
+ ) -> NodeOutcome<T>
where
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
Ft: Future<Output = Result<T, Error>> + Send + 'static,
@@ -277,19 +383,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let result = func(context, remote.clone(), node.clone()).await;
let api_response_time = now.elapsed();
- match result {
- Ok(data) => (
- node,
- Ok(NodeResults {
- data,
- api_response_time,
- }),
- ),
- Err(err) => (node, Err(err)),
+ NodeOutcome {
+ node_name: node,
+ data: result,
+ api_response_time,
}
}
- pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ /// Invoke a function `func` for all passed remotes in parallel.
+ pub async fn do_for_all_remotes<A, F, T, Ft>(
+ self,
+ remotes: A,
+ func: F,
+ ) -> FetcherOutcome<NodeOutcome<T>>
where
A: Iterator<Item = Remote>,
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -299,21 +405,31 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
let mut node_join_set = JoinSet::new();
- let mut results = FetchResults::default();
for remote in remotes {
let total_connections_semaphore = total_connections_semaphore.clone();
let remote_id = remote.id.clone();
+ let remote_type = remote.ty;
+
let context = self.context.clone();
let func = func.clone();
let future = async move {
let permit = total_connections_semaphore.acquire_owned().await.unwrap();
- (
- remote_id,
- Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
- )
+ RemoteOutcome {
+ remote_type,
+ remote_name: remote_id,
+ outcome: Self::fetch_node(
+ func,
+ context,
+ remote,
+ "localhost".into(),
+ permit,
+ None,
+ )
+ .await,
+ }
};
if let Some(log_context) = LogContext::current() {
@@ -323,29 +439,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
+ let mut results = Vec::new();
+
while let Some(a) = node_join_set.join_next().await {
match a {
- Ok((remote_id, (node_id, node_result))) => {
- let mut node_results = HashMap::new();
- node_results.insert(node_id, node_result);
-
- let remote_result = RemoteResult { node_results };
-
- if results
- .remote_results
- .insert(remote_id, Ok(remote_result))
- .is_some()
- {
- // should never happen, but log for good measure if it actually does
- log::warn!("made multiple requests for a remote!");
- }
- }
+ Ok(remote_outcome) => results.push(remote_outcome),
Err(err) => {
log::error!("join error when waiting for future: {err}")
}
}
}
- results
+ results.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+ FetcherOutcome { outcomes: results }
}
}
diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs
index e772eef5..460a91f2 100644
--- a/server/src/remote_updates.rs
+++ b/server/src/remote_updates.rs
@@ -15,7 +15,7 @@ use pdm_api_types::RemoteUpid;
use pdm_buildcfg::PDM_CACHE_DIR_M;
use crate::connection;
-use crate::parallel_fetcher::{NodeResults, ParallelFetcher};
+use crate::parallel_fetcher::ParallelFetcher;
pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json");
@@ -214,30 +214,28 @@ pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Er
let mut content = get_cached_summary_or_default()?;
- for (remote_name, result) in fetch_results.remote_results {
+ for remote_outcome in fetch_results {
+ let remote_name = remote_outcome.remote().to_string();
+
let entry = content
.remotes
.entry(remote_name.clone())
- .or_insert_with(|| {
- // unwrap: remote name came from the same config, should be safe.
- // TODO: Include type in ParallelFetcher results - should be much more efficient.
- let remote_type = remotes.iter().find(|r| r.id == remote_name).unwrap().ty;
-
- RemoteUpdateSummary {
- nodes: Default::default(),
- remote_type,
- status: RemoteUpdateStatus::Success,
- }
+ .or_insert_with(|| RemoteUpdateSummary {
+ nodes: Default::default(),
+ remote_type: remote_outcome.remote_type(),
+ status: RemoteUpdateStatus::Success,
});
- match result {
- Ok(remote_result) => {
+ match remote_outcome.nodes() {
+ Ok(node_outcomes) => {
entry.status = RemoteUpdateStatus::Success;
- for (node_name, node_result) in remote_result.node_results {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- entry.nodes.insert(node_name, data.into());
+ for node_outcome in node_outcomes {
+ let node_name = node_outcome.node_name().to_string();
+
+ match node_outcome.data() {
+ Ok(update_info) => {
+ entry.nodes.insert(node_name, update_info.clone().into());
}
Err(err) => {
// Could not fetch updates from node
--
2.47.3
next prev parent reply other threads:[~2026-02-04 15:27 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 1/5] parallel fetcher: clean up imports Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 2/5] parallel fetcher: make sure to inherit log context Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 3/5] parallel fetcher: add builder and make struct members private Lukas Wagner
2026-02-04 15:27 ` Lukas Wagner [this message]
2026-02-04 15:27 ` [PATCH datacenter-manager 5/5] parallel fetcher: add module documentation Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260204152723.482258-5-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox