From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 55B221FF186 for ; Fri, 29 Aug 2025 16:05:35 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 667B02F918; Fri, 29 Aug 2025 16:05:45 +0200 (CEST) Mime-Version: 1.0 Date: Fri, 29 Aug 2025 16:05:41 +0200 Message-Id: From: "Lukas Wagner" To: "Dominik Csapak" , "Proxmox Datacenter Manager development discussion" X-Mailer: aerc 0.20.1-0-g2ecb8770224a References: <20250828144247.298536-1-l.wagner@proxmox.com> <20250828144247.298536-2-l.wagner@proxmox.com> <5fca5c4c-cdbb-4cbf-b3a0-475f2d7f1f44@proxmox.com> In-Reply-To: <5fca5c4c-cdbb-4cbf-b3a0-475f2d7f1f44@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1756476333151 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.026 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs] Subject: Re: [pdm-devel] [RFC proxmox-datacenter-manager 1/2] server: add convenience helper to fetch results from many remote nodes in parallel X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" On Fri Aug 29, 2025 at 10:57 AM CEST, Dominik Csapak wrote: > some minor comments inline, but looks good in general to me > > On 8/28/25 4:42 PM, Lukas Wagner wrote: >> Implementing this over and over again is a lot of work; many parts of >> PDM need to fetch some kind of resource from all remotes and all nodes >> of these remotes. >> >> The general approach is the same as for remote task fetching (JoinSet >> and semaphores to control concurrency), it will be refactored in a later >> commit to use this new helper. >> >> Signed-off-by: Lukas Wagner >> --- >> server/src/lib.rs | 1 + >> server/src/parallel_fetcher.rs | 236 +++++++++++++++++++++++++++++++++ >> 2 files changed, 237 insertions(+) >> create mode 100644 server/src/parallel_fetcher.rs >> >> diff --git a/server/src/lib.rs b/server/src/lib.rs >> index 485bc792..3f8b7708 100644 >> --- a/server/src/lib.rs >> +++ b/server/src/lib.rs >> @@ -6,6 +6,7 @@ pub mod auth; >> pub mod context; >> pub mod env; >> pub mod metric_collection; >> +pub mod parallel_fetcher; >> pub mod remote_cache; >> pub mod remote_tasks; >> pub mod resource_cache; >> diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs >> new file mode 100644 >> index 00000000..3fad68a0 >> --- /dev/null >> +++ b/server/src/parallel_fetcher.rs >> @@ -0,0 +1,236 @@ >> +use std::{ >> + collections::HashMap, >> + fmt::Debug, >> + future::Future, >> + sync::Arc, >> + time::{Duration, Instant}, >> +}; >> + >> +use anyhow::Error; >> +use pdm_api_types::remotes::{Remote, RemoteType}; >> +use pve_api_types::ClusterNodeIndexResponse; >> +use tokio::{ >> + sync::{OwnedSemaphorePermit, Semaphore}, >> + task::JoinSet, >> +}; >> + >> +use crate::connection; >> + >> +pub struct ParallelFetcher { >> + pub max_connections: usize, >> + pub max_connections_per_remote: usize, >> + pub context: C, >> +} >> + >> +pub struct FetchResults { >> + pub remote_results: HashMap, Error>>, >> +} >> + >> +impl Default for FetchResults { >> + fn default() -> Self { >> + Self { >> + remote_results: Default::default(), >> + } >> + } >> +} >> + >> +#[derive(Debug)] >> +pub struct RemoteResult { >> + pub node_results: HashMap, Error>>, >> +} >> + >> +impl Default for RemoteResult { >> + fn default() -> Self { >> + Self { >> + node_results: Default::default(), >> + } >> + } >> +} >> + >> +#[derive(Debug)] >> +pub struct NodeResults { >> + pub data: T, >> + pub api_response_time: Duration, >> +} >> + >> +impl ParallelFetcher { >> + pub fn new(context: C) -> Self { >> + Self { >> + max_connections: 20, >> + max_connections_per_remote: 5, > > you probably thought of it already, but we probably want to > make this settable from outside in a seperate constructor, or at least > put the values in const values for now I've extracted these into constants for v2, but I think for the future a builder pattern could be nice for this one. Stefan and I are playing around with some ideas on how to make the whole thing a bit nicer to use in the call sites, but that's something for a future series; since we both need this helper for our code we've settled for something basic first, which can then be improved/refactored later. > >> + context, >> + } >> + } >> + >> + pub async fn do_for_all_remote_nodes(self, remotes: A, func: F) -> FetchResults >> + where >> + A: Iterator, >> + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, >> + Ft: Future> + Send + 'static, >> + T: Send + Debug + 'static, >> + { >> + let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections)); >> + >> + let mut remote_join_set = JoinSet::new(); >> + >> + for remote in remotes { >> + let semaphore = Arc::clone(&total_connections_semaphore); >> + >> + let f = func.clone(); >> + >> + remote_join_set.spawn(Self::fetch_remote( >> + remote, >> + self.context.clone(), >> + semaphore, >> + f, >> + self.max_connections_per_remote, >> + )); > > not sure if it would work properly, but couldn't we put the semaphores > into self, and clone that instead? > > then `fetch_remote` could become a normal method on self > and we could implement a helper for getting permits and > we wouldn't have to pass the semaphores around all the time. > > (maybe that does not work as i think though..) Good idea actually, but I'd also revisit that a bit later; I'd prefer to get this applied as soon as possible, so that Stefan and I can build on it. > >> + } >> + >> + let mut results = FetchResults::default(); >> + >> + while let Some(a) = remote_join_set.join_next().await { >> + match a { >> + Ok((remote_name, remote_result)) => { >> + results.remote_results.insert(remote_name, remote_result); >> + } >> + Err(err) => { >> + log::error!("join error when waiting for future: {err}") >> + } >> + } >> + } >> + >> + results >> + } >> + >> + async fn fetch_remote( >> + remote: Remote, >> + context: C, >> + semaphore: Arc, >> + func: F, >> + max_connections_per_remote: usize, >> + ) -> (String, Result, Error>) >> + where >> + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, >> + Ft: Future> + Send + 'static, >> + T: Send + Debug + 'static, >> + { >> + let mut per_remote_results = RemoteResult::default(); >> + >> + let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap()); >> + let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote)); >> + >> + match remote.ty { >> + RemoteType::Pve => { >> + let remote_clone = remote.clone(); >> + >> + let nodes = match async move { >> + let client = connection::make_pve_client(&remote_clone)?; >> + let nodes = client.list_nodes().await?; >> + >> + Ok::, Error>(nodes) >> + } >> + .await >> + { >> + Ok(nodes) => nodes, >> + Err(err) => return (remote.id.clone(), Err(err)), >> + }; >> + >> + let mut nodes_join_set = JoinSet::new(); >> + >> + for node in nodes { >> + let permit = if let Some(permit) = permit.take() { >> + permit >> + } else { >> + Arc::clone(&semaphore).acquire_owned().await.unwrap() >> + }; >> + >> + let per_remote_connections_permit = Arc::clone(&per_remote_semaphore) >> + .acquire_owned() >> + .await >> + .unwrap(); >> + >> + let func_clone = func.clone(); >> + let remote_clone = remote.clone(); >> + let node_name = node.node.clone(); >> + let context_clone = context.clone(); >> + >> + nodes_join_set.spawn(Self::fetch_pve_node( >> + func_clone, >> + context_clone, >> + remote_clone, >> + node_name, >> + permit, >> + per_remote_connections_permit, >> + )); >> + } >> + >> + while let Some(join_result) = nodes_join_set.join_next().await { >> + match join_result { >> + Ok((node_name, per_node_result)) => { >> + per_remote_results >> + .node_results >> + .insert(node_name, per_node_result); >> + } >> + Err(e) => { >> + log::error!("join error when waiting for future: {e}") >> + } >> + } >> + } >> + } >> + RemoteType::Pbs => { >> + let node = "localhost".to_string(); >> + >> + let now = Instant::now(); >> + let result = func(context, remote.clone(), node.clone()).await; >> + let api_response_time = now.elapsed(); >> + >> + match result { >> + Ok(data) => { >> + per_remote_results.node_results.insert( >> + node, >> + Ok(NodeResults { >> + data, >> + api_response_time, >> + }), >> + ); >> + } >> + Err(err) => { >> + per_remote_results.node_results.insert(node, Err(err)); >> + } >> + } >> + } >> + } >> + >> + (remote.id, Ok(per_remote_results)) >> + } > > probably a `fn do_for_all_remotes` that only does one thing per remote > would also be nice? (i think stefan said he'll need something like this) Will be added for v2, I adopted (and slightly modified, after some discussion) Stefan's patches for that >> + >> + async fn fetch_pve_node( >> + func: F, >> + context: C, >> + remote: Remote, >> + node: String, >> + _permit: OwnedSemaphorePermit, >> + _per_remote_connections_permit: OwnedSemaphorePermit, >> + ) -> (String, Result, Error>) >> + where >> + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, >> + Ft: Future> + Send + 'static, >> + T: Send + Debug + 'static, >> + { >> + let now = Instant::now(); >> + let result = func(context, remote.clone(), node.clone()).await; >> + let api_response_time = now.elapsed(); >> + >> + match result { >> + Ok(data) => ( >> + node, >> + Ok(NodeResults { >> + data, >> + api_response_time, >> + }), >> + ), >> + Err(err) => (node, Err(err)), >> + } >> + } >> +} _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel