* [pdm-devel] [PATCH datacenter-manager 1/2] remote tasks: poll foreign, non-tracked active tasks to avoid them getting stuck
2025-11-28 14:05 [pdm-devel] [PATCH datacenter-manager 0/2] remote tasks: avoid stuck running tasks Lukas Wagner
@ 2025-11-28 14:05 ` Lukas Wagner
2025-11-28 14:05 ` [pdm-devel] [PATCH datacenter-manager 2/2] remote tasks: make sure to update the task cache if there were errors when polling Lukas Wagner
2025-11-30 1:17 ` [pdm-devel] applied: [PATCH datacenter-manager 0/2] remote tasks: avoid stuck running tasks Thomas Lamprecht
2 siblings, 0 replies; 4+ messages in thread
From: Lukas Wagner @ 2025-11-28 14:05 UTC (permalink / raw)
To: pdm-devel
If tasks which are currently active but not tracked (not started by PDM)
are added to the task cache, then under some special circumstances, they
can get stuck in the 'active' state. This happened mostly due to a bug
that was already fixed [1]. As a safeguard and to fix existing stuck
tasks, we now poll active tasks with a long interval (10min) and check
if they are finished.
This polling is done as part of the regular poll loop and does not
result in additional API calls if there are no active, non-tracked
tasks.
[1] fixed in 6247ff3c7
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
.../tasks/remote_tasks.rs | 70 +++++++++++++++++--
1 file changed, 63 insertions(+), 7 deletions(-)
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index d3c8395e..c8d50183 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -20,7 +20,7 @@ use server::{
pbs_client,
remote_tasks::{
self,
- task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
+ task_cache::{GetTasks, NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER,
},
task_utils,
@@ -34,6 +34,14 @@ const POLL_INTERVAL: Duration = Duration::from_secs(10);
/// task for this remote).
const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600);
+/// Interval in seconds at which we poll active tasks. This only really affects 'foreign' (as in,
+/// not started by PDM) tasks. Tasks which were started by PDM are always 'tracked' and therefore
+/// polled at the interval set in [`POLL_INTERVAL`].
+// NOTE: Since we at the moment never query active tasks from remotes, this is merely a safeguard
+// to clear stuck active tasks from a previous bug. If we at some point query active tasks, we
+// might lower this interval.
+const POLL_ACTIVE_INTERVAL: Duration = Duration::from_secs(600);
+
/// Interval at which to check for task cache rotation.
const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
@@ -63,6 +71,9 @@ struct TaskState {
last_fetch: Instant,
/// Time at which we last applied the journal.
last_journal_apply: Instant,
+ /// Time at which we polled active tasks. This is done to ensure that
+ /// active tasks are never stuck in the 'active' state
+ last_active_poll: Instant,
}
impl TaskState {
@@ -73,6 +84,7 @@ impl TaskState {
last_rotate_check: now - CHECK_ROTATE_INTERVAL,
last_fetch: now - TASK_FETCH_INTERVAL,
last_journal_apply: now - APPLY_JOURNAL_INTERVAL,
+ last_active_poll: now - POLL_ACTIVE_INTERVAL,
}
}
@@ -91,6 +103,11 @@ impl TaskState {
self.last_journal_apply = Instant::now();
}
+ /// Reset the journal apply timestamp.
+ fn reset_active_poll(&mut self) {
+ self.last_active_poll = Instant::now();
+ }
+
/// Should we check for archive rotation?
fn is_due_for_rotate_check(&self) -> bool {
Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL
@@ -105,6 +122,11 @@ impl TaskState {
fn is_due_for_journal_apply(&self) -> bool {
Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL
}
+
+ /// Should we poll active tasks?
+ fn is_due_for_active_poll(&self) -> bool {
+ Instant::now().duration_since(self.last_active_poll) > POLL_ACTIVE_INTERVAL
+ }
}
/// Start the remote task fetching task
@@ -171,12 +193,32 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
let cache_state = cache.read_state();
- let poll_results = poll_tracked_tasks(
- &remote_config,
- cache_state.tracked_tasks(),
- Arc::clone(&total_connections_semaphore),
- )
- .await?;
+
+ let poll_results = if task_state.is_due_for_active_poll() {
+ let mut tasks_to_poll: HashSet<RemoteUpid> =
+ HashSet::from_iter(cache_state.tracked_tasks().cloned());
+
+ let active_tasks = get_active_tasks(cache.clone()).await?;
+ tasks_to_poll.extend(active_tasks.into_iter());
+
+ let poll_results = poll_tracked_tasks(
+ &remote_config,
+ tasks_to_poll.iter(),
+ Arc::clone(&total_connections_semaphore),
+ )
+ .await?;
+
+ task_state.reset_active_poll();
+
+ poll_results
+ } else {
+ poll_tracked_tasks(
+ &remote_config,
+ cache_state.tracked_tasks(),
+ Arc::clone(&total_connections_semaphore),
+ )
+ .await?
+ };
// Get a list of remotes that we should poll in this cycle.
let remotes = if task_state.is_due_for_fetch() {
@@ -357,6 +399,20 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
}
+/// Get a list of active tasks.
+async fn get_active_tasks(cache: TaskCache) -> Result<Vec<RemoteUpid>, Error> {
+ Ok(tokio::task::spawn_blocking(move || {
+ let tasks: Vec<RemoteUpid> = cache
+ .read()?
+ .get_tasks(GetTasks::Active)?
+ .map(|t| t.upid)
+ .collect();
+
+ Ok::<Vec<RemoteUpid>, Error>(tasks)
+ })
+ .await??)
+}
+
#[derive(PartialEq, Debug)]
/// Outcome from polling a tracked task.
enum PollResult {
--
2.47.3
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 4+ messages in thread