From: Dominik Csapak <d.csapak@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: Re: [pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache
Date: Wed, 14 May 2025 17:27:37 +0200 [thread overview]
Message-ID: <112a7d67-79c3-4d23-bee7-8b1d02140104@proxmox.com> (raw)
In-Reply-To: <20250512114144.118545-3-l.wagner@proxmox.com>
some comments inline
On 5/12/25 13:41, Lukas Wagner wrote:
> This commits changes the remote task module as follows:
>
> - Add a new background task for regular polling of task data
> Instead of triggering fetching of task data from the `get_tasks` function,
> which is usually called by an API handler, we move the fetching to a
> new background task. The task fetches the latest tasks from all remotes
> and stores them in the task cache in regular intervals (10 minutes).
> The `get_tasks` function itself only reads from the cache.
> The main rationale for this change is that for large setups, fetching
> tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
> each with a >100ms connection - adds up to minutes quickly).
> If we do this from within `get_tasks`, the API handler calling the
> function is also blocked for the entire time.
> The `get_tasks` API is called every couple of seconds by the UI the get
> a list of running remote tasks, so this *must* be quick.
>
> - Tracked tasks are also polled in the same background task, but with
> a short polling delay (10 seconds). Instead of polling the status specific
> tracked task UPID, we simply fetch *all* tasks since the tracked task started.
> While this increased the amount of transmitted data a bit for tracked tasks
> that run for a very long time, this new approach make the whole
> task tracking functionality much more elegant; it integrates better with
> the 'regular' task fetching which happens in long intervals.
as i said in patch one, for long running tasks this could be a scaling problem.
we then often query the same tasks over and over again just to e.g.
check one long running task
also this will impact disk performance on the pve/pbs side, since reading
all tasks since a cutoff time must probably read multiple files, but
querying a single task reads just the end of one file.
in general i would assume that the ratio of running tasks vs all tasks
is rather low, so we have e.g. 1-5 running tasks per remote, vs
hundreds or thousands that are finished.
>
> - Tasks are now stored in the new improved task cache implementation.
> This should make retrieving tasks much quicker and avoids
> unneeded disk IO.
>
> Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
> ---
>
> Notes:
> Changes since v4:
> - Rebase onto latest master, adapting to changes in
> the section config type
>
> Changes since v2:
> - Adapt to new locking approach (only drops a `mut`)
>
> Changes since v1:
>
> - use const Duration instead of u64s for durations, using
> Duration::as_secs() where needed
> - Move the remote_task fetching task functions to
> src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> - remote_tasks::get_tasks: wrap function body in a
> tokio::task::spawn_blocking. using the TaskCache::get_tasks
> iterator does disk IO and could block the executor
> - Added some doc strings to make the purpose/workings of
> some functions clearer
> - Couple of variables have been renamed for more clarity
>
> server/src/api/pve/lxc.rs | 10 +-
> server/src/api/pve/mod.rs | 4 +-
> server/src/api/pve/qemu.rs | 6 +-
> server/src/api/remote_tasks.rs | 11 +-
> server/src/bin/proxmox-datacenter-api/main.rs | 1 +
> .../bin/proxmox-datacenter-api/tasks/mod.rs | 1 +
> .../tasks/remote_tasks.rs | 363 +++++++++++
> server/src/remote_tasks/mod.rs | 605 ++++--------------
> 8 files changed, 498 insertions(+), 503 deletions(-)
> create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
>
> diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
> index f1c31425..83f9f4aa 100644
> --- a/server/src/api/pve/lxc.rs
> +++ b/server/src/api/pve/lxc.rs
> @@ -209,7 +209,7 @@ pub async fn lxc_start(
>
> let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
>
> - new_remote_upid(remote, upid)
> + new_remote_upid(remote, upid).await
> }
>
> #[api(
> @@ -242,7 +242,7 @@ pub async fn lxc_stop(
>
> let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
>
> - new_remote_upid(remote, upid)
> + new_remote_upid(remote, upid).await
> }
>
> #[api(
> @@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
> .shutdown_lxc_async(&node, vmid, Default::default())
> .await?;
>
> - new_remote_upid(remote, upid)
> + new_remote_upid(remote, upid).await
> }
>
> #[api(
> @@ -357,7 +357,7 @@ pub async fn lxc_migrate(
> };
> let upid = pve.migrate_lxc(&node, vmid, params).await?;
>
> - new_remote_upid(remote, upid)
> + new_remote_upid(remote, upid).await
> }
>
> #[api(
> @@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
> log::info!("migrating vm {vmid} of node {node:?}");
> let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
>
> - new_remote_upid(source, upid)
> + new_remote_upid(source, upid).await
> }
> diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
> index dd7cf382..d472cf58 100644
> --- a/server/src/api/pve/mod.rs
> +++ b/server/src/api/pve/mod.rs
> @@ -76,9 +76,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
> const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
>
> // converts a remote + PveUpid into a RemoteUpid and starts tracking it
> -fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
> +async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
> let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
> - remote_tasks::track_running_task(remote_upid.clone());
> + remote_tasks::track_running_task(remote_upid.clone()).await?;
> Ok(remote_upid)
> }
>
> diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
> index dea0550c..54e310d2 100644
> --- a/server/src/api/pve/qemu.rs
> +++ b/server/src/api/pve/qemu.rs
> @@ -216,7 +216,7 @@ pub async fn qemu_start(
> .start_qemu_async(&node, vmid, Default::default())
> .await?;
>
> - new_remote_upid(remote, upid)
> + new_remote_upid(remote, upid).await
> }
>
> #[api(
> @@ -376,7 +376,7 @@ pub async fn qemu_migrate(
> };
> let upid = pve.migrate_qemu(&node, vmid, params).await?;
>
> - new_remote_upid(remote, upid)
> + new_remote_upid(remote, upid).await
> }
>
> #[api(
> @@ -563,5 +563,5 @@ pub async fn qemu_remote_migrate(
> log::info!("migrating vm {vmid} of node {node:?}");
> let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
>
> - new_remote_upid(source, upid)
> + new_remote_upid(source, upid).await
> }
> diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
> index e629000c..05ce3666 100644
> --- a/server/src/api/remote_tasks.rs
> +++ b/server/src/api/remote_tasks.rs
> @@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
> },
> input: {
> properties: {
> - "max-age": {
> - type: Integer,
> - optional: true,
> - // TODO: sensible default max-age
> - default: 300,
> - description: "Maximum age of cached task data",
> - },
> filters: {
> type: TaskFilters,
> flatten: true,
> @@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
> },
> )]
> /// Get the list of tasks for all remotes.
> -async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> - let tasks = remote_tasks::get_tasks(max_age, filters).await?;
> +async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> + let tasks = remote_tasks::get_tasks(filters).await?;
>
> Ok(tasks)
> }
> diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
> index 49499980..70e489d0 100644
> --- a/server/src/bin/proxmox-datacenter-api/main.rs
> +++ b/server/src/bin/proxmox-datacenter-api/main.rs
> @@ -292,6 +292,7 @@ async fn run(debug: bool) -> Result<(), Error> {
> metric_collection::start_task();
> tasks::remote_node_mapping::start_task();
> resource_cache::start_task();
> + tasks::remote_tasks::start_task()?;
>
> server.await?;
> log::info!("server shutting down, waiting for active workers to complete");
> diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
> index e6ead882..a6b1f439 100644
> --- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
> +++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
> @@ -1 +1,2 @@
> pub mod remote_node_mapping;
> +pub mod remote_tasks;
> diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> new file mode 100644
> index 00000000..9b6da809
> --- /dev/null
> +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> @@ -0,0 +1,363 @@
> +use std::{collections::HashMap, sync::Arc, time::Duration};
> +
> +use anyhow::{format_err, Error};
> +use nix::sys::stat::Mode;
> +use tokio::{sync::Semaphore, task::JoinSet};
> +
> +use pdm_api_types::{
> + remotes::{Remote, RemoteType},
> + RemoteUpid,
> +};
> +use proxmox_sys::fs::CreateOptions;
> +use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
> +use server::{
> + api::pve,
> + remote_tasks::{
> + self,
> + task_cache::{AddTasks, State, TaskCache, TaskCacheItem},
> + REMOTE_TASKS_DIR,
> + },
> + task_utils,
> +};
> +
> +/// Tick interval for the remote task fetching task.
> +/// This is also the rate at which we check on tracked tasks.
> +const TASK_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
> +
> +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
> +/// task for this remote).
> +const REGULAR_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
imho those two are confusingly documented, if 'REGULAR_..' is used
for the normal interval, why does 'TASK_REFRE..' say its used too for that?
> +/// Number of cycles until a regular refresh.
> +const REGULAR_REFRESH_CYCLES: u64 =
> + REGULAR_REFRESH_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
> +
> +/// Check if we want to rotate once every hour.
commas are important, one can read this sentence in two ways:
Check if we want to rotate, once every hour
or
Check if want to (rotate once every hour) (brackets for clarity)
IMHO a better way to write that is:
Check once every hour if we want to rotate.
> +const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
> +/// Number of cycles before we want to check if we should rotate the task archives.
> +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
> +
> +/// Rotate once the most recent archive file is at least 24 hour old.
> +const ROTATE_AFTER: Duration = Duration::from_secs(24 * 3600);
> +
> +/// Keep 7 days worth of tasks.
> +const KEEP_OLD_FILES: u64 = 7;
> +
> +/// Maximum number of concurrent connections per remote.
> +const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
> +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_PVE_REMOTE` is taken into
> +/// consideration when accounting for the total number of connections.
> +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_PVE_REMOTE` is 5, we can connect
> +/// to 4 PVE remotes in parallel.
> +const MAX_CONNECTIONS: usize = 20;
> +
> +/// Maximum number of tasks to fetch from a single remote in one API call.
> +const MAX_TASKS_TO_FETCH: u64 = 5000;
> +
> +/// Start the remote task fetching task
> +pub fn start_task() -> Result<(), Error> {
> + let api_uid = pdm_config::api_user()?.uid;
> + let api_gid = pdm_config::api_group()?.gid;
> + let file_options = CreateOptions::new()
> + .owner(api_uid)
> + .group(api_gid)
> + .perm(Mode::from_bits_truncate(0o0750));
> + proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
this should probably use the proxmox_product_config crate
> +
> + tokio::spawn(async move {
> + let task_scheduler = std::pin::pin!(remote_task_fetching_task());
> + let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
> + futures::future::select(task_scheduler, abort_future).await;
> + });
> +
> + Ok(())
> +}
> +
> +/// Task which handles fetching remote tasks and task archive rotation.
> +/// This function never returns.
> +async fn remote_task_fetching_task() -> ! {
> + let mut cycle = 0u64;
> + let mut interval = tokio::time::interval(TASK_REFRESH_INTERVAL);
> + interval.reset_at(task_utils::next_aligned_instant(TASK_REFRESH_INTERVAL.as_secs()).into());
> +
> + // We don't really care about catching up to missed tick, we just want
> + // a steady tick rate.
> + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
> +
> + if let Err(err) = init_cache().await {
> + log::error!("error when initialized task cache: {err}");
> + }
> +
> + loop {
> + interval.tick().await;
> + if let Err(err) = do_tick(cycle).await {
> + log::error!("error when fetching remote tasks: {err}");
> + }
> +
> + // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
> + // better be safe and use .wrapping_add(1) :)
> + cycle = cycle.wrapping_add(1);
> + }
you do the cycle check here manually, but this can be bad, e.g.
if one cycle takes a long time (say 1 minute instead of a 10 seconds) and that
regularly, you fetch the remotes not every 10 minutes (as the comment above would indicate)
but only every hour
I guess you wanted to be on the safe side and not being too overly aggressive with the polling,
but having the fetch/rotation interval be that dependent on the cycle duration seems
also not very good to me.
> +}
> +
> +/// Handle a single timer tick.
> +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
> +async fn do_tick(cycle: u64) -> Result<(), Error> {
> + let cache = remote_tasks::get_cache()?;
> +
> + if should_check_for_cache_rotation(cycle) {
> + log::debug!("checking if remote task archive should be rotated");
> + if rotate_cache(cache.clone()).await? {
> + log::info!("rotated remote task archive");
> + }
> + }
> +
> + let state = cache.read_state();
> +
> + let mut all_tasks = HashMap::new();
> +
> + let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
> + let mut join_set = JoinSet::new();
> +
> + // Get a list of remotes that we should poll in this cycle.
> + let remotes = remotes_to_check(cycle, &state).await?;
> + for remote in remotes {
> + let since = get_cutoff_timestamp(&remote, &state);
> +
> + let permit = if remote.ty == RemoteType::Pve {
> + // Acquire multiple permits, for PVE remotes we want
> + // to multiple nodes in parallel.
> + //
> + // `.unwrap()` is safe, we never close the semaphore.
> + Arc::clone(&total_connections_semaphore)
> + .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
> + .await
> + .unwrap()
would it be possible to acquire the connection semaphores dynamicall inside the
`fetch_tasks` call up to the maximum?
that way, we could e.g. connect to 20 remotes with one host in parallel
instead of always having maximum of 4 ?
(not sure about the tokio semaphore possibilities here)
I'd still limit it to CONNECTIONS_PER_PVE_REMOTE for each remote,
but in case one remote has less nodes, we could utilize the connection count
for more remotes, doing more work in parallel.
> + } else {
> + // For PBS remotes we only have a single outgoing connection
> + //
> + // `.unwrap()` is safe, we never close the semaphore.
> + Arc::clone(&total_connections_semaphore)
> + .acquire_owned()
> + .await
> + .unwrap()
> + };
> +
> + join_set.spawn(async move {
> + log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
> + let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
> + format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
> + });
> +
> + drop(permit);
> + tasks
> + });
> + }
> +
> + while let Some(res) = join_set.join_next().await {
> + match res {
> + Ok(Ok((remote, request))) => {
> + all_tasks.insert(remote, request);
> + }
> + Ok(Err(err)) => log::error!("{err}"),
> + Err(err) => log::error!("could not join task fetching future: {err}"),
> + }
> + }
> +
> + if !all_tasks.is_empty() {
> + save_tasks(cache, all_tasks).await?;
> + }
> +
> + Ok(())
> +}
> +
> +/// Initialize the remote task cache with initial archive files, in case there are not
> +/// any archive files yet.
> +///
> +/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
> +/// spaced `ROTATE_AFTER_S` seconds apart.
> +/// This allows us to immediately backfill remote task history when setting up a new PDM instance
> +/// without any prior task archive rotation.
> +async fn init_cache() -> Result<(), Error> {
> + tokio::task::spawn_blocking(|| {
> + let cache = remote_tasks::get_cache()?;
> + cache.init(
> + proxmox_time::epoch_i64(),
> + KEEP_OLD_FILES,
> + ROTATE_AFTER.as_secs(),
> + )?;
> + Ok(())
> + })
> + .await?
> +}
> +
> +/// Return list of remotes that are to be polled in this cycle.
> +///
> +/// If `cycle` is a multiple of `REGULAR_REFRESH_CYCLES`, the function will
> +/// return all remotes from the remote config. This ensures that
> +/// all remotes are polled at regular intervals.
> +/// In any other case we only return remotes which currently have a tracked
> +/// task.
> +/// On daemon startup (when cycle is 0) we return all remotes to ensure
> +/// that we get an up-to-date task list from all remotes.
> +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
> + let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
> +
> + let all = cycle % REGULAR_REFRESH_CYCLES == 0;
> +
> + if all {
> + Ok(config.into_iter().map(|(_, section)| section).collect())
> + } else {
> + Ok(config
> + .into_iter()
> + .filter_map(|(name, remote)| {
> + if let Some(tracked) = state.tracked_tasks.get(&name) {
> + if !tracked.is_empty() {
> + Some(remote)
> + } else {
> + None
> + }
> + } else {
> + None
> + }
> + })
i think this could be more succinctly written as:
state
.tracked_tasks
.get(&name)
.and_then(|tracked| (!tracked.is_empty()).then_some(remote))
> + .collect())
> + }
> +}
> +
> +/// Get the timestamp from which on we should fetch tasks for a given remote.
> +/// The returned timestamp is a UNIX timestamp (in seconds).
> +fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
> + let oldest_active = state.oldest_active_task.get(&remote.id).copied();
> + let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
> +
> + match (oldest_active, youngest_archived) {
> + (None, None) => 0,
> + (None, Some(youngest_archived)) => youngest_archived,
> + (Some(oldest_active), None) => oldest_active,
> + (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
> + }
> +}
> +
> +/// Rotate the task cache if necessary.
> +///
> +/// Returns Ok(true) the cache's files were rotated.
> +async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
> + tokio::task::spawn_blocking(move || {
> + cache.rotate(
> + proxmox_time::epoch_i64(),
> + ROTATE_AFTER.as_secs(),
> + KEEP_OLD_FILES,
> + )
> + })
> + .await?
> +}
in pbs, we start a worker task for the log rotation, maybe we want here too ?
> +
> +/// Fetch tasks (active and finished) from a remote
> +/// `since` is a UNIX timestamp (seconds).
> +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
> + let mut tasks = Vec::new();
> +
> + let mut all_successful = true;
> +
> + match remote.ty {
> + RemoteType::Pve => {
> + let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
> + let mut join_set = JoinSet::new();
> +
> + let client = pve::connect(remote)?;
> +
> + // N+1 requests - we could use /cluster/tasks, but that one
> + // only gives a limited task history
> + for node in client.list_nodes().await? {
> + let params = ListTasks {
> + // Include running tasks
> + source: Some(ListTasksSource::All),
> + since: Some(since),
> + // If `limit` is not provided, we only receive 50 tasks
> + limit: Some(MAX_TASKS_TO_FETCH),
> + ..Default::default()
> + };
> +
> + let permit = Arc::clone(&per_remote_semaphore)
> + .acquire_owned()
> + .await
> + .unwrap();
> +
> + let r = remote.clone();
> +
> + join_set.spawn(async move {
> + let client = pve::connect(&r)?;
> + let task_list =
> + client
> + .get_task_list(&node.node, params)
> + .await
> + .map_err(|err| {
> + format_err!("remote '{}', node '{}': {err}", r.id, node.node)
> + })?;
> +
> + drop(permit);
> +
> + Ok::<Vec<_>, Error>(task_list)
> + });
> + }
> +
> + while let Some(res) = join_set.join_next().await {
> + match res {
> + Ok(Ok(list)) => {
> + let mapped = list.into_iter().filter_map(|task| {
> + match map_pve_task(task, &remote.id) {
> + Ok(task) => Some(task),
> + Err(err) => {
> + log::error!("could not map task data, skipping: {err}");
> + None
> + }
> + }
> + });
> +
> + tasks.extend(mapped);
> + }
> + Ok(Err(err)) => {
> + all_successful = false;
> + log::error!("could not fetch tasks: {err:?}");
> + }
> + Err(err) => return Err(err.into()),
> + }
> + }
> + }
> + RemoteType::Pbs => {
> + // TODO: Add code for PBS
> + }
> + }
> +
> + let new_tasks = AddTasks {
> + update_most_recent_archive_timestamp: all_successful,
> + tasks,
> + };
> +
> + Ok((remote.id.clone(), new_tasks))
> +}
> +
> +/// Check if we are due for checking for cache rotation.
> +///
> +/// If `cycle` is 0, a cache rotation check is forced. This is
> +/// only relevant at daemon startup.
> +fn should_check_for_cache_rotation(cycle: u64) -> bool {
> + cycle % CHECK_ROTATE_CYCLES == 0
> +}
> +
> +/// Map a `ListTasksResponse` to `TaskCacheItem`
> +fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
> + let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
> +
> + Ok(TaskCacheItem {
> + upid: remote_upid,
> + starttime: task.starttime,
> + endtime: task.endtime,
> + status: task.status,
> + })
> +}
> +
> +/// Add newly fetched tasks to the cache.
> +async fn save_tasks(cache: TaskCache, tasks: HashMap<String, AddTasks>) -> Result<(), Error> {
> + tokio::task::spawn_blocking(move || cache.add_tasks(tasks)).await?
> +}
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 7c8e31ef..126c9ad3 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -1,515 +1,152 @@
> -use std::{
> - collections::{HashMap, HashSet},
> - fs::File,
> - path::{Path, PathBuf},
> - sync::{LazyLock, RwLock},
> - time::Duration,
> -};
> +use std::path::Path;
>
> use anyhow::Error;
> -use pdm_api_types::{
> - remotes::{Remote, RemoteType},
> - RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
> -};
> +use pdm_api_types::{RemoteUpid, TaskFilters, TaskListItem, TaskStateType};
> use proxmox_sys::fs::CreateOptions;
> -use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
> -use serde::{Deserialize, Serialize};
> -use tokio::task::JoinHandle;
> +use pve_api_types::PveUpid;
> +use task_cache::{GetTasks, TaskCache, TaskCacheItem};
>
> -use crate::{api::pve, task_utils};
> +pub mod task_cache;
>
> -mod task_cache;
> +pub const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
>
> /// Get tasks for all remotes
> // FIXME: filter for privileges
> -pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> - let (remotes, _) = pdm_config::remotes::config()?;
> +pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> + tokio::task::spawn_blocking(move || {
> + let cache = get_cache()?;
>
> - let mut all_tasks = Vec::new();
> + let mut returned_tasks = Vec::new();
>
> - let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
> - let mut cache = TaskCache::new(cache_path)?;
> -
> - // Force a refresh for all tasks of a remote if a task is finished.
> - // Not super nice, but saves us from persisting finished tasks. Also,
> - // the /nodes/<node>/tasks/<upid>/status endpoint does not return
> - // a task's endtime, which is only returned by
> - // /nodes/<node>/tasks...
> - // Room for improvements in the future.
> - invalidate_cache_for_finished_tasks(&mut cache);
> -
> - for (remote_name, remote) in remotes.iter() {
> - let now = proxmox_time::epoch_i64();
> -
> - if let Some(tasks) = cache.get_tasks(remote_name, now, max_age) {
> - // Data in cache is recent enough and has not been invalidated.
> - all_tasks.extend(tasks);
> + let which = if filters.running {
> + GetTasks::Active
> } else {
> - let tasks = match fetch_tasks(remote).await {
> - Ok(tasks) => tasks,
> + GetTasks::All
> + };
> +
> + for task in &mut cache
> + .get_tasks(which)?
> + .skip(filters.start as usize)
> + .take(filters.limit as usize)
> + {
> + let task = match task {
> + Ok(task) => task,
> Err(err) => {
> - // ignore errors for not reachable remotes
> + log::error!("could not read task from remote task cache, skipping: {err}");
> continue;
> }
> };
> - cache.set_tasks(remote_name, tasks.clone(), now);
>
> - all_tasks.extend(tasks);
> + // TODO: Handle PBS tasks
> + let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
> + match pve_upid {
> + Ok(pve_upid) => {
> + returned_tasks.push(TaskListItem {
> + upid: task.upid.to_string(),
> + node: pve_upid.node,
> + pid: pve_upid.pid as i64,
> + pstart: pve_upid.pstart,
> + starttime: pve_upid.starttime,
> + worker_type: pve_upid.worker_type,
> + worker_id: None,
> + user: pve_upid.auth_id,
> + endtime: task.endtime,
> + status: task.status,
> + });
> + }
> + Err(err) => {
> + log::error!("could not parse UPID: {err}");
> + }
> + }
> }
> - }
>
> - let mut returned_tasks = add_running_tasks(all_tasks)?;
> - returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
> - let returned_tasks = returned_tasks
> - .into_iter()
> - .filter(|item| {
> - if filters.running && item.endtime.is_some() {
> - return false;
> - }
> -
> - if let Some(until) = filters.until {
> - if item.starttime > until {
> + let returned_tasks = returned_tasks
> + .into_iter()
> + .filter(|item| {
> + if filters.running && item.endtime.is_some() {
> return false;
> }
> - }
>
> - if let Some(since) = filters.since {
> - if item.starttime < since {
> - return false;
> - }
> - }
> -
> - if let Some(needle) = &filters.userfilter {
> - if !item.user.contains(needle) {
> - return false;
> - }
> - }
> -
> - if let Some(typefilter) = &filters.typefilter {
> - if !item.worker_type.contains(typefilter) {
> - return false;
> - }
> - }
> -
> - let state = item.status.as_ref().map(|status| tasktype(status));
> -
> - match (state, &filters.statusfilter) {
> - (Some(TaskStateType::OK), _) if filters.errors => return false,
> - (Some(state), Some(filters)) => {
> - if !filters.contains(&state) {
> + if let Some(until) = filters.until {
> + if item.starttime > until {
> return false;
> }
> }
> - (None, Some(_)) => return false,
> - _ => {}
> - }
>
> - true
> - })
> - .skip(filters.start as usize)
> - .take(filters.limit as usize)
> - .collect();
> -
> - // We don't need to wait for this task to finish
> - tokio::task::spawn_blocking(move || {
> - if let Err(e) = cache.save() {
> - log::error!("could not save task cache: {e}");
> - }
> - });
> -
> - Ok(returned_tasks)
> -}
> -
> -/// Fetch tasks (active and finished) from a remote
> -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> - let mut tasks = Vec::new();
> -
> - match remote.ty {
> - RemoteType::Pve => {
> - let client = pve::connect(remote)?;
> -
> - // N+1 requests - we could use /cluster/tasks, but that one
> - // only gives a limited task history
> - for node in client.list_nodes().await? {
> - let params = ListTasks {
> - // Include running tasks
> - source: Some(ListTasksSource::All),
> - // TODO: How much task history do we want? Right now we just hard-code it
> - // to 7 days.
> - since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
> - ..Default::default()
> - };
> -
> - let list = client.get_task_list(&node.node, params).await?;
> - let mapped = map_tasks(list, &remote.id)?;
> -
> - tasks.extend(mapped);
> - }
> - }
> - RemoteType::Pbs => {
> - // TODO: Add code for PBS
> - }
> - }
> -
> - Ok(tasks)
> -}
> -
> -/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
> -fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
> - let mut mapped = Vec::new();
> -
> - for task in tasks {
> - let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
> -
> - mapped.push(TaskListItem {
> - upid: remote_upid.to_string(),
> - node: task.node,
> - pid: task.pid,
> - pstart: task.pstart as u64,
> - starttime: task.starttime,
> - worker_type: task.ty,
> - worker_id: Some(task.id),
> - user: task.user,
> - endtime: task.endtime,
> - status: task.status,
> - })
> - }
> -
> - Ok(mapped)
> -}
> -
> -/// Drops the cached task list of a remote for all finished tasks.
> -///
> -/// We use this to force a refresh so that we get the full task
> -/// info (including `endtime`) in the next API call.
> -fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
> - let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
> -
> - // If a task is finished, we force a refresh for the remote - otherwise
> - // we don't get the 'endtime' for the task.
> - for task in finished.drain() {
> - cache.invalidate_cache_for_remote(task.remote());
> - }
> -}
> -
> -/// Supplement the list of tasks that we received from the remote with
> -/// the tasks that were started by PDM and are currently running.
> -fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
> - let mut returned_tasks = Vec::new();
> -
> - let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
> - for task in cached_tasks {
> - let remote_upid = task.upid.parse()?;
> -
> - if running_tasks.contains(&remote_upid) {
> - if task.endtime.is_some() {
> - // Task is finished but we still think it is running ->
> - // Drop it from RUNNING_FOREIGN_TASKS
> - running_tasks.remove(&remote_upid);
> -
> - // No need to put it in FINISHED_TASKS, since we already
> - // got its state recently enough (we know the status and endtime)
> - }
> - } else {
> - returned_tasks.push(task);
> - }
> - }
> -
> - for task in running_tasks.iter() {
> - let upid: PveUpid = task.upid.parse()?;
> - returned_tasks.push(TaskListItem {
> - upid: task.to_string(),
> - node: upid.node,
> - pid: upid.pid as i64,
> - pstart: upid.pstart,
> - starttime: upid.starttime,
> - worker_type: upid.worker_type,
> - worker_id: upid.worker_id,
> - user: upid.auth_id,
> - endtime: None,
> - status: None,
> - });
> - }
> -
> - Ok(returned_tasks)
> -}
> -
> -/// A cache for fetched remote tasks.
> -struct TaskCache {
> - /// Cache entries
> - content: TaskCacheContent,
> -
> - /// Entries that were added or updated - these will be persistet
> - /// when `save` is called.
> - new_or_updated: TaskCacheContent,
> -
> - /// Cache entries were changed/removed.
> - dirty: bool,
> -
> - /// File-location at which the cached tasks are stored.
> - cachefile_path: PathBuf,
> -}
> -
> -impl TaskCache {
> - /// Create a new tasks cache instance by loading
> - /// the cache from disk.
> - fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
> - Ok(Self {
> - content: Self::load_content()?,
> - new_or_updated: Default::default(),
> - dirty: false,
> - cachefile_path,
> - })
> - }
> -
> - /// Load the task cache contents from disk.
> - fn load_content() -> Result<TaskCacheContent, Error> {
> - let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
> - let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
> -
> - let content = if let Some(content) = content {
> - serde_json::from_str(&content)?
> - } else {
> - Default::default()
> - };
> -
> - Ok(content)
> - }
> -
> - /// Get path for the cache's lockfile.
> - fn lockfile_path(&self) -> PathBuf {
> - let mut path = self.cachefile_path.clone();
> - path.set_extension("lock");
> - path
> - }
> -
> - /// Persist the task cache
> - ///
> - /// This method requests an exclusive lock for the task cache lockfile.
> - fn save(&mut self) -> Result<(), Error> {
> - // if we have not updated anything, we don't have to update the cache file
> - if !self.dirty {
> - return Ok(());
> - }
> -
> - let _guard = self.lock(Duration::from_secs(5))?;
> -
> - // Read content again, in case somebody has changed it in the meanwhile
> - let mut content = Self::load_content()?;
> -
> - for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
> - if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
> - // Only update entry if nobody else has updated it in the meanwhile
> - if existing_entry.timestamp < entry.timestamp {
> - *existing_entry = entry;
> - }
> - } else {
> - content.remote_tasks.insert(remote_name, entry);
> - }
> - }
> -
> - let bytes = serde_json::to_vec_pretty(&content)?;
> -
> - let api_uid = pdm_config::api_user()?.uid;
> - let api_gid = pdm_config::api_group()?.gid;
> -
> - let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
> -
> - proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
> -
> - self.dirty = false;
> -
> - Ok(())
> - }
> -
> - // Update task data for a given remote.
> - fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
> - self.dirty = true;
> - self.new_or_updated
> - .remote_tasks
> - .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
> - }
> -
> - // Get task data for a given remote.
> - fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
> - if let Some(entry) = self.content.remote_tasks.get(remote) {
> - if (entry.timestamp + max_age) < now {
> - return None;
> - }
> -
> - Some(entry.tasks.clone())
> - } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
> - if (entry.timestamp + max_age) < now {
> - return None;
> - }
> - Some(entry.tasks.clone())
> - } else {
> - None
> - }
> - }
> -
> - // Invalidate cache for a given remote.
> - fn invalidate_cache_for_remote(&mut self, remote: &str) {
> - self.dirty = true;
> - self.content.remote_tasks.remove(remote);
> - }
> -
> - // Lock the cache for modification.
> - //
> - // While the cache is locked, other users can still read the cache
> - // without a lock, since the cache file is replaced atomically
> - // when updating.
> - fn lock(&self, duration: Duration) -> Result<File, Error> {
> - let api_uid = pdm_config::api_user()?.uid;
> - let api_gid = pdm_config::api_group()?.gid;
> -
> - let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
> - proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
> - }
> -}
> -
> -#[derive(Serialize, Deserialize)]
> -/// Per-remote entry in the task cache.
> -struct TaskCacheEntry {
> - timestamp: i64,
> - tasks: Vec<TaskListItem>,
> -}
> -
> -#[derive(Default, Serialize, Deserialize)]
> -/// Content of the task cache file.
> -struct TaskCacheContent {
> - remote_tasks: HashMap<String, TaskCacheEntry>,
> -}
> -
> -/// Interval at which tracked tasks are polled
> -const RUNNING_CHECK_INTERVAL_S: u64 = 10;
> -
> -/// Tasks which were started by PDM and are still running
> -static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
> -/// Tasks which were started by PDM and w
> -static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
> -
> -fn init() -> RwLock<HashSet<RemoteUpid>> {
> - RwLock::new(HashSet::new())
> -}
> -
> -/// Insert a remote UPID into the running list
> -///
> -/// If it is the first entry in the list, a background task is started to track its state
> -///
> -/// Returns the [`JoinHandle`] if a task was started.
> -///
> -/// panics on a poisoned mutex
> -pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
> - let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
> -
> - // the call inserting the first task in the list needs to start the checking task
> - let need_start_task = tasks.is_empty();
> - tasks.insert(task);
> -
> - if !need_start_task {
> - return None;
> - }
> - drop(tasks);
> -
> - Some(tokio::spawn(async move {
> - loop {
> - let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
> - tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
> -
> - let finished_tasks = get_finished_tasks().await;
> -
> - // skip iteration if we still have tasks, just not finished ones
> - if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
> - continue;
> - }
> -
> - let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
> - // we either have finished tasks, or the running task list was empty
> - let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
> -
> - for (upid, _status) in finished_tasks {
> - if set.remove(&upid) {
> - finished.insert(upid);
> - } else {
> - // someone else removed & persisted the task in the meantime
> - }
> - }
> -
> - // if no task remains, end the current task
> - // it will be restarted by the next caller that inserts one
> - if set.is_empty() {
> - return;
> - }
> - }
> - }))
> -}
> -
> -/// Get a list of running foreign tasks
> -///
> -/// panics on a poisoned mutex
> -pub fn get_running_tasks() -> Vec<RemoteUpid> {
> - RUNNING_FOREIGN_TASKS
> - .read()
> - .unwrap()
> - .iter()
> - .cloned()
> - .collect()
> -}
> -
> -/// Checks all current saved UPIDs if they're still running, and if not,
> -/// returns their upids + status
> -///
> -/// panics on a poisoned mutex
> -pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
> - let mut finished = Vec::new();
> - let config = match pdm_config::remotes::config() {
> - Ok((config, _)) => config,
> - Err(err) => {
> - log::error!("could not open remotes config: {err}");
> - return Vec::new();
> - }
> - };
> - for task in get_running_tasks() {
> - match config.get(task.remote()) {
> - Some(remote) => match remote.ty {
> - RemoteType::Pve => {
> - let status = match crate::api::pve::tasks::get_task_status(
> - remote.id.clone(),
> - task.clone(),
> - false,
> - )
> - .await
> - {
> - Ok(status) => status,
> - Err(err) => {
> - log::error!("could not get status from remote: {err}");
> - finished.push((task, "could not get status".to_string()));
> - continue;
> - }
> - };
> - if let Some(status) = status.exitstatus {
> - finished.push((task, status.to_string()));
> + if let Some(since) = filters.since {
> + if item.starttime < since {
> + return false;
> }
> }
> - RemoteType::Pbs => {
> - let _client = match crate::pbs_client::connect(remote) {
> - Ok(client) => client,
> - Err(err) => {
> - log::error!("could not get status from remote: {err}");
> - finished.push((task, "could not get status".to_string()));
> - continue;
> - }
> - };
> - // FIXME implement get task status
> - finished.push((task, "unknown state".to_string()));
> - }
> - },
> - None => finished.push((task, "unknown remote".to_string())),
> - }
> - }
>
> - finished
> + if let Some(needle) = &filters.userfilter {
> + if !item.user.contains(needle) {
> + return false;
> + }
> + }
> +
> + if let Some(typefilter) = &filters.typefilter {
> + if !item.worker_type.contains(typefilter) {
> + return false;
> + }
> + }
> +
> + let state = item.status.as_ref().map(|status| tasktype(status));
> +
> + match (state, &filters.statusfilter) {
> + (Some(TaskStateType::OK), _) if filters.errors => return false,
> + (Some(state), Some(filters)) => {
> + if !filters.contains(&state) {
> + return false;
> + }
> + }
> + (None, Some(_)) => return false,
> + _ => {}
> + }
> +
> + true
> + })
> + .collect();
> +
> + Ok(returned_tasks)
> + })
> + .await?
> +}
> +
> +/// Insert a newly created tasks into the list of tracked tasks.
> +///
> +/// Any remote with associated tracked tasks will polled with a short interval
> +/// until all tracked tasks have finished.
> +pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
> + tokio::task::spawn_blocking(move || {
> + let cache = get_cache()?;
> + // TODO:: Handle PBS tasks correctly.
> + let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
> + let task = TaskCacheItem {
> + upid: task.clone(),
> + starttime: pve_upid.starttime,
> + status: None,
> + endtime: None,
> + };
> + cache.add_tracked_task(task)
> + })
> + .await?
> +}
> +
> +/// Get a new [`TaskCache`] instance.
> +///
> +/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
> +/// update loop.
> +pub fn get_cache() -> Result<TaskCache, Error> {
> + let api_uid = pdm_config::api_user()?.uid;
> + let api_gid = pdm_config::api_group()?.gid;
> +
> + let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
> +
> + let cache_path = Path::new(REMOTE_TASKS_DIR);
> + let cache = TaskCache::new(cache_path, file_options)?;
> +
> + Ok(cache)
> }
>
> /// Parses a task status string into a TaskStateType
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-05-14 15:27 UTC|newest]
Thread overview: 12+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-05-12 11:41 [pdm-devel] [PATCH proxmox-datacenter-manager v5 0/6] remote task cache fetching task / better cache backend Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 1/6] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-05-14 14:08 ` Dominik Csapak
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache Lukas Wagner
2025-05-14 15:27 ` Dominik Csapak [this message]
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 3/6] remote tasks: improve locking for task archive iterator Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 4/6] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
2025-05-15 6:56 ` Dominik Csapak
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 5/6] fake remote: make the fake_remote feature compile again Lukas Wagner
2025-05-15 6:55 ` Dominik Csapak
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 6/6] fake remote: clippy fixes Lukas Wagner
2025-05-15 7:05 ` Dominik Csapak
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=112a7d67-79c3-4d23-bee7-8b1d02140104@proxmox.com \
--to=d.csapak@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal