public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 4/5] parallel fetcher: improve result type ergonomics
Date: Wed,  4 Feb 2026 16:27:22 +0100	[thread overview]
Message-ID: <20260204152723.482258-5-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260204152723.482258-1-l.wagner@proxmox.com>

Completely overhaul the *Result types:
  - use the term 'Outcome' instead of result, to avoid confusion with
    Rust's Result type
  - use generics to avoid the awkwardness of accessing the node outcome
    of a single node when using do_for_all_remotes (we only call out to
    a single node anyways)
  - implement .iter()/.into_iter() where it makes sense for conveniently
    iterating over remote outcomes and node outcomes
  - make all members private and offer getters to access fields
  - use a sorted vec as an internal data structure for storing
    remote outcomes instead a hash map. This avoids the awkwardness of
    having the remote name as key *and* in the struct that is used as a
    value. Since the vec is sorted by the remote name, we still can
    access specific remotes quite effectively via binary search.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/api/pve/firewall.rs                |  53 ++--
 server/src/api/sdn/controllers.rs             |  38 +--
 server/src/api/sdn/vnets.rs                   |  35 +--
 server/src/api/sdn/zones.rs                   |  35 +--
 .../tasks/remote_tasks.rs                     |  31 +-
 server/src/parallel_fetcher.rs                | 268 ++++++++++++------
 server/src/remote_updates.rs                  |  34 ++-
 7 files changed, 273 insertions(+), 221 deletions(-)

diff --git a/server/src/api/pve/firewall.rs b/server/src/api/pve/firewall.rs
index d81592f7..bf5df3f8 100644
--- a/server/src/api/pve/firewall.rs
+++ b/server/src/api/pve/firewall.rs
@@ -271,12 +271,10 @@ pub async fn pve_firewall_status(
 
     // 2: build context with guests for each remote and fetch node-level data
     let mut guests_per_remote = HashMap::new();
-    for (remote_id, remote_result) in &cluster_results.remote_results {
-        if let Ok(remote_result) = remote_result {
-            if let Ok(node_result) = remote_result.node_results.get("localhost").unwrap() {
-                guests_per_remote
-                    .insert(remote_id.clone(), Arc::new(node_result.data.guests.clone()));
-            }
+    for remote_outcome in &cluster_results {
+        let remote_id = remote_outcome.remote().to_string();
+        if let Ok(node_result) = remote_outcome.data() {
+            guests_per_remote.insert(remote_id, Arc::new(node_result.guests.clone()));
         }
     }
 
@@ -298,26 +296,18 @@ pub async fn pve_firewall_status(
     let mut result = Vec::new();
     for remote in &pve_remotes {
         let mut cluster_status = cluster_results
-            .remote_results
-            .get(&remote.id)
-            .and_then(|r| r.as_ref().ok())
-            .and_then(|r| r.node_results.get("localhost"))
-            .and_then(|n| n.as_ref().ok())
-            .and_then(|n| n.data.status.clone());
+            .get_remote_outcome(&remote.id)
+            .and_then(|r| r.data().ok())
+            .and_then(|n| n.status.clone());
 
-        let node_fetch_result = node_results.remote_results.get(&remote.id);
+        let node_fetch_result = node_results.get_remote_outcome(&remote.id);
 
         let nodes = node_fetch_result
-            .and_then(|r| r.as_ref().ok())
-            .map(|r| {
-                r.node_results
-                    .values()
-                    .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
-                    .collect()
-            })
+            .and_then(|r| r.nodes().ok())
+            .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
             .unwrap_or_default();
 
-        if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+        if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
             cluster_status = None;
         }
 
@@ -389,12 +379,8 @@ pub async fn cluster_firewall_status(
         .await;
 
     let cluster_data = cluster_results
-        .remote_results
-        .get(&remote)
-        .and_then(|r| r.as_ref().ok())
-        .and_then(|r| r.node_results.get("localhost"))
-        .and_then(|n| n.as_ref().ok())
-        .map(|n| &n.data);
+        .get_remote_outcome(&remote)
+        .and_then(|r| r.data().ok());
 
     let (cluster_status, guests) = match cluster_data {
         Some(data) => (data.status.clone(), data.guests.clone()),
@@ -418,19 +404,14 @@ pub async fn cluster_firewall_status(
         .await;
 
     // 3: collect node results
-    let node_fetch_result = node_results.remote_results.get(&remote);
+    let node_fetch_result = node_results.get_remote_outcome(&remote);
 
     let nodes = node_fetch_result
-        .and_then(|r| r.as_ref().ok())
-        .map(|r| {
-            r.node_results
-                .values()
-                .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
-                .collect()
-        })
+        .and_then(|r| r.nodes().ok())
+        .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
         .unwrap_or_default();
 
-    let final_status = if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+    let final_status = if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
         None
     } else {
         cluster_status
diff --git a/server/src/api/sdn/controllers.rs b/server/src/api/sdn/controllers.rs
index 96612516..50f9cb51 100644
--- a/server/src/api/sdn/controllers.rs
+++ b/server/src/api/sdn/controllers.rs
@@ -9,10 +9,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::ListControllersType;
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher};
 
 pub const ROUTER: Router = Router::new().get(&API_METHOD_LIST_CONTROLLERS);
 
@@ -105,26 +102,19 @@ pub async fn list_controllers(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|controller| ListController {
-                                remote: remote.clone(),
-                                controller,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
+    for remote_outcome in results {
+        let remote = remote_outcome.remote().to_string();
+
+        match remote_outcome.into_data() {
+            Ok(sdn_controllers) => {
+                vnets.extend(
+                    sdn_controllers
+                        .into_iter()
+                        .map(|controller| ListController {
+                            remote: remote.clone(),
+                            controller,
+                        }),
+                )
             }
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/vnets.rs b/server/src/api/sdn/vnets.rs
index 5e14c6ab..561b1d30 100644
--- a/server/src/api/sdn/vnets.rs
+++ b/server/src/api/sdn/vnets.rs
@@ -13,11 +13,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::{CreateVnet, SdnVnetType};
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-    sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
 
 pub const ROUTER: Router = Router::new()
     .get(&API_METHOD_LIST_VNETS)
@@ -105,26 +101,15 @@ async fn list_vnets(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|vnet| ListVnet {
-                                remote: remote.clone(),
-                                vnet,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
+    for remote_outcome in results {
+        let remote = remote_outcome.remote().to_string();
+
+        match remote_outcome.into_data() {
+            Ok(vnets_on_this_remote) => {
+                vnets.extend(vnets_on_this_remote.into_iter().map(|vnet| ListVnet {
+                    remote: remote.clone(),
+                    vnet,
+                }))
             }
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/zones.rs b/server/src/api/sdn/zones.rs
index c4552795..b5186e27 100644
--- a/server/src/api/sdn/zones.rs
+++ b/server/src/api/sdn/zones.rs
@@ -14,11 +14,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::{CreateZone, ListZonesType};
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-    sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
 
 pub const ROUTER: Router = Router::new()
     .get(&API_METHOD_LIST_ZONES)
@@ -111,27 +107,14 @@ pub async fn list_zones(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|zone| ListZone {
-                                remote: remote.clone(),
-                                zone,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
-            }
+    for remote_outcome in results {
+        let remote = remote_outcome.remote().to_string();
+
+        match remote_outcome.into_data() {
+            Ok(zones) => vnets.extend(zones.into_iter().map(|zone| ListZone {
+                remote: remote.clone(),
+                zone,
+            })),
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
             }
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 93a0d05e..03e71a02 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -16,7 +16,7 @@ use proxmox_section_config::typed::SectionConfigData;
 
 use server::{
     connection,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
+    parallel_fetcher::ParallelFetcher,
     pbs_client,
     remote_tasks::{
         self,
@@ -276,23 +276,32 @@ async fn fetch_remotes(
     let mut all_tasks = Vec::new();
     let mut node_success_map = NodeFetchSuccessMap::default();
 
-    for (remote_name, result) in fetch_results.remote_results {
-        match result {
-            Ok(remote_result) => {
-                for (node_name, node_result) in remote_result.node_results {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            all_tasks.extend(data);
-                            node_success_map.set_node_success(remote_name.clone(), node_name);
+    for remote_outcome in fetch_results {
+        match remote_outcome.nodes() {
+            Ok(node_outcomes) => {
+                for node_outcome in node_outcomes {
+                    let node_name = node_outcome.node_name().to_string();
+
+                    match node_outcome.data() {
+                        Ok(data) => {
+                            all_tasks.extend(data.clone());
+                            node_success_map
+                                .set_node_success(remote_outcome.remote().to_string(), node_name);
                         }
                         Err(err) => {
-                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+                            log::error!(
+                                "could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}",
+                                remote_name = remote_outcome.remote()
+                            );
                         }
                     }
                 }
             }
             Err(err) => {
-                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
+                log::error!(
+                    "could not fetch tasks from remote '{remote}': {err:#}",
+                    remote = remote_outcome.remote()
+                );
             }
         }
     }
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 1eded293..57011096 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::future::Future;
 use std::sync::Arc;
@@ -18,45 +17,133 @@ use crate::connection;
 pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
 pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
 
-pub struct ParallelFetcher<C> {
-    max_connections: usize,
-    max_connections_per_remote: usize,
-    context: C,
+/// Outcome type produced by [`ParallelFetcher::do_for_all_remotes`] or
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+///
+/// This type contains the individual outcomes for each remote. These can be accessed
+/// by iterating over this type (`.iter()`, `.into_iter()`) or by calling
+/// [`FetcherOutcome::get_remote_outcome`].
+pub struct FetcherOutcome<O> {
+    // NOTE: This vector is sorted ascending by remote name.
+    outcomes: Vec<RemoteOutcome<O>>,
 }
 
-pub struct FetchResults<T> {
-    /// Per-remote results. The key in the map is the remote name.
-    pub remote_results: HashMap<String, Result<RemoteResult<T>, Error>>,
-}
+impl<O> IntoIterator for FetcherOutcome<O> {
+    type Item = RemoteOutcome<O>;
+    type IntoIter = std::vec::IntoIter<Self::Item>;
 
-impl<T> Default for FetchResults<T> {
-    fn default() -> Self {
-        Self {
-            remote_results: Default::default(),
-        }
+    fn into_iter(self) -> Self::IntoIter {
+        self.outcomes.into_iter()
     }
 }
 
-#[derive(Debug)]
-pub struct RemoteResult<T> {
-    /// Per-node results. The key in the map is the node name.
-    pub node_results: HashMap<String, Result<NodeResults<T>, Error>>,
-}
+impl<'a, O> IntoIterator for &'a FetcherOutcome<O> {
+    type Item = &'a RemoteOutcome<O>;
+    type IntoIter = std::slice::Iter<'a, RemoteOutcome<O>>;
 
-impl<T> Default for RemoteResult<T> {
-    fn default() -> Self {
-        Self {
-            node_results: Default::default(),
-        }
+    fn into_iter(self) -> Self::IntoIter {
+        self.iter()
     }
 }
 
-#[derive(Debug)]
-pub struct NodeResults<T> {
-    /// The data returned from the passed function.
-    pub data: T,
-    /// Time needed waiting for the passed function to return.
-    pub api_response_time: Duration,
+impl<O> FetcherOutcome<O> {
+    /// Create a new iterator of all contained [`RemoteOutcome`]s.
+    pub fn iter<'a>(&'a self) -> std::slice::Iter<'a, RemoteOutcome<O>> {
+        self.outcomes.iter()
+    }
+
+    /// Get the outcome for a particular remote.
+    pub fn get_remote_outcome(&self, remote: &str) -> Option<&RemoteOutcome<O>> {
+        self.outcomes
+            .binary_search_by(|probe| probe.remote().cmp(remote))
+            .ok()
+            .map(|index| &self.outcomes[index])
+    }
+}
+
+/// Outcome for one remote.
+pub struct RemoteOutcome<O> {
+    remote_name: String,
+    remote_type: RemoteType,
+    outcome: O,
+}
+
+impl<O> RemoteOutcome<O> {
+    /// Returns the remote id.
+    pub fn remote(&self) -> &str {
+        self.remote_name.as_str()
+    }
+
+    /// Returns the type of the remote.
+    pub fn remote_type(&self) -> RemoteType {
+        self.remote_type
+    }
+}
+
+impl<T> RemoteOutcome<NodeOutcome<T>> {
+    /// Access the data that was returned by the handler function.
+    pub fn data(&self) -> Result<&T, &Error> {
+        self.outcome.data()
+    }
+
+    /// Access the data that was returned by the handler function, consuming self.
+    pub fn into_data(self) -> Result<T, Error> {
+        self.outcome.into_data()
+    }
+
+    /// The [`Duration`] of the handler call.
+    pub fn handler_duration(&self) -> Duration {
+        self.outcome.handler_duration()
+    }
+}
+
+impl<T> RemoteOutcome<MultipleNodeOutcome<T>> {
+    /// Access the node results.
+    ///
+    /// This returns an error if the list of nodes could not be fetched
+    /// during [`ParallelFetcher::do_for_all_remote_nodes`].
+    pub fn nodes(&self) -> Result<&Vec<NodeOutcome<T>>, &Error> {
+        self.outcome.inner.as_ref()
+    }
+}
+
+/// Wrapper type used to contain the node results when using
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+pub struct MultipleNodeOutcome<T> {
+    inner: Result<Vec<NodeOutcome<T>>, Error>,
+}
+
+/// Outcome for a single node.
+pub struct NodeOutcome<T> {
+    node_name: String,
+    data: Result<T, Error>,
+    api_response_time: Duration,
+}
+
+impl<T> NodeOutcome<T> {
+    /// Name of the node.
+    ///
+    /// At the moment, this is always `localhost` if `do_for_all_remotes` was used.
+    /// If `do_for_all_remote_nodes` is used, this is the actual nodename for PVE remotes and
+    /// `localhost` for PBS remotes.
+    pub fn node_name(&self) -> &str {
+        &self.node_name
+    }
+
+    /// Access the data that was returned by the handler function.
+    pub fn data(&self) -> Result<&T, &Error> {
+        self.data.as_ref()
+    }
+
+    /// Access the data that was returned by the handler function, consuming `self`.
+    pub fn into_data(self) -> Result<T, Error> {
+        self.data
+    }
+
+    /// The [`Duration`] of the handler call.
+    pub fn handler_duration(&self) -> Duration {
+        self.api_response_time
+    }
 }
 
 /// Builder for the [`ParallelFetcher`] struct.
@@ -101,6 +188,13 @@ impl<C> ParallelFetcherBuilder<C> {
     }
 }
 
+/// Helper for parallelizing API requests to multiple remotes/nodes.
+pub struct ParallelFetcher<C> {
+    max_connections: usize,
+    max_connections_per_remote: usize,
+    context: C,
+}
+
 impl<C: Clone + Send + 'static> ParallelFetcher<C> {
     /// Create a [`ParallelFetcher`] with default settings.
     pub fn new(context: C) -> Self {
@@ -112,7 +206,12 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         ParallelFetcherBuilder::new(context)
     }
 
-    pub async fn do_for_all_remote_nodes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+    /// Invoke a function `func` for all nodes of a given list of remotes in parallel.
+    pub async fn do_for_all_remote_nodes<A, F, T, Ft>(
+        self,
+        remotes: A,
+        func: F,
+    ) -> FetcherOutcome<MultipleNodeOutcome<T>>
     where
         A: Iterator<Item = Remote>,
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -142,20 +241,20 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
             }
         }
 
-        let mut results = FetchResults::default();
+        let mut results = Vec::new();
 
         while let Some(a) = remote_join_set.join_next().await {
             match a {
-                Ok((remote_name, remote_result)) => {
-                    results.remote_results.insert(remote_name, remote_result);
-                }
+                Ok(remote_outcome) => results.push(remote_outcome),
                 Err(err) => {
                     log::error!("join error when waiting for future: {err}")
                 }
             }
         }
 
-        results
+        results.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+        FetcherOutcome { outcomes: results }
     }
 
     async fn fetch_remote<F, Ft, T>(
@@ -164,13 +263,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         semaphore: Arc<Semaphore>,
         func: F,
         max_connections_per_remote: usize,
-    ) -> (String, Result<RemoteResult<T>, Error>)
+    ) -> RemoteOutcome<MultipleNodeOutcome<T>>
     where
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
         Ft: Future<Output = Result<T, Error>> + Send + 'static,
         T: Send + Debug + 'static,
     {
-        let mut per_remote_results = RemoteResult::default();
+        let mut node_outcomes = Vec::new();
 
         let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap());
         let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote));
@@ -188,7 +287,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 .await
                 {
                     Ok(nodes) => nodes,
-                    Err(err) => return (remote.id.clone(), Err(err)),
+                    Err(err) => {
+                        return RemoteOutcome {
+                            remote_name: remote.id,
+                            remote_type: remote.ty,
+                            outcome: MultipleNodeOutcome { inner: Err(err) },
+                        }
+                    }
                 };
 
                 let mut nodes_join_set = JoinSet::new();
@@ -228,10 +333,8 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
 
                 while let Some(join_result) = nodes_join_set.join_next().await {
                     match join_result {
-                        Ok((node_name, per_node_result)) => {
-                            per_remote_results
-                                .node_results
-                                .insert(node_name, per_node_result);
+                        Ok(node_outcome) => {
+                            node_outcomes.push(node_outcome);
                         }
                         Err(e) => {
                             log::error!("join error when waiting for future: {e}")
@@ -240,7 +343,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 }
             }
             RemoteType::Pbs => {
-                let (nodename, result) = Self::fetch_node(
+                let node_outcome = Self::fetch_node(
                     func,
                     context,
                     remote.clone(),
@@ -250,14 +353,17 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 )
                 .await;
 
-                match result {
-                    Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)),
-                    Err(err) => per_remote_results.node_results.insert(nodename, Err(err)),
-                };
+                node_outcomes.push(node_outcome)
             }
         }
 
-        (remote.id, Ok(per_remote_results))
+        RemoteOutcome {
+            remote_name: remote.id,
+            remote_type: remote.ty,
+            outcome: MultipleNodeOutcome {
+                inner: Ok(node_outcomes),
+            },
+        }
     }
 
     async fn fetch_node<F, Ft, T>(
@@ -267,7 +373,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         node: String,
         _permit: OwnedSemaphorePermit,
         _per_remote_connections_permit: Option<OwnedSemaphorePermit>,
-    ) -> (String, Result<NodeResults<T>, Error>)
+    ) -> NodeOutcome<T>
     where
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
         Ft: Future<Output = Result<T, Error>> + Send + 'static,
@@ -277,19 +383,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         let result = func(context, remote.clone(), node.clone()).await;
         let api_response_time = now.elapsed();
 
-        match result {
-            Ok(data) => (
-                node,
-                Ok(NodeResults {
-                    data,
-                    api_response_time,
-                }),
-            ),
-            Err(err) => (node, Err(err)),
+        NodeOutcome {
+            node_name: node,
+            data: result,
+            api_response_time,
         }
     }
 
-    pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+    /// Invoke a function `func` for all passed remotes in parallel.
+    pub async fn do_for_all_remotes<A, F, T, Ft>(
+        self,
+        remotes: A,
+        func: F,
+    ) -> FetcherOutcome<NodeOutcome<T>>
     where
         A: Iterator<Item = Remote>,
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -299,21 +405,31 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
 
         let mut node_join_set = JoinSet::new();
-        let mut results = FetchResults::default();
 
         for remote in remotes {
             let total_connections_semaphore = total_connections_semaphore.clone();
 
             let remote_id = remote.id.clone();
+            let remote_type = remote.ty;
+
             let context = self.context.clone();
             let func = func.clone();
             let future = async move {
                 let permit = total_connections_semaphore.acquire_owned().await.unwrap();
 
-                (
-                    remote_id,
-                    Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
-                )
+                RemoteOutcome {
+                    remote_type,
+                    remote_name: remote_id,
+                    outcome: Self::fetch_node(
+                        func,
+                        context,
+                        remote,
+                        "localhost".into(),
+                        permit,
+                        None,
+                    )
+                    .await,
+                }
             };
 
             if let Some(log_context) = LogContext::current() {
@@ -323,29 +439,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
             }
         }
 
+        let mut results = Vec::new();
+
         while let Some(a) = node_join_set.join_next().await {
             match a {
-                Ok((remote_id, (node_id, node_result))) => {
-                    let mut node_results = HashMap::new();
-                    node_results.insert(node_id, node_result);
-
-                    let remote_result = RemoteResult { node_results };
-
-                    if results
-                        .remote_results
-                        .insert(remote_id, Ok(remote_result))
-                        .is_some()
-                    {
-                        // should never happen, but log for good measure if it actually does
-                        log::warn!("made multiple requests for a remote!");
-                    }
-                }
+                Ok(remote_outcome) => results.push(remote_outcome),
                 Err(err) => {
                     log::error!("join error when waiting for future: {err}")
                 }
             }
         }
 
-        results
+        results.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+        FetcherOutcome { outcomes: results }
     }
 }
diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs
index e772eef5..460a91f2 100644
--- a/server/src/remote_updates.rs
+++ b/server/src/remote_updates.rs
@@ -15,7 +15,7 @@ use pdm_api_types::RemoteUpid;
 use pdm_buildcfg::PDM_CACHE_DIR_M;
 
 use crate::connection;
-use crate::parallel_fetcher::{NodeResults, ParallelFetcher};
+use crate::parallel_fetcher::ParallelFetcher;
 
 pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json");
 
@@ -214,30 +214,28 @@ pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Er
 
     let mut content = get_cached_summary_or_default()?;
 
-    for (remote_name, result) in fetch_results.remote_results {
+    for remote_outcome in fetch_results {
+        let remote_name = remote_outcome.remote().to_string();
+
         let entry = content
             .remotes
             .entry(remote_name.clone())
-            .or_insert_with(|| {
-                // unwrap: remote name came from the same config, should be safe.
-                // TODO: Include type in ParallelFetcher results - should be much more efficient.
-                let remote_type = remotes.iter().find(|r| r.id == remote_name).unwrap().ty;
-
-                RemoteUpdateSummary {
-                    nodes: Default::default(),
-                    remote_type,
-                    status: RemoteUpdateStatus::Success,
-                }
+            .or_insert_with(|| RemoteUpdateSummary {
+                nodes: Default::default(),
+                remote_type: remote_outcome.remote_type(),
+                status: RemoteUpdateStatus::Success,
             });
 
-        match result {
-            Ok(remote_result) => {
+        match remote_outcome.nodes() {
+            Ok(node_outcomes) => {
                 entry.status = RemoteUpdateStatus::Success;
 
-                for (node_name, node_result) in remote_result.node_results {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            entry.nodes.insert(node_name, data.into());
+                for node_outcome in node_outcomes {
+                    let node_name = node_outcome.node_name().to_string();
+
+                    match node_outcome.data() {
+                        Ok(update_info) => {
+                            entry.nodes.insert(node_name, update_info.clone().into());
                         }
                         Err(err) => {
                             // Could not fetch updates from node
-- 
2.47.3





  parent reply	other threads:[~2026-02-04 15:27 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 1/5] parallel fetcher: clean up imports Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 2/5] parallel fetcher: make sure to inherit log context Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 3/5] parallel fetcher: add builder and make struct members private Lukas Wagner
2026-02-04 15:27 ` Lukas Wagner [this message]
2026-02-04 15:27 ` [PATCH datacenter-manager 5/5] parallel fetcher: add module documentation Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260204152723.482258-5-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal