From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 4FFD61FF15C for ; Fri, 28 Nov 2025 15:05:39 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 562521C0D1; Fri, 28 Nov 2025 15:05:59 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Date: Fri, 28 Nov 2025 15:05:21 +0100 Message-ID: <20251128140522.311838-2-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20251128140522.311838-1-l.wagner@proxmox.com> References: <20251128140522.311838-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1764338686137 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.032 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [PATCH datacenter-manager 1/2] remote tasks: poll foreign, non-tracked active tasks to avoid them getting stuck X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" If tasks which are currently active but not tracked (not started by PDM) are added to the task cache, then under some special circumstances, they can get stuck in the 'active' state. This happened mostly due to a bug that was already fixed [1]. As a safeguard and to fix existing stuck tasks, we now poll active tasks with a long interval (10min) and check if they are finished. This polling is done as part of the regular poll loop and does not result in additional API calls if there are no active, non-tracked tasks. [1] fixed in 6247ff3c7 Signed-off-by: Lukas Wagner --- .../tasks/remote_tasks.rs | 70 +++++++++++++++++-- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs index d3c8395e..c8d50183 100644 --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs @@ -20,7 +20,7 @@ use server::{ pbs_client, remote_tasks::{ self, - task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem}, + task_cache::{GetTasks, NodeFetchSuccessMap, State, TaskCache, TaskCacheItem}, KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER, }, task_utils, @@ -34,6 +34,14 @@ const POLL_INTERVAL: Duration = Duration::from_secs(10); /// task for this remote). const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600); +/// Interval in seconds at which we poll active tasks. This only really affects 'foreign' (as in, +/// not started by PDM) tasks. Tasks which were started by PDM are always 'tracked' and therefore +/// polled at the interval set in [`POLL_INTERVAL`]. +// NOTE: Since we at the moment never query active tasks from remotes, this is merely a safeguard +// to clear stuck active tasks from a previous bug. If we at some point query active tasks, we +// might lower this interval. +const POLL_ACTIVE_INTERVAL: Duration = Duration::from_secs(600); + /// Interval at which to check for task cache rotation. const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600); @@ -63,6 +71,9 @@ struct TaskState { last_fetch: Instant, /// Time at which we last applied the journal. last_journal_apply: Instant, + /// Time at which we polled active tasks. This is done to ensure that + /// active tasks are never stuck in the 'active' state + last_active_poll: Instant, } impl TaskState { @@ -73,6 +84,7 @@ impl TaskState { last_rotate_check: now - CHECK_ROTATE_INTERVAL, last_fetch: now - TASK_FETCH_INTERVAL, last_journal_apply: now - APPLY_JOURNAL_INTERVAL, + last_active_poll: now - POLL_ACTIVE_INTERVAL, } } @@ -91,6 +103,11 @@ impl TaskState { self.last_journal_apply = Instant::now(); } + /// Reset the journal apply timestamp. + fn reset_active_poll(&mut self) { + self.last_active_poll = Instant::now(); + } + /// Should we check for archive rotation? fn is_due_for_rotate_check(&self) -> bool { Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL @@ -105,6 +122,11 @@ impl TaskState { fn is_due_for_journal_apply(&self) -> bool { Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL } + + /// Should we poll active tasks? + fn is_due_for_active_poll(&self) -> bool { + Instant::now().duration_since(self.last_active_poll) > POLL_ACTIVE_INTERVAL + } } /// Start the remote task fetching task @@ -171,12 +193,32 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> { let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS)); let cache_state = cache.read_state(); - let poll_results = poll_tracked_tasks( - &remote_config, - cache_state.tracked_tasks(), - Arc::clone(&total_connections_semaphore), - ) - .await?; + + let poll_results = if task_state.is_due_for_active_poll() { + let mut tasks_to_poll: HashSet = + HashSet::from_iter(cache_state.tracked_tasks().cloned()); + + let active_tasks = get_active_tasks(cache.clone()).await?; + tasks_to_poll.extend(active_tasks.into_iter()); + + let poll_results = poll_tracked_tasks( + &remote_config, + tasks_to_poll.iter(), + Arc::clone(&total_connections_semaphore), + ) + .await?; + + task_state.reset_active_poll(); + + poll_results + } else { + poll_tracked_tasks( + &remote_config, + cache_state.tracked_tasks(), + Arc::clone(&total_connections_semaphore), + ) + .await? + }; // Get a list of remotes that we should poll in this cycle. let remotes = if task_state.is_due_for_fetch() { @@ -357,6 +399,20 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> { tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await? } +/// Get a list of active tasks. +async fn get_active_tasks(cache: TaskCache) -> Result, Error> { + Ok(tokio::task::spawn_blocking(move || { + let tasks: Vec = cache + .read()? + .get_tasks(GetTasks::Active)? + .map(|t| t.upid) + .collect(); + + Ok::, Error>(tasks) + }) + .await??) +} + #[derive(PartialEq, Debug)] /// Outcome from polling a tracked task. enum PollResult { -- 2.47.3 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel