From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 902951FF17C for <inbox@lore.proxmox.com>; Wed, 14 May 2025 17:27:25 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 24D5B942F; Wed, 14 May 2025 17:27:48 +0200 (CEST) Message-ID: <112a7d67-79c3-4d23-bee7-8b1d02140104@proxmox.com> Date: Wed, 14 May 2025 17:27:37 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Beta To: pdm-devel@lists.proxmox.com References: <20250512114144.118545-1-l.wagner@proxmox.com> <20250512114144.118545-3-l.wagner@proxmox.com> Content-Language: en-US From: Dominik Csapak <d.csapak@proxmox.com> In-Reply-To: <20250512114144.118545-3-l.wagner@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL -0.129 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_1 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_3 0.1 random spam to be learned in bayes RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> some comments inline On 5/12/25 13:41, Lukas Wagner wrote: > This commits changes the remote task module as follows: > > - Add a new background task for regular polling of task data > Instead of triggering fetching of task data from the `get_tasks` function, > which is usually called by an API handler, we move the fetching to a > new background task. The task fetches the latest tasks from all remotes > and stores them in the task cache in regular intervals (10 minutes). > The `get_tasks` function itself only reads from the cache. > The main rationale for this change is that for large setups, fetching > tasks from all remotes can take a *long* time (e.g. hundreds of remotes, > each with a >100ms connection - adds up to minutes quickly). > If we do this from within `get_tasks`, the API handler calling the > function is also blocked for the entire time. > The `get_tasks` API is called every couple of seconds by the UI the get > a list of running remote tasks, so this *must* be quick. > > - Tracked tasks are also polled in the same background task, but with > a short polling delay (10 seconds). Instead of polling the status specific > tracked task UPID, we simply fetch *all* tasks since the tracked task started. > While this increased the amount of transmitted data a bit for tracked tasks > that run for a very long time, this new approach make the whole > task tracking functionality much more elegant; it integrates better with > the 'regular' task fetching which happens in long intervals. as i said in patch one, for long running tasks this could be a scaling problem. we then often query the same tasks over and over again just to e.g. check one long running task also this will impact disk performance on the pve/pbs side, since reading all tasks since a cutoff time must probably read multiple files, but querying a single task reads just the end of one file. in general i would assume that the ratio of running tasks vs all tasks is rather low, so we have e.g. 1-5 running tasks per remote, vs hundreds or thousands that are finished. > > - Tasks are now stored in the new improved task cache implementation. > This should make retrieving tasks much quicker and avoids > unneeded disk IO. > > Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> > --- > > Notes: > Changes since v4: > - Rebase onto latest master, adapting to changes in > the section config type > > Changes since v2: > - Adapt to new locking approach (only drops a `mut`) > > Changes since v1: > > - use const Duration instead of u64s for durations, using > Duration::as_secs() where needed > - Move the remote_task fetching task functions to > src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > - remote_tasks::get_tasks: wrap function body in a > tokio::task::spawn_blocking. using the TaskCache::get_tasks > iterator does disk IO and could block the executor > - Added some doc strings to make the purpose/workings of > some functions clearer > - Couple of variables have been renamed for more clarity > > server/src/api/pve/lxc.rs | 10 +- > server/src/api/pve/mod.rs | 4 +- > server/src/api/pve/qemu.rs | 6 +- > server/src/api/remote_tasks.rs | 11 +- > server/src/bin/proxmox-datacenter-api/main.rs | 1 + > .../bin/proxmox-datacenter-api/tasks/mod.rs | 1 + > .../tasks/remote_tasks.rs | 363 +++++++++++ > server/src/remote_tasks/mod.rs | 605 ++++-------------- > 8 files changed, 498 insertions(+), 503 deletions(-) > create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > > diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs > index f1c31425..83f9f4aa 100644 > --- a/server/src/api/pve/lxc.rs > +++ b/server/src/api/pve/lxc.rs > @@ -209,7 +209,7 @@ pub async fn lxc_start( > > let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -242,7 +242,7 @@ pub async fn lxc_stop( > > let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -277,7 +277,7 @@ pub async fn lxc_shutdown( > .shutdown_lxc_async(&node, vmid, Default::default()) > .await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -357,7 +357,7 @@ pub async fn lxc_migrate( > }; > let upid = pve.migrate_lxc(&node, vmid, params).await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate( > log::info!("migrating vm {vmid} of node {node:?}"); > let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?; > > - new_remote_upid(source, upid) > + new_remote_upid(source, upid).await > } > diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs > index dd7cf382..d472cf58 100644 > --- a/server/src/api/pve/mod.rs > +++ b/server/src/api/pve/mod.rs > @@ -76,9 +76,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES > const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS); > > // converts a remote + PveUpid into a RemoteUpid and starts tracking it > -fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> { > +async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> { > let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?; > - remote_tasks::track_running_task(remote_upid.clone()); > + remote_tasks::track_running_task(remote_upid.clone()).await?; > Ok(remote_upid) > } > > diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs > index dea0550c..54e310d2 100644 > --- a/server/src/api/pve/qemu.rs > +++ b/server/src/api/pve/qemu.rs > @@ -216,7 +216,7 @@ pub async fn qemu_start( > .start_qemu_async(&node, vmid, Default::default()) > .await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -376,7 +376,7 @@ pub async fn qemu_migrate( > }; > let upid = pve.migrate_qemu(&node, vmid, params).await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -563,5 +563,5 @@ pub async fn qemu_remote_migrate( > log::info!("migrating vm {vmid} of node {node:?}"); > let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?; > > - new_remote_upid(source, upid) > + new_remote_upid(source, upid).await > } > diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs > index e629000c..05ce3666 100644 > --- a/server/src/api/remote_tasks.rs > +++ b/server/src/api/remote_tasks.rs > @@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS > }, > input: { > properties: { > - "max-age": { > - type: Integer, > - optional: true, > - // TODO: sensible default max-age > - default: 300, > - description: "Maximum age of cached task data", > - }, > filters: { > type: TaskFilters, > flatten: true, > @@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS > }, > )] > /// Get the list of tasks for all remotes. > -async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { > - let tasks = remote_tasks::get_tasks(max_age, filters).await?; > +async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { > + let tasks = remote_tasks::get_tasks(filters).await?; > > Ok(tasks) > } > diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs > index 49499980..70e489d0 100644 > --- a/server/src/bin/proxmox-datacenter-api/main.rs > +++ b/server/src/bin/proxmox-datacenter-api/main.rs > @@ -292,6 +292,7 @@ async fn run(debug: bool) -> Result<(), Error> { > metric_collection::start_task(); > tasks::remote_node_mapping::start_task(); > resource_cache::start_task(); > + tasks::remote_tasks::start_task()?; > > server.await?; > log::info!("server shutting down, waiting for active workers to complete"); > diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs > index e6ead882..a6b1f439 100644 > --- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs > +++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs > @@ -1 +1,2 @@ > pub mod remote_node_mapping; > +pub mod remote_tasks; > diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > new file mode 100644 > index 00000000..9b6da809 > --- /dev/null > +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > @@ -0,0 +1,363 @@ > +use std::{collections::HashMap, sync::Arc, time::Duration}; > + > +use anyhow::{format_err, Error}; > +use nix::sys::stat::Mode; > +use tokio::{sync::Semaphore, task::JoinSet}; > + > +use pdm_api_types::{ > + remotes::{Remote, RemoteType}, > + RemoteUpid, > +}; > +use proxmox_sys::fs::CreateOptions; > +use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource}; > +use server::{ > + api::pve, > + remote_tasks::{ > + self, > + task_cache::{AddTasks, State, TaskCache, TaskCacheItem}, > + REMOTE_TASKS_DIR, > + }, > + task_utils, > +}; > + > +/// Tick interval for the remote task fetching task. > +/// This is also the rate at which we check on tracked tasks. > +const TASK_REFRESH_INTERVAL: Duration = Duration::from_secs(10); > + > +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked > +/// task for this remote). > +const REGULAR_REFRESH_INTERVAL: Duration = Duration::from_secs(600); imho those two are confusingly documented, if 'REGULAR_..' is used for the normal interval, why does 'TASK_REFRE..' say its used too for that? > +/// Number of cycles until a regular refresh. > +const REGULAR_REFRESH_CYCLES: u64 = > + REGULAR_REFRESH_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs(); > + > +/// Check if we want to rotate once every hour. commas are important, one can read this sentence in two ways: Check if we want to rotate, once every hour or Check if want to (rotate once every hour) (brackets for clarity) IMHO a better way to write that is: Check once every hour if we want to rotate. > +const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600); > +/// Number of cycles before we want to check if we should rotate the task archives. > +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs(); > + > +/// Rotate once the most recent archive file is at least 24 hour old. > +const ROTATE_AFTER: Duration = Duration::from_secs(24 * 3600); > + > +/// Keep 7 days worth of tasks. > +const KEEP_OLD_FILES: u64 = 7; > + > +/// Maximum number of concurrent connections per remote. > +const CONNECTIONS_PER_PVE_REMOTE: usize = 5; > +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_PVE_REMOTE` is taken into > +/// consideration when accounting for the total number of connections. > +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_PVE_REMOTE` is 5, we can connect > +/// to 4 PVE remotes in parallel. > +const MAX_CONNECTIONS: usize = 20; > + > +/// Maximum number of tasks to fetch from a single remote in one API call. > +const MAX_TASKS_TO_FETCH: u64 = 5000; > + > +/// Start the remote task fetching task > +pub fn start_task() -> Result<(), Error> { > + let api_uid = pdm_config::api_user()?.uid; > + let api_gid = pdm_config::api_group()?.gid; > + let file_options = CreateOptions::new() > + .owner(api_uid) > + .group(api_gid) > + .perm(Mode::from_bits_truncate(0o0750)); > + proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?; this should probably use the proxmox_product_config crate > + > + tokio::spawn(async move { > + let task_scheduler = std::pin::pin!(remote_task_fetching_task()); > + let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future()); > + futures::future::select(task_scheduler, abort_future).await; > + }); > + > + Ok(()) > +} > + > +/// Task which handles fetching remote tasks and task archive rotation. > +/// This function never returns. > +async fn remote_task_fetching_task() -> ! { > + let mut cycle = 0u64; > + let mut interval = tokio::time::interval(TASK_REFRESH_INTERVAL); > + interval.reset_at(task_utils::next_aligned_instant(TASK_REFRESH_INTERVAL.as_secs()).into()); > + > + // We don't really care about catching up to missed tick, we just want > + // a steady tick rate. > + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); > + > + if let Err(err) = init_cache().await { > + log::error!("error when initialized task cache: {err}"); > + } > + > + loop { > + interval.tick().await; > + if let Err(err) = do_tick(cycle).await { > + log::error!("error when fetching remote tasks: {err}"); > + } > + > + // At a rate of one tick every 10s we wrap around in *only* 5 trillion years, > + // better be safe and use .wrapping_add(1) :) > + cycle = cycle.wrapping_add(1); > + } you do the cycle check here manually, but this can be bad, e.g. if one cycle takes a long time (say 1 minute instead of a 10 seconds) and that regularly, you fetch the remotes not every 10 minutes (as the comment above would indicate) but only every hour I guess you wanted to be on the safe side and not being too overly aggressive with the polling, but having the fetch/rotation interval be that dependent on the cycle duration seems also not very good to me. > +} > + > +/// Handle a single timer tick. > +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks. > +async fn do_tick(cycle: u64) -> Result<(), Error> { > + let cache = remote_tasks::get_cache()?; > + > + if should_check_for_cache_rotation(cycle) { > + log::debug!("checking if remote task archive should be rotated"); > + if rotate_cache(cache.clone()).await? { > + log::info!("rotated remote task archive"); > + } > + } > + > + let state = cache.read_state(); > + > + let mut all_tasks = HashMap::new(); > + > + let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS)); > + let mut join_set = JoinSet::new(); > + > + // Get a list of remotes that we should poll in this cycle. > + let remotes = remotes_to_check(cycle, &state).await?; > + for remote in remotes { > + let since = get_cutoff_timestamp(&remote, &state); > + > + let permit = if remote.ty == RemoteType::Pve { > + // Acquire multiple permits, for PVE remotes we want > + // to multiple nodes in parallel. > + // > + // `.unwrap()` is safe, we never close the semaphore. > + Arc::clone(&total_connections_semaphore) > + .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32) > + .await > + .unwrap() would it be possible to acquire the connection semaphores dynamicall inside the `fetch_tasks` call up to the maximum? that way, we could e.g. connect to 20 remotes with one host in parallel instead of always having maximum of 4 ? (not sure about the tokio semaphore possibilities here) I'd still limit it to CONNECTIONS_PER_PVE_REMOTE for each remote, but in case one remote has less nodes, we could utilize the connection count for more remotes, doing more work in parallel. > + } else { > + // For PBS remotes we only have a single outgoing connection > + // > + // `.unwrap()` is safe, we never close the semaphore. > + Arc::clone(&total_connections_semaphore) > + .acquire_owned() > + .await > + .unwrap() > + }; > + > + join_set.spawn(async move { > + log::debug!("fetching remote tasks for '{}' since {since}", remote.id); > + let tasks = fetch_tasks(&remote, since).await.map_err(|err| { > + format_err!("could not fetch tasks from remote '{}': {err}", remote.id) > + }); > + > + drop(permit); > + tasks > + }); > + } > + > + while let Some(res) = join_set.join_next().await { > + match res { > + Ok(Ok((remote, request))) => { > + all_tasks.insert(remote, request); > + } > + Ok(Err(err)) => log::error!("{err}"), > + Err(err) => log::error!("could not join task fetching future: {err}"), > + } > + } > + > + if !all_tasks.is_empty() { > + save_tasks(cache, all_tasks).await?; > + } > + > + Ok(()) > +} > + > +/// Initialize the remote task cache with initial archive files, in case there are not > +/// any archive files yet. > +/// > +/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time > +/// spaced `ROTATE_AFTER_S` seconds apart. > +/// This allows us to immediately backfill remote task history when setting up a new PDM instance > +/// without any prior task archive rotation. > +async fn init_cache() -> Result<(), Error> { > + tokio::task::spawn_blocking(|| { > + let cache = remote_tasks::get_cache()?; > + cache.init( > + proxmox_time::epoch_i64(), > + KEEP_OLD_FILES, > + ROTATE_AFTER.as_secs(), > + )?; > + Ok(()) > + }) > + .await? > +} > + > +/// Return list of remotes that are to be polled in this cycle. > +/// > +/// If `cycle` is a multiple of `REGULAR_REFRESH_CYCLES`, the function will > +/// return all remotes from the remote config. This ensures that > +/// all remotes are polled at regular intervals. > +/// In any other case we only return remotes which currently have a tracked > +/// task. > +/// On daemon startup (when cycle is 0) we return all remotes to ensure > +/// that we get an up-to-date task list from all remotes. > +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> { > + let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??; > + > + let all = cycle % REGULAR_REFRESH_CYCLES == 0; > + > + if all { > + Ok(config.into_iter().map(|(_, section)| section).collect()) > + } else { > + Ok(config > + .into_iter() > + .filter_map(|(name, remote)| { > + if let Some(tracked) = state.tracked_tasks.get(&name) { > + if !tracked.is_empty() { > + Some(remote) > + } else { > + None > + } > + } else { > + None > + } > + }) i think this could be more succinctly written as: state .tracked_tasks .get(&name) .and_then(|tracked| (!tracked.is_empty()).then_some(remote)) > + .collect()) > + } > +} > + > +/// Get the timestamp from which on we should fetch tasks for a given remote. > +/// The returned timestamp is a UNIX timestamp (in seconds). > +fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 { > + let oldest_active = state.oldest_active_task.get(&remote.id).copied(); > + let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied(); > + > + match (oldest_active, youngest_archived) { > + (None, None) => 0, > + (None, Some(youngest_archived)) => youngest_archived, > + (Some(oldest_active), None) => oldest_active, > + (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active), > + } > +} > + > +/// Rotate the task cache if necessary. > +/// > +/// Returns Ok(true) the cache's files were rotated. > +async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> { > + tokio::task::spawn_blocking(move || { > + cache.rotate( > + proxmox_time::epoch_i64(), > + ROTATE_AFTER.as_secs(), > + KEEP_OLD_FILES, > + ) > + }) > + .await? > +} in pbs, we start a worker task for the log rotation, maybe we want here too ? > + > +/// Fetch tasks (active and finished) from a remote > +/// `since` is a UNIX timestamp (seconds). > +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> { > + let mut tasks = Vec::new(); > + > + let mut all_successful = true; > + > + match remote.ty { > + RemoteType::Pve => { > + let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE)); > + let mut join_set = JoinSet::new(); > + > + let client = pve::connect(remote)?; > + > + // N+1 requests - we could use /cluster/tasks, but that one > + // only gives a limited task history > + for node in client.list_nodes().await? { > + let params = ListTasks { > + // Include running tasks > + source: Some(ListTasksSource::All), > + since: Some(since), > + // If `limit` is not provided, we only receive 50 tasks > + limit: Some(MAX_TASKS_TO_FETCH), > + ..Default::default() > + }; > + > + let permit = Arc::clone(&per_remote_semaphore) > + .acquire_owned() > + .await > + .unwrap(); > + > + let r = remote.clone(); > + > + join_set.spawn(async move { > + let client = pve::connect(&r)?; > + let task_list = > + client > + .get_task_list(&node.node, params) > + .await > + .map_err(|err| { > + format_err!("remote '{}', node '{}': {err}", r.id, node.node) > + })?; > + > + drop(permit); > + > + Ok::<Vec<_>, Error>(task_list) > + }); > + } > + > + while let Some(res) = join_set.join_next().await { > + match res { > + Ok(Ok(list)) => { > + let mapped = list.into_iter().filter_map(|task| { > + match map_pve_task(task, &remote.id) { > + Ok(task) => Some(task), > + Err(err) => { > + log::error!("could not map task data, skipping: {err}"); > + None > + } > + } > + }); > + > + tasks.extend(mapped); > + } > + Ok(Err(err)) => { > + all_successful = false; > + log::error!("could not fetch tasks: {err:?}"); > + } > + Err(err) => return Err(err.into()), > + } > + } > + } > + RemoteType::Pbs => { > + // TODO: Add code for PBS > + } > + } > + > + let new_tasks = AddTasks { > + update_most_recent_archive_timestamp: all_successful, > + tasks, > + }; > + > + Ok((remote.id.clone(), new_tasks)) > +} > + > +/// Check if we are due for checking for cache rotation. > +/// > +/// If `cycle` is 0, a cache rotation check is forced. This is > +/// only relevant at daemon startup. > +fn should_check_for_cache_rotation(cycle: u64) -> bool { > + cycle % CHECK_ROTATE_CYCLES == 0 > +} > + > +/// Map a `ListTasksResponse` to `TaskCacheItem` > +fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> { > + let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?; > + > + Ok(TaskCacheItem { > + upid: remote_upid, > + starttime: task.starttime, > + endtime: task.endtime, > + status: task.status, > + }) > +} > + > +/// Add newly fetched tasks to the cache. > +async fn save_tasks(cache: TaskCache, tasks: HashMap<String, AddTasks>) -> Result<(), Error> { > + tokio::task::spawn_blocking(move || cache.add_tasks(tasks)).await? > +} > diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs > index 7c8e31ef..126c9ad3 100644 > --- a/server/src/remote_tasks/mod.rs > +++ b/server/src/remote_tasks/mod.rs > @@ -1,515 +1,152 @@ > -use std::{ > - collections::{HashMap, HashSet}, > - fs::File, > - path::{Path, PathBuf}, > - sync::{LazyLock, RwLock}, > - time::Duration, > -}; > +use std::path::Path; > > use anyhow::Error; > -use pdm_api_types::{ > - remotes::{Remote, RemoteType}, > - RemoteUpid, TaskFilters, TaskListItem, TaskStateType, > -}; > +use pdm_api_types::{RemoteUpid, TaskFilters, TaskListItem, TaskStateType}; > use proxmox_sys::fs::CreateOptions; > -use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid}; > -use serde::{Deserialize, Serialize}; > -use tokio::task::JoinHandle; > +use pve_api_types::PveUpid; > +use task_cache::{GetTasks, TaskCache, TaskCacheItem}; > > -use crate::{api::pve, task_utils}; > +pub mod task_cache; > > -mod task_cache; > +pub const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks"); > > /// Get tasks for all remotes > // FIXME: filter for privileges > -pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { > - let (remotes, _) = pdm_config::remotes::config()?; > +pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { > + tokio::task::spawn_blocking(move || { > + let cache = get_cache()?; > > - let mut all_tasks = Vec::new(); > + let mut returned_tasks = Vec::new(); > > - let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json"); > - let mut cache = TaskCache::new(cache_path)?; > - > - // Force a refresh for all tasks of a remote if a task is finished. > - // Not super nice, but saves us from persisting finished tasks. Also, > - // the /nodes/<node>/tasks/<upid>/status endpoint does not return > - // a task's endtime, which is only returned by > - // /nodes/<node>/tasks... > - // Room for improvements in the future. > - invalidate_cache_for_finished_tasks(&mut cache); > - > - for (remote_name, remote) in remotes.iter() { > - let now = proxmox_time::epoch_i64(); > - > - if let Some(tasks) = cache.get_tasks(remote_name, now, max_age) { > - // Data in cache is recent enough and has not been invalidated. > - all_tasks.extend(tasks); > + let which = if filters.running { > + GetTasks::Active > } else { > - let tasks = match fetch_tasks(remote).await { > - Ok(tasks) => tasks, > + GetTasks::All > + }; > + > + for task in &mut cache > + .get_tasks(which)? > + .skip(filters.start as usize) > + .take(filters.limit as usize) > + { > + let task = match task { > + Ok(task) => task, > Err(err) => { > - // ignore errors for not reachable remotes > + log::error!("could not read task from remote task cache, skipping: {err}"); > continue; > } > }; > - cache.set_tasks(remote_name, tasks.clone(), now); > > - all_tasks.extend(tasks); > + // TODO: Handle PBS tasks > + let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse(); > + match pve_upid { > + Ok(pve_upid) => { > + returned_tasks.push(TaskListItem { > + upid: task.upid.to_string(), > + node: pve_upid.node, > + pid: pve_upid.pid as i64, > + pstart: pve_upid.pstart, > + starttime: pve_upid.starttime, > + worker_type: pve_upid.worker_type, > + worker_id: None, > + user: pve_upid.auth_id, > + endtime: task.endtime, > + status: task.status, > + }); > + } > + Err(err) => { > + log::error!("could not parse UPID: {err}"); > + } > + } > } > - } > > - let mut returned_tasks = add_running_tasks(all_tasks)?; > - returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime)); > - let returned_tasks = returned_tasks > - .into_iter() > - .filter(|item| { > - if filters.running && item.endtime.is_some() { > - return false; > - } > - > - if let Some(until) = filters.until { > - if item.starttime > until { > + let returned_tasks = returned_tasks > + .into_iter() > + .filter(|item| { > + if filters.running && item.endtime.is_some() { > return false; > } > - } > > - if let Some(since) = filters.since { > - if item.starttime < since { > - return false; > - } > - } > - > - if let Some(needle) = &filters.userfilter { > - if !item.user.contains(needle) { > - return false; > - } > - } > - > - if let Some(typefilter) = &filters.typefilter { > - if !item.worker_type.contains(typefilter) { > - return false; > - } > - } > - > - let state = item.status.as_ref().map(|status| tasktype(status)); > - > - match (state, &filters.statusfilter) { > - (Some(TaskStateType::OK), _) if filters.errors => return false, > - (Some(state), Some(filters)) => { > - if !filters.contains(&state) { > + if let Some(until) = filters.until { > + if item.starttime > until { > return false; > } > } > - (None, Some(_)) => return false, > - _ => {} > - } > > - true > - }) > - .skip(filters.start as usize) > - .take(filters.limit as usize) > - .collect(); > - > - // We don't need to wait for this task to finish > - tokio::task::spawn_blocking(move || { > - if let Err(e) = cache.save() { > - log::error!("could not save task cache: {e}"); > - } > - }); > - > - Ok(returned_tasks) > -} > - > -/// Fetch tasks (active and finished) from a remote > -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> { > - let mut tasks = Vec::new(); > - > - match remote.ty { > - RemoteType::Pve => { > - let client = pve::connect(remote)?; > - > - // N+1 requests - we could use /cluster/tasks, but that one > - // only gives a limited task history > - for node in client.list_nodes().await? { > - let params = ListTasks { > - // Include running tasks > - source: Some(ListTasksSource::All), > - // TODO: How much task history do we want? Right now we just hard-code it > - // to 7 days. > - since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60), > - ..Default::default() > - }; > - > - let list = client.get_task_list(&node.node, params).await?; > - let mapped = map_tasks(list, &remote.id)?; > - > - tasks.extend(mapped); > - } > - } > - RemoteType::Pbs => { > - // TODO: Add code for PBS > - } > - } > - > - Ok(tasks) > -} > - > -/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>` > -fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> { > - let mut mapped = Vec::new(); > - > - for task in tasks { > - let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?; > - > - mapped.push(TaskListItem { > - upid: remote_upid.to_string(), > - node: task.node, > - pid: task.pid, > - pstart: task.pstart as u64, > - starttime: task.starttime, > - worker_type: task.ty, > - worker_id: Some(task.id), > - user: task.user, > - endtime: task.endtime, > - status: task.status, > - }) > - } > - > - Ok(mapped) > -} > - > -/// Drops the cached task list of a remote for all finished tasks. > -/// > -/// We use this to force a refresh so that we get the full task > -/// info (including `endtime`) in the next API call. > -fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) { > - let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned"); > - > - // If a task is finished, we force a refresh for the remote - otherwise > - // we don't get the 'endtime' for the task. > - for task in finished.drain() { > - cache.invalidate_cache_for_remote(task.remote()); > - } > -} > - > -/// Supplement the list of tasks that we received from the remote with > -/// the tasks that were started by PDM and are currently running. > -fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> { > - let mut returned_tasks = Vec::new(); > - > - let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned"); > - for task in cached_tasks { > - let remote_upid = task.upid.parse()?; > - > - if running_tasks.contains(&remote_upid) { > - if task.endtime.is_some() { > - // Task is finished but we still think it is running -> > - // Drop it from RUNNING_FOREIGN_TASKS > - running_tasks.remove(&remote_upid); > - > - // No need to put it in FINISHED_TASKS, since we already > - // got its state recently enough (we know the status and endtime) > - } > - } else { > - returned_tasks.push(task); > - } > - } > - > - for task in running_tasks.iter() { > - let upid: PveUpid = task.upid.parse()?; > - returned_tasks.push(TaskListItem { > - upid: task.to_string(), > - node: upid.node, > - pid: upid.pid as i64, > - pstart: upid.pstart, > - starttime: upid.starttime, > - worker_type: upid.worker_type, > - worker_id: upid.worker_id, > - user: upid.auth_id, > - endtime: None, > - status: None, > - }); > - } > - > - Ok(returned_tasks) > -} > - > -/// A cache for fetched remote tasks. > -struct TaskCache { > - /// Cache entries > - content: TaskCacheContent, > - > - /// Entries that were added or updated - these will be persistet > - /// when `save` is called. > - new_or_updated: TaskCacheContent, > - > - /// Cache entries were changed/removed. > - dirty: bool, > - > - /// File-location at which the cached tasks are stored. > - cachefile_path: PathBuf, > -} > - > -impl TaskCache { > - /// Create a new tasks cache instance by loading > - /// the cache from disk. > - fn new(cachefile_path: PathBuf) -> Result<Self, Error> { > - Ok(Self { > - content: Self::load_content()?, > - new_or_updated: Default::default(), > - dirty: false, > - cachefile_path, > - }) > - } > - > - /// Load the task cache contents from disk. > - fn load_content() -> Result<TaskCacheContent, Error> { > - let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json"); > - let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?; > - > - let content = if let Some(content) = content { > - serde_json::from_str(&content)? > - } else { > - Default::default() > - }; > - > - Ok(content) > - } > - > - /// Get path for the cache's lockfile. > - fn lockfile_path(&self) -> PathBuf { > - let mut path = self.cachefile_path.clone(); > - path.set_extension("lock"); > - path > - } > - > - /// Persist the task cache > - /// > - /// This method requests an exclusive lock for the task cache lockfile. > - fn save(&mut self) -> Result<(), Error> { > - // if we have not updated anything, we don't have to update the cache file > - if !self.dirty { > - return Ok(()); > - } > - > - let _guard = self.lock(Duration::from_secs(5))?; > - > - // Read content again, in case somebody has changed it in the meanwhile > - let mut content = Self::load_content()?; > - > - for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() { > - if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) { > - // Only update entry if nobody else has updated it in the meanwhile > - if existing_entry.timestamp < entry.timestamp { > - *existing_entry = entry; > - } > - } else { > - content.remote_tasks.insert(remote_name, entry); > - } > - } > - > - let bytes = serde_json::to_vec_pretty(&content)?; > - > - let api_uid = pdm_config::api_user()?.uid; > - let api_gid = pdm_config::api_group()?.gid; > - > - let file_options = CreateOptions::new().owner(api_uid).group(api_gid); > - > - proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?; > - > - self.dirty = false; > - > - Ok(()) > - } > - > - // Update task data for a given remote. > - fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) { > - self.dirty = true; > - self.new_or_updated > - .remote_tasks > - .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks }); > - } > - > - // Get task data for a given remote. > - fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> { > - if let Some(entry) = self.content.remote_tasks.get(remote) { > - if (entry.timestamp + max_age) < now { > - return None; > - } > - > - Some(entry.tasks.clone()) > - } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) { > - if (entry.timestamp + max_age) < now { > - return None; > - } > - Some(entry.tasks.clone()) > - } else { > - None > - } > - } > - > - // Invalidate cache for a given remote. > - fn invalidate_cache_for_remote(&mut self, remote: &str) { > - self.dirty = true; > - self.content.remote_tasks.remove(remote); > - } > - > - // Lock the cache for modification. > - // > - // While the cache is locked, other users can still read the cache > - // without a lock, since the cache file is replaced atomically > - // when updating. > - fn lock(&self, duration: Duration) -> Result<File, Error> { > - let api_uid = pdm_config::api_user()?.uid; > - let api_gid = pdm_config::api_group()?.gid; > - > - let file_options = CreateOptions::new().owner(api_uid).group(api_gid); > - proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options) > - } > -} > - > -#[derive(Serialize, Deserialize)] > -/// Per-remote entry in the task cache. > -struct TaskCacheEntry { > - timestamp: i64, > - tasks: Vec<TaskListItem>, > -} > - > -#[derive(Default, Serialize, Deserialize)] > -/// Content of the task cache file. > -struct TaskCacheContent { > - remote_tasks: HashMap<String, TaskCacheEntry>, > -} > - > -/// Interval at which tracked tasks are polled > -const RUNNING_CHECK_INTERVAL_S: u64 = 10; > - > -/// Tasks which were started by PDM and are still running > -static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init); > -/// Tasks which were started by PDM and w > -static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init); > - > -fn init() -> RwLock<HashSet<RemoteUpid>> { > - RwLock::new(HashSet::new()) > -} > - > -/// Insert a remote UPID into the running list > -/// > -/// If it is the first entry in the list, a background task is started to track its state > -/// > -/// Returns the [`JoinHandle`] if a task was started. > -/// > -/// panics on a poisoned mutex > -pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> { > - let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap(); > - > - // the call inserting the first task in the list needs to start the checking task > - let need_start_task = tasks.is_empty(); > - tasks.insert(task); > - > - if !need_start_task { > - return None; > - } > - drop(tasks); > - > - Some(tokio::spawn(async move { > - loop { > - let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S); > - tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; > - > - let finished_tasks = get_finished_tasks().await; > - > - // skip iteration if we still have tasks, just not finished ones > - if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() { > - continue; > - } > - > - let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap(); > - // we either have finished tasks, or the running task list was empty > - let mut set = RUNNING_FOREIGN_TASKS.write().unwrap(); > - > - for (upid, _status) in finished_tasks { > - if set.remove(&upid) { > - finished.insert(upid); > - } else { > - // someone else removed & persisted the task in the meantime > - } > - } > - > - // if no task remains, end the current task > - // it will be restarted by the next caller that inserts one > - if set.is_empty() { > - return; > - } > - } > - })) > -} > - > -/// Get a list of running foreign tasks > -/// > -/// panics on a poisoned mutex > -pub fn get_running_tasks() -> Vec<RemoteUpid> { > - RUNNING_FOREIGN_TASKS > - .read() > - .unwrap() > - .iter() > - .cloned() > - .collect() > -} > - > -/// Checks all current saved UPIDs if they're still running, and if not, > -/// returns their upids + status > -/// > -/// panics on a poisoned mutex > -pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> { > - let mut finished = Vec::new(); > - let config = match pdm_config::remotes::config() { > - Ok((config, _)) => config, > - Err(err) => { > - log::error!("could not open remotes config: {err}"); > - return Vec::new(); > - } > - }; > - for task in get_running_tasks() { > - match config.get(task.remote()) { > - Some(remote) => match remote.ty { > - RemoteType::Pve => { > - let status = match crate::api::pve::tasks::get_task_status( > - remote.id.clone(), > - task.clone(), > - false, > - ) > - .await > - { > - Ok(status) => status, > - Err(err) => { > - log::error!("could not get status from remote: {err}"); > - finished.push((task, "could not get status".to_string())); > - continue; > - } > - }; > - if let Some(status) = status.exitstatus { > - finished.push((task, status.to_string())); > + if let Some(since) = filters.since { > + if item.starttime < since { > + return false; > } > } > - RemoteType::Pbs => { > - let _client = match crate::pbs_client::connect(remote) { > - Ok(client) => client, > - Err(err) => { > - log::error!("could not get status from remote: {err}"); > - finished.push((task, "could not get status".to_string())); > - continue; > - } > - }; > - // FIXME implement get task status > - finished.push((task, "unknown state".to_string())); > - } > - }, > - None => finished.push((task, "unknown remote".to_string())), > - } > - } > > - finished > + if let Some(needle) = &filters.userfilter { > + if !item.user.contains(needle) { > + return false; > + } > + } > + > + if let Some(typefilter) = &filters.typefilter { > + if !item.worker_type.contains(typefilter) { > + return false; > + } > + } > + > + let state = item.status.as_ref().map(|status| tasktype(status)); > + > + match (state, &filters.statusfilter) { > + (Some(TaskStateType::OK), _) if filters.errors => return false, > + (Some(state), Some(filters)) => { > + if !filters.contains(&state) { > + return false; > + } > + } > + (None, Some(_)) => return false, > + _ => {} > + } > + > + true > + }) > + .collect(); > + > + Ok(returned_tasks) > + }) > + .await? > +} > + > +/// Insert a newly created tasks into the list of tracked tasks. > +/// > +/// Any remote with associated tracked tasks will polled with a short interval > +/// until all tracked tasks have finished. > +pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> { > + tokio::task::spawn_blocking(move || { > + let cache = get_cache()?; > + // TODO:: Handle PBS tasks correctly. > + let pve_upid: pve_api_types::PveUpid = task.upid.parse()?; > + let task = TaskCacheItem { > + upid: task.clone(), > + starttime: pve_upid.starttime, > + status: None, > + endtime: None, > + }; > + cache.add_tracked_task(task) > + }) > + .await? > +} > + > +/// Get a new [`TaskCache`] instance. > +/// > +/// No heavy-weight operations are done here, it's fine to call this regularly as part of the > +/// update loop. > +pub fn get_cache() -> Result<TaskCache, Error> { > + let api_uid = pdm_config::api_user()?.uid; > + let api_gid = pdm_config::api_group()?.gid; > + > + let file_options = CreateOptions::new().owner(api_uid).group(api_gid); > + > + let cache_path = Path::new(REMOTE_TASKS_DIR); > + let cache = TaskCache::new(cache_path, file_options)?; > + > + Ok(cache) > } > > /// Parses a task status string into a TaskStateType _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel