public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 3/3] remote task fetching: use ParallelFetcher helper
Date: Fri, 29 Aug 2025 16:10:28 +0200	[thread overview]
Message-ID: <20250829141028.309835-4-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250829141028.309835-1-l.wagner@proxmox.com>

This allows us to simplify the fetching logic quite a bit.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---

Notes:
    Changes since the RFC:
      - drop the map_err/format_err, since the remote/node info is
        handled elsewhere now
      - Use a single filter_map instead of map *and* filter_map
      - Chain the filter_map to the get_task_list_call, might save
        allocations?

 .../tasks/remote_tasks.rs                     | 238 ++++++------------
 1 file changed, 75 insertions(+), 163 deletions(-)

diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 04c51dac..e97f2a08 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -4,7 +4,7 @@ use std::{
     time::{Duration, Instant},
 };
 
-use anyhow::{format_err, Error};
+use anyhow::Error;
 use nix::sys::stat::Mode;
 use tokio::{sync::Semaphore, task::JoinSet};
 
@@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
 
 use server::{
     api::pve,
+    parallel_fetcher::{NodeResults, ParallelFetcher},
     remote_tasks::{
         self,
         task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
@@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
         get_remotes_with_finished_tasks(&remote_config, &poll_results)
     };
 
-    let (all_tasks, update_state_for_remote) = fetch_remotes(
-        remotes,
-        Arc::new(cache_state),
-        Arc::clone(&total_connections_semaphore),
-    )
-    .await;
+    let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
 
     if !all_tasks.is_empty() {
         update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
@@ -221,42 +217,87 @@ async fn init_cache() -> Result<(), Error> {
 async fn fetch_remotes(
     remotes: Vec<Remote>,
     cache_state: Arc<State>,
-    total_connections_semaphore: Arc<Semaphore>,
 ) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
-    let mut join_set = JoinSet::new();
+    let fetcher = ParallelFetcher {
+        max_connections: MAX_CONNECTIONS,
+        max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
+        context: cache_state,
+    };
 
-    for remote in remotes {
-        let semaphore = Arc::clone(&total_connections_semaphore);
-        let state_clone = Arc::clone(&cache_state);
-
-        join_set.spawn(async move {
-            log::debug!("fetching remote tasks for '{}'", remote.id);
-            fetch_tasks(&remote, state_clone, semaphore)
-                .await
-                .map_err(|err| {
-                    format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
-                })
-        });
-    }
+    let fetch_results = fetcher
+        .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
+        .await;
 
     let mut all_tasks = Vec::new();
-    let mut update_state_for_remote = NodeFetchSuccessMap::default();
+    let mut node_success_map = NodeFetchSuccessMap::default();
 
-    while let Some(res) = join_set.join_next().await {
-        match res {
-            Ok(Ok(FetchedTasks {
-                tasks,
-                node_results,
-            })) => {
-                all_tasks.extend(tasks);
-                update_state_for_remote.merge(node_results);
+    for (remote_name, result) in fetch_results.remote_results {
+        match result {
+            Ok(remote_result) => {
+                for (node_name, node_result) in remote_result.node_results {
+                    match node_result {
+                        Ok(NodeResults { data, .. }) => {
+                            all_tasks.extend(data);
+                            node_success_map.set_node_success(remote_name.clone(), node_name);
+                        }
+                        Err(err) => {
+                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+                        }
+                    }
+                }
+            }
+            Err(err) => {
+                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
             }
-            Ok(Err(err)) => log::error!("{err:#}"),
-            Err(err) => log::error!("could not join task fetching future: {err:#}"),
         }
     }
 
-    (all_tasks, update_state_for_remote)
+    (all_tasks, node_success_map)
+}
+
+async fn fetch_tasks_from_single_node(
+    context: Arc<State>,
+    remote: Remote,
+    node: String,
+) -> Result<Vec<TaskCacheItem>, Error> {
+    match remote.ty {
+        RemoteType::Pve => {
+            let since = context
+                .cutoff_timestamp(&remote.id, &node)
+                .unwrap_or_else(|| {
+                    proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
+                });
+
+            let params = ListTasks {
+                source: Some(ListTasksSource::Archive),
+                since: Some(since),
+                // If `limit` is not provided, we only receive 50 tasks
+                limit: Some(MAX_TASKS_TO_FETCH),
+                ..Default::default()
+            };
+
+            let client = pve::connect(&remote)?;
+
+            let task_list = client
+                .get_task_list(&node, params)
+                .await?
+                .into_iter()
+                .filter_map(|task| match map_pve_task(task, &remote.id) {
+                    Ok(task) => Some(task),
+                    Err(err) => {
+                        log::error!("could not map PVE task: {err:#}");
+                        None
+                    }
+                })
+                .collect();
+
+            Ok(task_list)
+        }
+        RemoteType::Pbs => {
+            // TODO: Support PBS.
+            Ok(vec![])
+        }
+    }
 }
 
 /// Return all remotes from the given config.
@@ -301,135 +342,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
     tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
 }
 
-/// Fetched tasks from a single remote.
-struct FetchedTasks {
-    /// List of tasks.
-    tasks: Vec<TaskCacheItem>,
-    /// Contains whether a cluster node was fetched successfully.
-    node_results: NodeFetchSuccessMap,
-}
-
-/// Fetch tasks (active and finished) from a remote.
-async fn fetch_tasks(
-    remote: &Remote,
-    state: Arc<State>,
-    total_connections_semaphore: Arc<Semaphore>,
-) -> Result<FetchedTasks, Error> {
-    let mut tasks = Vec::new();
-
-    let mut node_results = NodeFetchSuccessMap::default();
-
-    match remote.ty {
-        RemoteType::Pve => {
-            let client = pve::connect(remote)?;
-
-            let nodes = {
-                // This permit *must* be dropped before we acquire the permits for the
-                // per-node connections - otherwise we risk a deadlock.
-                let _permit = total_connections_semaphore.acquire().await.unwrap();
-                client.list_nodes().await?
-            };
-
-            // This second semaphore is used to limit the number of concurrent connections
-            // *per remote*, not in total.
-            let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
-            let mut join_set = JoinSet::new();
-
-            for node in nodes {
-                let node_name = node.node.to_string();
-
-                let since = state
-                    .cutoff_timestamp(&remote.id, &node_name)
-                    .unwrap_or_else(|| {
-                        proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
-                    });
-
-                let params = ListTasks {
-                    source: Some(ListTasksSource::Archive),
-                    since: Some(since),
-                    // If `limit` is not provided, we only receive 50 tasks
-                    limit: Some(MAX_TASKS_TO_FETCH),
-                    ..Default::default()
-                };
-
-                let per_remote_permit = Arc::clone(&per_remote_semaphore)
-                    .acquire_owned()
-                    .await
-                    .unwrap();
-
-                let total_connections_permit = Arc::clone(&total_connections_semaphore)
-                    .acquire_owned()
-                    .await
-                    .unwrap();
-
-                let remote_clone = remote.clone();
-
-                join_set.spawn(async move {
-                    let res = async {
-                        let client = pve::connect(&remote_clone)?;
-                        let task_list =
-                            client
-                                .get_task_list(&node.node, params)
-                                .await
-                                .map_err(|err| {
-                                    format_err!(
-                                        "remote '{}', node '{}': {err}",
-                                        remote_clone.id,
-                                        node.node
-                                    )
-                                })?;
-                        Ok::<Vec<_>, Error>(task_list)
-                    }
-                    .await;
-
-                    drop(total_connections_permit);
-                    drop(per_remote_permit);
-
-                    (node_name, res)
-                });
-            }
-
-            while let Some(result) = join_set.join_next().await {
-                match result {
-                    Ok((node_name, result)) => match result {
-                        Ok(task_list) => {
-                            let mapped =
-                                task_list.into_iter().filter_map(|task| {
-                                    match map_pve_task(task, &remote.id) {
-                                        Ok(task) => Some(task),
-                                        Err(err) => {
-                                            log::error!(
-                                                "could not map task data, skipping: {err:#}"
-                                            );
-                                            None
-                                        }
-                                    }
-                                });
-
-                            tasks.extend(mapped);
-                            node_results.set_node_success(remote.id.clone(), node_name);
-                        }
-                        Err(error) => {
-                            log::error!("could not fetch tasks: {error:#}");
-                        }
-                    },
-                    Err(error) => {
-                        log::error!("could not join task fetching task: {error:#}");
-                    }
-                }
-            }
-        }
-        RemoteType::Pbs => {
-            // TODO: Add code for PBS
-        }
-    }
-
-    Ok(FetchedTasks {
-        tasks,
-        node_results,
-    })
-}
-
 #[derive(PartialEq, Debug)]
 /// Outcome from polling a tracked task.
 enum PollResult {
-- 
2.47.2



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  parent reply	other threads:[~2025-08-29 14:10 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-08-29 14:10 [pdm-devel] [PATCH proxmox-datacenter-manager v2 0/3] add helper to fetch from many remote nodes in parallel Lukas Wagner
2025-08-29 14:10 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 1/3] server: add convenience helper to fetch results " Lukas Wagner
2025-08-29 14:10 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 2/3] server: add helper for fetching from multiple remotes at once Lukas Wagner
2025-08-29 14:10 ` Lukas Wagner [this message]
2025-09-01 10:04 ` [pdm-devel] applied: [PATCH proxmox-datacenter-manager v2 0/3] add helper to fetch from many remote nodes in parallel Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250829141028.309835-4-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal