From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id CFD171FF186 for ; Fri, 29 Aug 2025 10:58:23 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id DDFA82859F; Fri, 29 Aug 2025 10:58:33 +0200 (CEST) Message-ID: <5fca5c4c-cdbb-4cbf-b3a0-475f2d7f1f44@proxmox.com> Date: Fri, 29 Aug 2025 10:57:49 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Beta To: Proxmox Datacenter Manager development discussion , Lukas Wagner References: <20250828144247.298536-1-l.wagner@proxmox.com> <20250828144247.298536-2-l.wagner@proxmox.com> Content-Language: en-US From: Dominik Csapak In-Reply-To: <20250828144247.298536-2-l.wagner@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1756457869357 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.021 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs] Subject: Re: [pdm-devel] [RFC proxmox-datacenter-manager 1/2] server: add convenience helper to fetch results from many remote nodes in parallel X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" some minor comments inline, but looks good in general to me On 8/28/25 4:42 PM, Lukas Wagner wrote: > Implementing this over and over again is a lot of work; many parts of > PDM need to fetch some kind of resource from all remotes and all nodes > of these remotes. > > The general approach is the same as for remote task fetching (JoinSet > and semaphores to control concurrency), it will be refactored in a later > commit to use this new helper. > > Signed-off-by: Lukas Wagner > --- > server/src/lib.rs | 1 + > server/src/parallel_fetcher.rs | 236 +++++++++++++++++++++++++++++++++ > 2 files changed, 237 insertions(+) > create mode 100644 server/src/parallel_fetcher.rs > > diff --git a/server/src/lib.rs b/server/src/lib.rs > index 485bc792..3f8b7708 100644 > --- a/server/src/lib.rs > +++ b/server/src/lib.rs > @@ -6,6 +6,7 @@ pub mod auth; > pub mod context; > pub mod env; > pub mod metric_collection; > +pub mod parallel_fetcher; > pub mod remote_cache; > pub mod remote_tasks; > pub mod resource_cache; > diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs > new file mode 100644 > index 00000000..3fad68a0 > --- /dev/null > +++ b/server/src/parallel_fetcher.rs > @@ -0,0 +1,236 @@ > +use std::{ > + collections::HashMap, > + fmt::Debug, > + future::Future, > + sync::Arc, > + time::{Duration, Instant}, > +}; > + > +use anyhow::Error; > +use pdm_api_types::remotes::{Remote, RemoteType}; > +use pve_api_types::ClusterNodeIndexResponse; > +use tokio::{ > + sync::{OwnedSemaphorePermit, Semaphore}, > + task::JoinSet, > +}; > + > +use crate::connection; > + > +pub struct ParallelFetcher { > + pub max_connections: usize, > + pub max_connections_per_remote: usize, > + pub context: C, > +} > + > +pub struct FetchResults { > + pub remote_results: HashMap, Error>>, > +} > + > +impl Default for FetchResults { > + fn default() -> Self { > + Self { > + remote_results: Default::default(), > + } > + } > +} > + > +#[derive(Debug)] > +pub struct RemoteResult { > + pub node_results: HashMap, Error>>, > +} > + > +impl Default for RemoteResult { > + fn default() -> Self { > + Self { > + node_results: Default::default(), > + } > + } > +} > + > +#[derive(Debug)] > +pub struct NodeResults { > + pub data: T, > + pub api_response_time: Duration, > +} > + > +impl ParallelFetcher { > + pub fn new(context: C) -> Self { > + Self { > + max_connections: 20, > + max_connections_per_remote: 5, you probably thought of it already, but we probably want to make this settable from outside in a seperate constructor, or at least put the values in const values for now > + context, > + } > + } > + > + pub async fn do_for_all_remote_nodes(self, remotes: A, func: F) -> FetchResults > + where > + A: Iterator, > + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, > + Ft: Future> + Send + 'static, > + T: Send + Debug + 'static, > + { > + let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections)); > + > + let mut remote_join_set = JoinSet::new(); > + > + for remote in remotes { > + let semaphore = Arc::clone(&total_connections_semaphore); > + > + let f = func.clone(); > + > + remote_join_set.spawn(Self::fetch_remote( > + remote, > + self.context.clone(), > + semaphore, > + f, > + self.max_connections_per_remote, > + )); not sure if it would work properly, but couldn't we put the semaphores into self, and clone that instead? then `fetch_remote` could become a normal method on self and we could implement a helper for getting permits and we wouldn't have to pass the semaphores around all the time. (maybe that does not work as i think though..) > + } > + > + let mut results = FetchResults::default(); > + > + while let Some(a) = remote_join_set.join_next().await { > + match a { > + Ok((remote_name, remote_result)) => { > + results.remote_results.insert(remote_name, remote_result); > + } > + Err(err) => { > + log::error!("join error when waiting for future: {err}") > + } > + } > + } > + > + results > + } > + > + async fn fetch_remote( > + remote: Remote, > + context: C, > + semaphore: Arc, > + func: F, > + max_connections_per_remote: usize, > + ) -> (String, Result, Error>) > + where > + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, > + Ft: Future> + Send + 'static, > + T: Send + Debug + 'static, > + { > + let mut per_remote_results = RemoteResult::default(); > + > + let mut permit = Some(Arc::clone(&semaphore).acquire_owned().await.unwrap()); > + let per_remote_semaphore = Arc::new(Semaphore::new(max_connections_per_remote)); > + > + match remote.ty { > + RemoteType::Pve => { > + let remote_clone = remote.clone(); > + > + let nodes = match async move { > + let client = connection::make_pve_client(&remote_clone)?; > + let nodes = client.list_nodes().await?; > + > + Ok::, Error>(nodes) > + } > + .await > + { > + Ok(nodes) => nodes, > + Err(err) => return (remote.id.clone(), Err(err)), > + }; > + > + let mut nodes_join_set = JoinSet::new(); > + > + for node in nodes { > + let permit = if let Some(permit) = permit.take() { > + permit > + } else { > + Arc::clone(&semaphore).acquire_owned().await.unwrap() > + }; > + > + let per_remote_connections_permit = Arc::clone(&per_remote_semaphore) > + .acquire_owned() > + .await > + .unwrap(); > + > + let func_clone = func.clone(); > + let remote_clone = remote.clone(); > + let node_name = node.node.clone(); > + let context_clone = context.clone(); > + > + nodes_join_set.spawn(Self::fetch_pve_node( > + func_clone, > + context_clone, > + remote_clone, > + node_name, > + permit, > + per_remote_connections_permit, > + )); > + } > + > + while let Some(join_result) = nodes_join_set.join_next().await { > + match join_result { > + Ok((node_name, per_node_result)) => { > + per_remote_results > + .node_results > + .insert(node_name, per_node_result); > + } > + Err(e) => { > + log::error!("join error when waiting for future: {e}") > + } > + } > + } > + } > + RemoteType::Pbs => { > + let node = "localhost".to_string(); > + > + let now = Instant::now(); > + let result = func(context, remote.clone(), node.clone()).await; > + let api_response_time = now.elapsed(); > + > + match result { > + Ok(data) => { > + per_remote_results.node_results.insert( > + node, > + Ok(NodeResults { > + data, > + api_response_time, > + }), > + ); > + } > + Err(err) => { > + per_remote_results.node_results.insert(node, Err(err)); > + } > + } > + } > + } > + > + (remote.id, Ok(per_remote_results)) > + } probably a `fn do_for_all_remotes` that only does one thing per remote would also be nice? (i think stefan said he'll need something like this) > + > + async fn fetch_pve_node( > + func: F, > + context: C, > + remote: Remote, > + node: String, > + _permit: OwnedSemaphorePermit, > + _per_remote_connections_permit: OwnedSemaphorePermit, > + ) -> (String, Result, Error>) > + where > + F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static, > + Ft: Future> + Send + 'static, > + T: Send + Debug + 'static, > + { > + let now = Instant::now(); > + let result = func(context, remote.clone(), node.clone()).await; > + let api_response_time = now.elapsed(); > + > + match result { > + Ok(data) => ( > + node, > + Ok(NodeResults { > + data, > + api_response_time, > + }), > + ), > + Err(err) => (node, Err(err)), > + } > + } > +} _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel