From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 289C11FF165 for ; Thu, 28 Aug 2025 16:42:49 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0488612FA5; Thu, 28 Aug 2025 16:42:57 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Date: Thu, 28 Aug 2025 16:42:47 +0200 Message-ID: <20250828144247.298536-3-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.2 In-Reply-To: <20250828144247.298536-1-l.wagner@proxmox.com> References: <20250828144247.298536-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1756392165306 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.027 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" This allows us to simplify the fetching logic quite a bit. Signed-off-by: Lukas Wagner --- .../tasks/remote_tasks.rs | 239 ++++++------------ 1 file changed, 77 insertions(+), 162 deletions(-) diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs index 04c51dac..967e633c 100644 --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs @@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource}; use server::{ api::pve, + parallel_fetcher::{NodeResults, ParallelFetcher}, remote_tasks::{ self, task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem}, @@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> { get_remotes_with_finished_tasks(&remote_config, &poll_results) }; - let (all_tasks, update_state_for_remote) = fetch_remotes( - remotes, - Arc::new(cache_state), - Arc::clone(&total_connections_semaphore), - ) - .await; + let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await; if !all_tasks.is_empty() { update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?; @@ -221,42 +217,90 @@ async fn init_cache() -> Result<(), Error> { async fn fetch_remotes( remotes: Vec, cache_state: Arc, - total_connections_semaphore: Arc, ) -> (Vec, NodeFetchSuccessMap) { - let mut join_set = JoinSet::new(); + let fetcher = ParallelFetcher { + max_connections: MAX_CONNECTIONS, + max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE, + context: cache_state, + }; - for remote in remotes { - let semaphore = Arc::clone(&total_connections_semaphore); - let state_clone = Arc::clone(&cache_state); - - join_set.spawn(async move { - log::debug!("fetching remote tasks for '{}'", remote.id); - fetch_tasks(&remote, state_clone, semaphore) - .await - .map_err(|err| { - format_err!("could not fetch tasks from remote '{}': {err}", remote.id) - }) - }); - } + let fetch_results = fetcher + .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node) + .await; let mut all_tasks = Vec::new(); - let mut update_state_for_remote = NodeFetchSuccessMap::default(); + let mut node_success_map = NodeFetchSuccessMap::default(); - while let Some(res) = join_set.join_next().await { - match res { - Ok(Ok(FetchedTasks { - tasks, - node_results, - })) => { - all_tasks.extend(tasks); - update_state_for_remote.merge(node_results); + for (remote_name, result) in fetch_results.remote_results { + match result { + Ok(remote_result) => { + for (node_name, node_result) in remote_result.node_results { + match node_result { + Ok(NodeResults { data, .. }) => { + all_tasks.extend(data); + node_success_map.set_node_success(remote_name.clone(), node_name); + } + Err(err) => { + log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}"); + } + } + } + } + Err(err) => { + log::error!("could not fetch tasks from remote '{remote_name}': {err:#}"); } - Ok(Err(err)) => log::error!("{err:#}"), - Err(err) => log::error!("could not join task fetching future: {err:#}"), } } - (all_tasks, update_state_for_remote) + (all_tasks, node_success_map) +} + +async fn fetch_tasks_from_single_node( + context: Arc, + remote: Remote, + node: String, +) -> Result, Error> { + match remote.ty { + RemoteType::Pve => { + let since = context + .cutoff_timestamp(&remote.id, &node) + .unwrap_or_else(|| { + proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 + }); + + let params = ListTasks { + source: Some(ListTasksSource::Archive), + since: Some(since), + // If `limit` is not provided, we only receive 50 tasks + limit: Some(MAX_TASKS_TO_FETCH), + ..Default::default() + }; + + let remote_clone = remote.clone(); + let client = pve::connect(&remote_clone)?; + let task_list = client.get_task_list(&node, params).await.map_err(|err| { + format_err!("remote '{}', node '{}': {err}", remote_clone.id, node) + })?; + + let task_list = task_list + .into_iter() + .map(|task| map_pve_task(task, &remote.id)) + .filter_map(|task| match task { + Ok(task) => Some(task), + Err(err) => { + log::error!("could not map PVE task: {err:#}"); + None + } + }) + .collect(); + + Ok(task_list) + } + RemoteType::Pbs => { + // TODO: Support PBS. + Ok(vec![]) + } + } } /// Return all remotes from the given config. @@ -301,135 +345,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> { tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await? } -/// Fetched tasks from a single remote. -struct FetchedTasks { - /// List of tasks. - tasks: Vec, - /// Contains whether a cluster node was fetched successfully. - node_results: NodeFetchSuccessMap, -} - -/// Fetch tasks (active and finished) from a remote. -async fn fetch_tasks( - remote: &Remote, - state: Arc, - total_connections_semaphore: Arc, -) -> Result { - let mut tasks = Vec::new(); - - let mut node_results = NodeFetchSuccessMap::default(); - - match remote.ty { - RemoteType::Pve => { - let client = pve::connect(remote)?; - - let nodes = { - // This permit *must* be dropped before we acquire the permits for the - // per-node connections - otherwise we risk a deadlock. - let _permit = total_connections_semaphore.acquire().await.unwrap(); - client.list_nodes().await? - }; - - // This second semaphore is used to limit the number of concurrent connections - // *per remote*, not in total. - let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE)); - let mut join_set = JoinSet::new(); - - for node in nodes { - let node_name = node.node.to_string(); - - let since = state - .cutoff_timestamp(&remote.id, &node_name) - .unwrap_or_else(|| { - proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 - }); - - let params = ListTasks { - source: Some(ListTasksSource::Archive), - since: Some(since), - // If `limit` is not provided, we only receive 50 tasks - limit: Some(MAX_TASKS_TO_FETCH), - ..Default::default() - }; - - let per_remote_permit = Arc::clone(&per_remote_semaphore) - .acquire_owned() - .await - .unwrap(); - - let total_connections_permit = Arc::clone(&total_connections_semaphore) - .acquire_owned() - .await - .unwrap(); - - let remote_clone = remote.clone(); - - join_set.spawn(async move { - let res = async { - let client = pve::connect(&remote_clone)?; - let task_list = - client - .get_task_list(&node.node, params) - .await - .map_err(|err| { - format_err!( - "remote '{}', node '{}': {err}", - remote_clone.id, - node.node - ) - })?; - Ok::, Error>(task_list) - } - .await; - - drop(total_connections_permit); - drop(per_remote_permit); - - (node_name, res) - }); - } - - while let Some(result) = join_set.join_next().await { - match result { - Ok((node_name, result)) => match result { - Ok(task_list) => { - let mapped = - task_list.into_iter().filter_map(|task| { - match map_pve_task(task, &remote.id) { - Ok(task) => Some(task), - Err(err) => { - log::error!( - "could not map task data, skipping: {err:#}" - ); - None - } - } - }); - - tasks.extend(mapped); - node_results.set_node_success(remote.id.clone(), node_name); - } - Err(error) => { - log::error!("could not fetch tasks: {error:#}"); - } - }, - Err(error) => { - log::error!("could not join task fetching task: {error:#}"); - } - } - } - } - RemoteType::Pbs => { - // TODO: Add code for PBS - } - } - - Ok(FetchedTasks { - tasks, - node_results, - }) -} - #[derive(PartialEq, Debug)] /// Outcome from polling a tracked task. enum PollResult { -- 2.47.2 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel