From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 3/3] remote task fetching: use ParallelFetcher helper
Date: Fri, 29 Aug 2025 16:10:28 +0200 [thread overview]
Message-ID: <20250829141028.309835-4-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250829141028.309835-1-l.wagner@proxmox.com>
This allows us to simplify the fetching logic quite a bit.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Changes since the RFC:
- drop the map_err/format_err, since the remote/node info is
handled elsewhere now
- Use a single filter_map instead of map *and* filter_map
- Chain the filter_map to the get_task_list_call, might save
allocations?
.../tasks/remote_tasks.rs | 238 ++++++------------
1 file changed, 75 insertions(+), 163 deletions(-)
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 04c51dac..e97f2a08 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -4,7 +4,7 @@ use std::{
time::{Duration, Instant},
};
-use anyhow::{format_err, Error};
+use anyhow::Error;
use nix::sys::stat::Mode;
use tokio::{sync::Semaphore, task::JoinSet};
@@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
use server::{
api::pve,
+ parallel_fetcher::{NodeResults, ParallelFetcher},
remote_tasks::{
self,
task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
@@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
get_remotes_with_finished_tasks(&remote_config, &poll_results)
};
- let (all_tasks, update_state_for_remote) = fetch_remotes(
- remotes,
- Arc::new(cache_state),
- Arc::clone(&total_connections_semaphore),
- )
- .await;
+ let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
if !all_tasks.is_empty() {
update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
@@ -221,42 +217,87 @@ async fn init_cache() -> Result<(), Error> {
async fn fetch_remotes(
remotes: Vec<Remote>,
cache_state: Arc<State>,
- total_connections_semaphore: Arc<Semaphore>,
) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
- let mut join_set = JoinSet::new();
+ let fetcher = ParallelFetcher {
+ max_connections: MAX_CONNECTIONS,
+ max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
+ context: cache_state,
+ };
- for remote in remotes {
- let semaphore = Arc::clone(&total_connections_semaphore);
- let state_clone = Arc::clone(&cache_state);
-
- join_set.spawn(async move {
- log::debug!("fetching remote tasks for '{}'", remote.id);
- fetch_tasks(&remote, state_clone, semaphore)
- .await
- .map_err(|err| {
- format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
- })
- });
- }
+ let fetch_results = fetcher
+ .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
+ .await;
let mut all_tasks = Vec::new();
- let mut update_state_for_remote = NodeFetchSuccessMap::default();
+ let mut node_success_map = NodeFetchSuccessMap::default();
- while let Some(res) = join_set.join_next().await {
- match res {
- Ok(Ok(FetchedTasks {
- tasks,
- node_results,
- })) => {
- all_tasks.extend(tasks);
- update_state_for_remote.merge(node_results);
+ for (remote_name, result) in fetch_results.remote_results {
+ match result {
+ Ok(remote_result) => {
+ for (node_name, node_result) in remote_result.node_results {
+ match node_result {
+ Ok(NodeResults { data, .. }) => {
+ all_tasks.extend(data);
+ node_success_map.set_node_success(remote_name.clone(), node_name);
+ }
+ Err(err) => {
+ log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+ }
+ }
+ }
+ }
+ Err(err) => {
+ log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
}
- Ok(Err(err)) => log::error!("{err:#}"),
- Err(err) => log::error!("could not join task fetching future: {err:#}"),
}
}
- (all_tasks, update_state_for_remote)
+ (all_tasks, node_success_map)
+}
+
+async fn fetch_tasks_from_single_node(
+ context: Arc<State>,
+ remote: Remote,
+ node: String,
+) -> Result<Vec<TaskCacheItem>, Error> {
+ match remote.ty {
+ RemoteType::Pve => {
+ let since = context
+ .cutoff_timestamp(&remote.id, &node)
+ .unwrap_or_else(|| {
+ proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
+ });
+
+ let params = ListTasks {
+ source: Some(ListTasksSource::Archive),
+ since: Some(since),
+ // If `limit` is not provided, we only receive 50 tasks
+ limit: Some(MAX_TASKS_TO_FETCH),
+ ..Default::default()
+ };
+
+ let client = pve::connect(&remote)?;
+
+ let task_list = client
+ .get_task_list(&node, params)
+ .await?
+ .into_iter()
+ .filter_map(|task| match map_pve_task(task, &remote.id) {
+ Ok(task) => Some(task),
+ Err(err) => {
+ log::error!("could not map PVE task: {err:#}");
+ None
+ }
+ })
+ .collect();
+
+ Ok(task_list)
+ }
+ RemoteType::Pbs => {
+ // TODO: Support PBS.
+ Ok(vec![])
+ }
+ }
}
/// Return all remotes from the given config.
@@ -301,135 +342,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
}
-/// Fetched tasks from a single remote.
-struct FetchedTasks {
- /// List of tasks.
- tasks: Vec<TaskCacheItem>,
- /// Contains whether a cluster node was fetched successfully.
- node_results: NodeFetchSuccessMap,
-}
-
-/// Fetch tasks (active and finished) from a remote.
-async fn fetch_tasks(
- remote: &Remote,
- state: Arc<State>,
- total_connections_semaphore: Arc<Semaphore>,
-) -> Result<FetchedTasks, Error> {
- let mut tasks = Vec::new();
-
- let mut node_results = NodeFetchSuccessMap::default();
-
- match remote.ty {
- RemoteType::Pve => {
- let client = pve::connect(remote)?;
-
- let nodes = {
- // This permit *must* be dropped before we acquire the permits for the
- // per-node connections - otherwise we risk a deadlock.
- let _permit = total_connections_semaphore.acquire().await.unwrap();
- client.list_nodes().await?
- };
-
- // This second semaphore is used to limit the number of concurrent connections
- // *per remote*, not in total.
- let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
- let mut join_set = JoinSet::new();
-
- for node in nodes {
- let node_name = node.node.to_string();
-
- let since = state
- .cutoff_timestamp(&remote.id, &node_name)
- .unwrap_or_else(|| {
- proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
- });
-
- let params = ListTasks {
- source: Some(ListTasksSource::Archive),
- since: Some(since),
- // If `limit` is not provided, we only receive 50 tasks
- limit: Some(MAX_TASKS_TO_FETCH),
- ..Default::default()
- };
-
- let per_remote_permit = Arc::clone(&per_remote_semaphore)
- .acquire_owned()
- .await
- .unwrap();
-
- let total_connections_permit = Arc::clone(&total_connections_semaphore)
- .acquire_owned()
- .await
- .unwrap();
-
- let remote_clone = remote.clone();
-
- join_set.spawn(async move {
- let res = async {
- let client = pve::connect(&remote_clone)?;
- let task_list =
- client
- .get_task_list(&node.node, params)
- .await
- .map_err(|err| {
- format_err!(
- "remote '{}', node '{}': {err}",
- remote_clone.id,
- node.node
- )
- })?;
- Ok::<Vec<_>, Error>(task_list)
- }
- .await;
-
- drop(total_connections_permit);
- drop(per_remote_permit);
-
- (node_name, res)
- });
- }
-
- while let Some(result) = join_set.join_next().await {
- match result {
- Ok((node_name, result)) => match result {
- Ok(task_list) => {
- let mapped =
- task_list.into_iter().filter_map(|task| {
- match map_pve_task(task, &remote.id) {
- Ok(task) => Some(task),
- Err(err) => {
- log::error!(
- "could not map task data, skipping: {err:#}"
- );
- None
- }
- }
- });
-
- tasks.extend(mapped);
- node_results.set_node_success(remote.id.clone(), node_name);
- }
- Err(error) => {
- log::error!("could not fetch tasks: {error:#}");
- }
- },
- Err(error) => {
- log::error!("could not join task fetching task: {error:#}");
- }
- }
- }
- }
- RemoteType::Pbs => {
- // TODO: Add code for PBS
- }
- }
-
- Ok(FetchedTasks {
- tasks,
- node_results,
- })
-}
-
#[derive(PartialEq, Debug)]
/// Outcome from polling a tracked task.
enum PollResult {
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-08-29 14:10 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-08-29 14:10 [pdm-devel] [PATCH proxmox-datacenter-manager v2 0/3] add helper to fetch from many remote nodes in parallel Lukas Wagner
2025-08-29 14:10 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 1/3] server: add convenience helper to fetch results " Lukas Wagner
2025-08-29 14:10 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 2/3] server: add helper for fetching from multiple remotes at once Lukas Wagner
2025-08-29 14:10 ` Lukas Wagner [this message]
2025-09-01 10:04 ` [pdm-devel] applied: [PATCH proxmox-datacenter-manager v2 0/3] add helper to fetch from many remote nodes in parallel Dominik Csapak
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250829141028.309835-4-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox