From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 0F3E01FF186 for ; Fri, 29 Aug 2025 16:10:27 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 2F5942FA42; Fri, 29 Aug 2025 16:10:35 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Date: Fri, 29 Aug 2025 16:10:27 +0200 Message-ID: <20250829141028.309835-3-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.2 In-Reply-To: <20250829141028.309835-1-l.wagner@proxmox.com> References: <20250829141028.309835-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1756476623918 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.027 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 2/3] server: add helper for fetching from multiple remotes at once X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" There is already a helper for fetching results from all remotes and all their nodes. In some cases (e.g. SDN) it is sufficient to fetch the result of the API calls once from any node. Add a second helper that executes the API requests only once per remote on any node, instead of on all nodes. Originally-by: Stefan Hanreich Signed-off-by: Lukas Wagner --- Notes: Changes since Stefan's RFC: - always use 'localhost' node, instead of getting a node list first and using the first node - Some minor changes to make the code nicer/shorter server/src/parallel_fetcher.rs | 93 ++++++++++++++++++++++++++-------- 1 file changed, 71 insertions(+), 22 deletions(-) diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs index e4a5c106..58ca1f55 100644 --- a/server/src/parallel_fetcher.rs +++ b/server/src/parallel_fetcher.rs @@ -162,13 +162,13 @@ impl ParallelFetcher { let node_name = node.node.clone(); let context_clone = context.clone(); - nodes_join_set.spawn(Self::fetch_pve_node( + nodes_join_set.spawn(Self::fetch_node( func_clone, context_clone, remote_clone, node_name, permit, - per_remote_connections_permit, + Some(per_remote_connections_permit), )); } @@ -186,39 +186,33 @@ impl ParallelFetcher { } } RemoteType::Pbs => { - let node = "localhost".to_string(); - - let now = Instant::now(); - let result = func(context, remote.clone(), node.clone()).await; - let api_response_time = now.elapsed(); + let (nodename, result) = Self::fetch_node( + func, + context, + remote.clone(), + "localhost".into(), + permit.unwrap(), // Always set to `Some` at this point + None, + ) + .await; match result { - Ok(data) => { - per_remote_results.node_results.insert( - node, - Ok(NodeResults { - data, - api_response_time, - }), - ); - } - Err(err) => { - per_remote_results.node_results.insert(node, Err(err)); - } - } + Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)), + Err(err) => per_remote_results.node_results.insert(nodename, Err(err)), + }; } } (remote.id, Ok(per_remote_results)) } - async fn fetch_pve_node( + async fn fetch_node( func: F, context: C, remote: Remote, node: String, _permit: OwnedSemaphorePermit, - _per_remote_connections_permit: OwnedSemaphorePermit, + _per_remote_connections_permit: Option, ) -> (String, Result, Error>) where F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, @@ -240,4 +234,59 @@ impl ParallelFetcher { Err(err) => (node, Err(err)), } } + + pub async fn do_for_all_remotes(self, remotes: A, func: F) -> FetchResults + where + A: Iterator, + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, + Ft: Future> + Send + 'static, + T: Send + Debug + 'static, + { + let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections)); + + let mut node_join_set = JoinSet::new(); + let mut results = FetchResults::default(); + + for remote in remotes { + let total_connections_semaphore = total_connections_semaphore.clone(); + + let remote_id = remote.id.clone(); + let context = self.context.clone(); + let func = func.clone(); + + node_join_set.spawn(async move { + let permit = total_connections_semaphore.acquire_owned().await.unwrap(); + + ( + remote_id, + Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await, + ) + }); + } + + while let Some(a) = node_join_set.join_next().await { + match a { + Ok((remote_id, (node_id, node_result))) => { + let mut node_results = HashMap::new(); + node_results.insert(node_id, node_result); + + let remote_result = RemoteResult { node_results }; + + if results + .remote_results + .insert(remote_id, Ok(remote_result)) + .is_some() + { + // should never happen, but log for good measure if it actually does + log::warn!("made multiple requests for a remote!"); + } + } + Err(err) => { + log::error!("join error when waiting for future: {err}") + } + } + } + + results + } } -- 2.47.2 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel