From: Dominik Csapak <d.csapak@proxmox.com>
To: Lukas Wagner <l.wagner@proxmox.com>, pdm-devel@lists.proxmox.com
Subject: Re: [PATCH datacenter-manager 5/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp
Date: Tue, 2 Jun 2026 16:01:50 +0200 [thread overview]
Message-ID: <5f7e48a5-bf64-4b60-b0bb-1b7e7cb2fddf@proxmox.com> (raw)
In-Reply-To: <20260529133951.326103-6-l.wagner@proxmox.com>
tiny nit here:
the patch would read slightly better if the upid node refactoring was
already used. e.g. those changes could be folded into the first patch
On 5/29/26 3:39 PM, Lukas Wagner wrote:
> At the moment, PDM only polls finished tasks from remote nodes.
>
> Considering two tasks:
> A: starts at 100, ends at 300
> B: starts ad 150, ends at 200
>
> If PDM polls the node at t = 250, it would see that B has finished and
> update the cutoff timestamp for the next poll to 150. However, since A
> has a starttime of 100, and the cutoff determines the minimum starttime
> of tasks to retrieve from the remote via the API, PDM would never fetch
> task A, leading to gaps in the task archive.
>
> The solution is to fetch all tasks, running and finished, and then
> update the cutoff with the oldest running active task in mind.
>
> A side effect is that we now also return foreign (as in, not started by
> PDM) running tasks from the API, which makes them appear in the UI as
> well. This was always intended as a future extension anyways, so this
> should be okay.
>
> Initially reported in in the community forum [0].
>
> [0]: https://forum.proxmox.com/threads/180317/
>
> Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
> ---
> server/src/remote_tasks/refresh_task.rs | 11 +--
> server/src/remote_tasks/task_cache.rs | 122 ++++++++++++++++++------
> 2 files changed, 94 insertions(+), 39 deletions(-)
>
> diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs
> index 65572808..c3b005ce 100644
> --- a/server/src/remote_tasks/refresh_task.rs
> +++ b/server/src/remote_tasks/refresh_task.rs
> @@ -284,7 +284,7 @@ async fn fetch_tasks_from_single_node(
> match remote.ty {
> RemoteType::Pve => {
> let params = pve_api_types::ListTasks {
> - source: Some(pve_api_types::ListTasksSource::Archive),
> + source: Some(pve_api_types::ListTasksSource::All),
> since: Some(since),
> // If `limit` is not provided, we only receive 50 tasks
> limit: Some(MAX_TASKS_TO_FETCH),
> @@ -315,14 +315,7 @@ async fn fetch_tasks_from_single_node(
> .get_task_list(params)
> .await?
> .into_iter()
> - .filter_map(|task| {
> - if task.endtime.is_some() {
> - // We only care about finished tasks.
> - Some(map_pbs_task(task, remote.id.clone()))
> - } else {
> - None
> - }
> - })
> + .map(|task| map_pbs_task(task, remote.id.clone()))
> .collect();
>
> Ok(task_list)
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> index e83a351b..04e3d3ff 100644
> --- a/server/src/remote_tasks/task_cache.rs
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
>
> use proxmox_sys::fs::CreateOptions;
>
> -use pdm_api_types::{NativeUpid, RemoteUpid};
> +use pdm_api_types::RemoteUpid;
>
> /// Filename for the file containing running tasks.
> const ACTIVE_FILENAME: &str = "active";
> @@ -102,22 +102,34 @@ impl State {
> self.tracked_tasks.remove(upid);
> }
>
> - /// Update the per-node cutoff timestamp if it is higher than the current one.
> - fn update_cutoff_timestamp(&mut self, remote_id: &str, node: &str, starttime: i64) {
> + /// Update the per-node cutoff timestamp with a provided update function.
> + fn update_cutoff_timestamp<F: Fn(Option<i64>) -> i64>(
> + &mut self,
> + remote_id: &str,
> + node: &str,
> + update_fn: F,
> + ) {
> match self.remote_state.get_mut(remote_id) {
> Some(remote_state) => match remote_state.node_state.get_mut(node) {
> Some(node_state) => {
> - node_state.cutoff = node_state.cutoff.max(starttime);
> + node_state.cutoff = update_fn(Some(node_state.cutoff));
> }
> None => {
> - remote_state
> - .node_state
> - .insert(node.to_string(), NodeState { cutoff: starttime });
> + remote_state.node_state.insert(
> + node.to_string(),
> + NodeState {
> + cutoff: update_fn(None),
> + },
> + );
> }
> },
> None => {
> - let node_state =
> - HashMap::from_iter([(node.to_string(), NodeState { cutoff: starttime })]);
> + let node_state = HashMap::from_iter([(
> + node.to_string(),
> + NodeState {
> + cutoff: update_fn(None),
> + },
> + )]);
>
> self.remote_state
> .insert(remote_id.to_string(), RemoteState { node_state });
> @@ -389,7 +401,7 @@ impl WritableTaskCache {
>
> fn write_tasks_to_journal(
> &self,
> - tasks: Vec<TaskCacheItem>,
> + finished_tasks: Vec<TaskCacheItem>,
> active_tasks: &mut HashMap<RemoteUpid, TaskCacheItem>,
> node_success_map: &NodeFetchSuccessMap,
> state: &mut State,
> @@ -400,37 +412,53 @@ impl WritableTaskCache {
> .create(true)
> .open(filename)?;
>
> - for task in tasks {
> + for task in finished_tasks {
> // Remove this finished task from our set of active tasks.
> active_tasks.remove(&task.upid);
>
> - match task.upid.native_upid() {
> - Ok(NativeUpid::PveUpid(upid)) => {
> - let node = &upid.node;
> - let remote = task.upid.remote();
> -
> - if node_success_map.node_successful(remote, node) {
> - state.update_cutoff_timestamp(remote, node, task.starttime);
> - }
> - }
> - Ok(NativeUpid::PbsUpid(upid)) => {
> - let node = &upid.node;
> - let remote = task.upid.remote();
> -
> - if node_success_map.node_successful(remote, node) {
> - state.update_cutoff_timestamp(remote, node, task.starttime);
> - }
> - }
> - Err(error) => {
> - log::error!("could not parse PVE UPID - not saving to task cache: {error:#}");
> + let native_upid = match task.upid.native_upid() {
> + Ok(native_upid) => native_upid,
> + Err(err) => {
> + log::error!("could not deserialize UPID: {err:#}");
> continue;
> }
> + };
> +
> + let node = native_upid.node();
> + let remote = task.upid.remote();
> +
> + if node_success_map.node_successful(remote, node) {
> + state.update_cutoff_timestamp(remote, node, |existing| {
> + existing.unwrap_or(0).max(task.starttime)
> + });
> }
>
> serde_json::to_writer(&mut file, &task)?;
> writeln!(&file)?;
> }
>
> + // For all remaining active tasks, set the cutoff timestamp to the
> + // start time of the *oldest* running task. This is to avoid
> + // gaps in the task archive if tasks run overlappingly
> + for task in active_tasks.values() {
> + let native_upid = match task.upid.native_upid() {
> + Ok(native_upid) => native_upid,
> + Err(err) => {
> + log::error!("could not deserialize UPID: {err:#}");
> + continue;
> + }
> + };
> +
> + let node = native_upid.node();
> + let remote = task.upid.remote();
> +
> + if node_success_map.node_successful(remote, node) {
> + state.update_cutoff_timestamp(remote, node, |existing| {
> + existing.unwrap_or(i64::MAX).min(task.starttime)
> + });
> + }
> + }
> +
> file.sync_all()?;
>
> Ok(())
> @@ -1482,4 +1510,38 @@ mod tests {
>
> Ok(())
> }
> +
> + #[test]
> + fn cutoff_is_oldest_running_task() {
> + let (_tmp_dir, cache) = make_cache().unwrap();
> + let cache = cache.write().unwrap();
> +
> + cache.init(500).unwrap();
> +
> + // Establish the baseline cutoff with a single, finished task
> + add_tasks(&cache, vec![task(900, Some(910))]).unwrap();
> + assert_eq!(get_cutoff(&cache), 900);
> +
> + // Add two running tasks
> + add_tasks(&cache, vec![task(1000, None), task(1100, None)]).unwrap();
> + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 2);
> + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 1);
> +
> + // Two new *running* tasks, cutoff should remain the same
> + assert_eq!(get_cutoff(&cache), 900);
> +
> + add_tasks(&cache, vec![task(1000, None), task(1100, Some(1150))]).unwrap();
> + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 1);
> + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 2);
> +
> + // The cutoff should stick to the *oldest* running task
> + assert_eq!(get_cutoff(&cache), 1000);
> +
> + add_tasks(&cache, vec![task(1000, Some(1200)), task(1100, Some(1150))]).unwrap();
> + // If there is no running task anymore, the youngest finished tasks' starttime determines
> + // the cutoff
> + assert_eq!(get_cutoff(&cache), 1100);
> + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 0);
> + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 3);
> + }
> }
next prev parent reply other threads:[~2026-06-02 14:02 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-29 13:39 [PATCH datacenter-manager 0/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 1/6] pdm-api-types: add NativeUpid::node() convenience getter Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 2/6] task cache: tests: allow to provide an explicit end time in 'task' helper Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 3/6] task cache: tests: add get_cutoff helper Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 4/6] task cache: tests: add 'make_cache' convenience helper Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 5/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp Lukas Wagner
2026-06-02 14:01 ` Dominik Csapak [this message]
2026-05-29 13:39 ` [PATCH datacenter-manager 6/6] task cache: poll known active tasks every 30 seconds Lukas Wagner
2026-06-02 14:01 ` [PATCH datacenter-manager 0/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp Dominik Csapak
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=5f7e48a5-bf64-4b60-b0bb-1b7e7cb2fddf@proxmox.com \
--to=d.csapak@proxmox.com \
--cc=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox