public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [PATCH datacenter-manager v2 0/2] improvements for ParallelFetcher
@ 2026-02-06  9:43 Lukas Wagner
  2026-02-06  9:43 ` [PATCH datacenter-manager v2 1/2] parallel fetcher: improve result type ergonomics Lukas Wagner
  2026-02-06  9:43 ` [PATCH datacenter-manager v2 2/2] parallel fetcher: add module documentation Lukas Wagner
  0 siblings, 2 replies; 3+ messages in thread
From: Lukas Wagner @ 2026-02-06  9:43 UTC (permalink / raw)
  To: pdm-devel

A couple of improvents for the parallel_fetcher module:
  - inherit log context so that logging in worker tasks works
  - response type improvements, better ergonomics for callers
  - documentation

Changes since v2:
  - Change type names from *Outcome to *Response


proxmox-datacenter-manager:

Lukas Wagner (2):
  parallel fetcher: improve result type ergonomics
  parallel fetcher: add module documentation

 server/src/api/pve/firewall.rs                |  53 +--
 server/src/api/sdn/controllers.rs             |  38 +-
 server/src/api/sdn/vnets.rs                   |  35 +-
 server/src/api/sdn/zones.rs                   |  35 +-
 .../tasks/remote_tasks.rs                     |  33 +-
 server/src/parallel_fetcher.rs                | 333 +++++++++++++-----
 server/src/remote_updates.rs                  |  36 +-
 7 files changed, 340 insertions(+), 223 deletions(-)


Summary over all repositories:
  7 files changed, 340 insertions(+), 223 deletions(-)

-- 
Generated by murpp 0.9.0




^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH datacenter-manager v2 1/2] parallel fetcher: improve result type ergonomics
  2026-02-06  9:43 [PATCH datacenter-manager v2 0/2] improvements for ParallelFetcher Lukas Wagner
@ 2026-02-06  9:43 ` Lukas Wagner
  2026-02-06  9:43 ` [PATCH datacenter-manager v2 2/2] parallel fetcher: add module documentation Lukas Wagner
  1 sibling, 0 replies; 3+ messages in thread
From: Lukas Wagner @ 2026-02-06  9:43 UTC (permalink / raw)
  To: pdm-devel

Completely overhaul the *Result types:
  - use the term 'Response' instead of Result, to avoid confusion with
    Rust's Result type
  - use generics to avoid the awkwardness of accessing the node response
    of a single node when using do_for_all_remotes (we only call out to
    a single node anyways)
  - implement .iter()/.into_iter() where it makes sense for conveniently
    iterating over remote responses and node responses
  - make all members private and offer getters to access fields
  - use a sorted vec as an internal data structure for storing remote
    responses instead a hash map. This avoids the awkwardness of having
    the remote name as key *and* in the struct that is used as a value.
    Since the vec is sorted by the remote name, we still can access
    specific remotes quite effectively via binary search.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---

Notes:
    Changes since v2:
     - Change type names from *Outcome to *Response (Thx @Thomas)

 server/src/api/pve/firewall.rs                |  53 ++--
 server/src/api/sdn/controllers.rs             |  38 +--
 server/src/api/sdn/vnets.rs                   |  35 +--
 server/src/api/sdn/zones.rs                   |  35 +--
 .../tasks/remote_tasks.rs                     |  33 ++-
 server/src/parallel_fetcher.rs                | 268 ++++++++++++------
 server/src/remote_updates.rs                  |  36 ++-
 7 files changed, 275 insertions(+), 223 deletions(-)

diff --git a/server/src/api/pve/firewall.rs b/server/src/api/pve/firewall.rs
index d81592f7..95c29eff 100644
--- a/server/src/api/pve/firewall.rs
+++ b/server/src/api/pve/firewall.rs
@@ -271,12 +271,10 @@ pub async fn pve_firewall_status(
 
     // 2: build context with guests for each remote and fetch node-level data
     let mut guests_per_remote = HashMap::new();
-    for (remote_id, remote_result) in &cluster_results.remote_results {
-        if let Ok(remote_result) = remote_result {
-            if let Ok(node_result) = remote_result.node_results.get("localhost").unwrap() {
-                guests_per_remote
-                    .insert(remote_id.clone(), Arc::new(node_result.data.guests.clone()));
-            }
+    for remote_response in &cluster_results {
+        let remote_id = remote_response.remote().to_string();
+        if let Ok(node_result) = remote_response.data() {
+            guests_per_remote.insert(remote_id, Arc::new(node_result.guests.clone()));
         }
     }
 
@@ -298,26 +296,18 @@ pub async fn pve_firewall_status(
     let mut result = Vec::new();
     for remote in &pve_remotes {
         let mut cluster_status = cluster_results
-            .remote_results
-            .get(&remote.id)
-            .and_then(|r| r.as_ref().ok())
-            .and_then(|r| r.node_results.get("localhost"))
-            .and_then(|n| n.as_ref().ok())
-            .and_then(|n| n.data.status.clone());
+            .get_remote_response(&remote.id)
+            .and_then(|r| r.data().ok())
+            .and_then(|n| n.status.clone());
 
-        let node_fetch_result = node_results.remote_results.get(&remote.id);
+        let node_fetch_result = node_results.get_remote_response(&remote.id);
 
         let nodes = node_fetch_result
-            .and_then(|r| r.as_ref().ok())
-            .map(|r| {
-                r.node_results
-                    .values()
-                    .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
-                    .collect()
-            })
+            .and_then(|r| r.nodes().ok())
+            .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
             .unwrap_or_default();
 
-        if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+        if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
             cluster_status = None;
         }
 
@@ -389,12 +379,8 @@ pub async fn cluster_firewall_status(
         .await;
 
     let cluster_data = cluster_results
-        .remote_results
-        .get(&remote)
-        .and_then(|r| r.as_ref().ok())
-        .and_then(|r| r.node_results.get("localhost"))
-        .and_then(|n| n.as_ref().ok())
-        .map(|n| &n.data);
+        .get_remote_response(&remote)
+        .and_then(|r| r.data().ok());
 
     let (cluster_status, guests) = match cluster_data {
         Some(data) => (data.status.clone(), data.guests.clone()),
@@ -418,19 +404,14 @@ pub async fn cluster_firewall_status(
         .await;
 
     // 3: collect node results
-    let node_fetch_result = node_results.remote_results.get(&remote);
+    let node_fetch_result = node_results.get_remote_response(&remote);
 
     let nodes = node_fetch_result
-        .and_then(|r| r.as_ref().ok())
-        .map(|r| {
-            r.node_results
-                .values()
-                .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
-                .collect()
-        })
+        .and_then(|r| r.nodes().ok())
+        .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
         .unwrap_or_default();
 
-    let final_status = if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+    let final_status = if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
         None
     } else {
         cluster_status
diff --git a/server/src/api/sdn/controllers.rs b/server/src/api/sdn/controllers.rs
index 96612516..3921cf00 100644
--- a/server/src/api/sdn/controllers.rs
+++ b/server/src/api/sdn/controllers.rs
@@ -9,10 +9,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::ListControllersType;
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher};
 
 pub const ROUTER: Router = Router::new().get(&API_METHOD_LIST_CONTROLLERS);
 
@@ -105,26 +102,19 @@ pub async fn list_controllers(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|controller| ListController {
-                                remote: remote.clone(),
-                                controller,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
+    for remote_response in results {
+        let remote = remote_response.remote().to_string();
+
+        match remote_response.into_data() {
+            Ok(sdn_controllers) => {
+                vnets.extend(
+                    sdn_controllers
+                        .into_iter()
+                        .map(|controller| ListController {
+                            remote: remote.clone(),
+                            controller,
+                        }),
+                )
             }
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/vnets.rs b/server/src/api/sdn/vnets.rs
index 5e14c6ab..6f2fe6a3 100644
--- a/server/src/api/sdn/vnets.rs
+++ b/server/src/api/sdn/vnets.rs
@@ -13,11 +13,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::{CreateVnet, SdnVnetType};
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-    sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
 
 pub const ROUTER: Router = Router::new()
     .get(&API_METHOD_LIST_VNETS)
@@ -105,26 +101,15 @@ async fn list_vnets(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|vnet| ListVnet {
-                                remote: remote.clone(),
-                                vnet,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
+    for remote_response in results {
+        let remote = remote_response.remote().to_string();
+
+        match remote_response.into_data() {
+            Ok(vnets_on_this_remote) => {
+                vnets.extend(vnets_on_this_remote.into_iter().map(|vnet| ListVnet {
+                    remote: remote.clone(),
+                    vnet,
+                }))
             }
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/zones.rs b/server/src/api/sdn/zones.rs
index c4552795..57d7caad 100644
--- a/server/src/api/sdn/zones.rs
+++ b/server/src/api/sdn/zones.rs
@@ -14,11 +14,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
 use proxmox_schema::api;
 use pve_api_types::{CreateZone, ListZonesType};
 
-use crate::{
-    api::pve,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-    sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
 
 pub const ROUTER: Router = Router::new()
     .get(&API_METHOD_LIST_ZONES)
@@ -111,27 +107,14 @@ pub async fn list_zones(
         })
         .await;
 
-    for (remote, remote_result) in results.remote_results.into_iter() {
-        match remote_result {
-            Ok(remote_result) => {
-                for (node, node_result) in remote_result.node_results.into_iter() {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            vnets.extend(data.into_iter().map(|zone| ListZone {
-                                remote: remote.clone(),
-                                zone,
-                            }))
-                        }
-                        Err(error) => {
-                            log::error!(
-                                "could not fetch vnets from remote {} node {}: {error:#}",
-                                remote,
-                                node
-                            );
-                        }
-                    }
-                }
-            }
+    for remote_response in results {
+        let remote = remote_response.remote().to_string();
+
+        match remote_response.into_data() {
+            Ok(zones) => vnets.extend(zones.into_iter().map(|zone| ListZone {
+                remote: remote.clone(),
+                zone,
+            })),
             Err(error) => {
                 log::error!("could not fetch vnets from remote {}: {error:#}", remote)
             }
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 93a0d05e..5637dc6f 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -16,7 +16,7 @@ use proxmox_section_config::typed::SectionConfigData;
 
 use server::{
     connection,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
+    parallel_fetcher::ParallelFetcher,
     pbs_client,
     remote_tasks::{
         self,
@@ -269,30 +269,39 @@ async fn fetch_remotes(
         .max_connections_per_remote(CONNECTIONS_PER_PVE_REMOTE)
         .build();
 
-    let fetch_results = fetcher
+    let fetch_response = fetcher
         .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
         .await;
 
     let mut all_tasks = Vec::new();
     let mut node_success_map = NodeFetchSuccessMap::default();
 
-    for (remote_name, result) in fetch_results.remote_results {
-        match result {
-            Ok(remote_result) => {
-                for (node_name, node_result) in remote_result.node_results {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            all_tasks.extend(data);
-                            node_success_map.set_node_success(remote_name.clone(), node_name);
+    for remote_response in fetch_response {
+        match remote_response.nodes() {
+            Ok(node_responses) => {
+                for node_response in node_responses {
+                    let node_name = node_response.node_name().to_string();
+
+                    match node_response.data() {
+                        Ok(data) => {
+                            all_tasks.extend(data.clone());
+                            node_success_map
+                                .set_node_success(remote_response.remote().to_string(), node_name);
                         }
                         Err(err) => {
-                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+                            log::error!(
+                                "could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}",
+                                remote_name = remote_response.remote()
+                            );
                         }
                     }
                 }
             }
             Err(err) => {
-                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
+                log::error!(
+                    "could not fetch tasks from remote '{remote}': {err:#}",
+                    remote = remote_response.remote()
+                );
             }
         }
     }
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 1eded293..b6f49d6f 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::future::Future;
 use std::sync::Arc;
@@ -18,45 +17,133 @@ use crate::connection;
 pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
 pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
 
-pub struct ParallelFetcher<C> {
-    max_connections: usize,
-    max_connections_per_remote: usize,
-    context: C,
+/// Response container type produced by [`ParallelFetcher::do_for_all_remotes`] or
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+///
+/// This type contains the individual responses for each remote. These can be accessed
+/// by iterating over this type (`.iter()`, `.into_iter()`) or by calling
+/// [`FetcherResponse::get_remote_response`].
+pub struct FetcherReponse<R> {
+    // NOTE: This vector is sorted ascending by remote name.
+    remote_responses: Vec<RemoteResponse<R>>,
 }
 
-pub struct FetchResults<T> {
-    /// Per-remote results. The key in the map is the remote name.
-    pub remote_results: HashMap<String, Result<RemoteResult<T>, Error>>,
-}
+impl<R> IntoIterator for FetcherReponse<R> {
+    type Item = RemoteResponse<R>;
+    type IntoIter = std::vec::IntoIter<Self::Item>;
 
-impl<T> Default for FetchResults<T> {
-    fn default() -> Self {
-        Self {
-            remote_results: Default::default(),
-        }
+    fn into_iter(self) -> Self::IntoIter {
+        self.remote_responses.into_iter()
     }
 }
 
-#[derive(Debug)]
-pub struct RemoteResult<T> {
-    /// Per-node results. The key in the map is the node name.
-    pub node_results: HashMap<String, Result<NodeResults<T>, Error>>,
-}
+impl<'a, O> IntoIterator for &'a FetcherReponse<O> {
+    type Item = &'a RemoteResponse<O>;
+    type IntoIter = std::slice::Iter<'a, RemoteResponse<O>>;
 
-impl<T> Default for RemoteResult<T> {
-    fn default() -> Self {
-        Self {
-            node_results: Default::default(),
-        }
+    fn into_iter(self) -> Self::IntoIter {
+        self.iter()
     }
 }
 
-#[derive(Debug)]
-pub struct NodeResults<T> {
-    /// The data returned from the passed function.
-    pub data: T,
-    /// Time needed waiting for the passed function to return.
-    pub api_response_time: Duration,
+impl<R> FetcherReponse<R> {
+    /// Create a new iterator of all contained [`RemoteResponse`]s.
+    pub fn iter<'a>(&'a self) -> std::slice::Iter<'a, RemoteResponse<R>> {
+        self.remote_responses.iter()
+    }
+
+    /// Get the response for a particular remote.
+    pub fn get_remote_response(&self, remote: &str) -> Option<&RemoteResponse<R>> {
+        self.remote_responses
+            .binary_search_by(|probe| probe.remote().cmp(remote))
+            .ok()
+            .map(|index| &self.remote_responses[index])
+    }
+}
+
+/// Response container for one remote.
+pub struct RemoteResponse<R> {
+    remote_name: String,
+    remote_type: RemoteType,
+    response: R,
+}
+
+impl<R> RemoteResponse<R> {
+    /// Returns the remote id.
+    pub fn remote(&self) -> &str {
+        self.remote_name.as_str()
+    }
+
+    /// Returns the type of the remote.
+    pub fn remote_type(&self) -> RemoteType {
+        self.remote_type
+    }
+}
+
+impl<T> RemoteResponse<NodeResponse<T>> {
+    /// Access the data that was returned by the handler function.
+    pub fn data(&self) -> Result<&T, &Error> {
+        self.response.data()
+    }
+
+    /// Access the data that was returned by the handler function, consuming self.
+    pub fn into_data(self) -> Result<T, Error> {
+        self.response.into_data()
+    }
+
+    /// The [`Duration`] of the handler call.
+    pub fn handler_duration(&self) -> Duration {
+        self.response.handler_duration()
+    }
+}
+
+impl<T> RemoteResponse<MultipleNodesReponse<T>> {
+    /// Access the node response.
+    ///
+    /// This returns an error if the list of nodes could not be fetched
+    /// during [`ParallelFetcher::do_for_all_remote_nodes`].
+    pub fn nodes(&self) -> Result<&Vec<NodeResponse<T>>, &Error> {
+        self.response.inner.as_ref()
+    }
+}
+
+/// Wrapper type used to contain the node responses when using
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+pub struct MultipleNodesReponse<T> {
+    inner: Result<Vec<NodeResponse<T>>, Error>,
+}
+
+/// Response for a single node.
+pub struct NodeResponse<T> {
+    node_name: String,
+    data: Result<T, Error>,
+    api_response_time: Duration,
+}
+
+impl<T> NodeResponse<T> {
+    /// Name of the node.
+    ///
+    /// At the moment, this is always `localhost` if `do_for_all_remotes` was used.
+    /// If `do_for_all_remote_nodes` is used, this is the actual nodename for PVE remotes and
+    /// `localhost` for PBS remotes.
+    pub fn node_name(&self) -> &str {
+        &self.node_name
+    }
+
+    /// Access the data that was returned by the handler function.
+    pub fn data(&self) -> Result<&T, &Error> {
+        self.data.as_ref()
+    }
+
+    /// Access the data that was returned by the handler function, consuming `self`.
+    pub fn into_data(self) -> Result<T, Error> {
+        self.data
+    }
+
+    /// The [`Duration`] of the handler call.
+    pub fn handler_duration(&self) -> Duration {
+        self.api_response_time
+    }
 }
 
 /// Builder for the [`ParallelFetcher`] struct.
@@ -101,6 +188,13 @@ impl<C> ParallelFetcherBuilder<C> {
     }
 }
 
+/// Helper for parallelizing API requests to multiple remotes/nodes.
+pub struct ParallelFetcher<C> {
+    max_connections: usize,
+    max_connections_per_remote: usize,
+    context: C,
+}
+
 impl<C: Clone + Send + 'static> ParallelFetcher<C> {
     /// Create a [`ParallelFetcher`] with default settings.
     pub fn new(context: C) -> Self {
@@ -112,7 +206,12 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         ParallelFetcherBuilder::new(context)
     }
 
-    pub async fn do_for_all_remote_nodes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+    /// Invoke a function `func` for all nodes of a given list of remotes in parallel.
+    pub async fn do_for_all_remote_nodes<A, F, T, Ft>(
+        self,
+        remotes: A,
+        func: F,
+    ) -> FetcherReponse<MultipleNodesReponse<T>>
     where
         A: Iterator<Item = Remote>,
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -142,20 +241,20 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
             }
         }
 
-        let mut results = FetchResults::default();
+        let mut remote_responses = Vec::new();
 
         while let Some(a) = remote_join_set.join_next().await {
             match a {
-                Ok((remote_name, remote_result)) => {
-                    results.remote_results.insert(remote_name, remote_result);
-                }
+                Ok(remote_response) => remote_responses.push(remote_response),
                 Err(err) => {
                     log::error!("join error when waiting for future: {err}")
                 }
             }
         }
 
-        results
+        remote_responses.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+        FetcherReponse { remote_responses }
     }
 
     async fn fetch_remote<F, Ft, T>(
@@ -164,13 +263,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         semaphore: Arc<Semaphore>,
         func: F,
         max_connections_per_remote: usize,
-    ) -> (String, Result<RemoteResult<T>, Error>)
+    ) -> RemoteResponse<MultipleNodesReponse<T>>
     where
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
         Ft: Future<Output = Result<T, Error>> + Send + 'static,
         T: Send + Debug + 'static,
     {
-        let mut per_remote_results = RemoteResult::default();
+        let mut node_responses = Vec::new();
 
         let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap());
         let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote));
@@ -188,7 +287,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 .await
                 {
                     Ok(nodes) => nodes,
-                    Err(err) => return (remote.id.clone(), Err(err)),
+                    Err(err) => {
+                        return RemoteResponse {
+                            remote_name: remote.id,
+                            remote_type: remote.ty,
+                            response: MultipleNodesReponse { inner: Err(err) },
+                        }
+                    }
                 };
 
                 let mut nodes_join_set = JoinSet::new();
@@ -228,10 +333,8 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
 
                 while let Some(join_result) = nodes_join_set.join_next().await {
                     match join_result {
-                        Ok((node_name, per_node_result)) => {
-                            per_remote_results
-                                .node_results
-                                .insert(node_name, per_node_result);
+                        Ok(node_response) => {
+                            node_responses.push(node_response);
                         }
                         Err(e) => {
                             log::error!("join error when waiting for future: {e}")
@@ -240,7 +343,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 }
             }
             RemoteType::Pbs => {
-                let (nodename, result) = Self::fetch_node(
+                let node_response = Self::fetch_node(
                     func,
                     context,
                     remote.clone(),
@@ -250,14 +353,17 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
                 )
                 .await;
 
-                match result {
-                    Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)),
-                    Err(err) => per_remote_results.node_results.insert(nodename, Err(err)),
-                };
+                node_responses.push(node_response)
             }
         }
 
-        (remote.id, Ok(per_remote_results))
+        RemoteResponse {
+            remote_name: remote.id,
+            remote_type: remote.ty,
+            response: MultipleNodesReponse {
+                inner: Ok(node_responses),
+            },
+        }
     }
 
     async fn fetch_node<F, Ft, T>(
@@ -267,7 +373,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         node: String,
         _permit: OwnedSemaphorePermit,
         _per_remote_connections_permit: Option<OwnedSemaphorePermit>,
-    ) -> (String, Result<NodeResults<T>, Error>)
+    ) -> NodeResponse<T>
     where
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
         Ft: Future<Output = Result<T, Error>> + Send + 'static,
@@ -277,19 +383,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         let result = func(context, remote.clone(), node.clone()).await;
         let api_response_time = now.elapsed();
 
-        match result {
-            Ok(data) => (
-                node,
-                Ok(NodeResults {
-                    data,
-                    api_response_time,
-                }),
-            ),
-            Err(err) => (node, Err(err)),
+        NodeResponse {
+            node_name: node,
+            data: result,
+            api_response_time,
         }
     }
 
-    pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+    /// Invoke a function `func` for all passed remotes in parallel.
+    pub async fn do_for_all_remotes<A, F, T, Ft>(
+        self,
+        remotes: A,
+        func: F,
+    ) -> FetcherReponse<NodeResponse<T>>
     where
         A: Iterator<Item = Remote>,
         F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -299,21 +405,31 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
         let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
 
         let mut node_join_set = JoinSet::new();
-        let mut results = FetchResults::default();
 
         for remote in remotes {
             let total_connections_semaphore = total_connections_semaphore.clone();
 
             let remote_id = remote.id.clone();
+            let remote_type = remote.ty;
+
             let context = self.context.clone();
             let func = func.clone();
             let future = async move {
                 let permit = total_connections_semaphore.acquire_owned().await.unwrap();
 
-                (
-                    remote_id,
-                    Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
-                )
+                RemoteResponse {
+                    remote_type,
+                    remote_name: remote_id,
+                    response: Self::fetch_node(
+                        func,
+                        context,
+                        remote,
+                        "localhost".into(),
+                        permit,
+                        None,
+                    )
+                    .await,
+                }
             };
 
             if let Some(log_context) = LogContext::current() {
@@ -323,29 +439,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
             }
         }
 
+        let mut remote_responses = Vec::new();
+
         while let Some(a) = node_join_set.join_next().await {
             match a {
-                Ok((remote_id, (node_id, node_result))) => {
-                    let mut node_results = HashMap::new();
-                    node_results.insert(node_id, node_result);
-
-                    let remote_result = RemoteResult { node_results };
-
-                    if results
-                        .remote_results
-                        .insert(remote_id, Ok(remote_result))
-                        .is_some()
-                    {
-                        // should never happen, but log for good measure if it actually does
-                        log::warn!("made multiple requests for a remote!");
-                    }
-                }
+                Ok(remote_response) => remote_responses.push(remote_response),
                 Err(err) => {
                     log::error!("join error when waiting for future: {err}")
                 }
             }
         }
 
-        results
+        remote_responses.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+        FetcherReponse { remote_responses }
     }
 }
diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs
index e772eef5..2762001e 100644
--- a/server/src/remote_updates.rs
+++ b/server/src/remote_updates.rs
@@ -15,7 +15,7 @@ use pdm_api_types::RemoteUpid;
 use pdm_buildcfg::PDM_CACHE_DIR_M;
 
 use crate::connection;
-use crate::parallel_fetcher::{NodeResults, ParallelFetcher};
+use crate::parallel_fetcher::ParallelFetcher;
 
 pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json");
 
@@ -208,36 +208,34 @@ async fn update_cached_summary_for_node(
 pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Error> {
     let fetcher = ParallelFetcher::new(());
 
-    let fetch_results = fetcher
+    let fetch_response = fetcher
         .do_for_all_remote_nodes(remotes.clone().into_iter(), fetch_available_updates)
         .await;
 
     let mut content = get_cached_summary_or_default()?;
 
-    for (remote_name, result) in fetch_results.remote_results {
+    for remote_response in fetch_response {
+        let remote_name = remote_response.remote().to_string();
+
         let entry = content
             .remotes
             .entry(remote_name.clone())
-            .or_insert_with(|| {
-                // unwrap: remote name came from the same config, should be safe.
-                // TODO: Include type in ParallelFetcher results - should be much more efficient.
-                let remote_type = remotes.iter().find(|r| r.id == remote_name).unwrap().ty;
-
-                RemoteUpdateSummary {
-                    nodes: Default::default(),
-                    remote_type,
-                    status: RemoteUpdateStatus::Success,
-                }
+            .or_insert_with(|| RemoteUpdateSummary {
+                nodes: Default::default(),
+                remote_type: remote_response.remote_type(),
+                status: RemoteUpdateStatus::Success,
             });
 
-        match result {
-            Ok(remote_result) => {
+        match remote_response.nodes() {
+            Ok(node_responses) => {
                 entry.status = RemoteUpdateStatus::Success;
 
-                for (node_name, node_result) in remote_result.node_results {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            entry.nodes.insert(node_name, data.into());
+                for node_response in node_responses {
+                    let node_name = node_response.node_name().to_string();
+
+                    match node_response.data() {
+                        Ok(update_info) => {
+                            entry.nodes.insert(node_name, update_info.clone().into());
                         }
                         Err(err) => {
                             // Could not fetch updates from node
-- 
2.47.3





^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH datacenter-manager v2 2/2] parallel fetcher: add module documentation
  2026-02-06  9:43 [PATCH datacenter-manager v2 0/2] improvements for ParallelFetcher Lukas Wagner
  2026-02-06  9:43 ` [PATCH datacenter-manager v2 1/2] parallel fetcher: improve result type ergonomics Lukas Wagner
@ 2026-02-06  9:43 ` Lukas Wagner
  1 sibling, 0 replies; 3+ messages in thread
From: Lukas Wagner @ 2026-02-06  9:43 UTC (permalink / raw)
  To: pdm-devel

Adding a (no-run) doctest for the module level documentation gives users
a quick idea on how to use this helper.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/parallel_fetcher.rs | 65 ++++++++++++++++++++++++++++++++++
 1 file changed, 65 insertions(+)

diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index b6f49d6f..83f083ca 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,3 +1,66 @@
+//! Helpers that can be used to parallelize API requests to remotes.
+//!
+//! ```no_run
+//! # use anyhow::Error;
+//! #
+//! # use pdm_api_types::remotes::{RemoteType, Remote};
+//! # use server::parallel_fetcher::ParallelFetcher;
+//! #
+//! # #[tokio::main]
+//! # async fn main() -> Result<(), Error> {
+//! #   let remotes: Vec<Remote> = Vec::new();
+//! #
+//!     async fn fetch_meaning(
+//!         _context: (),
+//!         remote: Remote,
+//!         node: String,
+//!     ) -> Result<i32, Error> {
+//!         match remote.ty {
+//!             RemoteType::Pve => {
+//!                 // Perform the API request here and return some result.
+//!                 Ok(42)
+//!             },
+//!             RemoteType::Pbs => Ok(42),
+//!         }
+//!     }
+//!
+//!     // This context can be passed to the function what is executed for every remote node.
+//!     let context = ();
+//!
+//!     let fetcher = ParallelFetcher::builder(context)
+//!         .max_connections(10)
+//!         .max_connections_per_remote(2)
+//!         .build();
+//!
+//!     let fetcher_response = fetcher
+//!         .do_for_all_remote_nodes(remotes.into_iter(), fetch_meaning)
+//!         .await;
+//!
+//!     for remote_response in fetcher_response {
+//!         match remote_response.nodes() {
+//!             Ok(node_responses) => {
+//!                 for node_response in node_responses {
+//!                     match node_response.data() {
+//!                         Ok(meaning) => assert_eq!(*meaning, 42),
+//!                         Err(err) =>
+//!                             log::error!(
+//!                                 "failed to retrieve result for node {}",
+//!                                 node_response.node_name()
+//!                             ),
+//!                     }
+//!                 }
+//!             }
+//!             Err(err) => log::error!(
+//!                 "failed to connect to remote {}",
+//!                 remote_response.remote()
+//!             ),
+//!         }
+//!     }
+//!
+//! #   Ok(())
+//! # }
+//! ```
+
 use std::fmt::Debug;
 use std::future::Future;
 use std::sync::Arc;
@@ -14,7 +77,9 @@ use pdm_api_types::remotes::{Remote, RemoteType};
 
 use crate::connection;
 
+/// Maximum number of parallel outgoing API requests.
 pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
+/// Maximum number of parallel outgoing API requests to the *same* remote.
 pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
 
 /// Response container type produced by [`ParallelFetcher::do_for_all_remotes`] or
-- 
2.47.3





^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2026-02-06  9:43 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2026-02-06  9:43 [PATCH datacenter-manager v2 0/2] improvements for ParallelFetcher Lukas Wagner
2026-02-06  9:43 ` [PATCH datacenter-manager v2 1/2] parallel fetcher: improve result type ergonomics Lukas Wagner
2026-02-06  9:43 ` [PATCH datacenter-manager v2 2/2] parallel fetcher: add module documentation Lukas Wagner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal