From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id C068F1FF186 for ; Fri, 29 Aug 2025 11:16:11 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 6918D28DC1; Fri, 29 Aug 2025 11:16:21 +0200 (CEST) Message-ID: Date: Fri, 29 Aug 2025 11:15:46 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Beta To: Proxmox Datacenter Manager development discussion , Lukas Wagner References: <20250828144247.298536-1-l.wagner@proxmox.com> <20250828144247.298536-3-l.wagner@proxmox.com> Content-Language: en-US From: Dominik Csapak In-Reply-To: <20250828144247.298536-3-l.wagner@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1756458939254 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.021 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" some minor comments inline, rest LGTM On 8/28/25 4:42 PM, Lukas Wagner wrote: > This allows us to simplify the fetching logic quite a bit. > > Signed-off-by: Lukas Wagner > --- > .../tasks/remote_tasks.rs | 239 ++++++------------ > 1 file changed, 77 insertions(+), 162 deletions(-) > > diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > index 04c51dac..967e633c 100644 > --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > @@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource}; > > use server::{ > api::pve, > + parallel_fetcher::{NodeResults, ParallelFetcher}, > remote_tasks::{ > self, > task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem}, > @@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> { > get_remotes_with_finished_tasks(&remote_config, &poll_results) > }; > > - let (all_tasks, update_state_for_remote) = fetch_remotes( > - remotes, > - Arc::new(cache_state), > - Arc::clone(&total_connections_semaphore), > - ) > - .await; > + let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await; > > if !all_tasks.is_empty() { > update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?; > @@ -221,42 +217,90 @@ async fn init_cache() -> Result<(), Error> { > async fn fetch_remotes( > remotes: Vec, > cache_state: Arc, > - total_connections_semaphore: Arc, > ) -> (Vec, NodeFetchSuccessMap) { > - let mut join_set = JoinSet::new(); > + let fetcher = ParallelFetcher { > + max_connections: MAX_CONNECTIONS, > + max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE, > + context: cache_state, > + }; > > - for remote in remotes { > - let semaphore = Arc::clone(&total_connections_semaphore); > - let state_clone = Arc::clone(&cache_state); > - > - join_set.spawn(async move { > - log::debug!("fetching remote tasks for '{}'", remote.id); > - fetch_tasks(&remote, state_clone, semaphore) > - .await > - .map_err(|err| { > - format_err!("could not fetch tasks from remote '{}': {err}", remote.id) > - }) > - }); > - } > + let fetch_results = fetcher > + .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node) > + .await; > > let mut all_tasks = Vec::new(); > - let mut update_state_for_remote = NodeFetchSuccessMap::default(); > + let mut node_success_map = NodeFetchSuccessMap::default(); > > - while let Some(res) = join_set.join_next().await { > - match res { > - Ok(Ok(FetchedTasks { > - tasks, > - node_results, > - })) => { > - all_tasks.extend(tasks); > - update_state_for_remote.merge(node_results); > + for (remote_name, result) in fetch_results.remote_results { > + match result { > + Ok(remote_result) => { > + for (node_name, node_result) in remote_result.node_results { > + match node_result { > + Ok(NodeResults { data, .. }) => { > + all_tasks.extend(data); > + node_success_map.set_node_success(remote_name.clone(), node_name); > + } > + Err(err) => { > + log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}"); > + } > + } > + } > + } > + Err(err) => { > + log::error!("could not fetch tasks from remote '{remote_name}': {err:#}"); > } > - Ok(Err(err)) => log::error!("{err:#}"), > - Err(err) => log::error!("could not join task fetching future: {err:#}"), > } > } > > - (all_tasks, update_state_for_remote) > + (all_tasks, node_success_map) > +} > + > +async fn fetch_tasks_from_single_node( > + context: Arc, > + remote: Remote, > + node: String, > +) -> Result, Error> { > + match remote.ty { > + RemoteType::Pve => { > + let since = context > + .cutoff_timestamp(&remote.id, &node) > + .unwrap_or_else(|| { > + proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 > + }); > + > + let params = ListTasks { > + source: Some(ListTasksSource::Archive), > + since: Some(since), > + // If `limit` is not provided, we only receive 50 tasks > + limit: Some(MAX_TASKS_TO_FETCH), > + ..Default::default() > + }; > + > + let remote_clone = remote.clone(); > + let client = pve::connect(&remote_clone)?; this clone can simply be omitted by using remote directly > + let task_list = client.get_task_list(&node, params).await.map_err(|err| { > + format_err!("remote '{}', node '{}': {err}", remote_clone.id, node) > + })?; > + > + let task_list = task_list > + .into_iter() > + .map(|task| map_pve_task(task, &remote.id)) > + .filter_map(|task| match task { > + Ok(task) => Some(task), > + Err(err) => { > + log::error!("could not map PVE task: {err:#}"); > + None > + } > + }) > + .collect(); two things here: you could do just one filter_map calling map_pve_task inside and you can simply append `into_iter().filter_map(..)` on the original get_task_list call. no need for the extra `let task_list = task_list....` > + > + Ok(task_list) > + } > + RemoteType::Pbs => { > + // TODO: Support PBS. > + Ok(vec![]) > + } > + } > } > > /// Return all remotes from the given config. > @@ -301,135 +345,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> { > tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await? > } > > -/// Fetched tasks from a single remote. > -struct FetchedTasks { > - /// List of tasks. > - tasks: Vec, > - /// Contains whether a cluster node was fetched successfully. > - node_results: NodeFetchSuccessMap, > -} > - > -/// Fetch tasks (active and finished) from a remote. > -async fn fetch_tasks( > - remote: &Remote, > - state: Arc, > - total_connections_semaphore: Arc, > -) -> Result { > - let mut tasks = Vec::new(); > - > - let mut node_results = NodeFetchSuccessMap::default(); > - > - match remote.ty { > - RemoteType::Pve => { > - let client = pve::connect(remote)?; > - > - let nodes = { > - // This permit *must* be dropped before we acquire the permits for the > - // per-node connections - otherwise we risk a deadlock. > - let _permit = total_connections_semaphore.acquire().await.unwrap(); > - client.list_nodes().await? > - }; > - > - // This second semaphore is used to limit the number of concurrent connections > - // *per remote*, not in total. > - let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE)); > - let mut join_set = JoinSet::new(); > - > - for node in nodes { > - let node_name = node.node.to_string(); > - > - let since = state > - .cutoff_timestamp(&remote.id, &node_name) > - .unwrap_or_else(|| { > - proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 > - }); > - > - let params = ListTasks { > - source: Some(ListTasksSource::Archive), > - since: Some(since), > - // If `limit` is not provided, we only receive 50 tasks > - limit: Some(MAX_TASKS_TO_FETCH), > - ..Default::default() > - }; > - > - let per_remote_permit = Arc::clone(&per_remote_semaphore) > - .acquire_owned() > - .await > - .unwrap(); > - > - let total_connections_permit = Arc::clone(&total_connections_semaphore) > - .acquire_owned() > - .await > - .unwrap(); > - > - let remote_clone = remote.clone(); > - > - join_set.spawn(async move { > - let res = async { > - let client = pve::connect(&remote_clone)?; > - let task_list = > - client > - .get_task_list(&node.node, params) > - .await > - .map_err(|err| { > - format_err!( > - "remote '{}', node '{}': {err}", > - remote_clone.id, > - node.node > - ) > - })?; > - Ok::, Error>(task_list) > - } > - .await; > - > - drop(total_connections_permit); > - drop(per_remote_permit); > - > - (node_name, res) > - }); > - } > - > - while let Some(result) = join_set.join_next().await { > - match result { > - Ok((node_name, result)) => match result { > - Ok(task_list) => { > - let mapped = > - task_list.into_iter().filter_map(|task| { > - match map_pve_task(task, &remote.id) { > - Ok(task) => Some(task), > - Err(err) => { > - log::error!( > - "could not map task data, skipping: {err:#}" > - ); > - None > - } > - } > - }); > - > - tasks.extend(mapped); > - node_results.set_node_success(remote.id.clone(), node_name); > - } > - Err(error) => { > - log::error!("could not fetch tasks: {error:#}"); > - } > - }, > - Err(error) => { > - log::error!("could not join task fetching task: {error:#}"); > - } > - } > - } > - } > - RemoteType::Pbs => { > - // TODO: Add code for PBS > - } > - } > - > - Ok(FetchedTasks { > - tasks, > - node_results, > - }) > -} > - > #[derive(PartialEq, Debug)] > /// Outcome from polling a tracked task. > enum PollResult { _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel