public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: Lukas Wagner <l.wagner@proxmox.com>, pdm-devel@lists.proxmox.com
Subject: Re: [PATCH datacenter-manager 5/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp
Date: Tue, 2 Jun 2026 16:01:50 +0200	[thread overview]
Message-ID: <5f7e48a5-bf64-4b60-b0bb-1b7e7cb2fddf@proxmox.com> (raw)
In-Reply-To: <20260529133951.326103-6-l.wagner@proxmox.com>

tiny nit here:

the patch would read slightly better if the upid node refactoring was
already used. e.g. those changes could be folded into the first patch

On 5/29/26 3:39 PM, Lukas Wagner wrote:
> At the moment, PDM only polls finished tasks from remote nodes.
> 
> Considering two tasks:
>     A: starts at 100, ends at 300
>     B: starts ad 150, ends at 200
> 
> If PDM polls the node at t = 250, it would see that B has finished and
> update the cutoff timestamp for the next poll to 150. However, since A
> has a starttime of 100, and the cutoff determines the minimum starttime
> of tasks to retrieve from the remote via the API, PDM would never fetch
> task A, leading to gaps in the task archive.
> 
> The solution is to fetch all tasks, running and finished, and then
> update the cutoff with the oldest running active task in mind.
> 
> A side effect is that we now also return foreign (as in, not started by
> PDM) running tasks from the API, which makes them appear in the UI as
> well. This was always intended as a future extension anyways, so this
> should be okay.
> 
> Initially reported in in the community forum [0].
> 
> [0]: https://forum.proxmox.com/threads/180317/
> 
> Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
> ---
>   server/src/remote_tasks/refresh_task.rs |  11 +--
>   server/src/remote_tasks/task_cache.rs   | 122 ++++++++++++++++++------
>   2 files changed, 94 insertions(+), 39 deletions(-)
> 
> diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs
> index 65572808..c3b005ce 100644
> --- a/server/src/remote_tasks/refresh_task.rs
> +++ b/server/src/remote_tasks/refresh_task.rs
> @@ -284,7 +284,7 @@ async fn fetch_tasks_from_single_node(
>       match remote.ty {
>           RemoteType::Pve => {
>               let params = pve_api_types::ListTasks {
> -                source: Some(pve_api_types::ListTasksSource::Archive),
> +                source: Some(pve_api_types::ListTasksSource::All),
>                   since: Some(since),
>                   // If `limit` is not provided, we only receive 50 tasks
>                   limit: Some(MAX_TASKS_TO_FETCH),
> @@ -315,14 +315,7 @@ async fn fetch_tasks_from_single_node(
>                   .get_task_list(params)
>                   .await?
>                   .into_iter()
> -                .filter_map(|task| {
> -                    if task.endtime.is_some() {
> -                        // We only care about finished tasks.
> -                        Some(map_pbs_task(task, remote.id.clone()))
> -                    } else {
> -                        None
> -                    }
> -                })
> +                .map(|task| map_pbs_task(task, remote.id.clone()))
>                   .collect();
>   
>               Ok(task_list)
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> index e83a351b..04e3d3ff 100644
> --- a/server/src/remote_tasks/task_cache.rs
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
>   
>   use proxmox_sys::fs::CreateOptions;
>   
> -use pdm_api_types::{NativeUpid, RemoteUpid};
> +use pdm_api_types::RemoteUpid;
>   
>   /// Filename for the file containing running tasks.
>   const ACTIVE_FILENAME: &str = "active";
> @@ -102,22 +102,34 @@ impl State {
>           self.tracked_tasks.remove(upid);
>       }
>   
> -    /// Update the per-node cutoff timestamp if it is higher than the current one.
> -    fn update_cutoff_timestamp(&mut self, remote_id: &str, node: &str, starttime: i64) {
> +    /// Update the per-node cutoff timestamp with a provided update function.
> +    fn update_cutoff_timestamp<F: Fn(Option<i64>) -> i64>(
> +        &mut self,
> +        remote_id: &str,
> +        node: &str,
> +        update_fn: F,
> +    ) {
>           match self.remote_state.get_mut(remote_id) {
>               Some(remote_state) => match remote_state.node_state.get_mut(node) {
>                   Some(node_state) => {
> -                    node_state.cutoff = node_state.cutoff.max(starttime);
> +                    node_state.cutoff = update_fn(Some(node_state.cutoff));
>                   }
>                   None => {
> -                    remote_state
> -                        .node_state
> -                        .insert(node.to_string(), NodeState { cutoff: starttime });
> +                    remote_state.node_state.insert(
> +                        node.to_string(),
> +                        NodeState {
> +                            cutoff: update_fn(None),
> +                        },
> +                    );
>                   }
>               },
>               None => {
> -                let node_state =
> -                    HashMap::from_iter([(node.to_string(), NodeState { cutoff: starttime })]);
> +                let node_state = HashMap::from_iter([(
> +                    node.to_string(),
> +                    NodeState {
> +                        cutoff: update_fn(None),
> +                    },
> +                )]);
>   
>                   self.remote_state
>                       .insert(remote_id.to_string(), RemoteState { node_state });
> @@ -389,7 +401,7 @@ impl WritableTaskCache {
>   
>       fn write_tasks_to_journal(
>           &self,
> -        tasks: Vec<TaskCacheItem>,
> +        finished_tasks: Vec<TaskCacheItem>,
>           active_tasks: &mut HashMap<RemoteUpid, TaskCacheItem>,
>           node_success_map: &NodeFetchSuccessMap,
>           state: &mut State,
> @@ -400,37 +412,53 @@ impl WritableTaskCache {
>               .create(true)
>               .open(filename)?;
>   
> -        for task in tasks {
> +        for task in finished_tasks {
>               // Remove this finished task from our set of active tasks.
>               active_tasks.remove(&task.upid);
>   
> -            match task.upid.native_upid() {
> -                Ok(NativeUpid::PveUpid(upid)) => {
> -                    let node = &upid.node;
> -                    let remote = task.upid.remote();
> -
> -                    if node_success_map.node_successful(remote, node) {
> -                        state.update_cutoff_timestamp(remote, node, task.starttime);
> -                    }
> -                }
> -                Ok(NativeUpid::PbsUpid(upid)) => {
> -                    let node = &upid.node;
> -                    let remote = task.upid.remote();
> -
> -                    if node_success_map.node_successful(remote, node) {
> -                        state.update_cutoff_timestamp(remote, node, task.starttime);
> -                    }
> -                }
> -                Err(error) => {
> -                    log::error!("could not parse PVE UPID - not saving to task cache: {error:#}");
> +            let native_upid = match task.upid.native_upid() {
> +                Ok(native_upid) => native_upid,
> +                Err(err) => {
> +                    log::error!("could not deserialize UPID: {err:#}");
>                       continue;
>                   }
> +            };
> +
> +            let node = native_upid.node();
> +            let remote = task.upid.remote();
> +
> +            if node_success_map.node_successful(remote, node) {
> +                state.update_cutoff_timestamp(remote, node, |existing| {
> +                    existing.unwrap_or(0).max(task.starttime)
> +                });
>               }
>   
>               serde_json::to_writer(&mut file, &task)?;
>               writeln!(&file)?;
>           }
>   
> +        // For all remaining active tasks, set the cutoff timestamp to the
> +        // start time of the *oldest* running task. This is to avoid
> +        // gaps in the task archive if tasks run overlappingly
> +        for task in active_tasks.values() {
> +            let native_upid = match task.upid.native_upid() {
> +                Ok(native_upid) => native_upid,
> +                Err(err) => {
> +                    log::error!("could not deserialize UPID: {err:#}");
> +                    continue;
> +                }
> +            };
> +
> +            let node = native_upid.node();
> +            let remote = task.upid.remote();
> +
> +            if node_success_map.node_successful(remote, node) {
> +                state.update_cutoff_timestamp(remote, node, |existing| {
> +                    existing.unwrap_or(i64::MAX).min(task.starttime)
> +                });
> +            }
> +        }
> +
>           file.sync_all()?;
>   
>           Ok(())
> @@ -1482,4 +1510,38 @@ mod tests {
>   
>           Ok(())
>       }
> +
> +    #[test]
> +    fn cutoff_is_oldest_running_task() {
> +        let (_tmp_dir, cache) = make_cache().unwrap();
> +        let cache = cache.write().unwrap();
> +
> +        cache.init(500).unwrap();
> +
> +        // Establish the baseline cutoff with a single, finished task
> +        add_tasks(&cache, vec![task(900, Some(910))]).unwrap();
> +        assert_eq!(get_cutoff(&cache), 900);
> +
> +        // Add two running tasks
> +        add_tasks(&cache, vec![task(1000, None), task(1100, None)]).unwrap();
> +        assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 2);
> +        assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 1);
> +
> +        // Two new *running* tasks, cutoff should remain the same
> +        assert_eq!(get_cutoff(&cache), 900);
> +
> +        add_tasks(&cache, vec![task(1000, None), task(1100, Some(1150))]).unwrap();
> +        assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 1);
> +        assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 2);
> +
> +        // The cutoff should stick to the *oldest* running task
> +        assert_eq!(get_cutoff(&cache), 1000);
> +
> +        add_tasks(&cache, vec![task(1000, Some(1200)), task(1100, Some(1150))]).unwrap();
> +        // If there is no running task anymore, the youngest finished tasks' starttime determines
> +        // the cutoff
> +        assert_eq!(get_cutoff(&cache), 1100);
> +        assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 0);
> +        assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 3);
> +    }
>   }





  reply	other threads:[~2026-06-02 14:02 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-29 13:39 [PATCH datacenter-manager 0/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 1/6] pdm-api-types: add NativeUpid::node() convenience getter Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 2/6] task cache: tests: allow to provide an explicit end time in 'task' helper Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 3/6] task cache: tests: add get_cutoff helper Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 4/6] task cache: tests: add 'make_cache' convenience helper Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 5/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp Lukas Wagner
2026-06-02 14:01   ` Dominik Csapak [this message]
2026-05-29 13:39 ` [PATCH datacenter-manager 6/6] task cache: poll known active tasks every 30 seconds Lukas Wagner
2026-06-02 14:01 ` [PATCH datacenter-manager 0/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=5f7e48a5-bf64-4b60-b0bb-1b7e7cb2fddf@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal