From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 2/3] server: add helper for fetching from multiple remotes at once
Date: Fri, 29 Aug 2025 16:10:27 +0200 [thread overview]
Message-ID: <20250829141028.309835-3-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250829141028.309835-1-l.wagner@proxmox.com>
There is already a helper for fetching results from all remotes and all
their nodes. In some cases (e.g. SDN) it is sufficient to fetch the
result of the API calls once from any node. Add a second helper that
executes the API requests only once per remote on any node, instead of
on all nodes.
Originally-by: Stefan Hanreich <s.hanreich@proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Changes since Stefan's RFC:
- always use 'localhost' node, instead of getting a node list first and using
the first node
- Some minor changes to make the code nicer/shorter
server/src/parallel_fetcher.rs | 93 ++++++++++++++++++++++++++--------
1 file changed, 71 insertions(+), 22 deletions(-)
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index e4a5c106..58ca1f55 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -162,13 +162,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let node_name = node.node.clone();
let context_clone = context.clone();
- nodes_join_set.spawn(Self::fetch_pve_node(
+ nodes_join_set.spawn(Self::fetch_node(
func_clone,
context_clone,
remote_clone,
node_name,
permit,
- per_remote_connections_permit,
+ Some(per_remote_connections_permit),
));
}
@@ -186,39 +186,33 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
RemoteType::Pbs => {
- let node = "localhost".to_string();
-
- let now = Instant::now();
- let result = func(context, remote.clone(), node.clone()).await;
- let api_response_time = now.elapsed();
+ let (nodename, result) = Self::fetch_node(
+ func,
+ context,
+ remote.clone(),
+ "localhost".into(),
+ permit.unwrap(), // Always set to `Some` at this point
+ None,
+ )
+ .await;
match result {
- Ok(data) => {
- per_remote_results.node_results.insert(
- node,
- Ok(NodeResults {
- data,
- api_response_time,
- }),
- );
- }
- Err(err) => {
- per_remote_results.node_results.insert(node, Err(err));
- }
- }
+ Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)),
+ Err(err) => per_remote_results.node_results.insert(nodename, Err(err)),
+ };
}
}
(remote.id, Ok(per_remote_results))
}
- async fn fetch_pve_node<F, Ft, T>(
+ async fn fetch_node<F, Ft, T>(
func: F,
context: C,
remote: Remote,
node: String,
_permit: OwnedSemaphorePermit,
- _per_remote_connections_permit: OwnedSemaphorePermit,
+ _per_remote_connections_permit: Option<OwnedSemaphorePermit>,
) -> (String, Result<NodeResults<T>, Error>)
where
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -240,4 +234,59 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
Err(err) => (node, Err(err)),
}
}
+
+ pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ where
+ A: Iterator<Item = Remote>,
+ F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
+ Ft: Future<Output = Result<T, Error>> + Send + 'static,
+ T: Send + Debug + 'static,
+ {
+ let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
+
+ let mut node_join_set = JoinSet::new();
+ let mut results = FetchResults::default();
+
+ for remote in remotes {
+ let total_connections_semaphore = total_connections_semaphore.clone();
+
+ let remote_id = remote.id.clone();
+ let context = self.context.clone();
+ let func = func.clone();
+
+ node_join_set.spawn(async move {
+ let permit = total_connections_semaphore.acquire_owned().await.unwrap();
+
+ (
+ remote_id,
+ Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
+ )
+ });
+ }
+
+ while let Some(a) = node_join_set.join_next().await {
+ match a {
+ Ok((remote_id, (node_id, node_result))) => {
+ let mut node_results = HashMap::new();
+ node_results.insert(node_id, node_result);
+
+ let remote_result = RemoteResult { node_results };
+
+ if results
+ .remote_results
+ .insert(remote_id, Ok(remote_result))
+ .is_some()
+ {
+ // should never happen, but log for good measure if it actually does
+ log::warn!("made multiple requests for a remote!");
+ }
+ }
+ Err(err) => {
+ log::error!("join error when waiting for future: {err}")
+ }
+ }
+ }
+
+ results
+ }
}
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-08-29 14:10 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-08-29 14:10 [pdm-devel] [PATCH proxmox-datacenter-manager v2 0/3] add helper to fetch from many remote nodes in parallel Lukas Wagner
2025-08-29 14:10 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 1/3] server: add convenience helper to fetch results " Lukas Wagner
2025-08-29 14:10 ` Lukas Wagner [this message]
2025-08-29 14:10 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 3/3] remote task fetching: use ParallelFetcher helper Lukas Wagner
2025-09-01 10:04 ` [pdm-devel] applied: [PATCH proxmox-datacenter-manager v2 0/3] add helper to fetch from many remote nodes in parallel Dominik Csapak
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250829141028.309835-3-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.