From: Dominik Csapak <d.csapak@proxmox.com>
To: Proxmox Datacenter Manager development discussion
<pdm-devel@lists.proxmox.com>,
Lukas Wagner <l.wagner@proxmox.com>
Subject: Re: [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper
Date: Fri, 29 Aug 2025 11:15:46 +0200 [thread overview]
Message-ID: <ee519561-696b-49c9-ba80-633635e444cf@proxmox.com> (raw)
In-Reply-To: <20250828144247.298536-3-l.wagner@proxmox.com>
some minor comments inline, rest LGTM
On 8/28/25 4:42 PM, Lukas Wagner wrote:
> This allows us to simplify the fetching logic quite a bit.
>
> Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
> ---
> .../tasks/remote_tasks.rs | 239 ++++++------------
> 1 file changed, 77 insertions(+), 162 deletions(-)
>
> diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> index 04c51dac..967e633c 100644
> --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> @@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
>
> use server::{
> api::pve,
> + parallel_fetcher::{NodeResults, ParallelFetcher},
> remote_tasks::{
> self,
> task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
> @@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
> get_remotes_with_finished_tasks(&remote_config, &poll_results)
> };
>
> - let (all_tasks, update_state_for_remote) = fetch_remotes(
> - remotes,
> - Arc::new(cache_state),
> - Arc::clone(&total_connections_semaphore),
> - )
> - .await;
> + let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
>
> if !all_tasks.is_empty() {
> update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
> @@ -221,42 +217,90 @@ async fn init_cache() -> Result<(), Error> {
> async fn fetch_remotes(
> remotes: Vec<Remote>,
> cache_state: Arc<State>,
> - total_connections_semaphore: Arc<Semaphore>,
> ) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
> - let mut join_set = JoinSet::new();
> + let fetcher = ParallelFetcher {
> + max_connections: MAX_CONNECTIONS,
> + max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
> + context: cache_state,
> + };
>
> - for remote in remotes {
> - let semaphore = Arc::clone(&total_connections_semaphore);
> - let state_clone = Arc::clone(&cache_state);
> -
> - join_set.spawn(async move {
> - log::debug!("fetching remote tasks for '{}'", remote.id);
> - fetch_tasks(&remote, state_clone, semaphore)
> - .await
> - .map_err(|err| {
> - format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
> - })
> - });
> - }
> + let fetch_results = fetcher
> + .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
> + .await;
>
> let mut all_tasks = Vec::new();
> - let mut update_state_for_remote = NodeFetchSuccessMap::default();
> + let mut node_success_map = NodeFetchSuccessMap::default();
>
> - while let Some(res) = join_set.join_next().await {
> - match res {
> - Ok(Ok(FetchedTasks {
> - tasks,
> - node_results,
> - })) => {
> - all_tasks.extend(tasks);
> - update_state_for_remote.merge(node_results);
> + for (remote_name, result) in fetch_results.remote_results {
> + match result {
> + Ok(remote_result) => {
> + for (node_name, node_result) in remote_result.node_results {
> + match node_result {
> + Ok(NodeResults { data, .. }) => {
> + all_tasks.extend(data);
> + node_success_map.set_node_success(remote_name.clone(), node_name);
> + }
> + Err(err) => {
> + log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
> + }
> + }
> + }
> + }
> + Err(err) => {
> + log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
> }
> - Ok(Err(err)) => log::error!("{err:#}"),
> - Err(err) => log::error!("could not join task fetching future: {err:#}"),
> }
> }
>
> - (all_tasks, update_state_for_remote)
> + (all_tasks, node_success_map)
> +}
> +
> +async fn fetch_tasks_from_single_node(
> + context: Arc<State>,
> + remote: Remote,
> + node: String,
> +) -> Result<Vec<TaskCacheItem>, Error> {
> + match remote.ty {
> + RemoteType::Pve => {
> + let since = context
> + .cutoff_timestamp(&remote.id, &node)
> + .unwrap_or_else(|| {
> + proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
> + });
> +
> + let params = ListTasks {
> + source: Some(ListTasksSource::Archive),
> + since: Some(since),
> + // If `limit` is not provided, we only receive 50 tasks
> + limit: Some(MAX_TASKS_TO_FETCH),
> + ..Default::default()
> + };
> +
> + let remote_clone = remote.clone();
> + let client = pve::connect(&remote_clone)?;
this clone can simply be omitted by using remote directly
> + let task_list = client.get_task_list(&node, params).await.map_err(|err| {
> + format_err!("remote '{}', node '{}': {err}", remote_clone.id, node)
> + })?;
> +
> + let task_list = task_list
> + .into_iter()
> + .map(|task| map_pve_task(task, &remote.id))
> + .filter_map(|task| match task {
> + Ok(task) => Some(task),
> + Err(err) => {
> + log::error!("could not map PVE task: {err:#}");
> + None
> + }
> + })
> + .collect();
two things here:
you could do just one filter_map calling map_pve_task inside
and you can simply append `into_iter().filter_map(..)` on the original
get_task_list call. no need for the extra `let task_list = task_list....`
> +
> + Ok(task_list)
> + }
> + RemoteType::Pbs => {
> + // TODO: Support PBS.
> + Ok(vec![])
> + }
> + }
> }
>
> /// Return all remotes from the given config.
> @@ -301,135 +345,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
> tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
> }
>
> -/// Fetched tasks from a single remote.
> -struct FetchedTasks {
> - /// List of tasks.
> - tasks: Vec<TaskCacheItem>,
> - /// Contains whether a cluster node was fetched successfully.
> - node_results: NodeFetchSuccessMap,
> -}
> -
> -/// Fetch tasks (active and finished) from a remote.
> -async fn fetch_tasks(
> - remote: &Remote,
> - state: Arc<State>,
> - total_connections_semaphore: Arc<Semaphore>,
> -) -> Result<FetchedTasks, Error> {
> - let mut tasks = Vec::new();
> -
> - let mut node_results = NodeFetchSuccessMap::default();
> -
> - match remote.ty {
> - RemoteType::Pve => {
> - let client = pve::connect(remote)?;
> -
> - let nodes = {
> - // This permit *must* be dropped before we acquire the permits for the
> - // per-node connections - otherwise we risk a deadlock.
> - let _permit = total_connections_semaphore.acquire().await.unwrap();
> - client.list_nodes().await?
> - };
> -
> - // This second semaphore is used to limit the number of concurrent connections
> - // *per remote*, not in total.
> - let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
> - let mut join_set = JoinSet::new();
> -
> - for node in nodes {
> - let node_name = node.node.to_string();
> -
> - let since = state
> - .cutoff_timestamp(&remote.id, &node_name)
> - .unwrap_or_else(|| {
> - proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
> - });
> -
> - let params = ListTasks {
> - source: Some(ListTasksSource::Archive),
> - since: Some(since),
> - // If `limit` is not provided, we only receive 50 tasks
> - limit: Some(MAX_TASKS_TO_FETCH),
> - ..Default::default()
> - };
> -
> - let per_remote_permit = Arc::clone(&per_remote_semaphore)
> - .acquire_owned()
> - .await
> - .unwrap();
> -
> - let total_connections_permit = Arc::clone(&total_connections_semaphore)
> - .acquire_owned()
> - .await
> - .unwrap();
> -
> - let remote_clone = remote.clone();
> -
> - join_set.spawn(async move {
> - let res = async {
> - let client = pve::connect(&remote_clone)?;
> - let task_list =
> - client
> - .get_task_list(&node.node, params)
> - .await
> - .map_err(|err| {
> - format_err!(
> - "remote '{}', node '{}': {err}",
> - remote_clone.id,
> - node.node
> - )
> - })?;
> - Ok::<Vec<_>, Error>(task_list)
> - }
> - .await;
> -
> - drop(total_connections_permit);
> - drop(per_remote_permit);
> -
> - (node_name, res)
> - });
> - }
> -
> - while let Some(result) = join_set.join_next().await {
> - match result {
> - Ok((node_name, result)) => match result {
> - Ok(task_list) => {
> - let mapped =
> - task_list.into_iter().filter_map(|task| {
> - match map_pve_task(task, &remote.id) {
> - Ok(task) => Some(task),
> - Err(err) => {
> - log::error!(
> - "could not map task data, skipping: {err:#}"
> - );
> - None
> - }
> - }
> - });
> -
> - tasks.extend(mapped);
> - node_results.set_node_success(remote.id.clone(), node_name);
> - }
> - Err(error) => {
> - log::error!("could not fetch tasks: {error:#}");
> - }
> - },
> - Err(error) => {
> - log::error!("could not join task fetching task: {error:#}");
> - }
> - }
> - }
> - }
> - RemoteType::Pbs => {
> - // TODO: Add code for PBS
> - }
> - }
> -
> - Ok(FetchedTasks {
> - tasks,
> - node_results,
> - })
> -}
> -
> #[derive(PartialEq, Debug)]
> /// Outcome from polling a tracked task.
> enum PollResult {
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-08-29 9:16 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-08-28 14:42 [pdm-devel] [RFC proxmox-datacenter-manager 0/2] add helper to fetch from many remote nodes in parallel Lukas Wagner
2025-08-28 14:42 ` [pdm-devel] [RFC proxmox-datacenter-manager 1/2] server: add convenience helper to fetch results " Lukas Wagner
2025-08-29 8:57 ` Dominik Csapak
2025-08-29 9:02 ` Stefan Hanreich
2025-08-29 9:17 ` Lukas Wagner
2025-08-29 14:05 ` Lukas Wagner
2025-08-28 14:42 ` [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper Lukas Wagner
2025-08-29 9:15 ` Dominik Csapak [this message]
2025-08-29 9:19 ` Lukas Wagner
2025-08-29 14:11 ` [pdm-devel] superseded: [RFC proxmox-datacenter-manager 0/2] add helper to fetch from many remote nodes in parallel Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=ee519561-696b-49c9-ba80-633635e444cf@proxmox.com \
--to=d.csapak@proxmox.com \
--cc=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox