public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: "Lukas Wagner" <l.wagner@proxmox.com>
To: "Dominik Csapak" <d.csapak@proxmox.com>,
	"Proxmox Datacenter Manager development discussion"
	<pdm-devel@lists.proxmox.com>
Subject: Re: [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper
Date: Fri, 29 Aug 2025 11:19:21 +0200	[thread overview]
Message-ID: <DCEST3GF6MXU.2IO8PXBZVKDPB@proxmox.com> (raw)
In-Reply-To: <ee519561-696b-49c9-ba80-633635e444cf@proxmox.com>

On Fri Aug 29, 2025 at 11:15 AM CEST, Dominik Csapak wrote:
> some minor comments inline, rest LGTM
>
> On 8/28/25 4:42 PM, Lukas Wagner wrote:
>> This allows us to simplify the fetching logic quite a bit.
>> 
>> Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
>> ---
>>   .../tasks/remote_tasks.rs                     | 239 ++++++------------
>>   1 file changed, 77 insertions(+), 162 deletions(-)
>> 
>> diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
>> index 04c51dac..967e633c 100644
>> --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
>> +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
>> @@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
>>   
>>   use server::{
>>       api::pve,
>> +    parallel_fetcher::{NodeResults, ParallelFetcher},
>>       remote_tasks::{
>>           self,
>>           task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
>> @@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
>>           get_remotes_with_finished_tasks(&remote_config, &poll_results)
>>       };
>>   
>> -    let (all_tasks, update_state_for_remote) = fetch_remotes(
>> -        remotes,
>> -        Arc::new(cache_state),
>> -        Arc::clone(&total_connections_semaphore),
>> -    )
>> -    .await;
>> +    let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
>>   
>>       if !all_tasks.is_empty() {
>>           update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
>> @@ -221,42 +217,90 @@ async fn init_cache() -> Result<(), Error> {
>>   async fn fetch_remotes(
>>       remotes: Vec<Remote>,
>>       cache_state: Arc<State>,
>> -    total_connections_semaphore: Arc<Semaphore>,
>>   ) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
>> -    let mut join_set = JoinSet::new();
>> +    let fetcher = ParallelFetcher {
>> +        max_connections: MAX_CONNECTIONS,
>> +        max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
>> +        context: cache_state,
>> +    };
>>   
>> -    for remote in remotes {
>> -        let semaphore = Arc::clone(&total_connections_semaphore);
>> -        let state_clone = Arc::clone(&cache_state);
>> -
>> -        join_set.spawn(async move {
>> -            log::debug!("fetching remote tasks for '{}'", remote.id);
>> -            fetch_tasks(&remote, state_clone, semaphore)
>> -                .await
>> -                .map_err(|err| {
>> -                    format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
>> -                })
>> -        });
>> -    }
>> +    let fetch_results = fetcher
>> +        .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
>> +        .await;
>>   
>>       let mut all_tasks = Vec::new();
>> -    let mut update_state_for_remote = NodeFetchSuccessMap::default();
>> +    let mut node_success_map = NodeFetchSuccessMap::default();
>>   
>> -    while let Some(res) = join_set.join_next().await {
>> -        match res {
>> -            Ok(Ok(FetchedTasks {
>> -                tasks,
>> -                node_results,
>> -            })) => {
>> -                all_tasks.extend(tasks);
>> -                update_state_for_remote.merge(node_results);
>> +    for (remote_name, result) in fetch_results.remote_results {
>> +        match result {
>> +            Ok(remote_result) => {
>> +                for (node_name, node_result) in remote_result.node_results {
>> +                    match node_result {
>> +                        Ok(NodeResults { data, .. }) => {
>> +                            all_tasks.extend(data);
>> +                            node_success_map.set_node_success(remote_name.clone(), node_name);
>> +                        }
>> +                        Err(err) => {
>> +                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
>> +                        }
>> +                    }
>> +                }
>> +            }
>> +            Err(err) => {
>> +                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
>>               }
>> -            Ok(Err(err)) => log::error!("{err:#}"),
>> -            Err(err) => log::error!("could not join task fetching future: {err:#}"),
>>           }
>>       }
>>   
>> -    (all_tasks, update_state_for_remote)
>> +    (all_tasks, node_success_map)
>> +}
>> +
>> +async fn fetch_tasks_from_single_node(
>> +    context: Arc<State>,
>> +    remote: Remote,
>> +    node: String,
>> +) -> Result<Vec<TaskCacheItem>, Error> {
>> +    match remote.ty {
>> +        RemoteType::Pve => {
>> +            let since = context
>> +                .cutoff_timestamp(&remote.id, &node)
>> +                .unwrap_or_else(|| {
>> +                    proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
>> +                });
>> +
>> +            let params = ListTasks {
>> +                source: Some(ListTasksSource::Archive),
>> +                since: Some(since),
>> +                // If `limit` is not provided, we only receive 50 tasks
>> +                limit: Some(MAX_TASKS_TO_FETCH),
>> +                ..Default::default()
>> +            };
>> +
>> +            let remote_clone = remote.clone();
>> +            let client = pve::connect(&remote_clone)?;
>
> this clone can simply be omitted by using remote directly
>

True, thanks!

>> +            let task_list = client.get_task_list(&node, params).await.map_err(|err| {
>> +                format_err!("remote '{}', node '{}': {err}", remote_clone.id, node)
>> +            })?;
>> +
>> +            let task_list = task_list
>> +                .into_iter()
>> +                .map(|task| map_pve_task(task, &remote.id))
>> +                .filter_map(|task| match task {
>> +                    Ok(task) => Some(task),
>> +                    Err(err) => {
>> +                        log::error!("could not map PVE task: {err:#}");
>> +                        None
>> +                    }
>> +                })
>> +                .collect();
>
> two things here:
>
> you could do just one filter_map calling map_pve_task inside
>
> and you can simply append `into_iter().filter_map(..)` on the original 
> get_task_list call. no need for the extra `let task_list = task_list....`
>

Good point! Will make the changes you requested.

>> +
>> +            Ok(task_list)
>> +        }
>> +        RemoteType::Pbs => {
>> +            // TODO: Support PBS.
>> +            Ok(vec![])
>> +        }
>> +    }
>>   }
>>   
>>   /// Return all remotes from the given config.
>> @@ -301,135 +345,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
>>       tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
>>   }
>>   
>> -/// Fetched tasks from a single remote.
>> -struct FetchedTasks {
>> -    /// List of tasks.
>> -    tasks: Vec<TaskCacheItem>,
>> -    /// Contains whether a cluster node was fetched successfully.
>> -    node_results: NodeFetchSuccessMap,
>> -}
>> -
>> -/// Fetch tasks (active and finished) from a remote.
>> -async fn fetch_tasks(
>> -    remote: &Remote,
>> -    state: Arc<State>,
>> -    total_connections_semaphore: Arc<Semaphore>,
>> -) -> Result<FetchedTasks, Error> {
>> -    let mut tasks = Vec::new();
>> -
>> -    let mut node_results = NodeFetchSuccessMap::default();
>> -
>> -    match remote.ty {
>> -        RemoteType::Pve => {
>> -            let client = pve::connect(remote)?;
>> -
>> -            let nodes = {
>> -                // This permit *must* be dropped before we acquire the permits for the
>> -                // per-node connections - otherwise we risk a deadlock.
>> -                let _permit = total_connections_semaphore.acquire().await.unwrap();
>> -                client.list_nodes().await?
>> -            };
>> -
>> -            // This second semaphore is used to limit the number of concurrent connections
>> -            // *per remote*, not in total.
>> -            let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
>> -            let mut join_set = JoinSet::new();
>> -
>> -            for node in nodes {
>> -                let node_name = node.node.to_string();
>> -
>> -                let since = state
>> -                    .cutoff_timestamp(&remote.id, &node_name)
>> -                    .unwrap_or_else(|| {
>> -                        proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
>> -                    });
>> -
>> -                let params = ListTasks {
>> -                    source: Some(ListTasksSource::Archive),
>> -                    since: Some(since),
>> -                    // If `limit` is not provided, we only receive 50 tasks
>> -                    limit: Some(MAX_TASKS_TO_FETCH),
>> -                    ..Default::default()
>> -                };
>> -
>> -                let per_remote_permit = Arc::clone(&per_remote_semaphore)
>> -                    .acquire_owned()
>> -                    .await
>> -                    .unwrap();
>> -
>> -                let total_connections_permit = Arc::clone(&total_connections_semaphore)
>> -                    .acquire_owned()
>> -                    .await
>> -                    .unwrap();
>> -
>> -                let remote_clone = remote.clone();
>> -
>> -                join_set.spawn(async move {
>> -                    let res = async {
>> -                        let client = pve::connect(&remote_clone)?;
>> -                        let task_list =
>> -                            client
>> -                                .get_task_list(&node.node, params)
>> -                                .await
>> -                                .map_err(|err| {
>> -                                    format_err!(
>> -                                        "remote '{}', node '{}': {err}",
>> -                                        remote_clone.id,
>> -                                        node.node
>> -                                    )
>> -                                })?;
>> -                        Ok::<Vec<_>, Error>(task_list)
>> -                    }
>> -                    .await;
>> -
>> -                    drop(total_connections_permit);
>> -                    drop(per_remote_permit);
>> -
>> -                    (node_name, res)
>> -                });
>> -            }
>> -
>> -            while let Some(result) = join_set.join_next().await {
>> -                match result {
>> -                    Ok((node_name, result)) => match result {
>> -                        Ok(task_list) => {
>> -                            let mapped =
>> -                                task_list.into_iter().filter_map(|task| {
>> -                                    match map_pve_task(task, &remote.id) {
>> -                                        Ok(task) => Some(task),
>> -                                        Err(err) => {
>> -                                            log::error!(
>> -                                                "could not map task data, skipping: {err:#}"
>> -                                            );
>> -                                            None
>> -                                        }
>> -                                    }
>> -                                });
>> -
>> -                            tasks.extend(mapped);
>> -                            node_results.set_node_success(remote.id.clone(), node_name);
>> -                        }
>> -                        Err(error) => {
>> -                            log::error!("could not fetch tasks: {error:#}");
>> -                        }
>> -                    },
>> -                    Err(error) => {
>> -                        log::error!("could not join task fetching task: {error:#}");
>> -                    }
>> -                }
>> -            }
>> -        }
>> -        RemoteType::Pbs => {
>> -            // TODO: Add code for PBS
>> -        }
>> -    }
>> -
>> -    Ok(FetchedTasks {
>> -        tasks,
>> -        node_results,
>> -    })
>> -}
>> -
>>   #[derive(PartialEq, Debug)]
>>   /// Outcome from polling a tracked task.
>>   enum PollResult {



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  reply	other threads:[~2025-08-29  9:19 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-08-28 14:42 [pdm-devel] [RFC proxmox-datacenter-manager 0/2] add helper to fetch from many remote nodes in parallel Lukas Wagner
2025-08-28 14:42 ` [pdm-devel] [RFC proxmox-datacenter-manager 1/2] server: add convenience helper to fetch results " Lukas Wagner
2025-08-29  8:57   ` Dominik Csapak
2025-08-29  9:02     ` Stefan Hanreich
2025-08-29  9:17       ` Lukas Wagner
2025-08-29 14:05     ` Lukas Wagner
2025-08-28 14:42 ` [pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper Lukas Wagner
2025-08-29  9:15   ` Dominik Csapak
2025-08-29  9:19     ` Lukas Wagner [this message]
2025-08-29 14:11 ` [pdm-devel] superseded: [RFC proxmox-datacenter-manager 0/2] add helper to fetch from many remote nodes in parallel Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=DCEST3GF6MXU.2IO8PXBZVKDPB@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=d.csapak@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal