From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager v2 1/2] parallel fetcher: improve result type ergonomics
Date: Fri, 6 Feb 2026 10:43:03 +0100 [thread overview]
Message-ID: <20260206094304.117465-2-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260206094304.117465-1-l.wagner@proxmox.com>
Completely overhaul the *Result types:
- use the term 'Response' instead of Result, to avoid confusion with
Rust's Result type
- use generics to avoid the awkwardness of accessing the node response
of a single node when using do_for_all_remotes (we only call out to
a single node anyways)
- implement .iter()/.into_iter() where it makes sense for conveniently
iterating over remote responses and node responses
- make all members private and offer getters to access fields
- use a sorted vec as an internal data structure for storing remote
responses instead a hash map. This avoids the awkwardness of having
the remote name as key *and* in the struct that is used as a value.
Since the vec is sorted by the remote name, we still can access
specific remotes quite effectively via binary search.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Changes since v2:
- Change type names from *Outcome to *Response (Thx @Thomas)
server/src/api/pve/firewall.rs | 53 ++--
server/src/api/sdn/controllers.rs | 38 +--
server/src/api/sdn/vnets.rs | 35 +--
server/src/api/sdn/zones.rs | 35 +--
.../tasks/remote_tasks.rs | 33 ++-
server/src/parallel_fetcher.rs | 268 ++++++++++++------
server/src/remote_updates.rs | 36 ++-
7 files changed, 275 insertions(+), 223 deletions(-)
diff --git a/server/src/api/pve/firewall.rs b/server/src/api/pve/firewall.rs
index d81592f7..95c29eff 100644
--- a/server/src/api/pve/firewall.rs
+++ b/server/src/api/pve/firewall.rs
@@ -271,12 +271,10 @@ pub async fn pve_firewall_status(
// 2: build context with guests for each remote and fetch node-level data
let mut guests_per_remote = HashMap::new();
- for (remote_id, remote_result) in &cluster_results.remote_results {
- if let Ok(remote_result) = remote_result {
- if let Ok(node_result) = remote_result.node_results.get("localhost").unwrap() {
- guests_per_remote
- .insert(remote_id.clone(), Arc::new(node_result.data.guests.clone()));
- }
+ for remote_response in &cluster_results {
+ let remote_id = remote_response.remote().to_string();
+ if let Ok(node_result) = remote_response.data() {
+ guests_per_remote.insert(remote_id, Arc::new(node_result.guests.clone()));
}
}
@@ -298,26 +296,18 @@ pub async fn pve_firewall_status(
let mut result = Vec::new();
for remote in &pve_remotes {
let mut cluster_status = cluster_results
- .remote_results
- .get(&remote.id)
- .and_then(|r| r.as_ref().ok())
- .and_then(|r| r.node_results.get("localhost"))
- .and_then(|n| n.as_ref().ok())
- .and_then(|n| n.data.status.clone());
+ .get_remote_response(&remote.id)
+ .and_then(|r| r.data().ok())
+ .and_then(|n| n.status.clone());
- let node_fetch_result = node_results.remote_results.get(&remote.id);
+ let node_fetch_result = node_results.get_remote_response(&remote.id);
let nodes = node_fetch_result
- .and_then(|r| r.as_ref().ok())
- .map(|r| {
- r.node_results
- .values()
- .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
- .collect()
- })
+ .and_then(|r| r.nodes().ok())
+ .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
.unwrap_or_default();
- if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+ if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
cluster_status = None;
}
@@ -389,12 +379,8 @@ pub async fn cluster_firewall_status(
.await;
let cluster_data = cluster_results
- .remote_results
- .get(&remote)
- .and_then(|r| r.as_ref().ok())
- .and_then(|r| r.node_results.get("localhost"))
- .and_then(|n| n.as_ref().ok())
- .map(|n| &n.data);
+ .get_remote_response(&remote)
+ .and_then(|r| r.data().ok());
let (cluster_status, guests) = match cluster_data {
Some(data) => (data.status.clone(), data.guests.clone()),
@@ -418,19 +404,14 @@ pub async fn cluster_firewall_status(
.await;
// 3: collect node results
- let node_fetch_result = node_results.remote_results.get(&remote);
+ let node_fetch_result = node_results.get_remote_response(&remote);
let nodes = node_fetch_result
- .and_then(|r| r.as_ref().ok())
- .map(|r| {
- r.node_results
- .values()
- .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
- .collect()
- })
+ .and_then(|r| r.nodes().ok())
+ .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
.unwrap_or_default();
- let final_status = if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+ let final_status = if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
None
} else {
cluster_status
diff --git a/server/src/api/sdn/controllers.rs b/server/src/api/sdn/controllers.rs
index 96612516..3921cf00 100644
--- a/server/src/api/sdn/controllers.rs
+++ b/server/src/api/sdn/controllers.rs
@@ -9,10 +9,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::ListControllersType;
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher};
pub const ROUTER: Router = Router::new().get(&API_METHOD_LIST_CONTROLLERS);
@@ -105,26 +102,19 @@ pub async fn list_controllers(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|controller| ListController {
- remote: remote.clone(),
- controller,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
+ for remote_response in results {
+ let remote = remote_response.remote().to_string();
+
+ match remote_response.into_data() {
+ Ok(sdn_controllers) => {
+ vnets.extend(
+ sdn_controllers
+ .into_iter()
+ .map(|controller| ListController {
+ remote: remote.clone(),
+ controller,
+ }),
+ )
}
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/vnets.rs b/server/src/api/sdn/vnets.rs
index 5e14c6ab..6f2fe6a3 100644
--- a/server/src/api/sdn/vnets.rs
+++ b/server/src/api/sdn/vnets.rs
@@ -13,11 +13,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::{CreateVnet, SdnVnetType};
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
- sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
pub const ROUTER: Router = Router::new()
.get(&API_METHOD_LIST_VNETS)
@@ -105,26 +101,15 @@ async fn list_vnets(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|vnet| ListVnet {
- remote: remote.clone(),
- vnet,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
+ for remote_response in results {
+ let remote = remote_response.remote().to_string();
+
+ match remote_response.into_data() {
+ Ok(vnets_on_this_remote) => {
+ vnets.extend(vnets_on_this_remote.into_iter().map(|vnet| ListVnet {
+ remote: remote.clone(),
+ vnet,
+ }))
}
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/zones.rs b/server/src/api/sdn/zones.rs
index c4552795..57d7caad 100644
--- a/server/src/api/sdn/zones.rs
+++ b/server/src/api/sdn/zones.rs
@@ -14,11 +14,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::{CreateZone, ListZonesType};
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
- sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
pub const ROUTER: Router = Router::new()
.get(&API_METHOD_LIST_ZONES)
@@ -111,27 +107,14 @@ pub async fn list_zones(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|zone| ListZone {
- remote: remote.clone(),
- zone,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
- }
+ for remote_response in results {
+ let remote = remote_response.remote().to_string();
+
+ match remote_response.into_data() {
+ Ok(zones) => vnets.extend(zones.into_iter().map(|zone| ListZone {
+ remote: remote.clone(),
+ zone,
+ })),
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
}
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 93a0d05e..5637dc6f 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -16,7 +16,7 @@ use proxmox_section_config::typed::SectionConfigData;
use server::{
connection,
- parallel_fetcher::{NodeResults, ParallelFetcher},
+ parallel_fetcher::ParallelFetcher,
pbs_client,
remote_tasks::{
self,
@@ -269,30 +269,39 @@ async fn fetch_remotes(
.max_connections_per_remote(CONNECTIONS_PER_PVE_REMOTE)
.build();
- let fetch_results = fetcher
+ let fetch_response = fetcher
.do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
.await;
let mut all_tasks = Vec::new();
let mut node_success_map = NodeFetchSuccessMap::default();
- for (remote_name, result) in fetch_results.remote_results {
- match result {
- Ok(remote_result) => {
- for (node_name, node_result) in remote_result.node_results {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- all_tasks.extend(data);
- node_success_map.set_node_success(remote_name.clone(), node_name);
+ for remote_response in fetch_response {
+ match remote_response.nodes() {
+ Ok(node_responses) => {
+ for node_response in node_responses {
+ let node_name = node_response.node_name().to_string();
+
+ match node_response.data() {
+ Ok(data) => {
+ all_tasks.extend(data.clone());
+ node_success_map
+ .set_node_success(remote_response.remote().to_string(), node_name);
}
Err(err) => {
- log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+ log::error!(
+ "could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}",
+ remote_name = remote_response.remote()
+ );
}
}
}
}
Err(err) => {
- log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
+ log::error!(
+ "could not fetch tasks from remote '{remote}': {err:#}",
+ remote = remote_response.remote()
+ );
}
}
}
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 1eded293..b6f49d6f 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
@@ -18,45 +17,133 @@ use crate::connection;
pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
-pub struct ParallelFetcher<C> {
- max_connections: usize,
- max_connections_per_remote: usize,
- context: C,
+/// Response container type produced by [`ParallelFetcher::do_for_all_remotes`] or
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+///
+/// This type contains the individual responses for each remote. These can be accessed
+/// by iterating over this type (`.iter()`, `.into_iter()`) or by calling
+/// [`FetcherResponse::get_remote_response`].
+pub struct FetcherReponse<R> {
+ // NOTE: This vector is sorted ascending by remote name.
+ remote_responses: Vec<RemoteResponse<R>>,
}
-pub struct FetchResults<T> {
- /// Per-remote results. The key in the map is the remote name.
- pub remote_results: HashMap<String, Result<RemoteResult<T>, Error>>,
-}
+impl<R> IntoIterator for FetcherReponse<R> {
+ type Item = RemoteResponse<R>;
+ type IntoIter = std::vec::IntoIter<Self::Item>;
-impl<T> Default for FetchResults<T> {
- fn default() -> Self {
- Self {
- remote_results: Default::default(),
- }
+ fn into_iter(self) -> Self::IntoIter {
+ self.remote_responses.into_iter()
}
}
-#[derive(Debug)]
-pub struct RemoteResult<T> {
- /// Per-node results. The key in the map is the node name.
- pub node_results: HashMap<String, Result<NodeResults<T>, Error>>,
-}
+impl<'a, O> IntoIterator for &'a FetcherReponse<O> {
+ type Item = &'a RemoteResponse<O>;
+ type IntoIter = std::slice::Iter<'a, RemoteResponse<O>>;
-impl<T> Default for RemoteResult<T> {
- fn default() -> Self {
- Self {
- node_results: Default::default(),
- }
+ fn into_iter(self) -> Self::IntoIter {
+ self.iter()
}
}
-#[derive(Debug)]
-pub struct NodeResults<T> {
- /// The data returned from the passed function.
- pub data: T,
- /// Time needed waiting for the passed function to return.
- pub api_response_time: Duration,
+impl<R> FetcherReponse<R> {
+ /// Create a new iterator of all contained [`RemoteResponse`]s.
+ pub fn iter<'a>(&'a self) -> std::slice::Iter<'a, RemoteResponse<R>> {
+ self.remote_responses.iter()
+ }
+
+ /// Get the response for a particular remote.
+ pub fn get_remote_response(&self, remote: &str) -> Option<&RemoteResponse<R>> {
+ self.remote_responses
+ .binary_search_by(|probe| probe.remote().cmp(remote))
+ .ok()
+ .map(|index| &self.remote_responses[index])
+ }
+}
+
+/// Response container for one remote.
+pub struct RemoteResponse<R> {
+ remote_name: String,
+ remote_type: RemoteType,
+ response: R,
+}
+
+impl<R> RemoteResponse<R> {
+ /// Returns the remote id.
+ pub fn remote(&self) -> &str {
+ self.remote_name.as_str()
+ }
+
+ /// Returns the type of the remote.
+ pub fn remote_type(&self) -> RemoteType {
+ self.remote_type
+ }
+}
+
+impl<T> RemoteResponse<NodeResponse<T>> {
+ /// Access the data that was returned by the handler function.
+ pub fn data(&self) -> Result<&T, &Error> {
+ self.response.data()
+ }
+
+ /// Access the data that was returned by the handler function, consuming self.
+ pub fn into_data(self) -> Result<T, Error> {
+ self.response.into_data()
+ }
+
+ /// The [`Duration`] of the handler call.
+ pub fn handler_duration(&self) -> Duration {
+ self.response.handler_duration()
+ }
+}
+
+impl<T> RemoteResponse<MultipleNodesReponse<T>> {
+ /// Access the node response.
+ ///
+ /// This returns an error if the list of nodes could not be fetched
+ /// during [`ParallelFetcher::do_for_all_remote_nodes`].
+ pub fn nodes(&self) -> Result<&Vec<NodeResponse<T>>, &Error> {
+ self.response.inner.as_ref()
+ }
+}
+
+/// Wrapper type used to contain the node responses when using
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+pub struct MultipleNodesReponse<T> {
+ inner: Result<Vec<NodeResponse<T>>, Error>,
+}
+
+/// Response for a single node.
+pub struct NodeResponse<T> {
+ node_name: String,
+ data: Result<T, Error>,
+ api_response_time: Duration,
+}
+
+impl<T> NodeResponse<T> {
+ /// Name of the node.
+ ///
+ /// At the moment, this is always `localhost` if `do_for_all_remotes` was used.
+ /// If `do_for_all_remote_nodes` is used, this is the actual nodename for PVE remotes and
+ /// `localhost` for PBS remotes.
+ pub fn node_name(&self) -> &str {
+ &self.node_name
+ }
+
+ /// Access the data that was returned by the handler function.
+ pub fn data(&self) -> Result<&T, &Error> {
+ self.data.as_ref()
+ }
+
+ /// Access the data that was returned by the handler function, consuming `self`.
+ pub fn into_data(self) -> Result<T, Error> {
+ self.data
+ }
+
+ /// The [`Duration`] of the handler call.
+ pub fn handler_duration(&self) -> Duration {
+ self.api_response_time
+ }
}
/// Builder for the [`ParallelFetcher`] struct.
@@ -101,6 +188,13 @@ impl<C> ParallelFetcherBuilder<C> {
}
}
+/// Helper for parallelizing API requests to multiple remotes/nodes.
+pub struct ParallelFetcher<C> {
+ max_connections: usize,
+ max_connections_per_remote: usize,
+ context: C,
+}
+
impl<C: Clone + Send + 'static> ParallelFetcher<C> {
/// Create a [`ParallelFetcher`] with default settings.
pub fn new(context: C) -> Self {
@@ -112,7 +206,12 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
ParallelFetcherBuilder::new(context)
}
- pub async fn do_for_all_remote_nodes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ /// Invoke a function `func` for all nodes of a given list of remotes in parallel.
+ pub async fn do_for_all_remote_nodes<A, F, T, Ft>(
+ self,
+ remotes: A,
+ func: F,
+ ) -> FetcherReponse<MultipleNodesReponse<T>>
where
A: Iterator<Item = Remote>,
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -142,20 +241,20 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
- let mut results = FetchResults::default();
+ let mut remote_responses = Vec::new();
while let Some(a) = remote_join_set.join_next().await {
match a {
- Ok((remote_name, remote_result)) => {
- results.remote_results.insert(remote_name, remote_result);
- }
+ Ok(remote_response) => remote_responses.push(remote_response),
Err(err) => {
log::error!("join error when waiting for future: {err}")
}
}
}
- results
+ remote_responses.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+ FetcherReponse { remote_responses }
}
async fn fetch_remote<F, Ft, T>(
@@ -164,13 +263,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
semaphore: Arc<Semaphore>,
func: F,
max_connections_per_remote: usize,
- ) -> (String, Result<RemoteResult<T>, Error>)
+ ) -> RemoteResponse<MultipleNodesReponse<T>>
where
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
Ft: Future<Output = Result<T, Error>> + Send + 'static,
T: Send + Debug + 'static,
{
- let mut per_remote_results = RemoteResult::default();
+ let mut node_responses = Vec::new();
let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap());
let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote));
@@ -188,7 +287,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
.await
{
Ok(nodes) => nodes,
- Err(err) => return (remote.id.clone(), Err(err)),
+ Err(err) => {
+ return RemoteResponse {
+ remote_name: remote.id,
+ remote_type: remote.ty,
+ response: MultipleNodesReponse { inner: Err(err) },
+ }
+ }
};
let mut nodes_join_set = JoinSet::new();
@@ -228,10 +333,8 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
while let Some(join_result) = nodes_join_set.join_next().await {
match join_result {
- Ok((node_name, per_node_result)) => {
- per_remote_results
- .node_results
- .insert(node_name, per_node_result);
+ Ok(node_response) => {
+ node_responses.push(node_response);
}
Err(e) => {
log::error!("join error when waiting for future: {e}")
@@ -240,7 +343,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
RemoteType::Pbs => {
- let (nodename, result) = Self::fetch_node(
+ let node_response = Self::fetch_node(
func,
context,
remote.clone(),
@@ -250,14 +353,17 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
)
.await;
- match result {
- Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)),
- Err(err) => per_remote_results.node_results.insert(nodename, Err(err)),
- };
+ node_responses.push(node_response)
}
}
- (remote.id, Ok(per_remote_results))
+ RemoteResponse {
+ remote_name: remote.id,
+ remote_type: remote.ty,
+ response: MultipleNodesReponse {
+ inner: Ok(node_responses),
+ },
+ }
}
async fn fetch_node<F, Ft, T>(
@@ -267,7 +373,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
node: String,
_permit: OwnedSemaphorePermit,
_per_remote_connections_permit: Option<OwnedSemaphorePermit>,
- ) -> (String, Result<NodeResults<T>, Error>)
+ ) -> NodeResponse<T>
where
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
Ft: Future<Output = Result<T, Error>> + Send + 'static,
@@ -277,19 +383,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let result = func(context, remote.clone(), node.clone()).await;
let api_response_time = now.elapsed();
- match result {
- Ok(data) => (
- node,
- Ok(NodeResults {
- data,
- api_response_time,
- }),
- ),
- Err(err) => (node, Err(err)),
+ NodeResponse {
+ node_name: node,
+ data: result,
+ api_response_time,
}
}
- pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ /// Invoke a function `func` for all passed remotes in parallel.
+ pub async fn do_for_all_remotes<A, F, T, Ft>(
+ self,
+ remotes: A,
+ func: F,
+ ) -> FetcherReponse<NodeResponse<T>>
where
A: Iterator<Item = Remote>,
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -299,21 +405,31 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
let mut node_join_set = JoinSet::new();
- let mut results = FetchResults::default();
for remote in remotes {
let total_connections_semaphore = total_connections_semaphore.clone();
let remote_id = remote.id.clone();
+ let remote_type = remote.ty;
+
let context = self.context.clone();
let func = func.clone();
let future = async move {
let permit = total_connections_semaphore.acquire_owned().await.unwrap();
- (
- remote_id,
- Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
- )
+ RemoteResponse {
+ remote_type,
+ remote_name: remote_id,
+ response: Self::fetch_node(
+ func,
+ context,
+ remote,
+ "localhost".into(),
+ permit,
+ None,
+ )
+ .await,
+ }
};
if let Some(log_context) = LogContext::current() {
@@ -323,29 +439,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
+ let mut remote_responses = Vec::new();
+
while let Some(a) = node_join_set.join_next().await {
match a {
- Ok((remote_id, (node_id, node_result))) => {
- let mut node_results = HashMap::new();
- node_results.insert(node_id, node_result);
-
- let remote_result = RemoteResult { node_results };
-
- if results
- .remote_results
- .insert(remote_id, Ok(remote_result))
- .is_some()
- {
- // should never happen, but log for good measure if it actually does
- log::warn!("made multiple requests for a remote!");
- }
- }
+ Ok(remote_response) => remote_responses.push(remote_response),
Err(err) => {
log::error!("join error when waiting for future: {err}")
}
}
}
- results
+ remote_responses.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+ FetcherReponse { remote_responses }
}
}
diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs
index e772eef5..2762001e 100644
--- a/server/src/remote_updates.rs
+++ b/server/src/remote_updates.rs
@@ -15,7 +15,7 @@ use pdm_api_types::RemoteUpid;
use pdm_buildcfg::PDM_CACHE_DIR_M;
use crate::connection;
-use crate::parallel_fetcher::{NodeResults, ParallelFetcher};
+use crate::parallel_fetcher::ParallelFetcher;
pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json");
@@ -208,36 +208,34 @@ async fn update_cached_summary_for_node(
pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Error> {
let fetcher = ParallelFetcher::new(());
- let fetch_results = fetcher
+ let fetch_response = fetcher
.do_for_all_remote_nodes(remotes.clone().into_iter(), fetch_available_updates)
.await;
let mut content = get_cached_summary_or_default()?;
- for (remote_name, result) in fetch_results.remote_results {
+ for remote_response in fetch_response {
+ let remote_name = remote_response.remote().to_string();
+
let entry = content
.remotes
.entry(remote_name.clone())
- .or_insert_with(|| {
- // unwrap: remote name came from the same config, should be safe.
- // TODO: Include type in ParallelFetcher results - should be much more efficient.
- let remote_type = remotes.iter().find(|r| r.id == remote_name).unwrap().ty;
-
- RemoteUpdateSummary {
- nodes: Default::default(),
- remote_type,
- status: RemoteUpdateStatus::Success,
- }
+ .or_insert_with(|| RemoteUpdateSummary {
+ nodes: Default::default(),
+ remote_type: remote_response.remote_type(),
+ status: RemoteUpdateStatus::Success,
});
- match result {
- Ok(remote_result) => {
+ match remote_response.nodes() {
+ Ok(node_responses) => {
entry.status = RemoteUpdateStatus::Success;
- for (node_name, node_result) in remote_result.node_results {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- entry.nodes.insert(node_name, data.into());
+ for node_response in node_responses {
+ let node_name = node_response.node_name().to_string();
+
+ match node_response.data() {
+ Ok(update_info) => {
+ entry.nodes.insert(node_name, update_info.clone().into());
}
Err(err) => {
// Could not fetch updates from node
--
2.47.3
next prev parent reply other threads:[~2026-02-06 9:43 UTC|newest]
Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-02-06 9:43 [PATCH datacenter-manager v2 0/2] improvements for ParallelFetcher Lukas Wagner
2026-02-06 9:43 ` Lukas Wagner [this message]
2026-02-06 9:43 ` [PATCH datacenter-manager v2 2/2] parallel fetcher: add module documentation Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260206094304.117465-2-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.