From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id A6E101FF186 for ; Fri, 29 Aug 2025 14:33:23 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 8C4892DDCB; Fri, 29 Aug 2025 14:33:33 +0200 (CEST) From: Stefan Hanreich To: pdm-devel@lists.proxmox.com Date: Fri, 29 Aug 2025 14:33:28 +0200 Message-ID: <20250829123330.184123-2-s.hanreich@proxmox.com> X-Mailer: git-send-email 2.47.2 In-Reply-To: <20250829123330.184123-1-s.hanreich@proxmox.com> References: <20250829123330.184123-1-s.hanreich@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.194 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_LAZY_DOMAIN_SECURITY 1 Sending domain does not have any anti-forgery methods RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RDNS_NONE 0.793 Delivered to internal network by a host with no rDNS SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_NONE 0.001 SPF: sender does not publish an SPF Record Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 1/2] server: add helper for fetching results from one node per remote X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" There is already a helper for fetching results from all remotes and all their nodes. In some cases (e.g. SDN) it is sufficient to fetch the result of the API calls once from any node. Add a second helper that executes the API requests only once per remote on any node, instead of on all nodes. Currently this takes the first node returned from the PVE API, but in the future could be extended to re-try on a different node if the first node is not available or choose a random node instead of hitting the first node all the time. Signed-off-by: Stefan Hanreich --- server/src/parallel_fetcher.rs | 133 ++++++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs index 3fad68a..e55cc25 100644 --- a/server/src/parallel_fetcher.rs +++ b/server/src/parallel_fetcher.rs @@ -6,7 +6,7 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::Error; +use anyhow::{anyhow, Error}; use pdm_api_types::remotes::{Remote, RemoteType}; use pve_api_types::ClusterNodeIndexResponse; use tokio::{ @@ -103,6 +103,107 @@ impl ParallelFetcher { results } + pub async fn do_for_all_remotes(self, remotes: A, func: F) -> FetchResults + where + A: Iterator, + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, + Ft: Future> + Send + 'static, + T: Send + Debug + 'static, + { + let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections)); + + let mut node_join_set = JoinSet::new(); + let mut results = FetchResults::default(); + + for remote in remotes { + // TODO: currently the fetch helpers require a per_remote permit, but we are only making one + // request per remote, so fake it here until we find a better solution to handle both + // cases in the helpers + let per_remote_semaphore = Arc::new(Semaphore::new(1)); + let total_connections_semaphore = total_connections_semaphore.clone(); + + let remote_id = remote.id.clone(); + let context = self.context.clone(); + let func = func.clone(); + + match remote.ty { + RemoteType::Pve => { + let nodes = match Self::fetch_pve_node_list(&remote).await { + Ok(nodes) => nodes, + Err(error) => { + results.remote_results.insert(remote.id.clone(), Err(error)); + continue; + } + }; + + let Some(first_node) = nodes.into_iter().next() else { + results.remote_results.insert(remote.id.clone(), Err(anyhow!("no node returned for remote {}", remote.id))); + continue; + }; + + node_join_set.spawn(async move { + let permit = total_connections_semaphore.acquire_owned().await.unwrap(); + + let per_remote_permit = + per_remote_semaphore.clone().acquire_owned().await.unwrap(); + + ( + remote_id, + Self::fetch_pve_node( + func, + context, + remote, + first_node.node, + permit, + per_remote_permit, + ) + .await, + ) + }); + } + RemoteType::Pbs => { + node_join_set.spawn(async move { + let permit = total_connections_semaphore.acquire_owned().await.unwrap(); + + let per_remote_permit = + per_remote_semaphore.clone().acquire_owned().await.unwrap(); + + ( + remote_id, + Self::fetch_pbs_node(func, context, remote, permit, per_remote_permit) + .await, + ) + }); + } + } + } + + while let Some(a) = node_join_set.join_next().await { + match a { + Ok((remote_id, (node_id, node_result))) => { + let mut node_results = HashMap::new(); + node_results.insert(node_id, node_result); + + let remote_result = RemoteResult { node_results }; + + if results + .remote_results + .insert(remote_id, Ok(remote_result)) + .is_some() + { + // should never happen, but log for good measure if it actually does + log::warn!("made multiple requests for a remote!"); + } + } + Err(err) => { + log::error!("join error when waiting for future: {err}") + } + } + } + + results + } + async fn fetch_remote( remote: Remote, context: C, @@ -205,6 +306,36 @@ impl ParallelFetcher { (remote.id, Ok(per_remote_results)) } + async fn fetch_pve_node_list(remote: &Remote) -> Result, Error> { + let client = connection::make_pve_client(remote)?; + Ok(client.list_nodes().await?) + } + + async fn fetch_pbs_node( + func: F, + context: C, + remote: Remote, + permit: OwnedSemaphorePermit, + per_remote_connections_permit: OwnedSemaphorePermit, + ) -> (String, Result, Error>) + where + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, + Ft: Future> + Send + 'static, + T: Send + Debug + 'static, + { + // implementation is currently the same for PVE / PBS, except that PVE requires a node + // name, which we can hardcode to localhost for PBS + Self::fetch_pve_node( + func, + context, + remote, + "localhost".to_string(), + permit, + per_remote_connections_permit, + ) + .await + } + async fn fetch_pve_node( func: F, context: C, -- 2.47.2 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel