From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id C9C0A1FF14C for ; Fri, 29 May 2026 15:40:08 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 06AE0C56F; Fri, 29 May 2026 15:40:01 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 5/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp Date: Fri, 29 May 2026 15:39:50 +0200 Message-ID: <20260529133951.326103-6-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260529133951.326103-1-l.wagner@proxmox.com> References: <20260529133951.326103-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1780061964841 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.053 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: ZUMAAUGFYZFV5MTBRP3UPXX7R5PCPLND X-Message-ID-Hash: ZUMAAUGFYZFV5MTBRP3UPXX7R5PCPLND X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: At the moment, PDM only polls finished tasks from remote nodes. Considering two tasks: A: starts at 100, ends at 300 B: starts ad 150, ends at 200 If PDM polls the node at t = 250, it would see that B has finished and update the cutoff timestamp for the next poll to 150. However, since A has a starttime of 100, and the cutoff determines the minimum starttime of tasks to retrieve from the remote via the API, PDM would never fetch task A, leading to gaps in the task archive. The solution is to fetch all tasks, running and finished, and then update the cutoff with the oldest running active task in mind. A side effect is that we now also return foreign (as in, not started by PDM) running tasks from the API, which makes them appear in the UI as well. This was always intended as a future extension anyways, so this should be okay. Initially reported in in the community forum [0]. [0]: https://forum.proxmox.com/threads/180317/ Signed-off-by: Lukas Wagner --- server/src/remote_tasks/refresh_task.rs | 11 +-- server/src/remote_tasks/task_cache.rs | 122 ++++++++++++++++++------ 2 files changed, 94 insertions(+), 39 deletions(-) diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs index 65572808..c3b005ce 100644 --- a/server/src/remote_tasks/refresh_task.rs +++ b/server/src/remote_tasks/refresh_task.rs @@ -284,7 +284,7 @@ async fn fetch_tasks_from_single_node( match remote.ty { RemoteType::Pve => { let params = pve_api_types::ListTasks { - source: Some(pve_api_types::ListTasksSource::Archive), + source: Some(pve_api_types::ListTasksSource::All), since: Some(since), // If `limit` is not provided, we only receive 50 tasks limit: Some(MAX_TASKS_TO_FETCH), @@ -315,14 +315,7 @@ async fn fetch_tasks_from_single_node( .get_task_list(params) .await? .into_iter() - .filter_map(|task| { - if task.endtime.is_some() { - // We only care about finished tasks. - Some(map_pbs_task(task, remote.id.clone())) - } else { - None - } - }) + .map(|task| map_pbs_task(task, remote.id.clone())) .collect(); Ok(task_list) diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs index e83a351b..04e3d3ff 100644 --- a/server/src/remote_tasks/task_cache.rs +++ b/server/src/remote_tasks/task_cache.rs @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize}; use proxmox_sys::fs::CreateOptions; -use pdm_api_types::{NativeUpid, RemoteUpid}; +use pdm_api_types::RemoteUpid; /// Filename for the file containing running tasks. const ACTIVE_FILENAME: &str = "active"; @@ -102,22 +102,34 @@ impl State { self.tracked_tasks.remove(upid); } - /// Update the per-node cutoff timestamp if it is higher than the current one. - fn update_cutoff_timestamp(&mut self, remote_id: &str, node: &str, starttime: i64) { + /// Update the per-node cutoff timestamp with a provided update function. + fn update_cutoff_timestamp) -> i64>( + &mut self, + remote_id: &str, + node: &str, + update_fn: F, + ) { match self.remote_state.get_mut(remote_id) { Some(remote_state) => match remote_state.node_state.get_mut(node) { Some(node_state) => { - node_state.cutoff = node_state.cutoff.max(starttime); + node_state.cutoff = update_fn(Some(node_state.cutoff)); } None => { - remote_state - .node_state - .insert(node.to_string(), NodeState { cutoff: starttime }); + remote_state.node_state.insert( + node.to_string(), + NodeState { + cutoff: update_fn(None), + }, + ); } }, None => { - let node_state = - HashMap::from_iter([(node.to_string(), NodeState { cutoff: starttime })]); + let node_state = HashMap::from_iter([( + node.to_string(), + NodeState { + cutoff: update_fn(None), + }, + )]); self.remote_state .insert(remote_id.to_string(), RemoteState { node_state }); @@ -389,7 +401,7 @@ impl WritableTaskCache { fn write_tasks_to_journal( &self, - tasks: Vec, + finished_tasks: Vec, active_tasks: &mut HashMap, node_success_map: &NodeFetchSuccessMap, state: &mut State, @@ -400,37 +412,53 @@ impl WritableTaskCache { .create(true) .open(filename)?; - for task in tasks { + for task in finished_tasks { // Remove this finished task from our set of active tasks. active_tasks.remove(&task.upid); - match task.upid.native_upid() { - Ok(NativeUpid::PveUpid(upid)) => { - let node = &upid.node; - let remote = task.upid.remote(); - - if node_success_map.node_successful(remote, node) { - state.update_cutoff_timestamp(remote, node, task.starttime); - } - } - Ok(NativeUpid::PbsUpid(upid)) => { - let node = &upid.node; - let remote = task.upid.remote(); - - if node_success_map.node_successful(remote, node) { - state.update_cutoff_timestamp(remote, node, task.starttime); - } - } - Err(error) => { - log::error!("could not parse PVE UPID - not saving to task cache: {error:#}"); + let native_upid = match task.upid.native_upid() { + Ok(native_upid) => native_upid, + Err(err) => { + log::error!("could not deserialize UPID: {err:#}"); continue; } + }; + + let node = native_upid.node(); + let remote = task.upid.remote(); + + if node_success_map.node_successful(remote, node) { + state.update_cutoff_timestamp(remote, node, |existing| { + existing.unwrap_or(0).max(task.starttime) + }); } serde_json::to_writer(&mut file, &task)?; writeln!(&file)?; } + // For all remaining active tasks, set the cutoff timestamp to the + // start time of the *oldest* running task. This is to avoid + // gaps in the task archive if tasks run overlappingly + for task in active_tasks.values() { + let native_upid = match task.upid.native_upid() { + Ok(native_upid) => native_upid, + Err(err) => { + log::error!("could not deserialize UPID: {err:#}"); + continue; + } + }; + + let node = native_upid.node(); + let remote = task.upid.remote(); + + if node_success_map.node_successful(remote, node) { + state.update_cutoff_timestamp(remote, node, |existing| { + existing.unwrap_or(i64::MAX).min(task.starttime) + }); + } + } + file.sync_all()?; Ok(()) @@ -1482,4 +1510,38 @@ mod tests { Ok(()) } + + #[test] + fn cutoff_is_oldest_running_task() { + let (_tmp_dir, cache) = make_cache().unwrap(); + let cache = cache.write().unwrap(); + + cache.init(500).unwrap(); + + // Establish the baseline cutoff with a single, finished task + add_tasks(&cache, vec![task(900, Some(910))]).unwrap(); + assert_eq!(get_cutoff(&cache), 900); + + // Add two running tasks + add_tasks(&cache, vec![task(1000, None), task(1100, None)]).unwrap(); + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 2); + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 1); + + // Two new *running* tasks, cutoff should remain the same + assert_eq!(get_cutoff(&cache), 900); + + add_tasks(&cache, vec![task(1000, None), task(1100, Some(1150))]).unwrap(); + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 1); + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 2); + + // The cutoff should stick to the *oldest* running task + assert_eq!(get_cutoff(&cache), 1000); + + add_tasks(&cache, vec![task(1000, Some(1200)), task(1100, Some(1150))]).unwrap(); + // If there is no running task anymore, the youngest finished tasks' starttime determines + // the cutoff + assert_eq!(get_cutoff(&cache), 1100); + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 0); + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 3); + } } -- 2.47.3