From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 41D191FF164 for <inbox@lore.proxmox.com>; Fri, 14 Mar 2025 15:13:53 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id DC4FF5686; Fri, 14 Mar 2025 15:13:44 +0100 (CET) From: Lukas Wagner <l.wagner@proxmox.com> To: pdm-devel@lists.proxmox.com Date: Fri, 14 Mar 2025 15:12:22 +0100 Message-Id: <20250314141225.240768-6-l.wagner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250314141225.240768-1-l.wagner@proxmox.com> References: <20250314141225.240768-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.140 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_1 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_3 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> This commits changes the remote task module as follows: - Add a new background task for regular polling of task data Instead of triggering fetching of task data from the `get_tasks` function, which is usually called by an API handler, we move the fetching to a new background task. The task fetches the latest tasks from all remotes and stores them in the task cache in regular intervals (10 minutes). The `get_tasks` function itself only reads from the cache. The main rationale for this change is that for large setups, fetching tasks from all remotes can take a *long* time (e.g. hundreds of remotes, each with a >100ms connection - adds up to minutes quickly). If we do this from within `get_tasks`, the API handler calling the function is also blocked for the entire time. The `get_tasks` API is called every couple of seconds by the UI the get a list of running remote tasks, so this *must* be quick. - Tracked tasks are also polled in the same background task, but with a short polling delay (10 seconds). Instead of polling the status specific tracked task UPID, we simply fetch *all* tasks since the tracked task started. While this increased the amount of transmitted data a bit for tracked tasks that run for a very long time, this new approach make the whole task tracking functionality much more elegant; it integrates better with the 'regular' task fetching which happens in long intervals. - Tasks are now stored in the new improved task cache implementation. This should make retrieving tasks much quicker and avoids unneeded disk IO. Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> Reviewed-by: Max Carrara <m.carrara@proxmox.com> --- server/src/api/pve/lxc.rs | 10 +- server/src/api/pve/mod.rs | 4 +- server/src/api/pve/qemu.rs | 6 +- server/src/api/remote_tasks.rs | 11 +- server/src/bin/proxmox-datacenter-api.rs | 3 +- server/src/remote_tasks/mod.rs | 788 +++++++++++------------ 6 files changed, 388 insertions(+), 434 deletions(-) diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs index f1c31425..83f9f4aa 100644 --- a/server/src/api/pve/lxc.rs +++ b/server/src/api/pve/lxc.rs @@ -209,7 +209,7 @@ pub async fn lxc_start( let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?; - new_remote_upid(remote, upid) + new_remote_upid(remote, upid).await } #[api( @@ -242,7 +242,7 @@ pub async fn lxc_stop( let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?; - new_remote_upid(remote, upid) + new_remote_upid(remote, upid).await } #[api( @@ -277,7 +277,7 @@ pub async fn lxc_shutdown( .shutdown_lxc_async(&node, vmid, Default::default()) .await?; - new_remote_upid(remote, upid) + new_remote_upid(remote, upid).await } #[api( @@ -357,7 +357,7 @@ pub async fn lxc_migrate( }; let upid = pve.migrate_lxc(&node, vmid, params).await?; - new_remote_upid(remote, upid) + new_remote_upid(remote, upid).await } #[api( @@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate( log::info!("migrating vm {vmid} of node {node:?}"); let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?; - new_remote_upid(source, upid) + new_remote_upid(source, upid).await } diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs index c328675a..a351ad69 100644 --- a/server/src/api/pve/mod.rs +++ b/server/src/api/pve/mod.rs @@ -74,9 +74,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS); // converts a remote + PveUpid into a RemoteUpid and starts tracking it -fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> { +async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> { let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?; - remote_tasks::track_running_task(remote_upid.clone()); + remote_tasks::track_running_task(remote_upid.clone()).await?; Ok(remote_upid) } diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs index dea0550c..54e310d2 100644 --- a/server/src/api/pve/qemu.rs +++ b/server/src/api/pve/qemu.rs @@ -216,7 +216,7 @@ pub async fn qemu_start( .start_qemu_async(&node, vmid, Default::default()) .await?; - new_remote_upid(remote, upid) + new_remote_upid(remote, upid).await } #[api( @@ -376,7 +376,7 @@ pub async fn qemu_migrate( }; let upid = pve.migrate_qemu(&node, vmid, params).await?; - new_remote_upid(remote, upid) + new_remote_upid(remote, upid).await } #[api( @@ -563,5 +563,5 @@ pub async fn qemu_remote_migrate( log::info!("migrating vm {vmid} of node {node:?}"); let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?; - new_remote_upid(source, upid) + new_remote_upid(source, upid).await } diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs index e629000c..05ce3666 100644 --- a/server/src/api/remote_tasks.rs +++ b/server/src/api/remote_tasks.rs @@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS }, input: { properties: { - "max-age": { - type: Integer, - optional: true, - // TODO: sensible default max-age - default: 300, - description: "Maximum age of cached task data", - }, filters: { type: TaskFilters, flatten: true, @@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS }, )] /// Get the list of tasks for all remotes. -async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { - let tasks = remote_tasks::get_tasks(max_age, filters).await?; +async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { + let tasks = remote_tasks::get_tasks(filters).await?; Ok(tasks) } diff --git a/server/src/bin/proxmox-datacenter-api.rs b/server/src/bin/proxmox-datacenter-api.rs index a79094d5..da39c85d 100644 --- a/server/src/bin/proxmox-datacenter-api.rs +++ b/server/src/bin/proxmox-datacenter-api.rs @@ -25,11 +25,11 @@ use pdm_buildcfg::configdir; use pdm_api_types::Authid; use proxmox_auth_api::api::assemble_csrf_prevention_token; -use server::auth; use server::auth::csrf::csrf_secret; use server::metric_collection; use server::resource_cache; use server::task_utils; +use server::{auth, remote_tasks}; pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 5 * 60; @@ -288,6 +288,7 @@ async fn run(debug: bool) -> Result<(), Error> { start_task_scheduler(); metric_collection::start_task(); resource_cache::start_task(); + remote_tasks::start_task()?; server.await?; log::info!("server shutting down, waiting for active workers to complete"); diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs index 2062f2b7..48d54694 100644 --- a/server/src/remote_tasks/mod.rs +++ b/server/src/remote_tasks/mod.rs @@ -1,65 +1,106 @@ -use std::{ - collections::{HashMap, HashSet}, - fs::File, - path::{Path, PathBuf}, - sync::{LazyLock, RwLock}, - time::Duration, -}; +use std::{collections::HashMap, path::Path, sync::Arc, time::Duration}; -use anyhow::Error; +use anyhow::{format_err, Error}; +use nix::sys::stat::Mode; use pdm_api_types::{ remotes::{Remote, RemoteType}, RemoteUpid, TaskFilters, TaskListItem, TaskStateType, }; use proxmox_sys::fs::CreateOptions; use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid}; -use serde::{Deserialize, Serialize}; -use tokio::task::JoinHandle; +use task_cache::{AddTasks, GetTasks, State, TaskCache, TaskCacheItem}; +use tokio::{sync::Semaphore, task::JoinSet}; use crate::{api::pve, task_utils}; mod task_cache; +const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks"); + +const SECONDS_PER_MINUTE: u64 = 60; +const MINUTES_PER_HOUR: u64 = 60; + +/// Tick rate for the remote task fetching task. +/// This is also the rate at which we check on tracked tasks. +const TICK_RATE_S: u64 = 10; + +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked +/// task for this remote). +const REGULAR_REFRESH_S: u64 = 10 * SECONDS_PER_MINUTE; +/// Number of cycles until a regular refresh. +const REGULAR_REFRESH_CYCLES: u64 = REGULAR_REFRESH_S / TICK_RATE_S; + +/// Check if we want to rotate once every hour. +const CHECK_ROTATE_S: u64 = SECONDS_PER_MINUTE * MINUTES_PER_HOUR; +/// Number of cycles before we want to check if we should rotate the task archives. +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_S / TICK_RATE_S; + +/// Rotate once the most recent archive file is at least 24 hour old. +const ROTATE_AFTER_S: u64 = 24 * MINUTES_PER_HOUR * SECONDS_PER_MINUTE; + +/// Keep 7 days worth of tasks. +const KEEP_OLD_FILES: u64 = 7; + +/// Maximum number of concurrent connections per remote. +const CONNECTIONS_PER_PVE_REMOTE: usize = 5; +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_REMOTE` is taken into +/// consideration when accounting for the total number of connections. +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_REMOTE` is 5, we can connect +/// to 4 PVE remotes in parallel. +const MAX_CONNECTIONS: usize = 20; + +/// Maximum number of tasks to fetch from a single remote in one API call. +const MAX_TASKS_TO_FETCH: u64 = 5000; + /// Get tasks for all remotes // FIXME: filter for privileges -pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { - let (remotes, _) = pdm_config::remotes::config()?; +pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { + let cache = get_cache()?; - let mut all_tasks = Vec::new(); + let mut returned_tasks = Vec::new(); - let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json"); - let mut cache = TaskCache::new(cache_path)?; + let which = if filters.running { + GetTasks::Active + } else { + GetTasks::All + }; - // Force a refresh for all tasks of a remote if a task is finished. - // Not super nice, but saves us from persisting finished tasks. Also, - // the /nodes/<node>/tasks/<upid>/status endpoint does not return - // a task's endtime, which is only returned by - // /nodes/<node>/tasks... - // Room for improvements in the future. - invalidate_cache_for_finished_tasks(&mut cache); + for task in &mut cache + .get_tasks(which)? + .skip(filters.start as usize) + .take(filters.limit as usize) + { + let task = match task { + Ok(task) => task, + Err(err) => { + log::error!("could not read task from remote task cache, skipping: {err}"); + continue; + } + }; - for (remote_name, remote) in &remotes.sections { - let now = proxmox_time::epoch_i64(); - - if let Some(tasks) = cache.get_tasks(remote_name.as_str(), now, max_age) { - // Data in cache is recent enough and has not been invalidated. - all_tasks.extend(tasks); - } else { - let tasks = match fetch_tasks(remote).await { - Ok(tasks) => tasks, - Err(err) => { - // ignore errors for not reachable remotes - continue; - } - }; - cache.set_tasks(remote_name.as_str(), tasks.clone(), now); - - all_tasks.extend(tasks); + // TODO: Handle PBS tasks + let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse(); + match pve_upid { + Ok(pve_upid) => { + returned_tasks.push(TaskListItem { + upid: task.upid.to_string(), + node: pve_upid.node, + pid: pve_upid.pid as i64, + pstart: pve_upid.pstart, + starttime: pve_upid.starttime, + worker_type: pve_upid.worker_type, + worker_id: None, + user: pve_upid.auth_id, + endtime: task.endtime, + status: task.status, + }); + } + Err(err) => { + log::error!("could not parse UPID: {err}"); + } } } - let mut returned_tasks = add_running_tasks(all_tasks)?; - returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime)); let returned_tasks = returned_tasks .into_iter() .filter(|item| { @@ -106,26 +147,228 @@ pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskLis true }) - .skip(filters.start as usize) - .take(filters.limit as usize) .collect(); - // We don't need to wait for this task to finish - tokio::task::spawn_blocking(move || { - if let Err(e) = cache.save() { - log::error!("could not save task cache: {e}"); - } - }); - Ok(returned_tasks) } +/// Insert a newly created tasks into the list of tracked tasks. +/// +/// Any remote with associated tracked tasks will polled with a short interval +/// until all tracked tasks have finished. +pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> { + tokio::task::spawn_blocking(move || { + let cache = get_cache()?; + // TODO:: Handle PBS tasks correctly. + let pve_upid: pve_api_types::PveUpid = task.upid.parse()?; + let task = TaskCacheItem { + upid: task.clone(), + starttime: pve_upid.starttime, + status: None, + endtime: None, + }; + cache.add_tracked_task(task) + }) + .await? +} + +/// Start the remote task fetching task +pub fn start_task() -> Result<(), Error> { + let api_uid = pdm_config::api_user()?.uid; + let api_gid = pdm_config::api_group()?.gid; + let file_options = CreateOptions::new() + .owner(api_uid) + .group(api_gid) + .perm(Mode::from_bits_truncate(0o0750)); + proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?; + + tokio::spawn(async move { + let task_scheduler = std::pin::pin!(remote_task_fetching_task()); + let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future()); + futures::future::select(task_scheduler, abort_future).await; + }); + + Ok(()) +} + +/// Task which handles fetching remote tasks and task archive rotation. +/// This function never returns. +async fn remote_task_fetching_task() -> ! { + let mut cycle = 0u64; + let mut interval = tokio::time::interval(Duration::from_secs(TICK_RATE_S)); + interval.reset_at(task_utils::next_aligned_instant(TICK_RATE_S).into()); + + // We don't really care about catching up to missed tick, we just want + // a steady tick rate. + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + if let Err(err) = init_cache().await { + log::error!("error when initialized task cache: {err}"); + } + + loop { + interval.tick().await; + if let Err(err) = do_tick(cycle).await { + log::error!("error when fetching remote tasks: {err}"); + } + + // At a rate of one tick every 10s we wrap around in *only* 5 trillion years, + // better be safe and use .wrapping_add(1) :) + cycle = cycle.wrapping_add(1); + } +} + +/// Initialize the remote task cache with initial archive files, in case there are not +/// any archive files yet. +/// +/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time +/// spaced `ROTATE_AFTER_S` seconds apart. +/// This allows us to immediately backfill remote task history when setting up a new PDM instance +/// without any prior task archive rotation. +async fn init_cache() -> Result<(), Error> { + tokio::task::spawn_blocking(|| { + let cache = get_cache()?; + cache.init(proxmox_time::epoch_i64(), KEEP_OLD_FILES, ROTATE_AFTER_S)?; + Ok(()) + }) + .await? +} + +/// Handle a single timer tick. +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks. +async fn do_tick(cycle: u64) -> Result<(), Error> { + let cache = get_cache()?; + + if should_check_for_cache_rotation(cycle) { + log::debug!("checking if remote task archive should be rotated"); + if rotate_cache(cache.clone()).await? { + log::info!("rotated remote task archive"); + } + } + + let state = cache.read_state(); + + let mut all_tasks = HashMap::new(); + + let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS)); + let mut join_set = JoinSet::new(); + + let remotes = remotes_to_check(cycle, &state).await?; + for remote in remotes { + let since = get_cutoff_timestamp(&remote, &state); + + let permit = if remote.ty == RemoteType::Pve { + // Acquire multiple permits, for PVE remotes we want + // to multiple nodes in parallel. + // + // `.unwrap()` is safe, we never close the semaphore. + Arc::clone(&semaphore) + .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32) + .await + .unwrap() + } else { + // For PBS remotes we only have a single outgoing connection + // + // `.unwrap()` is safe, we never close the semaphore. + Arc::clone(&semaphore).acquire_owned().await.unwrap() + }; + + join_set.spawn(async move { + log::debug!("fetching remote tasks for '{}' since {since}", remote.id); + let tasks = fetch_tasks(&remote, since).await.map_err(|err| { + format_err!("could not fetch tasks from remote '{}': {err}", remote.id) + }); + + drop(permit); + tasks + }); + } + + while let Some(res) = join_set.join_next().await { + match res { + Ok(Ok((remote, request))) => { + all_tasks.insert(remote, request); + } + Ok(Err(err)) => log::error!("{err}"), + Err(err) => log::error!("could not join task fetching future: {err}"), + } + } + + if !all_tasks.is_empty() { + save_tasks(cache, all_tasks).await?; + } + + Ok(()) +} + +/// Return list of remotes that are to be polled in this cycle. +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> { + let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??; + + let all = cycle % REGULAR_REFRESH_CYCLES == 0; + + if all { + Ok(config.sections.into_values().collect()) + } else { + Ok(config + .sections + .into_iter() + .filter_map(|(name, remote)| { + if let Some(tracked) = state.tracked_tasks.get(&name) { + if !tracked.is_empty() { + Some(remote) + } else { + None + } + } else { + None + } + }) + .collect()) + } +} + +/// Get the timestamp from which on we should fetch tasks for a given remote. +/// The returned timestamp is a UNIX timestamp (in seconds). +fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 { + let oldest_active = state.oldest_active_task.get(&remote.id).copied(); + let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied(); + + match (oldest_active, youngest_archived) { + (None, None) => 0, + (None, Some(youngest_archived)) => youngest_archived, + (Some(oldest_active), None) => oldest_active, + (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active), + } +} + +/// Rotate the task cache if necessary. +/// +/// Returns Ok(true) the cache's files were rotated. +async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> { + tokio::task::spawn_blocking(move || { + cache.rotate(proxmox_time::epoch_i64(), ROTATE_AFTER_S, KEEP_OLD_FILES) + }) + .await? +} + +/// Add newly fetched tasks to the cache. +async fn save_tasks(cache: TaskCache, tasks: HashMap<String, AddTasks>) -> Result<(), Error> { + tokio::task::spawn_blocking(move || cache.add_tasks(tasks)).await? +} + /// Fetch tasks (active and finished) from a remote -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> { +/// `since` is a UNIX timestamp (seconds). +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> { let mut tasks = Vec::new(); + let mut all_successful = true; + match remote.ty { RemoteType::Pve => { + let semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE)); + let mut join_set = JoinSet::new(); + let client = pve::connect(remote)?; // N+1 requests - we could use /cluster/tasks, but that one @@ -134,16 +377,53 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> { let params = ListTasks { // Include running tasks source: Some(ListTasksSource::All), - // TODO: How much task history do we want? Right now we just hard-code it - // to 7 days. - since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60), + since: Some(since), + // If `limit` is not provided, we only receive 50 tasks + limit: Some(MAX_TASKS_TO_FETCH), ..Default::default() }; - let list = client.get_task_list(&node.node, params).await?; - let mapped = map_tasks(list, &remote.id)?; + let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap(); - tasks.extend(mapped); + let r = remote.clone(); + + join_set.spawn(async move { + let client = pve::connect(&r)?; + let task_list = + client + .get_task_list(&node.node, params) + .await + .map_err(|err| { + format_err!("remote '{}', node '{}': {err}", r.id, node.node) + })?; + + drop(permit); + + Ok::<Vec<_>, Error>(task_list) + }); + } + + while let Some(res) = join_set.join_next().await { + match res { + Ok(Ok(list)) => { + let mapped = list.into_iter().filter_map(|task| { + match map_pve_task(task, &remote.id) { + Ok(task) => Some(task), + Err(err) => { + log::error!("could not map task data, skipping: {err}"); + None + } + } + }); + + tasks.extend(mapped); + } + Ok(Err(err)) => { + all_successful = false; + log::error!("could not fetch tasks: {err:?}"); + } + Err(err) => return Err(err.into()), + } } } RemoteType::Pbs => { @@ -151,365 +431,45 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> { } } - Ok(tasks) -} - -/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>` -fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> { - let mut mapped = Vec::new(); - - for task in tasks { - let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?; - - mapped.push(TaskListItem { - upid: remote_upid.to_string(), - node: task.node, - pid: task.pid, - pstart: task.pstart as u64, - starttime: task.starttime, - worker_type: task.ty, - worker_id: Some(task.id), - user: task.user, - endtime: task.endtime, - status: task.status, - }) - } - - Ok(mapped) -} - -/// Drops the cached task list of a remote for all finished tasks. -/// -/// We use this to force a refresh so that we get the full task -/// info (including `endtime`) in the next API call. -fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) { - let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned"); - - // If a task is finished, we force a refresh for the remote - otherwise - // we don't get the 'endtime' for the task. - for task in finished.drain() { - cache.invalidate_cache_for_remote(task.remote()); - } -} - -/// Supplement the list of tasks that we received from the remote with -/// the tasks that were started by PDM and are currently running. -fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> { - let mut returned_tasks = Vec::new(); - - let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned"); - for task in cached_tasks { - let remote_upid = task.upid.parse()?; - - if running_tasks.contains(&remote_upid) { - if task.endtime.is_some() { - // Task is finished but we still think it is running -> - // Drop it from RUNNING_FOREIGN_TASKS - running_tasks.remove(&remote_upid); - - // No need to put it in FINISHED_TASKS, since we already - // got its state recently enough (we know the status and endtime) - } - } else { - returned_tasks.push(task); - } - } - - for task in running_tasks.iter() { - let upid: PveUpid = task.upid.parse()?; - returned_tasks.push(TaskListItem { - upid: task.to_string(), - node: upid.node, - pid: upid.pid as i64, - pstart: upid.pstart, - starttime: upid.starttime, - worker_type: upid.worker_type, - worker_id: upid.worker_id, - user: upid.auth_id, - endtime: None, - status: None, - }); - } - - Ok(returned_tasks) -} - -/// A cache for fetched remote tasks. -struct TaskCache { - /// Cache entries - content: TaskCacheContent, - - /// Entries that were added or updated - these will be persistet - /// when `save` is called. - new_or_updated: TaskCacheContent, - - /// Cache entries were changed/removed. - dirty: bool, - - /// File-location at which the cached tasks are stored. - cachefile_path: PathBuf, -} - -impl TaskCache { - /// Create a new tasks cache instance by loading - /// the cache from disk. - fn new(cachefile_path: PathBuf) -> Result<Self, Error> { - Ok(Self { - content: Self::load_content()?, - new_or_updated: Default::default(), - dirty: false, - cachefile_path, - }) - } - - /// Load the task cache contents from disk. - fn load_content() -> Result<TaskCacheContent, Error> { - let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json"); - let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?; - - let content = if let Some(content) = content { - serde_json::from_str(&content)? - } else { - Default::default() - }; - - Ok(content) - } - - /// Get path for the cache's lockfile. - fn lockfile_path(&self) -> PathBuf { - let mut path = self.cachefile_path.clone(); - path.set_extension("lock"); - path - } - - /// Persist the task cache - /// - /// This method requests an exclusive lock for the task cache lockfile. - fn save(&mut self) -> Result<(), Error> { - // if we have not updated anything, we don't have to update the cache file - if !self.dirty { - return Ok(()); - } - - let _guard = self.lock(Duration::from_secs(5))?; - - // Read content again, in case somebody has changed it in the meanwhile - let mut content = Self::load_content()?; - - for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() { - if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) { - // Only update entry if nobody else has updated it in the meanwhile - if existing_entry.timestamp < entry.timestamp { - *existing_entry = entry; - } - } else { - content.remote_tasks.insert(remote_name, entry); - } - } - - let bytes = serde_json::to_vec_pretty(&content)?; - - let api_uid = pdm_config::api_user()?.uid; - let api_gid = pdm_config::api_group()?.gid; - - let file_options = CreateOptions::new().owner(api_uid).group(api_gid); - - proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?; - - self.dirty = false; - - Ok(()) - } - - // Update task data for a given remote. - fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) { - self.dirty = true; - self.new_or_updated - .remote_tasks - .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks }); - } - - // Get task data for a given remote. - fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> { - if let Some(entry) = self.content.remote_tasks.get(remote) { - if (entry.timestamp + max_age) < now { - return None; - } - - Some(entry.tasks.clone()) - } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) { - if (entry.timestamp + max_age) < now { - return None; - } - Some(entry.tasks.clone()) - } else { - None - } - } - - // Invalidate cache for a given remote. - fn invalidate_cache_for_remote(&mut self, remote: &str) { - self.dirty = true; - self.content.remote_tasks.remove(remote); - } - - // Lock the cache for modification. - // - // While the cache is locked, other users can still read the cache - // without a lock, since the cache file is replaced atomically - // when updating. - fn lock(&self, duration: Duration) -> Result<File, Error> { - let api_uid = pdm_config::api_user()?.uid; - let api_gid = pdm_config::api_group()?.gid; - - let file_options = CreateOptions::new().owner(api_uid).group(api_gid); - proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options) - } -} - -#[derive(Serialize, Deserialize)] -/// Per-remote entry in the task cache. -struct TaskCacheEntry { - timestamp: i64, - tasks: Vec<TaskListItem>, -} - -#[derive(Default, Serialize, Deserialize)] -/// Content of the task cache file. -struct TaskCacheContent { - remote_tasks: HashMap<String, TaskCacheEntry>, -} - -/// Interval at which tracked tasks are polled -const RUNNING_CHECK_INTERVAL_S: u64 = 10; - -/// Tasks which were started by PDM and are still running -static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init); -/// Tasks which were started by PDM and w -static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init); - -fn init() -> RwLock<HashSet<RemoteUpid>> { - RwLock::new(HashSet::new()) -} - -/// Insert a remote UPID into the running list -/// -/// If it is the first entry in the list, a background task is started to track its state -/// -/// Returns the [`JoinHandle`] if a task was started. -/// -/// panics on a poisoned mutex -pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> { - let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap(); - - // the call inserting the first task in the list needs to start the checking task - let need_start_task = tasks.is_empty(); - tasks.insert(task); - - if !need_start_task { - return None; - } - drop(tasks); - - Some(tokio::spawn(async move { - loop { - let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S); - tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; - - let finished_tasks = get_finished_tasks().await; - - // skip iteration if we still have tasks, just not finished ones - if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() { - continue; - } - - let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap(); - // we either have finished tasks, or the running task list was empty - let mut set = RUNNING_FOREIGN_TASKS.write().unwrap(); - - for (upid, _status) in finished_tasks { - if set.remove(&upid) { - finished.insert(upid); - } else { - // someone else removed & persisted the task in the meantime - } - } - - // if no task remains, end the current task - // it will be restarted by the next caller that inserts one - if set.is_empty() { - return; - } - } - })) -} - -/// Get a list of running foreign tasks -/// -/// panics on a poisoned mutex -pub fn get_running_tasks() -> Vec<RemoteUpid> { - RUNNING_FOREIGN_TASKS - .read() - .unwrap() - .iter() - .cloned() - .collect() -} - -/// Checks all current saved UPIDs if they're still running, and if not, -/// returns their upids + status -/// -/// panics on a poisoned mutex -pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> { - let mut finished = Vec::new(); - let config = match pdm_config::remotes::config() { - Ok((config, _)) => config, - Err(err) => { - log::error!("could not open remotes config: {err}"); - return Vec::new(); - } + let new_tasks = AddTasks { + update_most_recent_archive_timestamp: all_successful, + tasks, }; - for task in get_running_tasks() { - match config.get(task.remote()) { - Some(remote) => match remote.ty { - RemoteType::Pve => { - let status = match crate::api::pve::tasks::get_task_status( - remote.id.clone(), - task.clone(), - false, - ) - .await - { - Ok(status) => status, - Err(err) => { - log::error!("could not get status from remote: {err}"); - finished.push((task, "could not get status".to_string())); - continue; - } - }; - if let Some(status) = status.exitstatus { - finished.push((task, status.to_string())); - } - } - RemoteType::Pbs => { - let _client = match crate::pbs_client::connect(remote) { - Ok(client) => client, - Err(err) => { - log::error!("could not get status from remote: {err}"); - finished.push((task, "could not get status".to_string())); - continue; - } - }; - // FIXME implement get task status - finished.push((task, "unknown state".to_string())); - } - }, - None => finished.push((task, "unknown remote".to_string())), - } - } - finished + Ok((remote.id.clone(), new_tasks)) +} + +/// Check if we are due for checking for cache rotation. +fn should_check_for_cache_rotation(cycle: u64) -> bool { + cycle % CHECK_ROTATE_CYCLES == 0 +} + +/// Get a new [`TaskCache`] instance. +/// +/// No heavy-weight operations are done here, it's fine to call this regularly as part of the +/// update loop. +fn get_cache() -> Result<TaskCache, Error> { + let api_uid = pdm_config::api_user()?.uid; + let api_gid = pdm_config::api_group()?.gid; + + let file_options = CreateOptions::new().owner(api_uid).group(api_gid); + + let cache_path = Path::new(REMOTE_TASKS_DIR); + let cache = TaskCache::new(cache_path, file_options)?; + + Ok(cache) +} + +/// Map a `ListTasksResponse` to `TaskCacheItem` +fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> { + let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?; + + Ok(TaskCacheItem { + upid: remote_upid, + starttime: task.starttime, + endtime: task.endtime, + status: task.status, + }) } /// Parses a task status string into a TaskStateType -- 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel