From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id DBDA71FF186 for ; Fri, 29 Aug 2025 11:19:46 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id BA46728DFD; Fri, 29 Aug 2025 11:19:56 +0200 (CEST) Mime-Version: 1.0 Date: Fri, 29 Aug 2025 11:19:21 +0200 Message-Id: From: "Lukas Wagner" To: "Dominik Csapak" , "Proxmox Datacenter Manager development discussion" X-Mailer: aerc 0.20.1-0-g2ecb8770224a References: <20250828144247.298536-1-l.wagner@proxmox.com> <20250828144247.298536-3-l.wagner@proxmox.com> In-Reply-To: X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1756459153957 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.027 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" On Fri Aug 29, 2025 at 11:15 AM CEST, Dominik Csapak wrote: > some minor comments inline, rest LGTM > > On 8/28/25 4:42 PM, Lukas Wagner wrote: >> This allows us to simplify the fetching logic quite a bit. >> >> Signed-off-by: Lukas Wagner >> --- >> .../tasks/remote_tasks.rs | 239 ++++++------------ >> 1 file changed, 77 insertions(+), 162 deletions(-) >> >> diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs >> index 04c51dac..967e633c 100644 >> --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs >> +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs >> @@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource}; >> >> use server::{ >> api::pve, >> + parallel_fetcher::{NodeResults, ParallelFetcher}, >> remote_tasks::{ >> self, >> task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem}, >> @@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> { >> get_remotes_with_finished_tasks(&remote_config, &poll_results) >> }; >> >> - let (all_tasks, update_state_for_remote) = fetch_remotes( >> - remotes, >> - Arc::new(cache_state), >> - Arc::clone(&total_connections_semaphore), >> - ) >> - .await; >> + let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await; >> >> if !all_tasks.is_empty() { >> update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?; >> @@ -221,42 +217,90 @@ async fn init_cache() -> Result<(), Error> { >> async fn fetch_remotes( >> remotes: Vec, >> cache_state: Arc, >> - total_connections_semaphore: Arc, >> ) -> (Vec, NodeFetchSuccessMap) { >> - let mut join_set = JoinSet::new(); >> + let fetcher = ParallelFetcher { >> + max_connections: MAX_CONNECTIONS, >> + max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE, >> + context: cache_state, >> + }; >> >> - for remote in remotes { >> - let semaphore = Arc::clone(&total_connections_semaphore); >> - let state_clone = Arc::clone(&cache_state); >> - >> - join_set.spawn(async move { >> - log::debug!("fetching remote tasks for '{}'", remote.id); >> - fetch_tasks(&remote, state_clone, semaphore) >> - .await >> - .map_err(|err| { >> - format_err!("could not fetch tasks from remote '{}': {err}", remote.id) >> - }) >> - }); >> - } >> + let fetch_results = fetcher >> + .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node) >> + .await; >> >> let mut all_tasks = Vec::new(); >> - let mut update_state_for_remote = NodeFetchSuccessMap::default(); >> + let mut node_success_map = NodeFetchSuccessMap::default(); >> >> - while let Some(res) = join_set.join_next().await { >> - match res { >> - Ok(Ok(FetchedTasks { >> - tasks, >> - node_results, >> - })) => { >> - all_tasks.extend(tasks); >> - update_state_for_remote.merge(node_results); >> + for (remote_name, result) in fetch_results.remote_results { >> + match result { >> + Ok(remote_result) => { >> + for (node_name, node_result) in remote_result.node_results { >> + match node_result { >> + Ok(NodeResults { data, .. }) => { >> + all_tasks.extend(data); >> + node_success_map.set_node_success(remote_name.clone(), node_name); >> + } >> + Err(err) => { >> + log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}"); >> + } >> + } >> + } >> + } >> + Err(err) => { >> + log::error!("could not fetch tasks from remote '{remote_name}': {err:#}"); >> } >> - Ok(Err(err)) => log::error!("{err:#}"), >> - Err(err) => log::error!("could not join task fetching future: {err:#}"), >> } >> } >> >> - (all_tasks, update_state_for_remote) >> + (all_tasks, node_success_map) >> +} >> + >> +async fn fetch_tasks_from_single_node( >> + context: Arc, >> + remote: Remote, >> + node: String, >> +) -> Result, Error> { >> + match remote.ty { >> + RemoteType::Pve => { >> + let since = context >> + .cutoff_timestamp(&remote.id, &node) >> + .unwrap_or_else(|| { >> + proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 >> + }); >> + >> + let params = ListTasks { >> + source: Some(ListTasksSource::Archive), >> + since: Some(since), >> + // If `limit` is not provided, we only receive 50 tasks >> + limit: Some(MAX_TASKS_TO_FETCH), >> + ..Default::default() >> + }; >> + >> + let remote_clone = remote.clone(); >> + let client = pve::connect(&remote_clone)?; > > this clone can simply be omitted by using remote directly > True, thanks! >> + let task_list = client.get_task_list(&node, params).await.map_err(|err| { >> + format_err!("remote '{}', node '{}': {err}", remote_clone.id, node) >> + })?; >> + >> + let task_list = task_list >> + .into_iter() >> + .map(|task| map_pve_task(task, &remote.id)) >> + .filter_map(|task| match task { >> + Ok(task) => Some(task), >> + Err(err) => { >> + log::error!("could not map PVE task: {err:#}"); >> + None >> + } >> + }) >> + .collect(); > > two things here: > > you could do just one filter_map calling map_pve_task inside > > and you can simply append `into_iter().filter_map(..)` on the original > get_task_list call. no need for the extra `let task_list = task_list....` > Good point! Will make the changes you requested. >> + >> + Ok(task_list) >> + } >> + RemoteType::Pbs => { >> + // TODO: Support PBS. >> + Ok(vec![]) >> + } >> + } >> } >> >> /// Return all remotes from the given config. >> @@ -301,135 +345,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> { >> tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await? >> } >> >> -/// Fetched tasks from a single remote. >> -struct FetchedTasks { >> - /// List of tasks. >> - tasks: Vec, >> - /// Contains whether a cluster node was fetched successfully. >> - node_results: NodeFetchSuccessMap, >> -} >> - >> -/// Fetch tasks (active and finished) from a remote. >> -async fn fetch_tasks( >> - remote: &Remote, >> - state: Arc, >> - total_connections_semaphore: Arc, >> -) -> Result { >> - let mut tasks = Vec::new(); >> - >> - let mut node_results = NodeFetchSuccessMap::default(); >> - >> - match remote.ty { >> - RemoteType::Pve => { >> - let client = pve::connect(remote)?; >> - >> - let nodes = { >> - // This permit *must* be dropped before we acquire the permits for the >> - // per-node connections - otherwise we risk a deadlock. >> - let _permit = total_connections_semaphore.acquire().await.unwrap(); >> - client.list_nodes().await? >> - }; >> - >> - // This second semaphore is used to limit the number of concurrent connections >> - // *per remote*, not in total. >> - let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE)); >> - let mut join_set = JoinSet::new(); >> - >> - for node in nodes { >> - let node_name = node.node.to_string(); >> - >> - let since = state >> - .cutoff_timestamp(&remote.id, &node_name) >> - .unwrap_or_else(|| { >> - proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 >> - }); >> - >> - let params = ListTasks { >> - source: Some(ListTasksSource::Archive), >> - since: Some(since), >> - // If `limit` is not provided, we only receive 50 tasks >> - limit: Some(MAX_TASKS_TO_FETCH), >> - ..Default::default() >> - }; >> - >> - let per_remote_permit = Arc::clone(&per_remote_semaphore) >> - .acquire_owned() >> - .await >> - .unwrap(); >> - >> - let total_connections_permit = Arc::clone(&total_connections_semaphore) >> - .acquire_owned() >> - .await >> - .unwrap(); >> - >> - let remote_clone = remote.clone(); >> - >> - join_set.spawn(async move { >> - let res = async { >> - let client = pve::connect(&remote_clone)?; >> - let task_list = >> - client >> - .get_task_list(&node.node, params) >> - .await >> - .map_err(|err| { >> - format_err!( >> - "remote '{}', node '{}': {err}", >> - remote_clone.id, >> - node.node >> - ) >> - })?; >> - Ok::, Error>(task_list) >> - } >> - .await; >> - >> - drop(total_connections_permit); >> - drop(per_remote_permit); >> - >> - (node_name, res) >> - }); >> - } >> - >> - while let Some(result) = join_set.join_next().await { >> - match result { >> - Ok((node_name, result)) => match result { >> - Ok(task_list) => { >> - let mapped = >> - task_list.into_iter().filter_map(|task| { >> - match map_pve_task(task, &remote.id) { >> - Ok(task) => Some(task), >> - Err(err) => { >> - log::error!( >> - "could not map task data, skipping: {err:#}" >> - ); >> - None >> - } >> - } >> - }); >> - >> - tasks.extend(mapped); >> - node_results.set_node_success(remote.id.clone(), node_name); >> - } >> - Err(error) => { >> - log::error!("could not fetch tasks: {error:#}"); >> - } >> - }, >> - Err(error) => { >> - log::error!("could not join task fetching task: {error:#}"); >> - } >> - } >> - } >> - } >> - RemoteType::Pbs => { >> - // TODO: Add code for PBS >> - } >> - } >> - >> - Ok(FetchedTasks { >> - tasks, >> - node_results, >> - }) >> -} >> - >> #[derive(PartialEq, Debug)] >> /// Outcome from polling a tracked task. >> enum PollResult { _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel