From: Stefan Hanreich <s.hanreich@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 1/2] server: add helper for fetching results from one node per remote
Date: Fri, 29 Aug 2025 14:33:28 +0200 [thread overview]
Message-ID: <20250829123330.184123-2-s.hanreich@proxmox.com> (raw)
In-Reply-To: <20250829123330.184123-1-s.hanreich@proxmox.com>
There is already a helper for fetching results from all remotes and
all their nodes. In some cases (e.g. SDN) it is sufficient to fetch
the result of the API calls once from any node. Add a second helper
that executes the API requests only once per remote on any node,
instead of on all nodes. Currently this takes the first node returned
from the PVE API, but in the future could be extended to re-try on a
different node if the first node is not available or choose a random
node instead of hitting the first node all the time.
Signed-off-by: Stefan Hanreich <s.hanreich@proxmox.com>
---
server/src/parallel_fetcher.rs | 133 ++++++++++++++++++++++++++++++++-
1 file changed, 132 insertions(+), 1 deletion(-)
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 3fad68a..e55cc25 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -6,7 +6,7 @@ use std::{
time::{Duration, Instant},
};
-use anyhow::Error;
+use anyhow::{anyhow, Error};
use pdm_api_types::remotes::{Remote, RemoteType};
use pve_api_types::ClusterNodeIndexResponse;
use tokio::{
@@ -103,6 +103,107 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
results
}
+ pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ where
+ A: Iterator<Item = Remote>,
+ F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
+ Ft: Future<Output = Result<T, Error>> + Send + 'static,
+ T: Send + Debug + 'static,
+ {
+ let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
+
+ let mut node_join_set = JoinSet::new();
+ let mut results = FetchResults::default();
+
+ for remote in remotes {
+ // TODO: currently the fetch helpers require a per_remote permit, but we are only making one
+ // request per remote, so fake it here until we find a better solution to handle both
+ // cases in the helpers
+ let per_remote_semaphore = Arc::new(Semaphore::new(1));
+ let total_connections_semaphore = total_connections_semaphore.clone();
+
+ let remote_id = remote.id.clone();
+ let context = self.context.clone();
+ let func = func.clone();
+
+ match remote.ty {
+ RemoteType::Pve => {
+ let nodes = match Self::fetch_pve_node_list(&remote).await {
+ Ok(nodes) => nodes,
+ Err(error) => {
+ results.remote_results.insert(remote.id.clone(), Err(error));
+ continue;
+ }
+ };
+
+ let Some(first_node) = nodes.into_iter().next() else {
+ results.remote_results.insert(remote.id.clone(), Err(anyhow!("no node returned for remote {}", remote.id)));
+ continue;
+ };
+
+ node_join_set.spawn(async move {
+ let permit = total_connections_semaphore.acquire_owned().await.unwrap();
+
+ let per_remote_permit =
+ per_remote_semaphore.clone().acquire_owned().await.unwrap();
+
+ (
+ remote_id,
+ Self::fetch_pve_node(
+ func,
+ context,
+ remote,
+ first_node.node,
+ permit,
+ per_remote_permit,
+ )
+ .await,
+ )
+ });
+ }
+ RemoteType::Pbs => {
+ node_join_set.spawn(async move {
+ let permit = total_connections_semaphore.acquire_owned().await.unwrap();
+
+ let per_remote_permit =
+ per_remote_semaphore.clone().acquire_owned().await.unwrap();
+
+ (
+ remote_id,
+ Self::fetch_pbs_node(func, context, remote, permit, per_remote_permit)
+ .await,
+ )
+ });
+ }
+ }
+ }
+
+ while let Some(a) = node_join_set.join_next().await {
+ match a {
+ Ok((remote_id, (node_id, node_result))) => {
+ let mut node_results = HashMap::new();
+ node_results.insert(node_id, node_result);
+
+ let remote_result = RemoteResult { node_results };
+
+ if results
+ .remote_results
+ .insert(remote_id, Ok(remote_result))
+ .is_some()
+ {
+ // should never happen, but log for good measure if it actually does
+ log::warn!("made multiple requests for a remote!");
+ }
+ }
+ Err(err) => {
+ log::error!("join error when waiting for future: {err}")
+ }
+ }
+ }
+
+ results
+ }
+
async fn fetch_remote<F, Ft, T>(
remote: Remote,
context: C,
@@ -205,6 +306,36 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
(remote.id, Ok(per_remote_results))
}
+ async fn fetch_pve_node_list(remote: &Remote) -> Result<Vec<ClusterNodeIndexResponse>, Error> {
+ let client = connection::make_pve_client(remote)?;
+ Ok(client.list_nodes().await?)
+ }
+
+ async fn fetch_pbs_node<F, Ft, T>(
+ func: F,
+ context: C,
+ remote: Remote,
+ permit: OwnedSemaphorePermit,
+ per_remote_connections_permit: OwnedSemaphorePermit,
+ ) -> (String, Result<NodeResults<T>, Error>)
+ where
+ F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
+ Ft: Future<Output = Result<T, Error>> + Send + 'static,
+ T: Send + Debug + 'static,
+ {
+ // implementation is currently the same for PVE / PBS, except that PVE requires a node
+ // name, which we can hardcode to localhost for PBS
+ Self::fetch_pve_node(
+ func,
+ context,
+ remote,
+ "localhost".to_string(),
+ permit,
+ per_remote_connections_permit,
+ )
+ .await
+ }
+
async fn fetch_pve_node<F, Ft, T>(
func: F,
context: C,
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-08-29 12:33 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-08-29 12:33 [pdm-devel] [RFC proxmox-datacenter-manager 0/2] Add a helper for fetching from multiple remotes once Stefan Hanreich
2025-08-29 12:33 ` Stefan Hanreich [this message]
2025-08-29 12:33 ` [pdm-devel] [PATCH proxmox-datacenter-manager 2/2] server: parallel fetcher: refactor: use helpers for common operations Stefan Hanreich
2025-08-29 13:29 ` [pdm-devel] [RFC proxmox-datacenter-manager 0/2] Add a helper for fetching from multiple remotes once Lukas Wagner
2025-08-29 14:12 ` [pdm-devel] superseded: " Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250829123330.184123-2-s.hanreich@proxmox.com \
--to=s.hanreich@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.