From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 151AE1FF141 for ; Tue, 02 Jun 2026 16:02:27 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id CA9C91608B; Tue, 2 Jun 2026 16:02:26 +0200 (CEST) Message-ID: <5f7e48a5-bf64-4b60-b0bb-1b7e7cb2fddf@proxmox.com> Date: Tue, 2 Jun 2026 16:01:50 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Beta Subject: Re: [PATCH datacenter-manager 5/6] fix #7639: task cache: consider running tasks when updating the cutoff timestamp To: Lukas Wagner , pdm-devel@lists.proxmox.com References: <20260529133951.326103-1-l.wagner@proxmox.com> <20260529133951.326103-6-l.wagner@proxmox.com> Content-Language: en-US From: Dominik Csapak In-Reply-To: <20260529133951.326103-6-l.wagner@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 7bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1780408875429 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.049 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 5HLW3X4GES42MCTZUEW6K3N4W6S5JKC5 X-Message-ID-Hash: 5HLW3X4GES42MCTZUEW6K3N4W6S5JKC5 X-MailFrom: d.csapak@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: tiny nit here: the patch would read slightly better if the upid node refactoring was already used. e.g. those changes could be folded into the first patch On 5/29/26 3:39 PM, Lukas Wagner wrote: > At the moment, PDM only polls finished tasks from remote nodes. > > Considering two tasks: > A: starts at 100, ends at 300 > B: starts ad 150, ends at 200 > > If PDM polls the node at t = 250, it would see that B has finished and > update the cutoff timestamp for the next poll to 150. However, since A > has a starttime of 100, and the cutoff determines the minimum starttime > of tasks to retrieve from the remote via the API, PDM would never fetch > task A, leading to gaps in the task archive. > > The solution is to fetch all tasks, running and finished, and then > update the cutoff with the oldest running active task in mind. > > A side effect is that we now also return foreign (as in, not started by > PDM) running tasks from the API, which makes them appear in the UI as > well. This was always intended as a future extension anyways, so this > should be okay. > > Initially reported in in the community forum [0]. > > [0]: https://forum.proxmox.com/threads/180317/ > > Signed-off-by: Lukas Wagner > --- > server/src/remote_tasks/refresh_task.rs | 11 +-- > server/src/remote_tasks/task_cache.rs | 122 ++++++++++++++++++------ > 2 files changed, 94 insertions(+), 39 deletions(-) > > diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs > index 65572808..c3b005ce 100644 > --- a/server/src/remote_tasks/refresh_task.rs > +++ b/server/src/remote_tasks/refresh_task.rs > @@ -284,7 +284,7 @@ async fn fetch_tasks_from_single_node( > match remote.ty { > RemoteType::Pve => { > let params = pve_api_types::ListTasks { > - source: Some(pve_api_types::ListTasksSource::Archive), > + source: Some(pve_api_types::ListTasksSource::All), > since: Some(since), > // If `limit` is not provided, we only receive 50 tasks > limit: Some(MAX_TASKS_TO_FETCH), > @@ -315,14 +315,7 @@ async fn fetch_tasks_from_single_node( > .get_task_list(params) > .await? > .into_iter() > - .filter_map(|task| { > - if task.endtime.is_some() { > - // We only care about finished tasks. > - Some(map_pbs_task(task, remote.id.clone())) > - } else { > - None > - } > - }) > + .map(|task| map_pbs_task(task, remote.id.clone())) > .collect(); > > Ok(task_list) > diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs > index e83a351b..04e3d3ff 100644 > --- a/server/src/remote_tasks/task_cache.rs > +++ b/server/src/remote_tasks/task_cache.rs > @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize}; > > use proxmox_sys::fs::CreateOptions; > > -use pdm_api_types::{NativeUpid, RemoteUpid}; > +use pdm_api_types::RemoteUpid; > > /// Filename for the file containing running tasks. > const ACTIVE_FILENAME: &str = "active"; > @@ -102,22 +102,34 @@ impl State { > self.tracked_tasks.remove(upid); > } > > - /// Update the per-node cutoff timestamp if it is higher than the current one. > - fn update_cutoff_timestamp(&mut self, remote_id: &str, node: &str, starttime: i64) { > + /// Update the per-node cutoff timestamp with a provided update function. > + fn update_cutoff_timestamp) -> i64>( > + &mut self, > + remote_id: &str, > + node: &str, > + update_fn: F, > + ) { > match self.remote_state.get_mut(remote_id) { > Some(remote_state) => match remote_state.node_state.get_mut(node) { > Some(node_state) => { > - node_state.cutoff = node_state.cutoff.max(starttime); > + node_state.cutoff = update_fn(Some(node_state.cutoff)); > } > None => { > - remote_state > - .node_state > - .insert(node.to_string(), NodeState { cutoff: starttime }); > + remote_state.node_state.insert( > + node.to_string(), > + NodeState { > + cutoff: update_fn(None), > + }, > + ); > } > }, > None => { > - let node_state = > - HashMap::from_iter([(node.to_string(), NodeState { cutoff: starttime })]); > + let node_state = HashMap::from_iter([( > + node.to_string(), > + NodeState { > + cutoff: update_fn(None), > + }, > + )]); > > self.remote_state > .insert(remote_id.to_string(), RemoteState { node_state }); > @@ -389,7 +401,7 @@ impl WritableTaskCache { > > fn write_tasks_to_journal( > &self, > - tasks: Vec, > + finished_tasks: Vec, > active_tasks: &mut HashMap, > node_success_map: &NodeFetchSuccessMap, > state: &mut State, > @@ -400,37 +412,53 @@ impl WritableTaskCache { > .create(true) > .open(filename)?; > > - for task in tasks { > + for task in finished_tasks { > // Remove this finished task from our set of active tasks. > active_tasks.remove(&task.upid); > > - match task.upid.native_upid() { > - Ok(NativeUpid::PveUpid(upid)) => { > - let node = &upid.node; > - let remote = task.upid.remote(); > - > - if node_success_map.node_successful(remote, node) { > - state.update_cutoff_timestamp(remote, node, task.starttime); > - } > - } > - Ok(NativeUpid::PbsUpid(upid)) => { > - let node = &upid.node; > - let remote = task.upid.remote(); > - > - if node_success_map.node_successful(remote, node) { > - state.update_cutoff_timestamp(remote, node, task.starttime); > - } > - } > - Err(error) => { > - log::error!("could not parse PVE UPID - not saving to task cache: {error:#}"); > + let native_upid = match task.upid.native_upid() { > + Ok(native_upid) => native_upid, > + Err(err) => { > + log::error!("could not deserialize UPID: {err:#}"); > continue; > } > + }; > + > + let node = native_upid.node(); > + let remote = task.upid.remote(); > + > + if node_success_map.node_successful(remote, node) { > + state.update_cutoff_timestamp(remote, node, |existing| { > + existing.unwrap_or(0).max(task.starttime) > + }); > } > > serde_json::to_writer(&mut file, &task)?; > writeln!(&file)?; > } > > + // For all remaining active tasks, set the cutoff timestamp to the > + // start time of the *oldest* running task. This is to avoid > + // gaps in the task archive if tasks run overlappingly > + for task in active_tasks.values() { > + let native_upid = match task.upid.native_upid() { > + Ok(native_upid) => native_upid, > + Err(err) => { > + log::error!("could not deserialize UPID: {err:#}"); > + continue; > + } > + }; > + > + let node = native_upid.node(); > + let remote = task.upid.remote(); > + > + if node_success_map.node_successful(remote, node) { > + state.update_cutoff_timestamp(remote, node, |existing| { > + existing.unwrap_or(i64::MAX).min(task.starttime) > + }); > + } > + } > + > file.sync_all()?; > > Ok(()) > @@ -1482,4 +1510,38 @@ mod tests { > > Ok(()) > } > + > + #[test] > + fn cutoff_is_oldest_running_task() { > + let (_tmp_dir, cache) = make_cache().unwrap(); > + let cache = cache.write().unwrap(); > + > + cache.init(500).unwrap(); > + > + // Establish the baseline cutoff with a single, finished task > + add_tasks(&cache, vec![task(900, Some(910))]).unwrap(); > + assert_eq!(get_cutoff(&cache), 900); > + > + // Add two running tasks > + add_tasks(&cache, vec![task(1000, None), task(1100, None)]).unwrap(); > + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 2); > + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 1); > + > + // Two new *running* tasks, cutoff should remain the same > + assert_eq!(get_cutoff(&cache), 900); > + > + add_tasks(&cache, vec![task(1000, None), task(1100, Some(1150))]).unwrap(); > + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 1); > + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 2); > + > + // The cutoff should stick to the *oldest* running task > + assert_eq!(get_cutoff(&cache), 1000); > + > + add_tasks(&cache, vec![task(1000, Some(1200)), task(1100, Some(1150))]).unwrap(); > + // If there is no running task anymore, the youngest finished tasks' starttime determines > + // the cutoff > + assert_eq!(get_cutoff(&cache), 1100); > + assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 0); > + assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 3); > + } > }