From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 2/4] remote tasks: add background task for task polling, use new task cache
Date: Fri, 11 Apr 2025 13:01:15 +0200 [thread overview]
Message-ID: <20250411110117.199543-3-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250411110117.199543-1-l.wagner@proxmox.com>
This commits changes the remote task module as follows:
- Add a new background task for regular polling of task data
Instead of triggering fetching of task data from the `get_tasks` function,
which is usually called by an API handler, we move the fetching to a
new background task. The task fetches the latest tasks from all remotes
and stores them in the task cache in regular intervals (10 minutes).
The `get_tasks` function itself only reads from the cache.
The main rationale for this change is that for large setups, fetching
tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
each with a >100ms connection - adds up to minutes quickly).
If we do this from within `get_tasks`, the API handler calling the
function is also blocked for the entire time.
The `get_tasks` API is called every couple of seconds by the UI the get
a list of running remote tasks, so this *must* be quick.
- Tracked tasks are also polled in the same background task, but with
a short polling delay (10 seconds). Instead of polling the status specific
tracked task UPID, we simply fetch *all* tasks since the tracked task started.
While this increased the amount of transmitted data a bit for tracked tasks
that run for a very long time, this new approach make the whole
task tracking functionality much more elegant; it integrates better with
the 'regular' task fetching which happens in long intervals.
- Tasks are now stored in the new improved task cache implementation.
This should make retrieving tasks much quicker and avoids
unneeded disk IO.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Changes since v1:
- use const Duration instead of u64s for durations, using
Duration::as_secs() where needed
- Move the remote_task fetching task functions to
src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
- remote_tasks::get_tasks: wrap function body in a
tokio::task::spawn_blocking. using the TaskCache::get_tasks
iterator does disk IO and could block the executor
- Added some doc strings to make the purpose/workings of
some functions clearer
- Couple of variables have been renamed for more clarity
server/src/api/pve/lxc.rs | 10 +-
server/src/api/pve/mod.rs | 4 +-
server/src/api/pve/qemu.rs | 6 +-
server/src/api/remote_tasks.rs | 11 +-
server/src/bin/proxmox-datacenter-api/main.rs | 1 +
.../bin/proxmox-datacenter-api/tasks/mod.rs | 1 +
.../tasks/remote_tasks.rs | 364 +++++++++++
server/src/remote_tasks/mod.rs | 605 ++++--------------
8 files changed, 499 insertions(+), 503 deletions(-)
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
index f1c31425..83f9f4aa 100644
--- a/server/src/api/pve/lxc.rs
+++ b/server/src/api/pve/lxc.rs
@@ -209,7 +209,7 @@ pub async fn lxc_start(
let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -242,7 +242,7 @@ pub async fn lxc_stop(
let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
.shutdown_lxc_async(&node, vmid, Default::default())
.await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -357,7 +357,7 @@ pub async fn lxc_migrate(
};
let upid = pve.migrate_lxc(&node, vmid, params).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
log::info!("migrating vm {vmid} of node {node:?}");
let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
- new_remote_upid(source, upid)
+ new_remote_upid(source, upid).await
}
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index dd7cf382..d472cf58 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -76,9 +76,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
// converts a remote + PveUpid into a RemoteUpid and starts tracking it
-fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
+async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
- remote_tasks::track_running_task(remote_upid.clone());
+ remote_tasks::track_running_task(remote_upid.clone()).await?;
Ok(remote_upid)
}
diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
index dea0550c..54e310d2 100644
--- a/server/src/api/pve/qemu.rs
+++ b/server/src/api/pve/qemu.rs
@@ -216,7 +216,7 @@ pub async fn qemu_start(
.start_qemu_async(&node, vmid, Default::default())
.await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -376,7 +376,7 @@ pub async fn qemu_migrate(
};
let upid = pve.migrate_qemu(&node, vmid, params).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -563,5 +563,5 @@ pub async fn qemu_remote_migrate(
log::info!("migrating vm {vmid} of node {node:?}");
let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
- new_remote_upid(source, upid)
+ new_remote_upid(source, upid).await
}
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index e629000c..05ce3666 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
},
input: {
properties: {
- "max-age": {
- type: Integer,
- optional: true,
- // TODO: sensible default max-age
- default: 300,
- description: "Maximum age of cached task data",
- },
filters: {
type: TaskFilters,
flatten: true,
@@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
},
)]
/// Get the list of tasks for all remotes.
-async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
- let tasks = remote_tasks::get_tasks(max_age, filters).await?;
+async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+ let tasks = remote_tasks::get_tasks(filters).await?;
Ok(tasks)
}
diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
index 25852c8c..d1cd9e57 100644
--- a/server/src/bin/proxmox-datacenter-api/main.rs
+++ b/server/src/bin/proxmox-datacenter-api/main.rs
@@ -291,6 +291,7 @@ async fn run(debug: bool) -> Result<(), Error> {
metric_collection::start_task();
tasks::remote_node_mapping::start_task();
resource_cache::start_task();
+ tasks::remote_tasks::start_task()?;
server.await?;
log::info!("server shutting down, waiting for active workers to complete");
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
index e6ead882..a6b1f439 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
@@ -1 +1,2 @@
pub mod remote_node_mapping;
+pub mod remote_tasks;
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
new file mode 100644
index 00000000..dc590871
--- /dev/null
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -0,0 +1,364 @@
+use std::{collections::HashMap, sync::Arc, time::Duration};
+
+use anyhow::{format_err, Error};
+use nix::sys::stat::Mode;
+use tokio::{sync::Semaphore, task::JoinSet};
+
+use pdm_api_types::{
+ remotes::{Remote, RemoteType},
+ RemoteUpid,
+};
+use proxmox_sys::fs::CreateOptions;
+use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
+use server::{
+ api::pve,
+ remote_tasks::{
+ self,
+ task_cache::{AddTasks, State, TaskCache, TaskCacheItem},
+ REMOTE_TASKS_DIR,
+ },
+ task_utils,
+};
+
+/// Tick interval for the remote task fetching task.
+/// This is also the rate at which we check on tracked tasks.
+const TASK_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const REGULAR_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
+/// Number of cycles until a regular refresh.
+const REGULAR_REFRESH_CYCLES: u64 =
+ REGULAR_REFRESH_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
+
+/// Check if we want to rotate once every hour.
+const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
+/// Number of cycles before we want to check if we should rotate the task archives.
+const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
+
+/// Rotate once the most recent archive file is at least 24 hour old.
+const ROTATE_AFTER: Duration = Duration::from_secs(24 * 3600);
+
+/// Keep 7 days worth of tasks.
+const KEEP_OLD_FILES: u64 = 7;
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+/// Maximum number of total concurrent connections. `CONNECTIONS_PER_PVE_REMOTE` is taken into
+/// consideration when accounting for the total number of connections.
+/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_PVE_REMOTE` is 5, we can connect
+/// to 4 PVE remotes in parallel.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
+/// Start the remote task fetching task
+pub fn start_task() -> Result<(), Error> {
+ let api_uid = pdm_config::api_user()?.uid;
+ let api_gid = pdm_config::api_group()?.gid;
+ let file_options = CreateOptions::new()
+ .owner(api_uid)
+ .group(api_gid)
+ .perm(Mode::from_bits_truncate(0o0750));
+ proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
+
+ tokio::spawn(async move {
+ let task_scheduler = std::pin::pin!(remote_task_fetching_task());
+ let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(task_scheduler, abort_future).await;
+ });
+
+ Ok(())
+}
+
+/// Task which handles fetching remote tasks and task archive rotation.
+/// This function never returns.
+async fn remote_task_fetching_task() -> ! {
+ let mut cycle = 0u64;
+ let mut interval = tokio::time::interval(TASK_REFRESH_INTERVAL);
+ interval.reset_at(task_utils::next_aligned_instant(TASK_REFRESH_INTERVAL.as_secs()).into());
+
+ // We don't really care about catching up to missed tick, we just want
+ // a steady tick rate.
+ interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
+ if let Err(err) = init_cache().await {
+ log::error!("error when initialized task cache: {err}");
+ }
+
+ loop {
+ interval.tick().await;
+ if let Err(err) = do_tick(cycle).await {
+ log::error!("error when fetching remote tasks: {err}");
+ }
+
+ // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
+ // better be safe and use .wrapping_add(1) :)
+ cycle = cycle.wrapping_add(1);
+ }
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+async fn do_tick(cycle: u64) -> Result<(), Error> {
+ let cache = remote_tasks::get_cache()?;
+
+ if should_check_for_cache_rotation(cycle) {
+ log::debug!("checking if remote task archive should be rotated");
+ if rotate_cache(cache.clone()).await? {
+ log::info!("rotated remote task archive");
+ }
+ }
+
+ let state = cache.read_state();
+
+ let mut all_tasks = HashMap::new();
+
+ let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+ let mut join_set = JoinSet::new();
+
+ // Get a list of remotes that we should poll in this cycle.
+ let remotes = remotes_to_check(cycle, &state).await?;
+ for remote in remotes {
+ let since = get_cutoff_timestamp(&remote, &state);
+
+ let permit = if remote.ty == RemoteType::Pve {
+ // Acquire multiple permits, for PVE remotes we want
+ // to multiple nodes in parallel.
+ //
+ // `.unwrap()` is safe, we never close the semaphore.
+ Arc::clone(&total_connections_semaphore)
+ .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
+ .await
+ .unwrap()
+ } else {
+ // For PBS remotes we only have a single outgoing connection
+ //
+ // `.unwrap()` is safe, we never close the semaphore.
+ Arc::clone(&total_connections_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap()
+ };
+
+ join_set.spawn(async move {
+ log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
+ let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
+ format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
+ });
+
+ drop(permit);
+ tasks
+ });
+ }
+
+ while let Some(res) = join_set.join_next().await {
+ match res {
+ Ok(Ok((remote, request))) => {
+ all_tasks.insert(remote, request);
+ }
+ Ok(Err(err)) => log::error!("{err}"),
+ Err(err) => log::error!("could not join task fetching future: {err}"),
+ }
+ }
+
+ if !all_tasks.is_empty() {
+ save_tasks(cache, all_tasks).await?;
+ }
+
+ Ok(())
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
+/// spaced `ROTATE_AFTER_S` seconds apart.
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+async fn init_cache() -> Result<(), Error> {
+ tokio::task::spawn_blocking(|| {
+ let cache = remote_tasks::get_cache()?;
+ cache.init(
+ proxmox_time::epoch_i64(),
+ KEEP_OLD_FILES,
+ ROTATE_AFTER.as_secs(),
+ )?;
+ Ok(())
+ })
+ .await?
+}
+
+/// Return list of remotes that are to be polled in this cycle.
+///
+/// If `cycle` is a multiple of `REGULAR_REFRESH_CYCLES`, the function will
+/// return all remotes from the remote config. This ensures that
+/// all remotes are polled at regular intervals.
+/// In any other case we only return remotes which currently have a tracked
+/// task.
+/// On daemon startup (when cycle is 0) we return all remotes to ensure
+/// that we get an up-to-date task list from all remotes.
+async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
+ let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+ let all = cycle % REGULAR_REFRESH_CYCLES == 0;
+
+ if all {
+ Ok(config.sections.into_values().collect())
+ } else {
+ Ok(config
+ .sections
+ .into_iter()
+ .filter_map(|(name, remote)| {
+ if let Some(tracked) = state.tracked_tasks.get(&name) {
+ if !tracked.is_empty() {
+ Some(remote)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ })
+ .collect())
+ }
+}
+
+/// Get the timestamp from which on we should fetch tasks for a given remote.
+/// The returned timestamp is a UNIX timestamp (in seconds).
+fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
+ let oldest_active = state.oldest_active_task.get(&remote.id).copied();
+ let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
+
+ match (oldest_active, youngest_archived) {
+ (None, None) => 0,
+ (None, Some(youngest_archived)) => youngest_archived,
+ (Some(oldest_active), None) => oldest_active,
+ (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
+ }
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+ tokio::task::spawn_blocking(move || {
+ cache.rotate(
+ proxmox_time::epoch_i64(),
+ ROTATE_AFTER.as_secs(),
+ KEEP_OLD_FILES,
+ )
+ })
+ .await?
+}
+
+/// Fetch tasks (active and finished) from a remote
+/// `since` is a UNIX timestamp (seconds).
+async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
+ let mut tasks = Vec::new();
+
+ let mut all_successful = true;
+
+ match remote.ty {
+ RemoteType::Pve => {
+ let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
+ let mut join_set = JoinSet::new();
+
+ let client = pve::connect(remote)?;
+
+ // N+1 requests - we could use /cluster/tasks, but that one
+ // only gives a limited task history
+ for node in client.list_nodes().await? {
+ let params = ListTasks {
+ // Include running tasks
+ source: Some(ListTasksSource::All),
+ since: Some(since),
+ // If `limit` is not provided, we only receive 50 tasks
+ limit: Some(MAX_TASKS_TO_FETCH),
+ ..Default::default()
+ };
+
+ let permit = Arc::clone(&per_remote_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap();
+
+ let r = remote.clone();
+
+ join_set.spawn(async move {
+ let client = pve::connect(&r)?;
+ let task_list =
+ client
+ .get_task_list(&node.node, params)
+ .await
+ .map_err(|err| {
+ format_err!("remote '{}', node '{}': {err}", r.id, node.node)
+ })?;
+
+ drop(permit);
+
+ Ok::<Vec<_>, Error>(task_list)
+ });
+ }
+
+ while let Some(res) = join_set.join_next().await {
+ match res {
+ Ok(Ok(list)) => {
+ let mapped = list.into_iter().filter_map(|task| {
+ match map_pve_task(task, &remote.id) {
+ Ok(task) => Some(task),
+ Err(err) => {
+ log::error!("could not map task data, skipping: {err}");
+ None
+ }
+ }
+ });
+
+ tasks.extend(mapped);
+ }
+ Ok(Err(err)) => {
+ all_successful = false;
+ log::error!("could not fetch tasks: {err:?}");
+ }
+ Err(err) => return Err(err.into()),
+ }
+ }
+ }
+ RemoteType::Pbs => {
+ // TODO: Add code for PBS
+ }
+ }
+
+ let new_tasks = AddTasks {
+ update_most_recent_archive_timestamp: all_successful,
+ tasks,
+ };
+
+ Ok((remote.id.clone(), new_tasks))
+}
+
+/// Check if we are due for checking for cache rotation.
+///
+/// If `cycle` is 0, a cache rotation check is forced. This is
+/// only relevant at daemon startup.
+fn should_check_for_cache_rotation(cycle: u64) -> bool {
+ cycle % CHECK_ROTATE_CYCLES == 0
+}
+
+/// Map a `ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
+ let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
+
+ Ok(TaskCacheItem {
+ upid: remote_upid,
+ starttime: task.starttime,
+ endtime: task.endtime,
+ status: task.status,
+ })
+}
+
+/// Add newly fetched tasks to the cache.
+async fn save_tasks(cache: TaskCache, tasks: HashMap<String, AddTasks>) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || cache.add_tasks(tasks)).await?
+}
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 2062f2b7..126c9ad3 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,515 +1,152 @@
-use std::{
- collections::{HashMap, HashSet},
- fs::File,
- path::{Path, PathBuf},
- sync::{LazyLock, RwLock},
- time::Duration,
-};
+use std::path::Path;
use anyhow::Error;
-use pdm_api_types::{
- remotes::{Remote, RemoteType},
- RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
-};
+use pdm_api_types::{RemoteUpid, TaskFilters, TaskListItem, TaskStateType};
use proxmox_sys::fs::CreateOptions;
-use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
-use tokio::task::JoinHandle;
+use pve_api_types::PveUpid;
+use task_cache::{GetTasks, TaskCache, TaskCacheItem};
-use crate::{api::pve, task_utils};
+pub mod task_cache;
-mod task_cache;
+pub const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
/// Get tasks for all remotes
// FIXME: filter for privileges
-pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
- let (remotes, _) = pdm_config::remotes::config()?;
+pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+ tokio::task::spawn_blocking(move || {
+ let cache = get_cache()?;
- let mut all_tasks = Vec::new();
+ let mut returned_tasks = Vec::new();
- let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let mut cache = TaskCache::new(cache_path)?;
-
- // Force a refresh for all tasks of a remote if a task is finished.
- // Not super nice, but saves us from persisting finished tasks. Also,
- // the /nodes/<node>/tasks/<upid>/status endpoint does not return
- // a task's endtime, which is only returned by
- // /nodes/<node>/tasks...
- // Room for improvements in the future.
- invalidate_cache_for_finished_tasks(&mut cache);
-
- for (remote_name, remote) in &remotes.sections {
- let now = proxmox_time::epoch_i64();
-
- if let Some(tasks) = cache.get_tasks(remote_name.as_str(), now, max_age) {
- // Data in cache is recent enough and has not been invalidated.
- all_tasks.extend(tasks);
+ let which = if filters.running {
+ GetTasks::Active
} else {
- let tasks = match fetch_tasks(remote).await {
- Ok(tasks) => tasks,
+ GetTasks::All
+ };
+
+ for task in &mut cache
+ .get_tasks(which)?
+ .skip(filters.start as usize)
+ .take(filters.limit as usize)
+ {
+ let task = match task {
+ Ok(task) => task,
Err(err) => {
- // ignore errors for not reachable remotes
+ log::error!("could not read task from remote task cache, skipping: {err}");
continue;
}
};
- cache.set_tasks(remote_name.as_str(), tasks.clone(), now);
- all_tasks.extend(tasks);
+ // TODO: Handle PBS tasks
+ let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
+ match pve_upid {
+ Ok(pve_upid) => {
+ returned_tasks.push(TaskListItem {
+ upid: task.upid.to_string(),
+ node: pve_upid.node,
+ pid: pve_upid.pid as i64,
+ pstart: pve_upid.pstart,
+ starttime: pve_upid.starttime,
+ worker_type: pve_upid.worker_type,
+ worker_id: None,
+ user: pve_upid.auth_id,
+ endtime: task.endtime,
+ status: task.status,
+ });
+ }
+ Err(err) => {
+ log::error!("could not parse UPID: {err}");
+ }
+ }
}
- }
- let mut returned_tasks = add_running_tasks(all_tasks)?;
- returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
- let returned_tasks = returned_tasks
- .into_iter()
- .filter(|item| {
- if filters.running && item.endtime.is_some() {
- return false;
- }
-
- if let Some(until) = filters.until {
- if item.starttime > until {
+ let returned_tasks = returned_tasks
+ .into_iter()
+ .filter(|item| {
+ if filters.running && item.endtime.is_some() {
return false;
}
- }
- if let Some(since) = filters.since {
- if item.starttime < since {
- return false;
- }
- }
-
- if let Some(needle) = &filters.userfilter {
- if !item.user.contains(needle) {
- return false;
- }
- }
-
- if let Some(typefilter) = &filters.typefilter {
- if !item.worker_type.contains(typefilter) {
- return false;
- }
- }
-
- let state = item.status.as_ref().map(|status| tasktype(status));
-
- match (state, &filters.statusfilter) {
- (Some(TaskStateType::OK), _) if filters.errors => return false,
- (Some(state), Some(filters)) => {
- if !filters.contains(&state) {
+ if let Some(until) = filters.until {
+ if item.starttime > until {
return false;
}
}
- (None, Some(_)) => return false,
- _ => {}
- }
- true
- })
- .skip(filters.start as usize)
- .take(filters.limit as usize)
- .collect();
-
- // We don't need to wait for this task to finish
- tokio::task::spawn_blocking(move || {
- if let Err(e) = cache.save() {
- log::error!("could not save task cache: {e}");
- }
- });
-
- Ok(returned_tasks)
-}
-
-/// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
- let mut tasks = Vec::new();
-
- match remote.ty {
- RemoteType::Pve => {
- let client = pve::connect(remote)?;
-
- // N+1 requests - we could use /cluster/tasks, but that one
- // only gives a limited task history
- for node in client.list_nodes().await? {
- let params = ListTasks {
- // Include running tasks
- source: Some(ListTasksSource::All),
- // TODO: How much task history do we want? Right now we just hard-code it
- // to 7 days.
- since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
- ..Default::default()
- };
-
- let list = client.get_task_list(&node.node, params).await?;
- let mapped = map_tasks(list, &remote.id)?;
-
- tasks.extend(mapped);
- }
- }
- RemoteType::Pbs => {
- // TODO: Add code for PBS
- }
- }
-
- Ok(tasks)
-}
-
-/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
-fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
- let mut mapped = Vec::new();
-
- for task in tasks {
- let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
-
- mapped.push(TaskListItem {
- upid: remote_upid.to_string(),
- node: task.node,
- pid: task.pid,
- pstart: task.pstart as u64,
- starttime: task.starttime,
- worker_type: task.ty,
- worker_id: Some(task.id),
- user: task.user,
- endtime: task.endtime,
- status: task.status,
- })
- }
-
- Ok(mapped)
-}
-
-/// Drops the cached task list of a remote for all finished tasks.
-///
-/// We use this to force a refresh so that we get the full task
-/// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
- let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
-
- // If a task is finished, we force a refresh for the remote - otherwise
- // we don't get the 'endtime' for the task.
- for task in finished.drain() {
- cache.invalidate_cache_for_remote(task.remote());
- }
-}
-
-/// Supplement the list of tasks that we received from the remote with
-/// the tasks that were started by PDM and are currently running.
-fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
- let mut returned_tasks = Vec::new();
-
- let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
- for task in cached_tasks {
- let remote_upid = task.upid.parse()?;
-
- if running_tasks.contains(&remote_upid) {
- if task.endtime.is_some() {
- // Task is finished but we still think it is running ->
- // Drop it from RUNNING_FOREIGN_TASKS
- running_tasks.remove(&remote_upid);
-
- // No need to put it in FINISHED_TASKS, since we already
- // got its state recently enough (we know the status and endtime)
- }
- } else {
- returned_tasks.push(task);
- }
- }
-
- for task in running_tasks.iter() {
- let upid: PveUpid = task.upid.parse()?;
- returned_tasks.push(TaskListItem {
- upid: task.to_string(),
- node: upid.node,
- pid: upid.pid as i64,
- pstart: upid.pstart,
- starttime: upid.starttime,
- worker_type: upid.worker_type,
- worker_id: upid.worker_id,
- user: upid.auth_id,
- endtime: None,
- status: None,
- });
- }
-
- Ok(returned_tasks)
-}
-
-/// A cache for fetched remote tasks.
-struct TaskCache {
- /// Cache entries
- content: TaskCacheContent,
-
- /// Entries that were added or updated - these will be persistet
- /// when `save` is called.
- new_or_updated: TaskCacheContent,
-
- /// Cache entries were changed/removed.
- dirty: bool,
-
- /// File-location at which the cached tasks are stored.
- cachefile_path: PathBuf,
-}
-
-impl TaskCache {
- /// Create a new tasks cache instance by loading
- /// the cache from disk.
- fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
- Ok(Self {
- content: Self::load_content()?,
- new_or_updated: Default::default(),
- dirty: false,
- cachefile_path,
- })
- }
-
- /// Load the task cache contents from disk.
- fn load_content() -> Result<TaskCacheContent, Error> {
- let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
-
- let content = if let Some(content) = content {
- serde_json::from_str(&content)?
- } else {
- Default::default()
- };
-
- Ok(content)
- }
-
- /// Get path for the cache's lockfile.
- fn lockfile_path(&self) -> PathBuf {
- let mut path = self.cachefile_path.clone();
- path.set_extension("lock");
- path
- }
-
- /// Persist the task cache
- ///
- /// This method requests an exclusive lock for the task cache lockfile.
- fn save(&mut self) -> Result<(), Error> {
- // if we have not updated anything, we don't have to update the cache file
- if !self.dirty {
- return Ok(());
- }
-
- let _guard = self.lock(Duration::from_secs(5))?;
-
- // Read content again, in case somebody has changed it in the meanwhile
- let mut content = Self::load_content()?;
-
- for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
- if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
- // Only update entry if nobody else has updated it in the meanwhile
- if existing_entry.timestamp < entry.timestamp {
- *existing_entry = entry;
- }
- } else {
- content.remote_tasks.insert(remote_name, entry);
- }
- }
-
- let bytes = serde_json::to_vec_pretty(&content)?;
-
- let api_uid = pdm_config::api_user()?.uid;
- let api_gid = pdm_config::api_group()?.gid;
-
- let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
- proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
-
- self.dirty = false;
-
- Ok(())
- }
-
- // Update task data for a given remote.
- fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
- self.dirty = true;
- self.new_or_updated
- .remote_tasks
- .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
- }
-
- // Get task data for a given remote.
- fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
- if let Some(entry) = self.content.remote_tasks.get(remote) {
- if (entry.timestamp + max_age) < now {
- return None;
- }
-
- Some(entry.tasks.clone())
- } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
- if (entry.timestamp + max_age) < now {
- return None;
- }
- Some(entry.tasks.clone())
- } else {
- None
- }
- }
-
- // Invalidate cache for a given remote.
- fn invalidate_cache_for_remote(&mut self, remote: &str) {
- self.dirty = true;
- self.content.remote_tasks.remove(remote);
- }
-
- // Lock the cache for modification.
- //
- // While the cache is locked, other users can still read the cache
- // without a lock, since the cache file is replaced atomically
- // when updating.
- fn lock(&self, duration: Duration) -> Result<File, Error> {
- let api_uid = pdm_config::api_user()?.uid;
- let api_gid = pdm_config::api_group()?.gid;
-
- let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
- proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
- }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
- timestamp: i64,
- tasks: Vec<TaskListItem>,
-}
-
-#[derive(Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
- remote_tasks: HashMap<String, TaskCacheEntry>,
-}
-
-/// Interval at which tracked tasks are polled
-const RUNNING_CHECK_INTERVAL_S: u64 = 10;
-
-/// Tasks which were started by PDM and are still running
-static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-/// Tasks which were started by PDM and w
-static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-
-fn init() -> RwLock<HashSet<RemoteUpid>> {
- RwLock::new(HashSet::new())
-}
-
-/// Insert a remote UPID into the running list
-///
-/// If it is the first entry in the list, a background task is started to track its state
-///
-/// Returns the [`JoinHandle`] if a task was started.
-///
-/// panics on a poisoned mutex
-pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
- let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
-
- // the call inserting the first task in the list needs to start the checking task
- let need_start_task = tasks.is_empty();
- tasks.insert(task);
-
- if !need_start_task {
- return None;
- }
- drop(tasks);
-
- Some(tokio::spawn(async move {
- loop {
- let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
- let finished_tasks = get_finished_tasks().await;
-
- // skip iteration if we still have tasks, just not finished ones
- if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
- continue;
- }
-
- let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
- // we either have finished tasks, or the running task list was empty
- let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
-
- for (upid, _status) in finished_tasks {
- if set.remove(&upid) {
- finished.insert(upid);
- } else {
- // someone else removed & persisted the task in the meantime
- }
- }
-
- // if no task remains, end the current task
- // it will be restarted by the next caller that inserts one
- if set.is_empty() {
- return;
- }
- }
- }))
-}
-
-/// Get a list of running foreign tasks
-///
-/// panics on a poisoned mutex
-pub fn get_running_tasks() -> Vec<RemoteUpid> {
- RUNNING_FOREIGN_TASKS
- .read()
- .unwrap()
- .iter()
- .cloned()
- .collect()
-}
-
-/// Checks all current saved UPIDs if they're still running, and if not,
-/// returns their upids + status
-///
-/// panics on a poisoned mutex
-pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
- let mut finished = Vec::new();
- let config = match pdm_config::remotes::config() {
- Ok((config, _)) => config,
- Err(err) => {
- log::error!("could not open remotes config: {err}");
- return Vec::new();
- }
- };
- for task in get_running_tasks() {
- match config.get(task.remote()) {
- Some(remote) => match remote.ty {
- RemoteType::Pve => {
- let status = match crate::api::pve::tasks::get_task_status(
- remote.id.clone(),
- task.clone(),
- false,
- )
- .await
- {
- Ok(status) => status,
- Err(err) => {
- log::error!("could not get status from remote: {err}");
- finished.push((task, "could not get status".to_string()));
- continue;
- }
- };
- if let Some(status) = status.exitstatus {
- finished.push((task, status.to_string()));
+ if let Some(since) = filters.since {
+ if item.starttime < since {
+ return false;
}
}
- RemoteType::Pbs => {
- let _client = match crate::pbs_client::connect(remote) {
- Ok(client) => client,
- Err(err) => {
- log::error!("could not get status from remote: {err}");
- finished.push((task, "could not get status".to_string()));
- continue;
- }
- };
- // FIXME implement get task status
- finished.push((task, "unknown state".to_string()));
- }
- },
- None => finished.push((task, "unknown remote".to_string())),
- }
- }
- finished
+ if let Some(needle) = &filters.userfilter {
+ if !item.user.contains(needle) {
+ return false;
+ }
+ }
+
+ if let Some(typefilter) = &filters.typefilter {
+ if !item.worker_type.contains(typefilter) {
+ return false;
+ }
+ }
+
+ let state = item.status.as_ref().map(|status| tasktype(status));
+
+ match (state, &filters.statusfilter) {
+ (Some(TaskStateType::OK), _) if filters.errors => return false,
+ (Some(state), Some(filters)) => {
+ if !filters.contains(&state) {
+ return false;
+ }
+ }
+ (None, Some(_)) => return false,
+ _ => {}
+ }
+
+ true
+ })
+ .collect();
+
+ Ok(returned_tasks)
+ })
+ .await?
+}
+
+/// Insert a newly created tasks into the list of tracked tasks.
+///
+/// Any remote with associated tracked tasks will polled with a short interval
+/// until all tracked tasks have finished.
+pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || {
+ let cache = get_cache()?;
+ // TODO:: Handle PBS tasks correctly.
+ let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
+ let task = TaskCacheItem {
+ upid: task.clone(),
+ starttime: pve_upid.starttime,
+ status: None,
+ endtime: None,
+ };
+ cache.add_tracked_task(task)
+ })
+ .await?
+}
+
+/// Get a new [`TaskCache`] instance.
+///
+/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
+/// update loop.
+pub fn get_cache() -> Result<TaskCache, Error> {
+ let api_uid = pdm_config::api_user()?.uid;
+ let api_gid = pdm_config::api_group()?.gid;
+
+ let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+ let cache_path = Path::new(REMOTE_TASKS_DIR);
+ let cache = TaskCache::new(cache_path, file_options)?;
+
+ Ok(cache)
}
/// Parses a task status string into a TaskStateType
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-04-11 11:01 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-04-11 11:01 [pdm-devel] [PATCH proxmox-datacenter-manager v2 0/4] remote task cache fetching task / better cache backend Lukas Wagner
2025-04-11 11:01 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 1/4] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-04-16 13:15 ` Wolfgang Bumiller
2025-04-11 11:01 ` Lukas Wagner [this message]
2025-04-11 11:01 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 3/4] pdm-api-types: remote tasks: implement From<&str> for TaskStateType Lukas Wagner
2025-04-16 14:28 ` Wolfgang Bumiller
2025-04-11 11:01 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 4/4] fake remote: make the fake_remote feature compile again Lukas Wagner
2025-04-17 13:25 ` [pdm-devel] superseded: [PATCH proxmox-datacenter-manager v2 0/4] remote task cache fetching task / better cache backend Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250411110117.199543-3-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal