From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 5/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp
Date: Fri, 29 May 2026 15:39:50 +0200 [thread overview]
Message-ID: <20260529133951.326103-6-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260529133951.326103-1-l.wagner@proxmox.com>
At the moment, PDM only polls finished tasks from remote nodes.
Considering two tasks:
A: starts at 100, ends at 300
B: starts ad 150, ends at 200
If PDM polls the node at t = 250, it would see that B has finished and
update the cutoff timestamp for the next poll to 150. However, since A
has a starttime of 100, and the cutoff determines the minimum starttime
of tasks to retrieve from the remote via the API, PDM would never fetch
task A, leading to gaps in the task archive.
The solution is to fetch all tasks, running and finished, and then
update the cutoff with the oldest running active task in mind.
A side effect is that we now also return foreign (as in, not started by
PDM) running tasks from the API, which makes them appear in the UI as
well. This was always intended as a future extension anyways, so this
should be okay.
Initially reported in in the community forum [0].
[0]: https://forum.proxmox.com/threads/180317/
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/remote_tasks/refresh_task.rs | 11 +--
server/src/remote_tasks/task_cache.rs | 122 ++++++++++++++++++------
2 files changed, 94 insertions(+), 39 deletions(-)
diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs
index 65572808..c3b005ce 100644
--- a/server/src/remote_tasks/refresh_task.rs
+++ b/server/src/remote_tasks/refresh_task.rs
@@ -284,7 +284,7 @@ async fn fetch_tasks_from_single_node(
match remote.ty {
RemoteType::Pve => {
let params = pve_api_types::ListTasks {
- source: Some(pve_api_types::ListTasksSource::Archive),
+ source: Some(pve_api_types::ListTasksSource::All),
since: Some(since),
// If `limit` is not provided, we only receive 50 tasks
limit: Some(MAX_TASKS_TO_FETCH),
@@ -315,14 +315,7 @@ async fn fetch_tasks_from_single_node(
.get_task_list(params)
.await?
.into_iter()
- .filter_map(|task| {
- if task.endtime.is_some() {
- // We only care about finished tasks.
- Some(map_pbs_task(task, remote.id.clone()))
- } else {
- None
- }
- })
+ .map(|task| map_pbs_task(task, remote.id.clone()))
.collect();
Ok(task_list)
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index e83a351b..04e3d3ff 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
use proxmox_sys::fs::CreateOptions;
-use pdm_api_types::{NativeUpid, RemoteUpid};
+use pdm_api_types::RemoteUpid;
/// Filename for the file containing running tasks.
const ACTIVE_FILENAME: &str = "active";
@@ -102,22 +102,34 @@ impl State {
self.tracked_tasks.remove(upid);
}
- /// Update the per-node cutoff timestamp if it is higher than the current one.
- fn update_cutoff_timestamp(&mut self, remote_id: &str, node: &str, starttime: i64) {
+ /// Update the per-node cutoff timestamp with a provided update function.
+ fn update_cutoff_timestamp<F: Fn(Option<i64>) -> i64>(
+ &mut self,
+ remote_id: &str,
+ node: &str,
+ update_fn: F,
+ ) {
match self.remote_state.get_mut(remote_id) {
Some(remote_state) => match remote_state.node_state.get_mut(node) {
Some(node_state) => {
- node_state.cutoff = node_state.cutoff.max(starttime);
+ node_state.cutoff = update_fn(Some(node_state.cutoff));
}
None => {
- remote_state
- .node_state
- .insert(node.to_string(), NodeState { cutoff: starttime });
+ remote_state.node_state.insert(
+ node.to_string(),
+ NodeState {
+ cutoff: update_fn(None),
+ },
+ );
}
},
None => {
- let node_state =
- HashMap::from_iter([(node.to_string(), NodeState { cutoff: starttime })]);
+ let node_state = HashMap::from_iter([(
+ node.to_string(),
+ NodeState {
+ cutoff: update_fn(None),
+ },
+ )]);
self.remote_state
.insert(remote_id.to_string(), RemoteState { node_state });
@@ -389,7 +401,7 @@ impl WritableTaskCache {
fn write_tasks_to_journal(
&self,
- tasks: Vec<TaskCacheItem>,
+ finished_tasks: Vec<TaskCacheItem>,
active_tasks: &mut HashMap<RemoteUpid, TaskCacheItem>,
node_success_map: &NodeFetchSuccessMap,
state: &mut State,
@@ -400,37 +412,53 @@ impl WritableTaskCache {
.create(true)
.open(filename)?;
- for task in tasks {
+ for task in finished_tasks {
// Remove this finished task from our set of active tasks.
active_tasks.remove(&task.upid);
- match task.upid.native_upid() {
- Ok(NativeUpid::PveUpid(upid)) => {
- let node = &upid.node;
- let remote = task.upid.remote();
-
- if node_success_map.node_successful(remote, node) {
- state.update_cutoff_timestamp(remote, node, task.starttime);
- }
- }
- Ok(NativeUpid::PbsUpid(upid)) => {
- let node = &upid.node;
- let remote = task.upid.remote();
-
- if node_success_map.node_successful(remote, node) {
- state.update_cutoff_timestamp(remote, node, task.starttime);
- }
- }
- Err(error) => {
- log::error!("could not parse PVE UPID - not saving to task cache: {error:#}");
+ let native_upid = match task.upid.native_upid() {
+ Ok(native_upid) => native_upid,
+ Err(err) => {
+ log::error!("could not deserialize UPID: {err:#}");
continue;
}
+ };
+
+ let node = native_upid.node();
+ let remote = task.upid.remote();
+
+ if node_success_map.node_successful(remote, node) {
+ state.update_cutoff_timestamp(remote, node, |existing| {
+ existing.unwrap_or(0).max(task.starttime)
+ });
}
serde_json::to_writer(&mut file, &task)?;
writeln!(&file)?;
}
+ // For all remaining active tasks, set the cutoff timestamp to the
+ // start time of the *oldest* running task. This is to avoid
+ // gaps in the task archive if tasks run overlappingly
+ for task in active_tasks.values() {
+ let native_upid = match task.upid.native_upid() {
+ Ok(native_upid) => native_upid,
+ Err(err) => {
+ log::error!("could not deserialize UPID: {err:#}");
+ continue;
+ }
+ };
+
+ let node = native_upid.node();
+ let remote = task.upid.remote();
+
+ if node_success_map.node_successful(remote, node) {
+ state.update_cutoff_timestamp(remote, node, |existing| {
+ existing.unwrap_or(i64::MAX).min(task.starttime)
+ });
+ }
+ }
+
file.sync_all()?;
Ok(())
@@ -1482,4 +1510,38 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn cutoff_is_oldest_running_task() {
+ let (_tmp_dir, cache) = make_cache().unwrap();
+ let cache = cache.write().unwrap();
+
+ cache.init(500).unwrap();
+
+ // Establish the baseline cutoff with a single, finished task
+ add_tasks(&cache, vec![task(900, Some(910))]).unwrap();
+ assert_eq!(get_cutoff(&cache), 900);
+
+ // Add two running tasks
+ add_tasks(&cache, vec![task(1000, None), task(1100, None)]).unwrap();
+ assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 2);
+ assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 1);
+
+ // Two new *running* tasks, cutoff should remain the same
+ assert_eq!(get_cutoff(&cache), 900);
+
+ add_tasks(&cache, vec![task(1000, None), task(1100, Some(1150))]).unwrap();
+ assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 1);
+ assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 2);
+
+ // The cutoff should stick to the *oldest* running task
+ assert_eq!(get_cutoff(&cache), 1000);
+
+ add_tasks(&cache, vec![task(1000, Some(1200)), task(1100, Some(1150))]).unwrap();
+ // If there is no running task anymore, the youngest finished tasks' starttime determines
+ // the cutoff
+ assert_eq!(get_cutoff(&cache), 1100);
+ assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 0);
+ assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 3);
+ }
}
--
2.47.3
next prev parent reply other threads:[~2026-05-29 13:40 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-29 13:39 [PATCH datacenter-manager 0/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 1/6] pdm-api-types: add NativeUpid::node() convenience getter Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 2/6] task cache: tests: allow to provide an explicit end time in 'task' helper Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 3/6] task cache: tests: add get_cutoff helper Lukas Wagner
2026-05-29 13:39 ` [PATCH datacenter-manager 4/6] task cache: tests: add 'make_cache' convenience helper Lukas Wagner
2026-05-29 13:39 ` Lukas Wagner [this message]
2026-05-29 13:39 ` [PATCH datacenter-manager 6/6] task cache: poll known active tasks every 30 seconds Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260529133951.326103-6-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.