From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 7A73F1FF138 for ; Wed, 04 Feb 2026 16:27:48 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 74E4E1857F; Wed, 4 Feb 2026 16:28:20 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 4/5] parallel fetcher: improve result type ergonomics Date: Wed, 4 Feb 2026 16:27:22 +0100 Message-ID: <20260204152723.482258-5-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260204152723.482258-1-l.wagner@proxmox.com> References: <20260204152723.482258-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1770218781295 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.037 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: ZCTSXSONWDVFSHYN52ELYOH5AQHWRDRV X-Message-ID-Hash: ZCTSXSONWDVFSHYN52ELYOH5AQHWRDRV X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Completely overhaul the *Result types: - use the term 'Outcome' instead of result, to avoid confusion with Rust's Result type - use generics to avoid the awkwardness of accessing the node outcome of a single node when using do_for_all_remotes (we only call out to a single node anyways) - implement .iter()/.into_iter() where it makes sense for conveniently iterating over remote outcomes and node outcomes - make all members private and offer getters to access fields - use a sorted vec as an internal data structure for storing remote outcomes instead a hash map. This avoids the awkwardness of having the remote name as key *and* in the struct that is used as a value. Since the vec is sorted by the remote name, we still can access specific remotes quite effectively via binary search. Signed-off-by: Lukas Wagner --- server/src/api/pve/firewall.rs | 53 ++-- server/src/api/sdn/controllers.rs | 38 +-- server/src/api/sdn/vnets.rs | 35 +-- server/src/api/sdn/zones.rs | 35 +-- .../tasks/remote_tasks.rs | 31 +- server/src/parallel_fetcher.rs | 268 ++++++++++++------ server/src/remote_updates.rs | 34 ++- 7 files changed, 273 insertions(+), 221 deletions(-) diff --git a/server/src/api/pve/firewall.rs b/server/src/api/pve/firewall.rs index d81592f7..bf5df3f8 100644 --- a/server/src/api/pve/firewall.rs +++ b/server/src/api/pve/firewall.rs @@ -271,12 +271,10 @@ pub async fn pve_firewall_status( // 2: build context with guests for each remote and fetch node-level data let mut guests_per_remote = HashMap::new(); - for (remote_id, remote_result) in &cluster_results.remote_results { - if let Ok(remote_result) = remote_result { - if let Ok(node_result) = remote_result.node_results.get("localhost").unwrap() { - guests_per_remote - .insert(remote_id.clone(), Arc::new(node_result.data.guests.clone())); - } + for remote_outcome in &cluster_results { + let remote_id = remote_outcome.remote().to_string(); + if let Ok(node_result) = remote_outcome.data() { + guests_per_remote.insert(remote_id, Arc::new(node_result.guests.clone())); } } @@ -298,26 +296,18 @@ pub async fn pve_firewall_status( let mut result = Vec::new(); for remote in &pve_remotes { let mut cluster_status = cluster_results - .remote_results - .get(&remote.id) - .and_then(|r| r.as_ref().ok()) - .and_then(|r| r.node_results.get("localhost")) - .and_then(|n| n.as_ref().ok()) - .and_then(|n| n.data.status.clone()); + .get_remote_outcome(&remote.id) + .and_then(|r| r.data().ok()) + .and_then(|n| n.status.clone()); - let node_fetch_result = node_results.remote_results.get(&remote.id); + let node_fetch_result = node_results.get_remote_outcome(&remote.id); let nodes = node_fetch_result - .and_then(|r| r.as_ref().ok()) - .map(|r| { - r.node_results - .values() - .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone())) - .collect() - }) + .and_then(|r| r.nodes().ok()) + .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect()) .unwrap_or_default(); - if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() { + if node_fetch_result.and_then(|r| r.nodes().err()).is_some() { cluster_status = None; } @@ -389,12 +379,8 @@ pub async fn cluster_firewall_status( .await; let cluster_data = cluster_results - .remote_results - .get(&remote) - .and_then(|r| r.as_ref().ok()) - .and_then(|r| r.node_results.get("localhost")) - .and_then(|n| n.as_ref().ok()) - .map(|n| &n.data); + .get_remote_outcome(&remote) + .and_then(|r| r.data().ok()); let (cluster_status, guests) = match cluster_data { Some(data) => (data.status.clone(), data.guests.clone()), @@ -418,19 +404,14 @@ pub async fn cluster_firewall_status( .await; // 3: collect node results - let node_fetch_result = node_results.remote_results.get(&remote); + let node_fetch_result = node_results.get_remote_outcome(&remote); let nodes = node_fetch_result - .and_then(|r| r.as_ref().ok()) - .map(|r| { - r.node_results - .values() - .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone())) - .collect() - }) + .and_then(|r| r.nodes().ok()) + .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect()) .unwrap_or_default(); - let final_status = if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() { + let final_status = if node_fetch_result.and_then(|r| r.nodes().err()).is_some() { None } else { cluster_status diff --git a/server/src/api/sdn/controllers.rs b/server/src/api/sdn/controllers.rs index 96612516..50f9cb51 100644 --- a/server/src/api/sdn/controllers.rs +++ b/server/src/api/sdn/controllers.rs @@ -9,10 +9,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment}; use proxmox_schema::api; use pve_api_types::ListControllersType; -use crate::{ - api::pve, - parallel_fetcher::{NodeResults, ParallelFetcher}, -}; +use crate::{api::pve, parallel_fetcher::ParallelFetcher}; pub const ROUTER: Router = Router::new().get(&API_METHOD_LIST_CONTROLLERS); @@ -105,26 +102,19 @@ pub async fn list_controllers( }) .await; - for (remote, remote_result) in results.remote_results.into_iter() { - match remote_result { - Ok(remote_result) => { - for (node, node_result) in remote_result.node_results.into_iter() { - match node_result { - Ok(NodeResults { data, .. }) => { - vnets.extend(data.into_iter().map(|controller| ListController { - remote: remote.clone(), - controller, - })) - } - Err(error) => { - log::error!( - "could not fetch vnets from remote {} node {}: {error:#}", - remote, - node - ); - } - } - } + for remote_outcome in results { + let remote = remote_outcome.remote().to_string(); + + match remote_outcome.into_data() { + Ok(sdn_controllers) => { + vnets.extend( + sdn_controllers + .into_iter() + .map(|controller| ListController { + remote: remote.clone(), + controller, + }), + ) } Err(error) => { log::error!("could not fetch vnets from remote {}: {error:#}", remote) diff --git a/server/src/api/sdn/vnets.rs b/server/src/api/sdn/vnets.rs index 5e14c6ab..561b1d30 100644 --- a/server/src/api/sdn/vnets.rs +++ b/server/src/api/sdn/vnets.rs @@ -13,11 +13,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment}; use proxmox_schema::api; use pve_api_types::{CreateVnet, SdnVnetType}; -use crate::{ - api::pve, - parallel_fetcher::{NodeResults, ParallelFetcher}, - sdn_client::LockedSdnClients, -}; +use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients}; pub const ROUTER: Router = Router::new() .get(&API_METHOD_LIST_VNETS) @@ -105,26 +101,15 @@ async fn list_vnets( }) .await; - for (remote, remote_result) in results.remote_results.into_iter() { - match remote_result { - Ok(remote_result) => { - for (node, node_result) in remote_result.node_results.into_iter() { - match node_result { - Ok(NodeResults { data, .. }) => { - vnets.extend(data.into_iter().map(|vnet| ListVnet { - remote: remote.clone(), - vnet, - })) - } - Err(error) => { - log::error!( - "could not fetch vnets from remote {} node {}: {error:#}", - remote, - node - ); - } - } - } + for remote_outcome in results { + let remote = remote_outcome.remote().to_string(); + + match remote_outcome.into_data() { + Ok(vnets_on_this_remote) => { + vnets.extend(vnets_on_this_remote.into_iter().map(|vnet| ListVnet { + remote: remote.clone(), + vnet, + })) } Err(error) => { log::error!("could not fetch vnets from remote {}: {error:#}", remote) diff --git a/server/src/api/sdn/zones.rs b/server/src/api/sdn/zones.rs index c4552795..b5186e27 100644 --- a/server/src/api/sdn/zones.rs +++ b/server/src/api/sdn/zones.rs @@ -14,11 +14,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment}; use proxmox_schema::api; use pve_api_types::{CreateZone, ListZonesType}; -use crate::{ - api::pve, - parallel_fetcher::{NodeResults, ParallelFetcher}, - sdn_client::LockedSdnClients, -}; +use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients}; pub const ROUTER: Router = Router::new() .get(&API_METHOD_LIST_ZONES) @@ -111,27 +107,14 @@ pub async fn list_zones( }) .await; - for (remote, remote_result) in results.remote_results.into_iter() { - match remote_result { - Ok(remote_result) => { - for (node, node_result) in remote_result.node_results.into_iter() { - match node_result { - Ok(NodeResults { data, .. }) => { - vnets.extend(data.into_iter().map(|zone| ListZone { - remote: remote.clone(), - zone, - })) - } - Err(error) => { - log::error!( - "could not fetch vnets from remote {} node {}: {error:#}", - remote, - node - ); - } - } - } - } + for remote_outcome in results { + let remote = remote_outcome.remote().to_string(); + + match remote_outcome.into_data() { + Ok(zones) => vnets.extend(zones.into_iter().map(|zone| ListZone { + remote: remote.clone(), + zone, + })), Err(error) => { log::error!("could not fetch vnets from remote {}: {error:#}", remote) } diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs index 93a0d05e..03e71a02 100644 --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs @@ -16,7 +16,7 @@ use proxmox_section_config::typed::SectionConfigData; use server::{ connection, - parallel_fetcher::{NodeResults, ParallelFetcher}, + parallel_fetcher::ParallelFetcher, pbs_client, remote_tasks::{ self, @@ -276,23 +276,32 @@ async fn fetch_remotes( let mut all_tasks = Vec::new(); let mut node_success_map = NodeFetchSuccessMap::default(); - for (remote_name, result) in fetch_results.remote_results { - match result { - Ok(remote_result) => { - for (node_name, node_result) in remote_result.node_results { - match node_result { - Ok(NodeResults { data, .. }) => { - all_tasks.extend(data); - node_success_map.set_node_success(remote_name.clone(), node_name); + for remote_outcome in fetch_results { + match remote_outcome.nodes() { + Ok(node_outcomes) => { + for node_outcome in node_outcomes { + let node_name = node_outcome.node_name().to_string(); + + match node_outcome.data() { + Ok(data) => { + all_tasks.extend(data.clone()); + node_success_map + .set_node_success(remote_outcome.remote().to_string(), node_name); } Err(err) => { - log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}"); + log::error!( + "could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}", + remote_name = remote_outcome.remote() + ); } } } } Err(err) => { - log::error!("could not fetch tasks from remote '{remote_name}': {err:#}"); + log::error!( + "could not fetch tasks from remote '{remote}': {err:#}", + remote = remote_outcome.remote() + ); } } } diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs index 1eded293..57011096 100644 --- a/server/src/parallel_fetcher.rs +++ b/server/src/parallel_fetcher.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::fmt::Debug; use std::future::Future; use std::sync::Arc; @@ -18,45 +17,133 @@ use crate::connection; pub const DEFAULT_MAX_CONNECTIONS: usize = 20; pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5; -pub struct ParallelFetcher { - max_connections: usize, - max_connections_per_remote: usize, - context: C, +/// Outcome type produced by [`ParallelFetcher::do_for_all_remotes`] or +/// [`ParallelFetcher::do_for_all_remote_nodes`]. +/// +/// This type contains the individual outcomes for each remote. These can be accessed +/// by iterating over this type (`.iter()`, `.into_iter()`) or by calling +/// [`FetcherOutcome::get_remote_outcome`]. +pub struct FetcherOutcome { + // NOTE: This vector is sorted ascending by remote name. + outcomes: Vec>, } -pub struct FetchResults { - /// Per-remote results. The key in the map is the remote name. - pub remote_results: HashMap, Error>>, -} +impl IntoIterator for FetcherOutcome { + type Item = RemoteOutcome; + type IntoIter = std::vec::IntoIter; -impl Default for FetchResults { - fn default() -> Self { - Self { - remote_results: Default::default(), - } + fn into_iter(self) -> Self::IntoIter { + self.outcomes.into_iter() } } -#[derive(Debug)] -pub struct RemoteResult { - /// Per-node results. The key in the map is the node name. - pub node_results: HashMap, Error>>, -} +impl<'a, O> IntoIterator for &'a FetcherOutcome { + type Item = &'a RemoteOutcome; + type IntoIter = std::slice::Iter<'a, RemoteOutcome>; -impl Default for RemoteResult { - fn default() -> Self { - Self { - node_results: Default::default(), - } + fn into_iter(self) -> Self::IntoIter { + self.iter() } } -#[derive(Debug)] -pub struct NodeResults { - /// The data returned from the passed function. - pub data: T, - /// Time needed waiting for the passed function to return. - pub api_response_time: Duration, +impl FetcherOutcome { + /// Create a new iterator of all contained [`RemoteOutcome`]s. + pub fn iter<'a>(&'a self) -> std::slice::Iter<'a, RemoteOutcome> { + self.outcomes.iter() + } + + /// Get the outcome for a particular remote. + pub fn get_remote_outcome(&self, remote: &str) -> Option<&RemoteOutcome> { + self.outcomes + .binary_search_by(|probe| probe.remote().cmp(remote)) + .ok() + .map(|index| &self.outcomes[index]) + } +} + +/// Outcome for one remote. +pub struct RemoteOutcome { + remote_name: String, + remote_type: RemoteType, + outcome: O, +} + +impl RemoteOutcome { + /// Returns the remote id. + pub fn remote(&self) -> &str { + self.remote_name.as_str() + } + + /// Returns the type of the remote. + pub fn remote_type(&self) -> RemoteType { + self.remote_type + } +} + +impl RemoteOutcome> { + /// Access the data that was returned by the handler function. + pub fn data(&self) -> Result<&T, &Error> { + self.outcome.data() + } + + /// Access the data that was returned by the handler function, consuming self. + pub fn into_data(self) -> Result { + self.outcome.into_data() + } + + /// The [`Duration`] of the handler call. + pub fn handler_duration(&self) -> Duration { + self.outcome.handler_duration() + } +} + +impl RemoteOutcome> { + /// Access the node results. + /// + /// This returns an error if the list of nodes could not be fetched + /// during [`ParallelFetcher::do_for_all_remote_nodes`]. + pub fn nodes(&self) -> Result<&Vec>, &Error> { + self.outcome.inner.as_ref() + } +} + +/// Wrapper type used to contain the node results when using +/// [`ParallelFetcher::do_for_all_remote_nodes`]. +pub struct MultipleNodeOutcome { + inner: Result>, Error>, +} + +/// Outcome for a single node. +pub struct NodeOutcome { + node_name: String, + data: Result, + api_response_time: Duration, +} + +impl NodeOutcome { + /// Name of the node. + /// + /// At the moment, this is always `localhost` if `do_for_all_remotes` was used. + /// If `do_for_all_remote_nodes` is used, this is the actual nodename for PVE remotes and + /// `localhost` for PBS remotes. + pub fn node_name(&self) -> &str { + &self.node_name + } + + /// Access the data that was returned by the handler function. + pub fn data(&self) -> Result<&T, &Error> { + self.data.as_ref() + } + + /// Access the data that was returned by the handler function, consuming `self`. + pub fn into_data(self) -> Result { + self.data + } + + /// The [`Duration`] of the handler call. + pub fn handler_duration(&self) -> Duration { + self.api_response_time + } } /// Builder for the [`ParallelFetcher`] struct. @@ -101,6 +188,13 @@ impl ParallelFetcherBuilder { } } +/// Helper for parallelizing API requests to multiple remotes/nodes. +pub struct ParallelFetcher { + max_connections: usize, + max_connections_per_remote: usize, + context: C, +} + impl ParallelFetcher { /// Create a [`ParallelFetcher`] with default settings. pub fn new(context: C) -> Self { @@ -112,7 +206,12 @@ impl ParallelFetcher { ParallelFetcherBuilder::new(context) } - pub async fn do_for_all_remote_nodes(self, remotes: A, func: F) -> FetchResults + /// Invoke a function `func` for all nodes of a given list of remotes in parallel. + pub async fn do_for_all_remote_nodes( + self, + remotes: A, + func: F, + ) -> FetcherOutcome> where A: Iterator, F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, @@ -142,20 +241,20 @@ impl ParallelFetcher { } } - let mut results = FetchResults::default(); + let mut results = Vec::new(); while let Some(a) = remote_join_set.join_next().await { match a { - Ok((remote_name, remote_result)) => { - results.remote_results.insert(remote_name, remote_result); - } + Ok(remote_outcome) => results.push(remote_outcome), Err(err) => { log::error!("join error when waiting for future: {err}") } } } - results + results.sort_by(|a, b| a.remote().cmp(b.remote())); + + FetcherOutcome { outcomes: results } } async fn fetch_remote( @@ -164,13 +263,13 @@ impl ParallelFetcher { semaphore: Arc, func: F, max_connections_per_remote: usize, - ) -> (String, Result, Error>) + ) -> RemoteOutcome> where F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, Ft: Future> + Send + 'static, T: Send + Debug + 'static, { - let mut per_remote_results = RemoteResult::default(); + let mut node_outcomes = Vec::new(); let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap()); let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote)); @@ -188,7 +287,13 @@ impl ParallelFetcher { .await { Ok(nodes) => nodes, - Err(err) => return (remote.id.clone(), Err(err)), + Err(err) => { + return RemoteOutcome { + remote_name: remote.id, + remote_type: remote.ty, + outcome: MultipleNodeOutcome { inner: Err(err) }, + } + } }; let mut nodes_join_set = JoinSet::new(); @@ -228,10 +333,8 @@ impl ParallelFetcher { while let Some(join_result) = nodes_join_set.join_next().await { match join_result { - Ok((node_name, per_node_result)) => { - per_remote_results - .node_results - .insert(node_name, per_node_result); + Ok(node_outcome) => { + node_outcomes.push(node_outcome); } Err(e) => { log::error!("join error when waiting for future: {e}") @@ -240,7 +343,7 @@ impl ParallelFetcher { } } RemoteType::Pbs => { - let (nodename, result) = Self::fetch_node( + let node_outcome = Self::fetch_node( func, context, remote.clone(), @@ -250,14 +353,17 @@ impl ParallelFetcher { ) .await; - match result { - Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)), - Err(err) => per_remote_results.node_results.insert(nodename, Err(err)), - }; + node_outcomes.push(node_outcome) } } - (remote.id, Ok(per_remote_results)) + RemoteOutcome { + remote_name: remote.id, + remote_type: remote.ty, + outcome: MultipleNodeOutcome { + inner: Ok(node_outcomes), + }, + } } async fn fetch_node( @@ -267,7 +373,7 @@ impl ParallelFetcher { node: String, _permit: OwnedSemaphorePermit, _per_remote_connections_permit: Option, - ) -> (String, Result, Error>) + ) -> NodeOutcome where F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, Ft: Future> + Send + 'static, @@ -277,19 +383,19 @@ impl ParallelFetcher { let result = func(context, remote.clone(), node.clone()).await; let api_response_time = now.elapsed(); - match result { - Ok(data) => ( - node, - Ok(NodeResults { - data, - api_response_time, - }), - ), - Err(err) => (node, Err(err)), + NodeOutcome { + node_name: node, + data: result, + api_response_time, } } - pub async fn do_for_all_remotes(self, remotes: A, func: F) -> FetchResults + /// Invoke a function `func` for all passed remotes in parallel. + pub async fn do_for_all_remotes( + self, + remotes: A, + func: F, + ) -> FetcherOutcome> where A: Iterator, F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, @@ -299,21 +405,31 @@ impl ParallelFetcher { let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections)); let mut node_join_set = JoinSet::new(); - let mut results = FetchResults::default(); for remote in remotes { let total_connections_semaphore = total_connections_semaphore.clone(); let remote_id = remote.id.clone(); + let remote_type = remote.ty; + let context = self.context.clone(); let func = func.clone(); let future = async move { let permit = total_connections_semaphore.acquire_owned().await.unwrap(); - ( - remote_id, - Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await, - ) + RemoteOutcome { + remote_type, + remote_name: remote_id, + outcome: Self::fetch_node( + func, + context, + remote, + "localhost".into(), + permit, + None, + ) + .await, + } }; if let Some(log_context) = LogContext::current() { @@ -323,29 +439,19 @@ impl ParallelFetcher { } } + let mut results = Vec::new(); + while let Some(a) = node_join_set.join_next().await { match a { - Ok((remote_id, (node_id, node_result))) => { - let mut node_results = HashMap::new(); - node_results.insert(node_id, node_result); - - let remote_result = RemoteResult { node_results }; - - if results - .remote_results - .insert(remote_id, Ok(remote_result)) - .is_some() - { - // should never happen, but log for good measure if it actually does - log::warn!("made multiple requests for a remote!"); - } - } + Ok(remote_outcome) => results.push(remote_outcome), Err(err) => { log::error!("join error when waiting for future: {err}") } } } - results + results.sort_by(|a, b| a.remote().cmp(b.remote())); + + FetcherOutcome { outcomes: results } } } diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs index e772eef5..460a91f2 100644 --- a/server/src/remote_updates.rs +++ b/server/src/remote_updates.rs @@ -15,7 +15,7 @@ use pdm_api_types::RemoteUpid; use pdm_buildcfg::PDM_CACHE_DIR_M; use crate::connection; -use crate::parallel_fetcher::{NodeResults, ParallelFetcher}; +use crate::parallel_fetcher::ParallelFetcher; pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json"); @@ -214,30 +214,28 @@ pub async fn refresh_update_summary_cache(remotes: Vec) -> Result<(), Er let mut content = get_cached_summary_or_default()?; - for (remote_name, result) in fetch_results.remote_results { + for remote_outcome in fetch_results { + let remote_name = remote_outcome.remote().to_string(); + let entry = content .remotes .entry(remote_name.clone()) - .or_insert_with(|| { - // unwrap: remote name came from the same config, should be safe. - // TODO: Include type in ParallelFetcher results - should be much more efficient. - let remote_type = remotes.iter().find(|r| r.id == remote_name).unwrap().ty; - - RemoteUpdateSummary { - nodes: Default::default(), - remote_type, - status: RemoteUpdateStatus::Success, - } + .or_insert_with(|| RemoteUpdateSummary { + nodes: Default::default(), + remote_type: remote_outcome.remote_type(), + status: RemoteUpdateStatus::Success, }); - match result { - Ok(remote_result) => { + match remote_outcome.nodes() { + Ok(node_outcomes) => { entry.status = RemoteUpdateStatus::Success; - for (node_name, node_result) in remote_result.node_results { - match node_result { - Ok(NodeResults { data, .. }) => { - entry.nodes.insert(node_name, data.into()); + for node_outcome in node_outcomes { + let node_name = node_outcome.node_name().to_string(); + + match node_outcome.data() { + Ok(update_info) => { + entry.nodes.insert(node_name, update_info.clone().into()); } Err(err) => { // Could not fetch updates from node -- 2.47.3