all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 4/5] parallel fetcher: improve result type ergonomics
Date: Wed,  4 Feb 2026 16:27:22 +0100	[thread overview]
Message-ID: <20260204152723.482258-5-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260204152723.482258-1-l.wagner@proxmox.com>

Completely overhaul the *Result types:
  - use the term 'Outcome' instead of result, to avoid confusion with
    Rust's Result type
  - use generics to avoid the awkwardness of accessing the node outcome
    of a single node when using do_for_all_remotes (we only call out to
    a single node anyways)
  - implement .iter()/.into_iter() where it makes sense for conveniently
    iterating over remote outcomes and node outcomes
  - make all members private and offer getters to access fields
  - use a sorted vec as an internal data structure for storing
    remote outcomes instead a hash map. This avoids the awkwardness of
    having the remote name as key *and* in the struct that is used as a
    value. Since the vec is sorted by the remote name, we still can
    access specific remotes quite effectively via binary search.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/api/pve/firewall.rs                |  53 ++--
 server/src/api/sdn/controllers.rs             |  38 +--
 server/src/api/sdn/vnets.rs                   |  35 +--
 server/src/api/sdn/zones.rs                   |  35 +--
 .../tasks/remote_tasks.rs                     |  31 +-
 server/src/parallel_fetcher.rs                | 268 ++++++++++++------
 server/src/remote_updates.rs                  |  34 ++-
 7 files changed, 273 insertions(+), 221 deletions(-)

diff --git a/server/src/api/pve/firewall.rs b/server/src/api/pve/firewall.rs
index d81592f7..bf5df3f8 100644
--- a/server/src/api/pve/firewall.rs
+++ b/server/src/api/pve/firewall.rs
@@ -271,12 +271,10 @@ pub async fn pve_firewall_status(
 
     // 2: build context with guests for each remote and fetch node-level data
     let mut guests_per_remote = HashMap::new();
-    for (remote_id, remote_result) in &cluster_results.remote_results {
-        if let Ok(remote_result) = remote_result {
-            if let Ok(node_result) = remote_result.node_results.get("localhost").unwrap() {
-                guests_per_remote
-                    .insert(remote_id.clone(), Arc::new(node_result.data.guests.clone()));
-            }
+    for remote_outcome in &cluster_results {
+        let remote_id = remote_outcome.remote().to_string();
+        if let Ok(node_result) = remote_outcome.data() {
+            guests_per_remote.insert(remote_id, Arc::new(node_result.guests.clone()));
         }
     }
 
@@ -298,26 +296,18 @@ pub async fn pve_firewall_status(
     let mut result = Vec::new();
     for remote in &pve_remotes {
         let mut cluster_status = cluster_results
-            .remote_results
-            .get(&remote.id)
-            .and_then(|r| r.as_ref().ok())
-            .and_then(|r| r.node_results.get("localhost"))
-            .and_then(|n| n.as_ref().ok())
-            .and_then(|n| n.data.status.clone());
+            .get_remote_outcome(&remote.id)
+            .and_then(|r| r.data().ok())
+            .and_then(|n| n.status.clone());
 
-        let node_fetch_result = node_results.remote_results.get(&remote.id);
+        let node_fetch_result = node_results.get_remote_outcome(&remote.id);
 
         let nodes = node_fetch_result
-            .and_then(|r| r.as_ref().ok())
-            .map(|r| {
-                r.node_results
-                    .values()
-                    .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
-                    .collect()
-            })
+            .and_then(|r| r.nodes().ok())
+            .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
             .unwrap_or_default();
 
-        if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+        if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
             cluster_status = None;
         }
 
@@ -389,12 +379,8 @@ pub async fn cluster_firewall_status(
         .await;
 
     let cluster_data = cluster_results
-        .remote_results
-        .get(&remote)
-        .and_then(|r| r.as_ref().ok())
-        .and_then(|r| r.node_results.get("localhost"))
-        .and_then(|n| n.as_ref().ok())
-        .map(|n| &n.data);
+        .get_remote_outcome(&remote)
+        .and_then(|r| r.data().ok());
 
     let (cluster_status, guests) = match cluster_data {
         Some(data) => (data.status.clone(), data.guests.clone()),
@@ -418,19 +404,14 @@ pub async fn cluster_firewall_status(
         .await;
 
     // 3: collect node results
-    let node_fetch_result = node_results.remote_results.get(&remote);
+    let node_fetch_result = node_results.get_remote_outcome(&remote);
 
     let nodes = node_fetch_result
-        .and_then(|r| r.as_ref().ok())
-        .map(|r| {
-            r.node_results
-                .values()
-                .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
-                .collect()
-        })
+        .and_then(|r| r.nodes().ok())
+        .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
         .unwrap_or_default();
 
-    let final_status = if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+    let final_status = if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
         None
     } else {
         cluster_status
diff --git a/server/src/api/sdn/controllers.rs b/server/src/api/sdn/controllers.rs
index 96612516..50f9cb51 100644
--- a/server/src/api/sdn/controllers.rs
+++ b/server/src/api/sdn/controllers.rs
@@ -9,10 +9,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::ListControllersType;
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher};
 
 pub const ROUTER: Router = Router::new().get(&API_METHOD_LIST_CONTROLLERS);
 
@@ -105,26 +102,19 @@ pub async fn list_controllers(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|controller| ListController {
-                                remote: remote.clone(),
-                                controller,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
+    for remote_outcome in results {
+        let remote = remote_outcome.remote().to_string();
+
+        match remote_outcome.into_data() {
+            Ok(sdn_controllers) => {
+                vnets.extend(
+                    sdn_controllers
+                        .into_iter()
+                        .map(|controller| ListController {
+                            remote: remote.clone(),
+                            controller,
+                        }),
+                )
             }
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/vnets.rs b/server/src/api/sdn/vnets.rs
index 5e14c6ab..561b1d30 100644
--- a/server/src/api/sdn/vnets.rs
+++ b/server/src/api/sdn/vnets.rs
@@ -13,11 +13,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::{CreateVnet, SdnVnetType};
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-    sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
 
 pub const ROUTER: Router = Router::new()
     .get(&API_METHOD_LIST_VNETS)
@@ -105,26 +101,15 @@ async fn list_vnets(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|vnet| ListVnet {
-                                remote: remote.clone(),
-                                vnet,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
+    for remote_outcome in results {
+        let remote = remote_outcome.remote().to_string();
+
+        match remote_outcome.into_data() {
+            Ok(vnets_on_this_remote) => {
+                vnets.extend(vnets_on_this_remote.into_iter().map(|vnet| ListVnet {
+                    remote: remote.clone(),
+                    vnet,
+                }))
             }
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/zones.rs b/server/src/api/sdn/zones.rs
index c4552795..b5186e27 100644
--- a/server/src/api/sdn/zones.rs
+++ b/server/src/api/sdn/zones.rs
@@ -14,11 +14,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::{CreateZone, ListZonesType};
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-    sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
 
 pub const ROUTER: Router = Router::new()
     .get(&API_METHOD_LIST_ZONES)
@@ -111,27 +107,14 @@ pub async fn list_zones(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|zone| ListZone {
-                                remote: remote.clone(),
-                                zone,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
-            }
+    for remote_outcome in results {
+        let remote = remote_outcome.remote().to_string();
+
+        match remote_outcome.into_data() {
+            Ok(zones) => vnets.extend(zones.into_iter().map(|zone| ListZone {
+                remote: remote.clone(),
+                zone,
+            })),
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
             }
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 93a0d05e..03e71a02 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -16,7 +16,7 @@ use proxmox_section_config::typed::SectionConfigData;
 
 use server::{
     connection,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
+    parallel_fetcher::ParallelFetcher,
     pbs_client,
     remote_tasks::{
         self,
@@ -276,23 +276,32 @@ async fn fetch_remotes(
     let mut all_tasks = Vec::new();
     let mut node_success_map = NodeFetchSuccessMap::default();
 
-    for (remote_name, result) in fetch_results.remote_results {
-        match result {
-            Ok(remote_result) => {
-                for (node_name, node_result) in remote_result.node_results {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            all_tasks.extend(data);
-                            node_success_map.set_node_success(remote_name.clone(), node_name);
+    for remote_outcome in fetch_results {
+        match remote_outcome.nodes() {
+            Ok(node_outcomes) => {
+                for node_outcome in node_outcomes {
+                    let node_name = node_outcome.node_name().to_string();
+
+                    match node_outcome.data() {
+                        Ok(data) => {
+                            all_tasks.extend(data.clone());
+                            node_success_map
+                                .set_node_success(remote_outcome.remote().to_string(), node_name);
                         }
                         Err(err) => {
-                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+                            log::error!(
+                                "could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}",
+                                remote_name = remote_outcome.remote()
+                            );
                         }
                     }
                 }
             }
             Err(err) => {
-                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
+                log::error!(
+                    "could not fetch tasks from remote '{remote}': {err:#}",
+                    remote = remote_outcome.remote()
+                );
             }
         }
     }
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 1eded293..57011096 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::future::Future;
 use std::sync::Arc;
@@ -18,45 +17,133 @@ use crate::connection;
 pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
 pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
 
-pub struct ParallelFetcher<C> {
-    max_connections: usize,
-    max_connections_per_remote: usize,
-    context: C,
+/// Outcome type produced by [`ParallelFetcher::do_for_all_remotes`] or
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+///
+/// This type contains the individual outcomes for each remote. These can be accessed
+/// by iterating over this type (`.iter()`, `.into_iter()`) or by calling
+/// [`FetcherOutcome::get_remote_outcome`].
+pub struct FetcherOutcome<O> {
+    // NOTE: This vector is sorted ascending by remote name.
+    outcomes: Vec<RemoteOutcome<O>>,
 }
 
-pub struct FetchResults<T> {
-    /// Per-remote results. The key in the map is the remote name.
-    pub remote_results: HashMap<String, Result<RemoteResult<T>, Error>>,
-}
+impl<O> IntoIterator for FetcherOutcome<O> {
+    type Item = RemoteOutcome<O>;
+    type IntoIter = std::vec::IntoIter<Self::Item>;
 
-impl<T> Default for FetchResults<T> {
-    fn default() -> Self {
-        Self {
-            remote_results: Default::default(),
-        }
+    fn into_iter(self) -> Self::IntoIter {
+        self.outcomes.into_iter()
     }
 }
 
-#[derive(Debug)]
-pub struct RemoteResult<T> {
-    /// Per-node results. The key in the map is the node name.
-    pub node_results: HashMap<String, Result<NodeResults<T>, Error>>,
-}
+impl<'a, O> IntoIterator for &'a FetcherOutcome<O> {
+    type Item = &'a RemoteOutcome<O>;
+    type IntoIter = std::slice::Iter<'a, RemoteOutcome<O>>;
 
-impl<T> Default for RemoteResult<T> {
-    fn default() -> Self {
-        Self {
-            node_results: Default::default(),
-        }
+    fn into_iter(self) -> Self::IntoIter {
+        self.iter()
     }
 }
 
-#[derive(Debug)]
-pub struct NodeResults<T> {
-    /// The data returned from the passed function.
-    pub data: T,
-    /// Time needed waiting for the passed function to return.
-    pub api_response_time: Duration,
+impl<O> FetcherOutcome<O> {
+    /// Create a new iterator of all contained [`RemoteOutcome`]s.
+    pub fn iter<'a>(&'a self) -> std::slice::Iter<'a, RemoteOutcome<O>> {
+        self.outcomes.iter()
+    }
+
+    /// Get the outcome for a particular remote.
+    pub fn get_remote_outcome(&self, remote: &str) -> Option<&RemoteOutcome<O>> {
+        self.outcomes
+            .binary_search_by(|probe| probe.remote().cmp(remote))
+            .ok()
+            .map(|index| &self.outcomes[index])
+    }
+}
+
+/// Outcome for one remote.
+pub struct RemoteOutcome<O> {
+    remote_name: String,
+    remote_type: RemoteType,
+    outcome: O,
+}
+
+impl<O> RemoteOutcome<O> {
+    /// Returns the remote id.
+    pub fn remote(&self) -> &str {
+        self.remote_name.as_str()
+    }
+
+    /// Returns the type of the remote.
+    pub fn remote_type(&self) -> RemoteType {
+        self.remote_type
+    }
+}
+
+impl<T> RemoteOutcome<NodeOutcome<T>> {
+    /// Access the data that was returned by the handler function.
+    pub fn data(&self) -> Result<&T, &Error> {
+        self.outcome.data()
+    }
+
+    /// Access the data that was returned by the handler function, consuming self.
+    pub fn into_data(self) -> Result<T, Error> {
+        self.outcome.into_data()
+    }
+
+    /// The [`Duration`] of the handler call.
+    pub fn handler_duration(&self) -> Duration {
+        self.outcome.handler_duration()
+    }
+}
+
+impl<T> RemoteOutcome<MultipleNodeOutcome<T>> {
+    /// Access the node results.
+    ///
+    /// This returns an error if the list of nodes could not be fetched
+    /// during [`ParallelFetcher::do_for_all_remote_nodes`].
+    pub fn nodes(&self) -> Result<&Vec<NodeOutcome<T>>, &Error> {
+        self.outcome.inner.as_ref()
+    }
+}
+
+/// Wrapper type used to contain the node results when using
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+pub struct MultipleNodeOutcome<T> {
+    inner: Result<Vec<NodeOutcome<T>>, Error>,
+}
+
+/// Outcome for a single node.
+pub struct NodeOutcome<T> {
+    node_name: String,
+    data: Result<T, Error>,
+    api_response_time: Duration,
+}
+
+impl<T> NodeOutcome<T> {
+    /// Name of the node.
+    ///
+    /// At the moment, this is always `localhost` if `do_for_all_remotes` was used.
+    /// If `do_for_all_remote_nodes` is used, this is the actual nodename for PVE remotes and
+    /// `localhost` for PBS remotes.
+    pub fn node_name(&self) -> &str {
+        &self.node_name
+    }
+
+    /// Access the data that was returned by the handler function.
+    pub fn data(&self) -> Result<&T, &Error> {
+        self.data.as_ref()
+    }
+
+    /// Access the data that was returned by the handler function, consuming `self`.
+    pub fn into_data(self) -> Result<T, Error> {
+        self.data
+    }
+
+    /// The [`Duration`] of the handler call.
+    pub fn handler_duration(&self) -> Duration {
+        self.api_response_time
+    }
 }
 
 /// Builder for the [`ParallelFetcher`] struct.
@@ -101,6 +188,13 @@ impl<C> ParallelFetcherBuilder<C> {
     }
 }
 
+/// Helper for parallelizing API requests to multiple remotes/nodes.
+pub struct ParallelFetcher<C> {
+    max_connections: usize,
+    max_connections_per_remote: usize,
+    context: C,
+}
+
 impl<C: Clone + Send + 'static> ParallelFetcher<C> {
     /// Create a [`ParallelFetcher`] with default settings.
     pub fn new(context: C) -> Self {
@@ -112,7 +206,12 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         ParallelFetcherBuilder::new(context)
     }
 
-    pub async fn do_for_all_remote_nodes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+    /// Invoke a function `func` for all nodes of a given list of remotes in parallel.
+    pub async fn do_for_all_remote_nodes<A, F, T, Ft>(
+        self,
+        remotes: A,
+        func: F,
+    ) -> FetcherOutcome<MultipleNodeOutcome<T>>
     where
         A: Iterator<Item = Remote>,
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -142,20 +241,20 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
             }
         }
 
-        let mut results = FetchResults::default();
+        let mut results = Vec::new();
 
         while let Some(a) = remote_join_set.join_next().await {
             match a {
-                Ok((remote_name, remote_result)) => {
-                    results.remote_results.insert(remote_name, remote_result);
-                }
+                Ok(remote_outcome) => results.push(remote_outcome),
                 Err(err) => {
                     log::error!("join error when waiting for future: {err}")
                 }
             }
         }
 
-        results
+        results.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+        FetcherOutcome { outcomes: results }
     }
 
     async fn fetch_remote<F, Ft, T>(
@@ -164,13 +263,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         semaphore: Arc<Semaphore>,
         func: F,
         max_connections_per_remote: usize,
-    ) -> (String, Result<RemoteResult<T>, Error>)
+    ) -> RemoteOutcome<MultipleNodeOutcome<T>>
     where
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
         Ft: Future<Output = Result<T, Error>> + Send + 'static,
         T: Send + Debug + 'static,
     {
-        let mut per_remote_results = RemoteResult::default();
+        let mut node_outcomes = Vec::new();
 
         let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap());
         let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote));
@@ -188,7 +287,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 .await
                 {
                     Ok(nodes) => nodes,
-                    Err(err) => return (remote.id.clone(), Err(err)),
+                    Err(err) => {
+                        return RemoteOutcome {
+                            remote_name: remote.id,
+                            remote_type: remote.ty,
+                            outcome: MultipleNodeOutcome { inner: Err(err) },
+                        }
+                    }
                 };
 
                 let mut nodes_join_set = JoinSet::new();
@@ -228,10 +333,8 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
 
                 while let Some(join_result) = nodes_join_set.join_next().await {
                     match join_result {
-                        Ok((node_name, per_node_result)) => {
-                            per_remote_results
-                                .node_results
-                                .insert(node_name, per_node_result);
+                        Ok(node_outcome) => {
+                            node_outcomes.push(node_outcome);
                         }
                         Err(e) => {
                             log::error!("join error when waiting for future: {e}")
@@ -240,7 +343,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 }
             }
             RemoteType::Pbs => {
-                let (nodename, result) = Self::fetch_node(
+                let node_outcome = Self::fetch_node(
                     func,
                     context,
                     remote.clone(),
@@ -250,14 +353,17 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 )
                 .await;
 
-                match result {
-                    Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)),
-                    Err(err) => per_remote_results.node_results.insert(nodename, Err(err)),
-                };
+                node_outcomes.push(node_outcome)
             }
         }
 
-        (remote.id, Ok(per_remote_results))
+        RemoteOutcome {
+            remote_name: remote.id,
+            remote_type: remote.ty,
+            outcome: MultipleNodeOutcome {
+                inner: Ok(node_outcomes),
+            },
+        }
     }
 
     async fn fetch_node<F, Ft, T>(
@@ -267,7 +373,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         node: String,
         _permit: OwnedSemaphorePermit,
         _per_remote_connections_permit: Option<OwnedSemaphorePermit>,
-    ) -> (String, Result<NodeResults<T>, Error>)
+    ) -> NodeOutcome<T>
     where
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
         Ft: Future<Output = Result<T, Error>> + Send + 'static,
@@ -277,19 +383,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         let result = func(context, remote.clone(), node.clone()).await;
         let api_response_time = now.elapsed();
 
-        match result {
-            Ok(data) => (
-                node,
-                Ok(NodeResults {
-                    data,
-                    api_response_time,
-                }),
-            ),
-            Err(err) => (node, Err(err)),
+        NodeOutcome {
+            node_name: node,
+            data: result,
+            api_response_time,
         }
     }
 
-    pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+    /// Invoke a function `func` for all passed remotes in parallel.
+    pub async fn do_for_all_remotes<A, F, T, Ft>(
+        self,
+        remotes: A,
+        func: F,
+    ) -> FetcherOutcome<NodeOutcome<T>>
     where
         A: Iterator<Item = Remote>,
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -299,21 +405,31 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
 
         let mut node_join_set = JoinSet::new();
-        let mut results = FetchResults::default();
 
         for remote in remotes {
             let total_connections_semaphore = total_connections_semaphore.clone();
 
             let remote_id = remote.id.clone();
+            let remote_type = remote.ty;
+
             let context = self.context.clone();
             let func = func.clone();
             let future = async move {
                 let permit = total_connections_semaphore.acquire_owned().await.unwrap();
 
-                (
-                    remote_id,
-                    Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
-                )
+                RemoteOutcome {
+                    remote_type,
+                    remote_name: remote_id,
+                    outcome: Self::fetch_node(
+                        func,
+                        context,
+                        remote,
+                        "localhost".into(),
+                        permit,
+                        None,
+                    )
+                    .await,
+                }
             };
 
             if let Some(log_context) = LogContext::current() {
@@ -323,29 +439,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
             }
         }
 
+        let mut results = Vec::new();
+
         while let Some(a) = node_join_set.join_next().await {
             match a {
-                Ok((remote_id, (node_id, node_result))) => {
-                    let mut node_results = HashMap::new();
-                    node_results.insert(node_id, node_result);
-
-                    let remote_result = RemoteResult { node_results };
-
-                    if results
-                        .remote_results
-                        .insert(remote_id, Ok(remote_result))
-                        .is_some()
-                    {
-                        // should never happen, but log for good measure if it actually does
-                        log::warn!("made multiple requests for a remote!");
-                    }
-                }
+                Ok(remote_outcome) => results.push(remote_outcome),
                 Err(err) => {
                     log::error!("join error when waiting for future: {err}")
                 }
             }
         }
 
-        results
+        results.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+        FetcherOutcome { outcomes: results }
     }
 }
diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs
index e772eef5..460a91f2 100644
--- a/server/src/remote_updates.rs
+++ b/server/src/remote_updates.rs
@@ -15,7 +15,7 @@ use pdm_api_types::RemoteUpid;
 use pdm_buildcfg::PDM_CACHE_DIR_M;
 
 use crate::connection;
-use crate::parallel_fetcher::{NodeResults, ParallelFetcher};
+use crate::parallel_fetcher::ParallelFetcher;
 
 pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json");
 
@@ -214,30 +214,28 @@ pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Er
 
     let mut content = get_cached_summary_or_default()?;
 
-    for (remote_name, result) in fetch_results.remote_results {
+    for remote_outcome in fetch_results {
+        let remote_name = remote_outcome.remote().to_string();
+
         let entry = content
             .remotes
             .entry(remote_name.clone())
-            .or_insert_with(|| {
-                // unwrap: remote name came from the same config, should be safe.
-                // TODO: Include type in ParallelFetcher results - should be much more efficient.
-                let remote_type = remotes.iter().find(|r| r.id == remote_name).unwrap().ty;
-
-                RemoteUpdateSummary {
-                    nodes: Default::default(),
-                    remote_type,
-                    status: RemoteUpdateStatus::Success,
-                }
+            .or_insert_with(|| RemoteUpdateSummary {
+                nodes: Default::default(),
+                remote_type: remote_outcome.remote_type(),
+                status: RemoteUpdateStatus::Success,
             });
 
-        match result {
-            Ok(remote_result) => {
+        match remote_outcome.nodes() {
+            Ok(node_outcomes) => {
                 entry.status = RemoteUpdateStatus::Success;
 
-                for (node_name, node_result) in remote_result.node_results {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            entry.nodes.insert(node_name, data.into());
+                for node_outcome in node_outcomes {
+                    let node_name = node_outcome.node_name().to_string();
+
+                    match node_outcome.data() {
+                        Ok(update_info) => {
+                            entry.nodes.insert(node_name, update_info.clone().into());
                         }
                         Err(err) => {
                             // Could not fetch updates from node
-- 
2.47.3





  parent reply	other threads:[~2026-02-04 15:27 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 1/5] parallel fetcher: clean up imports Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 2/5] parallel fetcher: make sure to inherit log context Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 3/5] parallel fetcher: add builder and make struct members private Lukas Wagner
2026-02-04 15:27 ` Lukas Wagner [this message]
2026-02-04 15:27 ` [PATCH datacenter-manager 5/5] parallel fetcher: add module documentation Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260204152723.482258-5-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal