* [PATCH datacenter-manager 0/5] improvements for ParallelFetcher
@ 2026-02-04 15:27 Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 1/5] parallel fetcher: clean up imports Lukas Wagner
` (4 more replies)
0 siblings, 5 replies; 6+ messages in thread
From: Lukas Wagner @ 2026-02-04 15:27 UTC (permalink / raw)
To: pdm-devel
A couple of improvents for the parallel_fetcher module:
- inherit log context so that logging in worker tasks works
- outcome type improvements, better ergonomics for callers
- documentation
proxmox-datacenter-manager:
Lukas Wagner (5):
parallel fetcher: clean up imports
parallel fetcher: make sure to inherit log context
parallel fetcher: add builder and make struct members private
parallel fetcher: improve result type ergonomics
parallel fetcher: add module documentation
server/src/api/pve/firewall.rs | 53 +-
server/src/api/sdn/controllers.rs | 38 +-
server/src/api/sdn/vnets.rs | 35 +-
server/src/api/sdn/zones.rs | 35 +-
.../tasks/remote_tasks.rs | 40 +-
server/src/parallel_fetcher.rs | 458 +++++++++++++-----
server/src/remote_updates.rs | 34 +-
7 files changed, 434 insertions(+), 259 deletions(-)
Summary over all repositories:
7 files changed, 434 insertions(+), 259 deletions(-)
--
Generated by murpp 0.9.0
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH datacenter-manager 1/5] parallel fetcher: clean up imports
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
@ 2026-02-04 15:27 ` Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 2/5] parallel fetcher: make sure to inherit log context Lukas Wagner
` (3 subsequent siblings)
4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2026-02-04 15:27 UTC (permalink / raw)
To: pdm-devel
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/parallel_fetcher.rs | 22 ++++++++++------------
1 file changed, 10 insertions(+), 12 deletions(-)
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 58ca1f55..7fb0d162 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,18 +1,16 @@
-use std::{
- collections::HashMap,
- fmt::Debug,
- future::Future,
- sync::Arc,
- time::{Duration, Instant},
-};
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::future::Future;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
use anyhow::Error;
-use pdm_api_types::remotes::{Remote, RemoteType};
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+use tokio::task::JoinSet;
+
use pve_api_types::ClusterNodeIndexResponse;
-use tokio::{
- sync::{OwnedSemaphorePermit, Semaphore},
- task::JoinSet,
-};
+
+use pdm_api_types::remotes::{Remote, RemoteType};
use crate::connection;
--
2.47.3
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH datacenter-manager 2/5] parallel fetcher: make sure to inherit log context
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 1/5] parallel fetcher: clean up imports Lukas Wagner
@ 2026-02-04 15:27 ` Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 3/5] parallel fetcher: add builder and make struct members private Lukas Wagner
` (2 subsequent siblings)
4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2026-02-04 15:27 UTC (permalink / raw)
To: pdm-devel
If ParallelFetcher is used in a worker task, we need to ensure to
inherit the log context from the current task, otherwise log messages
printed in the handler function are not printed to the task log.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/parallel_fetcher.rs | 33 +++++++++++++++++++++++++--------
1 file changed, 25 insertions(+), 8 deletions(-)
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 7fb0d162..f07a2de3 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -8,6 +8,7 @@ use anyhow::Error;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinSet;
+use proxmox_log::LogContext;
use pve_api_types::ClusterNodeIndexResponse;
use pdm_api_types::remotes::{Remote, RemoteType};
@@ -82,14 +83,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let semaphore = Arc::clone(&total_connections_semaphore);
let f = func.clone();
-
- remote_join_set.spawn(Self::fetch_remote(
+ let future = Self::fetch_remote(
remote,
self.context.clone(),
semaphore,
f,
self.max_connections_per_remote,
- ));
+ );
+
+ if let Some(log_context) = LogContext::current() {
+ remote_join_set.spawn(log_context.scope(future));
+ } else {
+ remote_join_set.spawn(future);
+ }
}
let mut results = FetchResults::default();
@@ -160,14 +166,20 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let node_name = node.node.clone();
let context_clone = context.clone();
- nodes_join_set.spawn(Self::fetch_node(
+ let future = Self::fetch_node(
func_clone,
context_clone,
remote_clone,
node_name,
permit,
Some(per_remote_connections_permit),
- ));
+ );
+
+ if let Some(log_context) = LogContext::current() {
+ nodes_join_set.spawn(log_context.scope(future));
+ } else {
+ nodes_join_set.spawn(future);
+ }
}
while let Some(join_result) = nodes_join_set.join_next().await {
@@ -251,15 +263,20 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let remote_id = remote.id.clone();
let context = self.context.clone();
let func = func.clone();
-
- node_join_set.spawn(async move {
+ let future = async move {
let permit = total_connections_semaphore.acquire_owned().await.unwrap();
(
remote_id,
Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
)
- });
+ };
+
+ if let Some(log_context) = LogContext::current() {
+ node_join_set.spawn(log_context.scope(future));
+ } else {
+ node_join_set.spawn(future);
+ }
}
while let Some(a) = node_join_set.join_next().await {
--
2.47.3
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH datacenter-manager 3/5] parallel fetcher: add builder and make struct members private
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 1/5] parallel fetcher: clean up imports Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 2/5] parallel fetcher: make sure to inherit log context Lukas Wagner
@ 2026-02-04 15:27 ` Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 4/5] parallel fetcher: improve result type ergonomics Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 5/5] parallel fetcher: add module documentation Lukas Wagner
4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2026-02-04 15:27 UTC (permalink / raw)
To: pdm-devel
ParallelFetcher could potentially be used quite often in the PDM
codebase, so it makes sense to make its usage as convenient as possible.
Also, it makes sense to hide its internals as much as possible, so that
we can make future changes without modifying the callers.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
.../tasks/remote_tasks.rs | 9 ++-
server/src/parallel_fetcher.rs | 58 ++++++++++++++++---
2 files changed, 55 insertions(+), 12 deletions(-)
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index c71a0894..93a0d05e 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -264,11 +264,10 @@ async fn fetch_remotes(
remotes: Vec<Remote>,
cache_state: Arc<State>,
) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
- let fetcher = ParallelFetcher {
- max_connections: MAX_CONNECTIONS,
- max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
- context: cache_state,
- };
+ let fetcher = ParallelFetcher::builder(cache_state)
+ .max_connections(MAX_CONNECTIONS)
+ .max_connections_per_remote(CONNECTIONS_PER_PVE_REMOTE)
+ .build();
let fetch_results = fetcher
.do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index f07a2de3..1eded293 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -19,9 +19,9 @@ pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
pub struct ParallelFetcher<C> {
- pub max_connections: usize,
- pub max_connections_per_remote: usize,
- pub context: C,
+ max_connections: usize,
+ max_connections_per_remote: usize,
+ context: C,
}
pub struct FetchResults<T> {
@@ -59,15 +59,59 @@ pub struct NodeResults<T> {
pub api_response_time: Duration,
}
-impl<C: Clone + Send + 'static> ParallelFetcher<C> {
- pub fn new(context: C) -> Self {
+/// Builder for the [`ParallelFetcher`] struct.
+pub struct ParallelFetcherBuilder<C> {
+ max_connections: Option<usize>,
+ max_connections_per_remote: Option<usize>,
+ context: C,
+}
+
+impl<C> ParallelFetcherBuilder<C> {
+ fn new(context: C) -> Self {
Self {
- max_connections: DEFAULT_MAX_CONNECTIONS,
- max_connections_per_remote: DEFAULT_MAX_CONNECTIONS_PER_REMOTE,
context,
+ max_connections: None,
+ max_connections_per_remote: None,
}
}
+ /// Set the maximum number of parallel connections.
+ pub fn max_connections(mut self, limit: usize) -> Self {
+ self.max_connections = Some(limit);
+ self
+ }
+
+ /// Set the maximum number of parallel connections per remote.
+ ///
+ /// This only really affects PVE remotes with multiple cluster members.
+ pub fn max_connections_per_remote(mut self, limit: usize) -> Self {
+ self.max_connections_per_remote = Some(limit);
+ self
+ }
+
+ /// Build the [`ParallelFetcher`] instance.
+ pub fn build(self) -> ParallelFetcher<C> {
+ ParallelFetcher {
+ max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
+ max_connections_per_remote: self
+ .max_connections_per_remote
+ .unwrap_or(DEFAULT_MAX_CONNECTIONS_PER_REMOTE),
+ context: self.context,
+ }
+ }
+}
+
+impl<C: Clone + Send + 'static> ParallelFetcher<C> {
+ /// Create a [`ParallelFetcher`] with default settings.
+ pub fn new(context: C) -> Self {
+ Self::builder(context).build()
+ }
+
+ /// Create the builder for constructing a [`ParallelFetcher`] with custom settings.
+ pub fn builder(context: C) -> ParallelFetcherBuilder<C> {
+ ParallelFetcherBuilder::new(context)
+ }
+
pub async fn do_for_all_remote_nodes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
where
A: Iterator<Item = Remote>,
--
2.47.3
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH datacenter-manager 4/5] parallel fetcher: improve result type ergonomics
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
` (2 preceding siblings ...)
2026-02-04 15:27 ` [PATCH datacenter-manager 3/5] parallel fetcher: add builder and make struct members private Lukas Wagner
@ 2026-02-04 15:27 ` Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 5/5] parallel fetcher: add module documentation Lukas Wagner
4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2026-02-04 15:27 UTC (permalink / raw)
To: pdm-devel
Completely overhaul the *Result types:
- use the term 'Outcome' instead of result, to avoid confusion with
Rust's Result type
- use generics to avoid the awkwardness of accessing the node outcome
of a single node when using do_for_all_remotes (we only call out to
a single node anyways)
- implement .iter()/.into_iter() where it makes sense for conveniently
iterating over remote outcomes and node outcomes
- make all members private and offer getters to access fields
- use a sorted vec as an internal data structure for storing
remote outcomes instead a hash map. This avoids the awkwardness of
having the remote name as key *and* in the struct that is used as a
value. Since the vec is sorted by the remote name, we still can
access specific remotes quite effectively via binary search.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/api/pve/firewall.rs | 53 ++--
server/src/api/sdn/controllers.rs | 38 +--
server/src/api/sdn/vnets.rs | 35 +--
server/src/api/sdn/zones.rs | 35 +--
.../tasks/remote_tasks.rs | 31 +-
server/src/parallel_fetcher.rs | 268 ++++++++++++------
server/src/remote_updates.rs | 34 ++-
7 files changed, 273 insertions(+), 221 deletions(-)
diff --git a/server/src/api/pve/firewall.rs b/server/src/api/pve/firewall.rs
index d81592f7..bf5df3f8 100644
--- a/server/src/api/pve/firewall.rs
+++ b/server/src/api/pve/firewall.rs
@@ -271,12 +271,10 @@ pub async fn pve_firewall_status(
// 2: build context with guests for each remote and fetch node-level data
let mut guests_per_remote = HashMap::new();
- for (remote_id, remote_result) in &cluster_results.remote_results {
- if let Ok(remote_result) = remote_result {
- if let Ok(node_result) = remote_result.node_results.get("localhost").unwrap() {
- guests_per_remote
- .insert(remote_id.clone(), Arc::new(node_result.data.guests.clone()));
- }
+ for remote_outcome in &cluster_results {
+ let remote_id = remote_outcome.remote().to_string();
+ if let Ok(node_result) = remote_outcome.data() {
+ guests_per_remote.insert(remote_id, Arc::new(node_result.guests.clone()));
}
}
@@ -298,26 +296,18 @@ pub async fn pve_firewall_status(
let mut result = Vec::new();
for remote in &pve_remotes {
let mut cluster_status = cluster_results
- .remote_results
- .get(&remote.id)
- .and_then(|r| r.as_ref().ok())
- .and_then(|r| r.node_results.get("localhost"))
- .and_then(|n| n.as_ref().ok())
- .and_then(|n| n.data.status.clone());
+ .get_remote_outcome(&remote.id)
+ .and_then(|r| r.data().ok())
+ .and_then(|n| n.status.clone());
- let node_fetch_result = node_results.remote_results.get(&remote.id);
+ let node_fetch_result = node_results.get_remote_outcome(&remote.id);
let nodes = node_fetch_result
- .and_then(|r| r.as_ref().ok())
- .map(|r| {
- r.node_results
- .values()
- .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
- .collect()
- })
+ .and_then(|r| r.nodes().ok())
+ .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
.unwrap_or_default();
- if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+ if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
cluster_status = None;
}
@@ -389,12 +379,8 @@ pub async fn cluster_firewall_status(
.await;
let cluster_data = cluster_results
- .remote_results
- .get(&remote)
- .and_then(|r| r.as_ref().ok())
- .and_then(|r| r.node_results.get("localhost"))
- .and_then(|n| n.as_ref().ok())
- .map(|n| &n.data);
+ .get_remote_outcome(&remote)
+ .and_then(|r| r.data().ok());
let (cluster_status, guests) = match cluster_data {
Some(data) => (data.status.clone(), data.guests.clone()),
@@ -418,19 +404,14 @@ pub async fn cluster_firewall_status(
.await;
// 3: collect node results
- let node_fetch_result = node_results.remote_results.get(&remote);
+ let node_fetch_result = node_results.get_remote_outcome(&remote);
let nodes = node_fetch_result
- .and_then(|r| r.as_ref().ok())
- .map(|r| {
- r.node_results
- .values()
- .filter_map(|n| n.as_ref().ok().map(|n| n.data.clone()))
- .collect()
- })
+ .and_then(|r| r.nodes().ok())
+ .map(|r| r.iter().filter_map(|n| n.data().ok().cloned()).collect())
.unwrap_or_default();
- let final_status = if node_fetch_result.and_then(|r| r.as_ref().err()).is_some() {
+ let final_status = if node_fetch_result.and_then(|r| r.nodes().err()).is_some() {
None
} else {
cluster_status
diff --git a/server/src/api/sdn/controllers.rs b/server/src/api/sdn/controllers.rs
index 96612516..50f9cb51 100644
--- a/server/src/api/sdn/controllers.rs
+++ b/server/src/api/sdn/controllers.rs
@@ -9,10 +9,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::ListControllersType;
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher};
pub const ROUTER: Router = Router::new().get(&API_METHOD_LIST_CONTROLLERS);
@@ -105,26 +102,19 @@ pub async fn list_controllers(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|controller| ListController {
- remote: remote.clone(),
- controller,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
+ for remote_outcome in results {
+ let remote = remote_outcome.remote().to_string();
+
+ match remote_outcome.into_data() {
+ Ok(sdn_controllers) => {
+ vnets.extend(
+ sdn_controllers
+ .into_iter()
+ .map(|controller| ListController {
+ remote: remote.clone(),
+ controller,
+ }),
+ )
}
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/vnets.rs b/server/src/api/sdn/vnets.rs
index 5e14c6ab..561b1d30 100644
--- a/server/src/api/sdn/vnets.rs
+++ b/server/src/api/sdn/vnets.rs
@@ -13,11 +13,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::{CreateVnet, SdnVnetType};
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
- sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
pub const ROUTER: Router = Router::new()
.get(&API_METHOD_LIST_VNETS)
@@ -105,26 +101,15 @@ async fn list_vnets(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|vnet| ListVnet {
- remote: remote.clone(),
- vnet,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
+ for remote_outcome in results {
+ let remote = remote_outcome.remote().to_string();
+
+ match remote_outcome.into_data() {
+ Ok(vnets_on_this_remote) => {
+ vnets.extend(vnets_on_this_remote.into_iter().map(|vnet| ListVnet {
+ remote: remote.clone(),
+ vnet,
+ }))
}
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
diff --git a/server/src/api/sdn/zones.rs b/server/src/api/sdn/zones.rs
index c4552795..b5186e27 100644
--- a/server/src/api/sdn/zones.rs
+++ b/server/src/api/sdn/zones.rs
@@ -14,11 +14,7 @@ use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
use proxmox_schema::api;
use pve_api_types::{CreateZone, ListZonesType};
-use crate::{
- api::pve,
- parallel_fetcher::{NodeResults, ParallelFetcher},
- sdn_client::LockedSdnClients,
-};
+use crate::{api::pve, parallel_fetcher::ParallelFetcher, sdn_client::LockedSdnClients};
pub const ROUTER: Router = Router::new()
.get(&API_METHOD_LIST_ZONES)
@@ -111,27 +107,14 @@ pub async fn list_zones(
})
.await;
- for (remote, remote_result) in results.remote_results.into_iter() {
- match remote_result {
- Ok(remote_result) => {
- for (node, node_result) in remote_result.node_results.into_iter() {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- vnets.extend(data.into_iter().map(|zone| ListZone {
- remote: remote.clone(),
- zone,
- }))
- }
- Err(error) => {
- log::error!(
- "could not fetch vnets from remote {} node {}: {error:#}",
- remote,
- node
- );
- }
- }
- }
- }
+ for remote_outcome in results {
+ let remote = remote_outcome.remote().to_string();
+
+ match remote_outcome.into_data() {
+ Ok(zones) => vnets.extend(zones.into_iter().map(|zone| ListZone {
+ remote: remote.clone(),
+ zone,
+ })),
Err(error) => {
log::error!("could not fetch vnets from remote {}: {error:#}", remote)
}
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 93a0d05e..03e71a02 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -16,7 +16,7 @@ use proxmox_section_config::typed::SectionConfigData;
use server::{
connection,
- parallel_fetcher::{NodeResults, ParallelFetcher},
+ parallel_fetcher::ParallelFetcher,
pbs_client,
remote_tasks::{
self,
@@ -276,23 +276,32 @@ async fn fetch_remotes(
let mut all_tasks = Vec::new();
let mut node_success_map = NodeFetchSuccessMap::default();
- for (remote_name, result) in fetch_results.remote_results {
- match result {
- Ok(remote_result) => {
- for (node_name, node_result) in remote_result.node_results {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- all_tasks.extend(data);
- node_success_map.set_node_success(remote_name.clone(), node_name);
+ for remote_outcome in fetch_results {
+ match remote_outcome.nodes() {
+ Ok(node_outcomes) => {
+ for node_outcome in node_outcomes {
+ let node_name = node_outcome.node_name().to_string();
+
+ match node_outcome.data() {
+ Ok(data) => {
+ all_tasks.extend(data.clone());
+ node_success_map
+ .set_node_success(remote_outcome.remote().to_string(), node_name);
}
Err(err) => {
- log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+ log::error!(
+ "could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}",
+ remote_name = remote_outcome.remote()
+ );
}
}
}
}
Err(err) => {
- log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
+ log::error!(
+ "could not fetch tasks from remote '{remote}': {err:#}",
+ remote = remote_outcome.remote()
+ );
}
}
}
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 1eded293..57011096 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
@@ -18,45 +17,133 @@ use crate::connection;
pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
-pub struct ParallelFetcher<C> {
- max_connections: usize,
- max_connections_per_remote: usize,
- context: C,
+/// Outcome type produced by [`ParallelFetcher::do_for_all_remotes`] or
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+///
+/// This type contains the individual outcomes for each remote. These can be accessed
+/// by iterating over this type (`.iter()`, `.into_iter()`) or by calling
+/// [`FetcherOutcome::get_remote_outcome`].
+pub struct FetcherOutcome<O> {
+ // NOTE: This vector is sorted ascending by remote name.
+ outcomes: Vec<RemoteOutcome<O>>,
}
-pub struct FetchResults<T> {
- /// Per-remote results. The key in the map is the remote name.
- pub remote_results: HashMap<String, Result<RemoteResult<T>, Error>>,
-}
+impl<O> IntoIterator for FetcherOutcome<O> {
+ type Item = RemoteOutcome<O>;
+ type IntoIter = std::vec::IntoIter<Self::Item>;
-impl<T> Default for FetchResults<T> {
- fn default() -> Self {
- Self {
- remote_results: Default::default(),
- }
+ fn into_iter(self) -> Self::IntoIter {
+ self.outcomes.into_iter()
}
}
-#[derive(Debug)]
-pub struct RemoteResult<T> {
- /// Per-node results. The key in the map is the node name.
- pub node_results: HashMap<String, Result<NodeResults<T>, Error>>,
-}
+impl<'a, O> IntoIterator for &'a FetcherOutcome<O> {
+ type Item = &'a RemoteOutcome<O>;
+ type IntoIter = std::slice::Iter<'a, RemoteOutcome<O>>;
-impl<T> Default for RemoteResult<T> {
- fn default() -> Self {
- Self {
- node_results: Default::default(),
- }
+ fn into_iter(self) -> Self::IntoIter {
+ self.iter()
}
}
-#[derive(Debug)]
-pub struct NodeResults<T> {
- /// The data returned from the passed function.
- pub data: T,
- /// Time needed waiting for the passed function to return.
- pub api_response_time: Duration,
+impl<O> FetcherOutcome<O> {
+ /// Create a new iterator of all contained [`RemoteOutcome`]s.
+ pub fn iter<'a>(&'a self) -> std::slice::Iter<'a, RemoteOutcome<O>> {
+ self.outcomes.iter()
+ }
+
+ /// Get the outcome for a particular remote.
+ pub fn get_remote_outcome(&self, remote: &str) -> Option<&RemoteOutcome<O>> {
+ self.outcomes
+ .binary_search_by(|probe| probe.remote().cmp(remote))
+ .ok()
+ .map(|index| &self.outcomes[index])
+ }
+}
+
+/// Outcome for one remote.
+pub struct RemoteOutcome<O> {
+ remote_name: String,
+ remote_type: RemoteType,
+ outcome: O,
+}
+
+impl<O> RemoteOutcome<O> {
+ /// Returns the remote id.
+ pub fn remote(&self) -> &str {
+ self.remote_name.as_str()
+ }
+
+ /// Returns the type of the remote.
+ pub fn remote_type(&self) -> RemoteType {
+ self.remote_type
+ }
+}
+
+impl<T> RemoteOutcome<NodeOutcome<T>> {
+ /// Access the data that was returned by the handler function.
+ pub fn data(&self) -> Result<&T, &Error> {
+ self.outcome.data()
+ }
+
+ /// Access the data that was returned by the handler function, consuming self.
+ pub fn into_data(self) -> Result<T, Error> {
+ self.outcome.into_data()
+ }
+
+ /// The [`Duration`] of the handler call.
+ pub fn handler_duration(&self) -> Duration {
+ self.outcome.handler_duration()
+ }
+}
+
+impl<T> RemoteOutcome<MultipleNodeOutcome<T>> {
+ /// Access the node results.
+ ///
+ /// This returns an error if the list of nodes could not be fetched
+ /// during [`ParallelFetcher::do_for_all_remote_nodes`].
+ pub fn nodes(&self) -> Result<&Vec<NodeOutcome<T>>, &Error> {
+ self.outcome.inner.as_ref()
+ }
+}
+
+/// Wrapper type used to contain the node results when using
+/// [`ParallelFetcher::do_for_all_remote_nodes`].
+pub struct MultipleNodeOutcome<T> {
+ inner: Result<Vec<NodeOutcome<T>>, Error>,
+}
+
+/// Outcome for a single node.
+pub struct NodeOutcome<T> {
+ node_name: String,
+ data: Result<T, Error>,
+ api_response_time: Duration,
+}
+
+impl<T> NodeOutcome<T> {
+ /// Name of the node.
+ ///
+ /// At the moment, this is always `localhost` if `do_for_all_remotes` was used.
+ /// If `do_for_all_remote_nodes` is used, this is the actual nodename for PVE remotes and
+ /// `localhost` for PBS remotes.
+ pub fn node_name(&self) -> &str {
+ &self.node_name
+ }
+
+ /// Access the data that was returned by the handler function.
+ pub fn data(&self) -> Result<&T, &Error> {
+ self.data.as_ref()
+ }
+
+ /// Access the data that was returned by the handler function, consuming `self`.
+ pub fn into_data(self) -> Result<T, Error> {
+ self.data
+ }
+
+ /// The [`Duration`] of the handler call.
+ pub fn handler_duration(&self) -> Duration {
+ self.api_response_time
+ }
}
/// Builder for the [`ParallelFetcher`] struct.
@@ -101,6 +188,13 @@ impl<C> ParallelFetcherBuilder<C> {
}
}
+/// Helper for parallelizing API requests to multiple remotes/nodes.
+pub struct ParallelFetcher<C> {
+ max_connections: usize,
+ max_connections_per_remote: usize,
+ context: C,
+}
+
impl<C: Clone + Send + 'static> ParallelFetcher<C> {
/// Create a [`ParallelFetcher`] with default settings.
pub fn new(context: C) -> Self {
@@ -112,7 +206,12 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
ParallelFetcherBuilder::new(context)
}
- pub async fn do_for_all_remote_nodes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ /// Invoke a function `func` for all nodes of a given list of remotes in parallel.
+ pub async fn do_for_all_remote_nodes<A, F, T, Ft>(
+ self,
+ remotes: A,
+ func: F,
+ ) -> FetcherOutcome<MultipleNodeOutcome<T>>
where
A: Iterator<Item = Remote>,
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -142,20 +241,20 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
- let mut results = FetchResults::default();
+ let mut results = Vec::new();
while let Some(a) = remote_join_set.join_next().await {
match a {
- Ok((remote_name, remote_result)) => {
- results.remote_results.insert(remote_name, remote_result);
- }
+ Ok(remote_outcome) => results.push(remote_outcome),
Err(err) => {
log::error!("join error when waiting for future: {err}")
}
}
}
- results
+ results.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+ FetcherOutcome { outcomes: results }
}
async fn fetch_remote<F, Ft, T>(
@@ -164,13 +263,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
semaphore: Arc<Semaphore>,
func: F,
max_connections_per_remote: usize,
- ) -> (String, Result<RemoteResult<T>, Error>)
+ ) -> RemoteOutcome<MultipleNodeOutcome<T>>
where
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
Ft: Future<Output = Result<T, Error>> + Send + 'static,
T: Send + Debug + 'static,
{
- let mut per_remote_results = RemoteResult::default();
+ let mut node_outcomes = Vec::new();
let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap());
let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote));
@@ -188,7 +287,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
.await
{
Ok(nodes) => nodes,
- Err(err) => return (remote.id.clone(), Err(err)),
+ Err(err) => {
+ return RemoteOutcome {
+ remote_name: remote.id,
+ remote_type: remote.ty,
+ outcome: MultipleNodeOutcome { inner: Err(err) },
+ }
+ }
};
let mut nodes_join_set = JoinSet::new();
@@ -228,10 +333,8 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
while let Some(join_result) = nodes_join_set.join_next().await {
match join_result {
- Ok((node_name, per_node_result)) => {
- per_remote_results
- .node_results
- .insert(node_name, per_node_result);
+ Ok(node_outcome) => {
+ node_outcomes.push(node_outcome);
}
Err(e) => {
log::error!("join error when waiting for future: {e}")
@@ -240,7 +343,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
RemoteType::Pbs => {
- let (nodename, result) = Self::fetch_node(
+ let node_outcome = Self::fetch_node(
func,
context,
remote.clone(),
@@ -250,14 +353,17 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
)
.await;
- match result {
- Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)),
- Err(err) => per_remote_results.node_results.insert(nodename, Err(err)),
- };
+ node_outcomes.push(node_outcome)
}
}
- (remote.id, Ok(per_remote_results))
+ RemoteOutcome {
+ remote_name: remote.id,
+ remote_type: remote.ty,
+ outcome: MultipleNodeOutcome {
+ inner: Ok(node_outcomes),
+ },
+ }
}
async fn fetch_node<F, Ft, T>(
@@ -267,7 +373,7 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
node: String,
_permit: OwnedSemaphorePermit,
_per_remote_connections_permit: Option<OwnedSemaphorePermit>,
- ) -> (String, Result<NodeResults<T>, Error>)
+ ) -> NodeOutcome<T>
where
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
Ft: Future<Output = Result<T, Error>> + Send + 'static,
@@ -277,19 +383,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let result = func(context, remote.clone(), node.clone()).await;
let api_response_time = now.elapsed();
- match result {
- Ok(data) => (
- node,
- Ok(NodeResults {
- data,
- api_response_time,
- }),
- ),
- Err(err) => (node, Err(err)),
+ NodeOutcome {
+ node_name: node,
+ data: result,
+ api_response_time,
}
}
- pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ /// Invoke a function `func` for all passed remotes in parallel.
+ pub async fn do_for_all_remotes<A, F, T, Ft>(
+ self,
+ remotes: A,
+ func: F,
+ ) -> FetcherOutcome<NodeOutcome<T>>
where
A: Iterator<Item = Remote>,
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -299,21 +405,31 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
let mut node_join_set = JoinSet::new();
- let mut results = FetchResults::default();
for remote in remotes {
let total_connections_semaphore = total_connections_semaphore.clone();
let remote_id = remote.id.clone();
+ let remote_type = remote.ty;
+
let context = self.context.clone();
let func = func.clone();
let future = async move {
let permit = total_connections_semaphore.acquire_owned().await.unwrap();
- (
- remote_id,
- Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
- )
+ RemoteOutcome {
+ remote_type,
+ remote_name: remote_id,
+ outcome: Self::fetch_node(
+ func,
+ context,
+ remote,
+ "localhost".into(),
+ permit,
+ None,
+ )
+ .await,
+ }
};
if let Some(log_context) = LogContext::current() {
@@ -323,29 +439,19 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
+ let mut results = Vec::new();
+
while let Some(a) = node_join_set.join_next().await {
match a {
- Ok((remote_id, (node_id, node_result))) => {
- let mut node_results = HashMap::new();
- node_results.insert(node_id, node_result);
-
- let remote_result = RemoteResult { node_results };
-
- if results
- .remote_results
- .insert(remote_id, Ok(remote_result))
- .is_some()
- {
- // should never happen, but log for good measure if it actually does
- log::warn!("made multiple requests for a remote!");
- }
- }
+ Ok(remote_outcome) => results.push(remote_outcome),
Err(err) => {
log::error!("join error when waiting for future: {err}")
}
}
}
- results
+ results.sort_by(|a, b| a.remote().cmp(b.remote()));
+
+ FetcherOutcome { outcomes: results }
}
}
diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs
index e772eef5..460a91f2 100644
--- a/server/src/remote_updates.rs
+++ b/server/src/remote_updates.rs
@@ -15,7 +15,7 @@ use pdm_api_types::RemoteUpid;
use pdm_buildcfg::PDM_CACHE_DIR_M;
use crate::connection;
-use crate::parallel_fetcher::{NodeResults, ParallelFetcher};
+use crate::parallel_fetcher::ParallelFetcher;
pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json");
@@ -214,30 +214,28 @@ pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Er
let mut content = get_cached_summary_or_default()?;
- for (remote_name, result) in fetch_results.remote_results {
+ for remote_outcome in fetch_results {
+ let remote_name = remote_outcome.remote().to_string();
+
let entry = content
.remotes
.entry(remote_name.clone())
- .or_insert_with(|| {
- // unwrap: remote name came from the same config, should be safe.
- // TODO: Include type in ParallelFetcher results - should be much more efficient.
- let remote_type = remotes.iter().find(|r| r.id == remote_name).unwrap().ty;
-
- RemoteUpdateSummary {
- nodes: Default::default(),
- remote_type,
- status: RemoteUpdateStatus::Success,
- }
+ .or_insert_with(|| RemoteUpdateSummary {
+ nodes: Default::default(),
+ remote_type: remote_outcome.remote_type(),
+ status: RemoteUpdateStatus::Success,
});
- match result {
- Ok(remote_result) => {
+ match remote_outcome.nodes() {
+ Ok(node_outcomes) => {
entry.status = RemoteUpdateStatus::Success;
- for (node_name, node_result) in remote_result.node_results {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- entry.nodes.insert(node_name, data.into());
+ for node_outcome in node_outcomes {
+ let node_name = node_outcome.node_name().to_string();
+
+ match node_outcome.data() {
+ Ok(update_info) => {
+ entry.nodes.insert(node_name, update_info.clone().into());
}
Err(err) => {
// Could not fetch updates from node
--
2.47.3
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH datacenter-manager 5/5] parallel fetcher: add module documentation
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
` (3 preceding siblings ...)
2026-02-04 15:27 ` [PATCH datacenter-manager 4/5] parallel fetcher: improve result type ergonomics Lukas Wagner
@ 2026-02-04 15:27 ` Lukas Wagner
4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2026-02-04 15:27 UTC (permalink / raw)
To: pdm-devel
Adding a (no-run) doctest for the module level documentation gives users
a quick idea on how to use this helper.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/parallel_fetcher.rs | 65 ++++++++++++++++++++++++++++++++++
1 file changed, 65 insertions(+)
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 57011096..0abd9a50 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -1,3 +1,66 @@
+//! Helpers that can be used to parallelize API requests to remotes.
+//!
+//! ```no_run
+//! # use anyhow::Error;
+//! #
+//! # use pdm_api_types::remotes::{RemoteType, Remote};
+//! # use server::parallel_fetcher::ParallelFetcher;
+//! #
+//! # #[tokio::main]
+//! # async fn main() -> Result<(), Error> {
+//! # let remotes: Vec<Remote> = Vec::new();
+//! #
+//! async fn fetch_meaning(
+//! _context: (),
+//! remote: Remote,
+//! node: String,
+//! ) -> Result<i32, Error> {
+//! match remote.ty {
+//! RemoteType::Pve => {
+//! // Perform the API request here and return some result.
+//! Ok(42)
+//! },
+//! RemoteType::Pbs => Ok(42),
+//! }
+//! }
+//!
+//! // This context can be passed to the function what is executed for every remote node.
+//! let context = ();
+//!
+//! let fetcher = ParallelFetcher::builder(context)
+//! .max_connections(10)
+//! .max_connections_per_remote(2)
+//! .build();
+//!
+//! let fetch_result = fetcher
+//! .do_for_all_remote_nodes(remotes.into_iter(), fetch_meaning)
+//! .await;
+//!
+//! for remote_outcome in fetch_result {
+//! match remote_outcome.nodes() {
+//! Ok(node_outcomes) => {
+//! for node_outcome in node_outcomes {
+//! match node_outcome.data() {
+//! Ok(meaning) => assert_eq!(*meaning, 42),
+//! Err(err) =>
+//! log::error!(
+//! "failed to retrieve result for node {}",
+//! node_outcome.node_name()
+//! ),
+//! }
+//! }
+//! }
+//! Err(err) => log::error!(
+//! "failed to connect to remote {}",
+//! remote_outcome.remote()
+//! ),
+//! }
+//! }
+//!
+//! # Ok(())
+//! # }
+//! ```
+
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
@@ -14,7 +77,9 @@ use pdm_api_types::remotes::{Remote, RemoteType};
use crate::connection;
+/// Maximum number of parallel outgoing API requests.
pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
+/// Maximum number of parallel outgoing API requests to the *same* remote.
pub const DEFAULT_MAX_CONNECTIONS_PER_REMOTE: usize = 5;
/// Outcome type produced by [`ParallelFetcher::do_for_all_remotes`] or
--
2.47.3
^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2026-02-04 15:27 UTC | newest]
Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2026-02-04 15:27 [PATCH datacenter-manager 0/5] improvements for ParallelFetcher Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 1/5] parallel fetcher: clean up imports Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 2/5] parallel fetcher: make sure to inherit log context Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 3/5] parallel fetcher: add builder and make struct members private Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 4/5] parallel fetcher: improve result type ergonomics Lukas Wagner
2026-02-04 15:27 ` [PATCH datacenter-manager 5/5] parallel fetcher: add module documentation Lukas Wagner
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.