all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper
Date: Thu, 28 Aug 2025 16:42:47 +0200	[thread overview]
Message-ID: <20250828144247.298536-3-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250828144247.298536-1-l.wagner@proxmox.com>

This allows us to simplify the fetching logic quite a bit.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 .../tasks/remote_tasks.rs                     | 239 ++++++------------
 1 file changed, 77 insertions(+), 162 deletions(-)

diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 04c51dac..967e633c 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
 
 use server::{
     api::pve,
+    parallel_fetcher::{NodeResults, ParallelFetcher},
     remote_tasks::{
         self,
         task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
@@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
         get_remotes_with_finished_tasks(&remote_config, &poll_results)
     };
 
-    let (all_tasks, update_state_for_remote) = fetch_remotes(
-        remotes,
-        Arc::new(cache_state),
-        Arc::clone(&total_connections_semaphore),
-    )
-    .await;
+    let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
 
     if !all_tasks.is_empty() {
         update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
@@ -221,42 +217,90 @@ async fn init_cache() -> Result<(), Error> {
 async fn fetch_remotes(
     remotes: Vec<Remote>,
     cache_state: Arc<State>,
-    total_connections_semaphore: Arc<Semaphore>,
 ) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
-    let mut join_set = JoinSet::new();
+    let fetcher = ParallelFetcher {
+        max_connections: MAX_CONNECTIONS,
+        max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
+        context: cache_state,
+    };
 
-    for remote in remotes {
-        let semaphore = Arc::clone(&total_connections_semaphore);
-        let state_clone = Arc::clone(&cache_state);
-
-        join_set.spawn(async move {
-            log::debug!("fetching remote tasks for '{}'", remote.id);
-            fetch_tasks(&remote, state_clone, semaphore)
-                .await
-                .map_err(|err| {
-                    format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
-                })
-        });
-    }
+    let fetch_results = fetcher
+        .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
+        .await;
 
     let mut all_tasks = Vec::new();
-    let mut update_state_for_remote = NodeFetchSuccessMap::default();
+    let mut node_success_map = NodeFetchSuccessMap::default();
 
-    while let Some(res) = join_set.join_next().await {
-        match res {
-            Ok(Ok(FetchedTasks {
-                tasks,
-                node_results,
-            })) => {
-                all_tasks.extend(tasks);
-                update_state_for_remote.merge(node_results);
+    for (remote_name, result) in fetch_results.remote_results {
+        match result {
+            Ok(remote_result) => {
+                for (node_name, node_result) in remote_result.node_results {
+                    match node_result {
+                        Ok(NodeResults { data, .. }) => {
+                            all_tasks.extend(data);
+                            node_success_map.set_node_success(remote_name.clone(), node_name);
+                        }
+                        Err(err) => {
+                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+                        }
+                    }
+                }
+            }
+            Err(err) => {
+                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
             }
-            Ok(Err(err)) => log::error!("{err:#}"),
-            Err(err) => log::error!("could not join task fetching future: {err:#}"),
         }
     }
 
-    (all_tasks, update_state_for_remote)
+    (all_tasks, node_success_map)
+}
+
+async fn fetch_tasks_from_single_node(
+    context: Arc<State>,
+    remote: Remote,
+    node: String,
+) -> Result<Vec<TaskCacheItem>, Error> {
+    match remote.ty {
+        RemoteType::Pve => {
+            let since = context
+                .cutoff_timestamp(&remote.id, &node)
+                .unwrap_or_else(|| {
+                    proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
+                });
+
+            let params = ListTasks {
+                source: Some(ListTasksSource::Archive),
+                since: Some(since),
+                // If `limit` is not provided, we only receive 50 tasks
+                limit: Some(MAX_TASKS_TO_FETCH),
+                ..Default::default()
+            };
+
+            let remote_clone = remote.clone();
+            let client = pve::connect(&remote_clone)?;
+            let task_list = client.get_task_list(&node, params).await.map_err(|err| {
+                format_err!("remote '{}', node '{}': {err}", remote_clone.id, node)
+            })?;
+
+            let task_list = task_list
+                .into_iter()
+                .map(|task| map_pve_task(task, &remote.id))
+                .filter_map(|task| match task {
+                    Ok(task) => Some(task),
+                    Err(err) => {
+                        log::error!("could not map PVE task: {err:#}");
+                        None
+                    }
+                })
+                .collect();
+
+            Ok(task_list)
+        }
+        RemoteType::Pbs => {
+            // TODO: Support PBS.
+            Ok(vec![])
+        }
+    }
 }
 
 /// Return all remotes from the given config.
@@ -301,135 +345,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
     tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
 }
 
-/// Fetched tasks from a single remote.
-struct FetchedTasks {
-    /// List of tasks.
-    tasks: Vec<TaskCacheItem>,
-    /// Contains whether a cluster node was fetched successfully.
-    node_results: NodeFetchSuccessMap,
-}
-
-/// Fetch tasks (active and finished) from a remote.
-async fn fetch_tasks(
-    remote: &Remote,
-    state: Arc<State>,
-    total_connections_semaphore: Arc<Semaphore>,
-) -> Result<FetchedTasks, Error> {
-    let mut tasks = Vec::new();
-
-    let mut node_results = NodeFetchSuccessMap::default();
-
-    match remote.ty {
-        RemoteType::Pve => {
-            let client = pve::connect(remote)?;
-
-            let nodes = {
-                // This permit *must* be dropped before we acquire the permits for the
-                // per-node connections - otherwise we risk a deadlock.
-                let _permit = total_connections_semaphore.acquire().await.unwrap();
-                client.list_nodes().await?
-            };
-
-            // This second semaphore is used to limit the number of concurrent connections
-            // *per remote*, not in total.
-            let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
-            let mut join_set = JoinSet::new();
-
-            for node in nodes {
-                let node_name = node.node.to_string();
-
-                let since = state
-                    .cutoff_timestamp(&remote.id, &node_name)
-                    .unwrap_or_else(|| {
-                        proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
-                    });
-
-                let params = ListTasks {
-                    source: Some(ListTasksSource::Archive),
-                    since: Some(since),
-                    // If `limit` is not provided, we only receive 50 tasks
-                    limit: Some(MAX_TASKS_TO_FETCH),
-                    ..Default::default()
-                };
-
-                let per_remote_permit = Arc::clone(&per_remote_semaphore)
-                    .acquire_owned()
-                    .await
-                    .unwrap();
-
-                let total_connections_permit = Arc::clone(&total_connections_semaphore)
-                    .acquire_owned()
-                    .await
-                    .unwrap();
-
-                let remote_clone = remote.clone();
-
-                join_set.spawn(async move {
-                    let res = async {
-                        let client = pve::connect(&remote_clone)?;
-                        let task_list =
-                            client
-                                .get_task_list(&node.node, params)
-                                .await
-                                .map_err(|err| {
-                                    format_err!(
-                                        "remote '{}', node '{}': {err}",
-                                        remote_clone.id,
-                                        node.node
-                                    )
-                                })?;
-                        Ok::<Vec<_>, Error>(task_list)
-                    }
-                    .await;
-
-                    drop(total_connections_permit);
-                    drop(per_remote_permit);
-
-                    (node_name, res)
-                });
-            }
-
-            while let Some(result) = join_set.join_next().await {
-                match result {
-                    Ok((node_name, result)) => match result {
-                        Ok(task_list) => {
-                            let mapped =
-                                task_list.into_iter().filter_map(|task| {
-                                    match map_pve_task(task, &remote.id) {
-                                        Ok(task) => Some(task),
-                                        Err(err) => {
-                                            log::error!(
-                                                "could not map task data, skipping: {err:#}"
-                                            );
-                                            None
-                                        }
-                                    }
-                                });
-
-                            tasks.extend(mapped);
-                            node_results.set_node_success(remote.id.clone(), node_name);
-                        }
-                        Err(error) => {
-                            log::error!("could not fetch tasks: {error:#}");
-                        }
-                    },
-                    Err(error) => {
-                        log::error!("could not join task fetching task: {error:#}");
-                    }
-                }
-            }
-        }
-        RemoteType::Pbs => {
-            // TODO: Add code for PBS
-        }
-    }
-
-    Ok(FetchedTasks {
-        tasks,
-        node_results,
-    })
-}
-
 #[derive(PartialEq, Debug)]
 /// Outcome from polling a tracked task.
 enum PollResult {
-- 
2.47.2



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  parent reply	other threads:[~2025-08-28 14:42 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-08-28 14:42 [pdm-devel] [RFC proxmox-datacenter-manager 0/2] add helper to fetch from many remote nodes in parallel Lukas Wagner
2025-08-28 14:42 ` [pdm-devel] [RFC proxmox-datacenter-manager 1/2] server: add convenience helper to fetch results " Lukas Wagner
2025-08-29  8:57   ` Dominik Csapak
2025-08-29  9:02     ` Stefan Hanreich
2025-08-29  9:17       ` Lukas Wagner
2025-08-29 14:05     ` Lukas Wagner
2025-08-28 14:42 ` Lukas Wagner [this message]
2025-08-29  9:15   ` [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper Dominik Csapak
2025-08-29  9:19     ` Lukas Wagner
2025-08-29 14:11 ` [pdm-devel] superseded: [RFC proxmox-datacenter-manager 0/2] add helper to fetch from many remote nodes in parallel Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250828144247.298536-3-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal