From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache
Date: Fri, 14 Mar 2025 15:12:22 +0100 [thread overview]
Message-ID: <20250314141225.240768-6-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250314141225.240768-1-l.wagner@proxmox.com>
This commits changes the remote task module as follows:
- Add a new background task for regular polling of task data
Instead of triggering fetching of task data from the `get_tasks` function,
which is usually called by an API handler, we move the fetching to a
new background task. The task fetches the latest tasks from all remotes
and stores them in the task cache in regular intervals (10 minutes).
The `get_tasks` function itself only reads from the cache.
The main rationale for this change is that for large setups, fetching
tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
each with a >100ms connection - adds up to minutes quickly).
If we do this from within `get_tasks`, the API handler calling the
function is also blocked for the entire time.
The `get_tasks` API is called every couple of seconds by the UI the get
a list of running remote tasks, so this *must* be quick.
- Tracked tasks are also polled in the same background task, but with
a short polling delay (10 seconds). Instead of polling the status specific
tracked task UPID, we simply fetch *all* tasks since the tracked task started.
While this increased the amount of transmitted data a bit for tracked tasks
that run for a very long time, this new approach make the whole
task tracking functionality much more elegant; it integrates better with
the 'regular' task fetching which happens in long intervals.
- Tasks are now stored in the new improved task cache implementation.
This should make retrieving tasks much quicker and avoids
unneeded disk IO.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Max Carrara <m.carrara@proxmox.com>
---
server/src/api/pve/lxc.rs | 10 +-
server/src/api/pve/mod.rs | 4 +-
server/src/api/pve/qemu.rs | 6 +-
server/src/api/remote_tasks.rs | 11 +-
server/src/bin/proxmox-datacenter-api.rs | 3 +-
server/src/remote_tasks/mod.rs | 788 +++++++++++------------
6 files changed, 388 insertions(+), 434 deletions(-)
diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
index f1c31425..83f9f4aa 100644
--- a/server/src/api/pve/lxc.rs
+++ b/server/src/api/pve/lxc.rs
@@ -209,7 +209,7 @@ pub async fn lxc_start(
let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -242,7 +242,7 @@ pub async fn lxc_stop(
let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
.shutdown_lxc_async(&node, vmid, Default::default())
.await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -357,7 +357,7 @@ pub async fn lxc_migrate(
};
let upid = pve.migrate_lxc(&node, vmid, params).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
log::info!("migrating vm {vmid} of node {node:?}");
let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
- new_remote_upid(source, upid)
+ new_remote_upid(source, upid).await
}
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index c328675a..a351ad69 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -74,9 +74,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
// converts a remote + PveUpid into a RemoteUpid and starts tracking it
-fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
+async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
- remote_tasks::track_running_task(remote_upid.clone());
+ remote_tasks::track_running_task(remote_upid.clone()).await?;
Ok(remote_upid)
}
diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
index dea0550c..54e310d2 100644
--- a/server/src/api/pve/qemu.rs
+++ b/server/src/api/pve/qemu.rs
@@ -216,7 +216,7 @@ pub async fn qemu_start(
.start_qemu_async(&node, vmid, Default::default())
.await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -376,7 +376,7 @@ pub async fn qemu_migrate(
};
let upid = pve.migrate_qemu(&node, vmid, params).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -563,5 +563,5 @@ pub async fn qemu_remote_migrate(
log::info!("migrating vm {vmid} of node {node:?}");
let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
- new_remote_upid(source, upid)
+ new_remote_upid(source, upid).await
}
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index e629000c..05ce3666 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
},
input: {
properties: {
- "max-age": {
- type: Integer,
- optional: true,
- // TODO: sensible default max-age
- default: 300,
- description: "Maximum age of cached task data",
- },
filters: {
type: TaskFilters,
flatten: true,
@@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
},
)]
/// Get the list of tasks for all remotes.
-async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
- let tasks = remote_tasks::get_tasks(max_age, filters).await?;
+async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+ let tasks = remote_tasks::get_tasks(filters).await?;
Ok(tasks)
}
diff --git a/server/src/bin/proxmox-datacenter-api.rs b/server/src/bin/proxmox-datacenter-api.rs
index a79094d5..da39c85d 100644
--- a/server/src/bin/proxmox-datacenter-api.rs
+++ b/server/src/bin/proxmox-datacenter-api.rs
@@ -25,11 +25,11 @@ use pdm_buildcfg::configdir;
use pdm_api_types::Authid;
use proxmox_auth_api::api::assemble_csrf_prevention_token;
-use server::auth;
use server::auth::csrf::csrf_secret;
use server::metric_collection;
use server::resource_cache;
use server::task_utils;
+use server::{auth, remote_tasks};
pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 5 * 60;
@@ -288,6 +288,7 @@ async fn run(debug: bool) -> Result<(), Error> {
start_task_scheduler();
metric_collection::start_task();
resource_cache::start_task();
+ remote_tasks::start_task()?;
server.await?;
log::info!("server shutting down, waiting for active workers to complete");
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 2062f2b7..48d54694 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,65 +1,106 @@
-use std::{
- collections::{HashMap, HashSet},
- fs::File,
- path::{Path, PathBuf},
- sync::{LazyLock, RwLock},
- time::Duration,
-};
+use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
-use anyhow::Error;
+use anyhow::{format_err, Error};
+use nix::sys::stat::Mode;
use pdm_api_types::{
remotes::{Remote, RemoteType},
RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
};
use proxmox_sys::fs::CreateOptions;
use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
-use tokio::task::JoinHandle;
+use task_cache::{AddTasks, GetTasks, State, TaskCache, TaskCacheItem};
+use tokio::{sync::Semaphore, task::JoinSet};
use crate::{api::pve, task_utils};
mod task_cache;
+const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
+
+const SECONDS_PER_MINUTE: u64 = 60;
+const MINUTES_PER_HOUR: u64 = 60;
+
+/// Tick rate for the remote task fetching task.
+/// This is also the rate at which we check on tracked tasks.
+const TICK_RATE_S: u64 = 10;
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const REGULAR_REFRESH_S: u64 = 10 * SECONDS_PER_MINUTE;
+/// Number of cycles until a regular refresh.
+const REGULAR_REFRESH_CYCLES: u64 = REGULAR_REFRESH_S / TICK_RATE_S;
+
+/// Check if we want to rotate once every hour.
+const CHECK_ROTATE_S: u64 = SECONDS_PER_MINUTE * MINUTES_PER_HOUR;
+/// Number of cycles before we want to check if we should rotate the task archives.
+const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_S / TICK_RATE_S;
+
+/// Rotate once the most recent archive file is at least 24 hour old.
+const ROTATE_AFTER_S: u64 = 24 * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
+
+/// Keep 7 days worth of tasks.
+const KEEP_OLD_FILES: u64 = 7;
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+/// Maximum number of total concurrent connections. `CONNECTIONS_PER_REMOTE` is taken into
+/// consideration when accounting for the total number of connections.
+/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_REMOTE` is 5, we can connect
+/// to 4 PVE remotes in parallel.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
/// Get tasks for all remotes
// FIXME: filter for privileges
-pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
- let (remotes, _) = pdm_config::remotes::config()?;
+pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+ let cache = get_cache()?;
- let mut all_tasks = Vec::new();
+ let mut returned_tasks = Vec::new();
- let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let mut cache = TaskCache::new(cache_path)?;
+ let which = if filters.running {
+ GetTasks::Active
+ } else {
+ GetTasks::All
+ };
- // Force a refresh for all tasks of a remote if a task is finished.
- // Not super nice, but saves us from persisting finished tasks. Also,
- // the /nodes/<node>/tasks/<upid>/status endpoint does not return
- // a task's endtime, which is only returned by
- // /nodes/<node>/tasks...
- // Room for improvements in the future.
- invalidate_cache_for_finished_tasks(&mut cache);
+ for task in &mut cache
+ .get_tasks(which)?
+ .skip(filters.start as usize)
+ .take(filters.limit as usize)
+ {
+ let task = match task {
+ Ok(task) => task,
+ Err(err) => {
+ log::error!("could not read task from remote task cache, skipping: {err}");
+ continue;
+ }
+ };
- for (remote_name, remote) in &remotes.sections {
- let now = proxmox_time::epoch_i64();
-
- if let Some(tasks) = cache.get_tasks(remote_name.as_str(), now, max_age) {
- // Data in cache is recent enough and has not been invalidated.
- all_tasks.extend(tasks);
- } else {
- let tasks = match fetch_tasks(remote).await {
- Ok(tasks) => tasks,
- Err(err) => {
- // ignore errors for not reachable remotes
- continue;
- }
- };
- cache.set_tasks(remote_name.as_str(), tasks.clone(), now);
-
- all_tasks.extend(tasks);
+ // TODO: Handle PBS tasks
+ let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
+ match pve_upid {
+ Ok(pve_upid) => {
+ returned_tasks.push(TaskListItem {
+ upid: task.upid.to_string(),
+ node: pve_upid.node,
+ pid: pve_upid.pid as i64,
+ pstart: pve_upid.pstart,
+ starttime: pve_upid.starttime,
+ worker_type: pve_upid.worker_type,
+ worker_id: None,
+ user: pve_upid.auth_id,
+ endtime: task.endtime,
+ status: task.status,
+ });
+ }
+ Err(err) => {
+ log::error!("could not parse UPID: {err}");
+ }
}
}
- let mut returned_tasks = add_running_tasks(all_tasks)?;
- returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
let returned_tasks = returned_tasks
.into_iter()
.filter(|item| {
@@ -106,26 +147,228 @@ pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskLis
true
})
- .skip(filters.start as usize)
- .take(filters.limit as usize)
.collect();
- // We don't need to wait for this task to finish
- tokio::task::spawn_blocking(move || {
- if let Err(e) = cache.save() {
- log::error!("could not save task cache: {e}");
- }
- });
-
Ok(returned_tasks)
}
+/// Insert a newly created tasks into the list of tracked tasks.
+///
+/// Any remote with associated tracked tasks will polled with a short interval
+/// until all tracked tasks have finished.
+pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || {
+ let cache = get_cache()?;
+ // TODO:: Handle PBS tasks correctly.
+ let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
+ let task = TaskCacheItem {
+ upid: task.clone(),
+ starttime: pve_upid.starttime,
+ status: None,
+ endtime: None,
+ };
+ cache.add_tracked_task(task)
+ })
+ .await?
+}
+
+/// Start the remote task fetching task
+pub fn start_task() -> Result<(), Error> {
+ let api_uid = pdm_config::api_user()?.uid;
+ let api_gid = pdm_config::api_group()?.gid;
+ let file_options = CreateOptions::new()
+ .owner(api_uid)
+ .group(api_gid)
+ .perm(Mode::from_bits_truncate(0o0750));
+ proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
+
+ tokio::spawn(async move {
+ let task_scheduler = std::pin::pin!(remote_task_fetching_task());
+ let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(task_scheduler, abort_future).await;
+ });
+
+ Ok(())
+}
+
+/// Task which handles fetching remote tasks and task archive rotation.
+/// This function never returns.
+async fn remote_task_fetching_task() -> ! {
+ let mut cycle = 0u64;
+ let mut interval = tokio::time::interval(Duration::from_secs(TICK_RATE_S));
+ interval.reset_at(task_utils::next_aligned_instant(TICK_RATE_S).into());
+
+ // We don't really care about catching up to missed tick, we just want
+ // a steady tick rate.
+ interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
+ if let Err(err) = init_cache().await {
+ log::error!("error when initialized task cache: {err}");
+ }
+
+ loop {
+ interval.tick().await;
+ if let Err(err) = do_tick(cycle).await {
+ log::error!("error when fetching remote tasks: {err}");
+ }
+
+ // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
+ // better be safe and use .wrapping_add(1) :)
+ cycle = cycle.wrapping_add(1);
+ }
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
+/// spaced `ROTATE_AFTER_S` seconds apart.
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+async fn init_cache() -> Result<(), Error> {
+ tokio::task::spawn_blocking(|| {
+ let cache = get_cache()?;
+ cache.init(proxmox_time::epoch_i64(), KEEP_OLD_FILES, ROTATE_AFTER_S)?;
+ Ok(())
+ })
+ .await?
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+async fn do_tick(cycle: u64) -> Result<(), Error> {
+ let cache = get_cache()?;
+
+ if should_check_for_cache_rotation(cycle) {
+ log::debug!("checking if remote task archive should be rotated");
+ if rotate_cache(cache.clone()).await? {
+ log::info!("rotated remote task archive");
+ }
+ }
+
+ let state = cache.read_state();
+
+ let mut all_tasks = HashMap::new();
+
+ let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+ let mut join_set = JoinSet::new();
+
+ let remotes = remotes_to_check(cycle, &state).await?;
+ for remote in remotes {
+ let since = get_cutoff_timestamp(&remote, &state);
+
+ let permit = if remote.ty == RemoteType::Pve {
+ // Acquire multiple permits, for PVE remotes we want
+ // to multiple nodes in parallel.
+ //
+ // `.unwrap()` is safe, we never close the semaphore.
+ Arc::clone(&semaphore)
+ .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
+ .await
+ .unwrap()
+ } else {
+ // For PBS remotes we only have a single outgoing connection
+ //
+ // `.unwrap()` is safe, we never close the semaphore.
+ Arc::clone(&semaphore).acquire_owned().await.unwrap()
+ };
+
+ join_set.spawn(async move {
+ log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
+ let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
+ format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
+ });
+
+ drop(permit);
+ tasks
+ });
+ }
+
+ while let Some(res) = join_set.join_next().await {
+ match res {
+ Ok(Ok((remote, request))) => {
+ all_tasks.insert(remote, request);
+ }
+ Ok(Err(err)) => log::error!("{err}"),
+ Err(err) => log::error!("could not join task fetching future: {err}"),
+ }
+ }
+
+ if !all_tasks.is_empty() {
+ save_tasks(cache, all_tasks).await?;
+ }
+
+ Ok(())
+}
+
+/// Return list of remotes that are to be polled in this cycle.
+async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
+ let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+ let all = cycle % REGULAR_REFRESH_CYCLES == 0;
+
+ if all {
+ Ok(config.sections.into_values().collect())
+ } else {
+ Ok(config
+ .sections
+ .into_iter()
+ .filter_map(|(name, remote)| {
+ if let Some(tracked) = state.tracked_tasks.get(&name) {
+ if !tracked.is_empty() {
+ Some(remote)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ })
+ .collect())
+ }
+}
+
+/// Get the timestamp from which on we should fetch tasks for a given remote.
+/// The returned timestamp is a UNIX timestamp (in seconds).
+fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
+ let oldest_active = state.oldest_active_task.get(&remote.id).copied();
+ let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
+
+ match (oldest_active, youngest_archived) {
+ (None, None) => 0,
+ (None, Some(youngest_archived)) => youngest_archived,
+ (Some(oldest_active), None) => oldest_active,
+ (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
+ }
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+ tokio::task::spawn_blocking(move || {
+ cache.rotate(proxmox_time::epoch_i64(), ROTATE_AFTER_S, KEEP_OLD_FILES)
+ })
+ .await?
+}
+
+/// Add newly fetched tasks to the cache.
+async fn save_tasks(cache: TaskCache, tasks: HashMap<String, AddTasks>) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || cache.add_tasks(tasks)).await?
+}
+
/// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
+/// `since` is a UNIX timestamp (seconds).
+async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
let mut tasks = Vec::new();
+ let mut all_successful = true;
+
match remote.ty {
RemoteType::Pve => {
+ let semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
+ let mut join_set = JoinSet::new();
+
let client = pve::connect(remote)?;
// N+1 requests - we could use /cluster/tasks, but that one
@@ -134,16 +377,53 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
let params = ListTasks {
// Include running tasks
source: Some(ListTasksSource::All),
- // TODO: How much task history do we want? Right now we just hard-code it
- // to 7 days.
- since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
+ since: Some(since),
+ // If `limit` is not provided, we only receive 50 tasks
+ limit: Some(MAX_TASKS_TO_FETCH),
..Default::default()
};
- let list = client.get_task_list(&node.node, params).await?;
- let mapped = map_tasks(list, &remote.id)?;
+ let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
- tasks.extend(mapped);
+ let r = remote.clone();
+
+ join_set.spawn(async move {
+ let client = pve::connect(&r)?;
+ let task_list =
+ client
+ .get_task_list(&node.node, params)
+ .await
+ .map_err(|err| {
+ format_err!("remote '{}', node '{}': {err}", r.id, node.node)
+ })?;
+
+ drop(permit);
+
+ Ok::<Vec<_>, Error>(task_list)
+ });
+ }
+
+ while let Some(res) = join_set.join_next().await {
+ match res {
+ Ok(Ok(list)) => {
+ let mapped = list.into_iter().filter_map(|task| {
+ match map_pve_task(task, &remote.id) {
+ Ok(task) => Some(task),
+ Err(err) => {
+ log::error!("could not map task data, skipping: {err}");
+ None
+ }
+ }
+ });
+
+ tasks.extend(mapped);
+ }
+ Ok(Err(err)) => {
+ all_successful = false;
+ log::error!("could not fetch tasks: {err:?}");
+ }
+ Err(err) => return Err(err.into()),
+ }
}
}
RemoteType::Pbs => {
@@ -151,365 +431,45 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
}
}
- Ok(tasks)
-}
-
-/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
-fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
- let mut mapped = Vec::new();
-
- for task in tasks {
- let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
-
- mapped.push(TaskListItem {
- upid: remote_upid.to_string(),
- node: task.node,
- pid: task.pid,
- pstart: task.pstart as u64,
- starttime: task.starttime,
- worker_type: task.ty,
- worker_id: Some(task.id),
- user: task.user,
- endtime: task.endtime,
- status: task.status,
- })
- }
-
- Ok(mapped)
-}
-
-/// Drops the cached task list of a remote for all finished tasks.
-///
-/// We use this to force a refresh so that we get the full task
-/// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
- let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
-
- // If a task is finished, we force a refresh for the remote - otherwise
- // we don't get the 'endtime' for the task.
- for task in finished.drain() {
- cache.invalidate_cache_for_remote(task.remote());
- }
-}
-
-/// Supplement the list of tasks that we received from the remote with
-/// the tasks that were started by PDM and are currently running.
-fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
- let mut returned_tasks = Vec::new();
-
- let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
- for task in cached_tasks {
- let remote_upid = task.upid.parse()?;
-
- if running_tasks.contains(&remote_upid) {
- if task.endtime.is_some() {
- // Task is finished but we still think it is running ->
- // Drop it from RUNNING_FOREIGN_TASKS
- running_tasks.remove(&remote_upid);
-
- // No need to put it in FINISHED_TASKS, since we already
- // got its state recently enough (we know the status and endtime)
- }
- } else {
- returned_tasks.push(task);
- }
- }
-
- for task in running_tasks.iter() {
- let upid: PveUpid = task.upid.parse()?;
- returned_tasks.push(TaskListItem {
- upid: task.to_string(),
- node: upid.node,
- pid: upid.pid as i64,
- pstart: upid.pstart,
- starttime: upid.starttime,
- worker_type: upid.worker_type,
- worker_id: upid.worker_id,
- user: upid.auth_id,
- endtime: None,
- status: None,
- });
- }
-
- Ok(returned_tasks)
-}
-
-/// A cache for fetched remote tasks.
-struct TaskCache {
- /// Cache entries
- content: TaskCacheContent,
-
- /// Entries that were added or updated - these will be persistet
- /// when `save` is called.
- new_or_updated: TaskCacheContent,
-
- /// Cache entries were changed/removed.
- dirty: bool,
-
- /// File-location at which the cached tasks are stored.
- cachefile_path: PathBuf,
-}
-
-impl TaskCache {
- /// Create a new tasks cache instance by loading
- /// the cache from disk.
- fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
- Ok(Self {
- content: Self::load_content()?,
- new_or_updated: Default::default(),
- dirty: false,
- cachefile_path,
- })
- }
-
- /// Load the task cache contents from disk.
- fn load_content() -> Result<TaskCacheContent, Error> {
- let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
-
- let content = if let Some(content) = content {
- serde_json::from_str(&content)?
- } else {
- Default::default()
- };
-
- Ok(content)
- }
-
- /// Get path for the cache's lockfile.
- fn lockfile_path(&self) -> PathBuf {
- let mut path = self.cachefile_path.clone();
- path.set_extension("lock");
- path
- }
-
- /// Persist the task cache
- ///
- /// This method requests an exclusive lock for the task cache lockfile.
- fn save(&mut self) -> Result<(), Error> {
- // if we have not updated anything, we don't have to update the cache file
- if !self.dirty {
- return Ok(());
- }
-
- let _guard = self.lock(Duration::from_secs(5))?;
-
- // Read content again, in case somebody has changed it in the meanwhile
- let mut content = Self::load_content()?;
-
- for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
- if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
- // Only update entry if nobody else has updated it in the meanwhile
- if existing_entry.timestamp < entry.timestamp {
- *existing_entry = entry;
- }
- } else {
- content.remote_tasks.insert(remote_name, entry);
- }
- }
-
- let bytes = serde_json::to_vec_pretty(&content)?;
-
- let api_uid = pdm_config::api_user()?.uid;
- let api_gid = pdm_config::api_group()?.gid;
-
- let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
- proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
-
- self.dirty = false;
-
- Ok(())
- }
-
- // Update task data for a given remote.
- fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
- self.dirty = true;
- self.new_or_updated
- .remote_tasks
- .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
- }
-
- // Get task data for a given remote.
- fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
- if let Some(entry) = self.content.remote_tasks.get(remote) {
- if (entry.timestamp + max_age) < now {
- return None;
- }
-
- Some(entry.tasks.clone())
- } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
- if (entry.timestamp + max_age) < now {
- return None;
- }
- Some(entry.tasks.clone())
- } else {
- None
- }
- }
-
- // Invalidate cache for a given remote.
- fn invalidate_cache_for_remote(&mut self, remote: &str) {
- self.dirty = true;
- self.content.remote_tasks.remove(remote);
- }
-
- // Lock the cache for modification.
- //
- // While the cache is locked, other users can still read the cache
- // without a lock, since the cache file is replaced atomically
- // when updating.
- fn lock(&self, duration: Duration) -> Result<File, Error> {
- let api_uid = pdm_config::api_user()?.uid;
- let api_gid = pdm_config::api_group()?.gid;
-
- let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
- proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
- }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
- timestamp: i64,
- tasks: Vec<TaskListItem>,
-}
-
-#[derive(Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
- remote_tasks: HashMap<String, TaskCacheEntry>,
-}
-
-/// Interval at which tracked tasks are polled
-const RUNNING_CHECK_INTERVAL_S: u64 = 10;
-
-/// Tasks which were started by PDM and are still running
-static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-/// Tasks which were started by PDM and w
-static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-
-fn init() -> RwLock<HashSet<RemoteUpid>> {
- RwLock::new(HashSet::new())
-}
-
-/// Insert a remote UPID into the running list
-///
-/// If it is the first entry in the list, a background task is started to track its state
-///
-/// Returns the [`JoinHandle`] if a task was started.
-///
-/// panics on a poisoned mutex
-pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
- let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
-
- // the call inserting the first task in the list needs to start the checking task
- let need_start_task = tasks.is_empty();
- tasks.insert(task);
-
- if !need_start_task {
- return None;
- }
- drop(tasks);
-
- Some(tokio::spawn(async move {
- loop {
- let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
- let finished_tasks = get_finished_tasks().await;
-
- // skip iteration if we still have tasks, just not finished ones
- if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
- continue;
- }
-
- let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
- // we either have finished tasks, or the running task list was empty
- let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
-
- for (upid, _status) in finished_tasks {
- if set.remove(&upid) {
- finished.insert(upid);
- } else {
- // someone else removed & persisted the task in the meantime
- }
- }
-
- // if no task remains, end the current task
- // it will be restarted by the next caller that inserts one
- if set.is_empty() {
- return;
- }
- }
- }))
-}
-
-/// Get a list of running foreign tasks
-///
-/// panics on a poisoned mutex
-pub fn get_running_tasks() -> Vec<RemoteUpid> {
- RUNNING_FOREIGN_TASKS
- .read()
- .unwrap()
- .iter()
- .cloned()
- .collect()
-}
-
-/// Checks all current saved UPIDs if they're still running, and if not,
-/// returns their upids + status
-///
-/// panics on a poisoned mutex
-pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
- let mut finished = Vec::new();
- let config = match pdm_config::remotes::config() {
- Ok((config, _)) => config,
- Err(err) => {
- log::error!("could not open remotes config: {err}");
- return Vec::new();
- }
+ let new_tasks = AddTasks {
+ update_most_recent_archive_timestamp: all_successful,
+ tasks,
};
- for task in get_running_tasks() {
- match config.get(task.remote()) {
- Some(remote) => match remote.ty {
- RemoteType::Pve => {
- let status = match crate::api::pve::tasks::get_task_status(
- remote.id.clone(),
- task.clone(),
- false,
- )
- .await
- {
- Ok(status) => status,
- Err(err) => {
- log::error!("could not get status from remote: {err}");
- finished.push((task, "could not get status".to_string()));
- continue;
- }
- };
- if let Some(status) = status.exitstatus {
- finished.push((task, status.to_string()));
- }
- }
- RemoteType::Pbs => {
- let _client = match crate::pbs_client::connect(remote) {
- Ok(client) => client,
- Err(err) => {
- log::error!("could not get status from remote: {err}");
- finished.push((task, "could not get status".to_string()));
- continue;
- }
- };
- // FIXME implement get task status
- finished.push((task, "unknown state".to_string()));
- }
- },
- None => finished.push((task, "unknown remote".to_string())),
- }
- }
- finished
+ Ok((remote.id.clone(), new_tasks))
+}
+
+/// Check if we are due for checking for cache rotation.
+fn should_check_for_cache_rotation(cycle: u64) -> bool {
+ cycle % CHECK_ROTATE_CYCLES == 0
+}
+
+/// Get a new [`TaskCache`] instance.
+///
+/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
+/// update loop.
+fn get_cache() -> Result<TaskCache, Error> {
+ let api_uid = pdm_config::api_user()?.uid;
+ let api_gid = pdm_config::api_group()?.gid;
+
+ let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+ let cache_path = Path::new(REMOTE_TASKS_DIR);
+ let cache = TaskCache::new(cache_path, file_options)?;
+
+ Ok(cache)
+}
+
+/// Map a `ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
+ let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
+
+ Ok(TaskCacheItem {
+ upid: remote_upid,
+ starttime: task.starttime,
+ endtime: task.endtime,
+ status: task.status,
+ })
}
/// Parses a task status string into a TaskStateType
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-03-14 14:13 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 1/8] test support: add NamedTempFile helper Lukas Wagner
2025-04-09 12:50 ` [pdm-devel] applied: " Thomas Lamprecht
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 2/8] test support: add NamedTempDir helper Lukas Wagner
2025-04-09 12:50 ` [pdm-devel] applied: " Thomas Lamprecht
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 3/8] move task_cache.rs to remote_tasks/mod.rs Lukas Wagner
2025-04-09 12:50 ` [pdm-devel] applied: " Thomas Lamprecht
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 4/8] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-03-14 14:12 ` Lukas Wagner [this message]
2025-03-20 17:39 ` [pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache Thomas Lamprecht
2025-03-21 13:33 ` Lukas Wagner
2025-03-21 13:39 ` Max Carrara
2025-04-11 8:03 ` Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 6/8] pdm-api-types: remote tasks: implement From<&str> for TaskStateType Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 7/8] fake remote: add missing fields to make the debug feature compile again Lukas Wagner
2025-04-09 12:52 ` [pdm-devel] applied: " Thomas Lamprecht
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 8/8] fake remote: generate fake task data Lukas Wagner
2025-04-09 12:52 ` [pdm-devel] applied: " Thomas Lamprecht
2025-04-11 11:02 ` [pdm-devel] superseded: [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250314141225.240768-6-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal