* [pdm-devel] [PATCH datacenter-manager 1/5] remote tasks: move implementation to server crate
2025-12-16 13:15 [pdm-devel] [PATCH-SERIES 0/6] manual refresh button for remote task cache Lukas Wagner
2025-12-16 13:15 ` [pdm-devel] [PATCH proxmox-yew-comp 1/1] task list: add support for starting refresh tasks Lukas Wagner
@ 2025-12-16 13:15 ` Lukas Wagner
2025-12-16 13:15 ` [pdm-devel] [PATCH datacenter-manager 2/5] remote tasks: implement `refresh_task_cache` for manual task fetching Lukas Wagner
` (4 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Lukas Wagner @ 2025-12-16 13:15 UTC (permalink / raw)
To: pdm-devel
Most of the functions from the refresh task are needed for any kind of
manual refresh, so they have to moved to the shared server crate.
No functional changes.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
.../tasks/remote_tasks.rs | 535 +-----------------
server/src/remote_tasks/mod.rs | 1 +
server/src/remote_tasks/refresh_task.rs | 530 +++++++++++++++++
3 files changed, 536 insertions(+), 530 deletions(-)
create mode 100644 server/src/remote_tasks/refresh_task.rs
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index c71a0894..fd8823cb 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -1,28 +1,10 @@
-use std::{
- collections::{HashMap, HashSet},
- sync::Arc,
- time::{Duration, Instant},
-};
+use std::time::Duration;
use anyhow::Error;
use nix::sys::stat::Mode;
-use tokio::{sync::Semaphore, task::JoinSet};
-
-use pdm_api_types::{
- remotes::{Remote, RemoteType},
- RemoteUpid,
-};
-use proxmox_section_config::typed::SectionConfigData;
use server::{
- connection,
- parallel_fetcher::{NodeResults, ParallelFetcher},
- pbs_client,
- remote_tasks::{
- self,
- task_cache::{GetTasks, NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
- KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER,
- },
+ remote_tasks::{self, refresh_task, REMOTE_TASKS_DIR},
task_utils,
};
@@ -30,105 +12,6 @@ use server::{
/// This is also the rate at which we check on tracked tasks.
const POLL_INTERVAL: Duration = Duration::from_secs(10);
-/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
-/// task for this remote).
-const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600);
-
-/// Interval in seconds at which we poll active tasks. This only really affects 'foreign' (as in,
-/// not started by PDM) tasks. Tasks which were started by PDM are always 'tracked' and therefore
-/// polled at the interval set in [`POLL_INTERVAL`].
-// NOTE: Since we at the moment never query active tasks from remotes, this is merely a safeguard
-// to clear stuck active tasks from a previous bug. If we at some point query active tasks, we
-// might lower this interval.
-const POLL_ACTIVE_INTERVAL: Duration = Duration::from_secs(600);
-
-/// Interval at which to check for task cache rotation.
-const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
-
-/// Interval at which the task cache journal should be applied.
-///
-/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes.
-/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing
-/// it will be slower than the task archive itself, as the entire journal must be loaded into
-/// memory and then sorted by task starttime. Applying the journal more often might
-/// lead to more writes, but should yield better performance.
-const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600);
-
-/// Maximum number of concurrent connections per remote.
-const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
-
-/// Maximum number of total concurrent connections.
-const MAX_CONNECTIONS: usize = 20;
-
-/// Maximum number of tasks to fetch from a single remote in one API call.
-const MAX_TASKS_TO_FETCH: u64 = 5000;
-
-/// (Ephemeral) Remote task fetching task state.
-struct TaskState {
- /// Time at which we last checked for archive rotation.
- last_rotate_check: Instant,
- /// Time at which we fetch tasks the last time.
- last_fetch: Instant,
- /// Time at which we last applied the journal.
- last_journal_apply: Instant,
- /// Time at which we polled active tasks. This is done to ensure that
- /// active tasks are never stuck in the 'active' state
- last_active_poll: Instant,
-}
-
-impl TaskState {
- fn new() -> Self {
- let now = Instant::now();
-
- Self {
- last_rotate_check: now - CHECK_ROTATE_INTERVAL,
- last_fetch: now - TASK_FETCH_INTERVAL,
- last_journal_apply: now - APPLY_JOURNAL_INTERVAL,
- last_active_poll: now - POLL_ACTIVE_INTERVAL,
- }
- }
-
- /// Reset the task archive rotation timestamp.
- fn reset_rotate_check(&mut self) {
- self.last_rotate_check = Instant::now();
- }
-
- /// Reset the task fetch timestamp.
- fn reset_fetch(&mut self) {
- self.last_fetch = Instant::now();
- }
-
- /// Reset the journal apply timestamp.
- fn reset_journal_apply(&mut self) {
- self.last_journal_apply = Instant::now();
- }
-
- /// Reset the journal apply timestamp.
- fn reset_active_poll(&mut self) {
- self.last_active_poll = Instant::now();
- }
-
- /// Should we check for archive rotation?
- fn is_due_for_rotate_check(&self) -> bool {
- Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL
- }
-
- /// Should we fetch tasks?
- fn is_due_for_fetch(&self) -> bool {
- Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL
- }
-
- /// Should we apply the task archive's journal?
- fn is_due_for_journal_apply(&self) -> bool {
- Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL
- }
-
- /// Should we poll active tasks?
- fn is_due_for_active_poll(&self) -> bool {
- Instant::now().duration_since(self.last_active_poll) > POLL_ACTIVE_INTERVAL
- }
-}
-
/// Start the remote task fetching task
pub fn start_task() -> Result<(), Error> {
let dir_options =
@@ -148,7 +31,7 @@ pub fn start_task() -> Result<(), Error> {
/// Task which handles fetching remote tasks and task archive rotation.
/// This function never returns.
async fn remote_task_fetching_task() -> ! {
- let mut task_state = TaskState::new();
+ let mut task_state = refresh_task::TaskState::new();
let mut interval = tokio::time::interval(POLL_INTERVAL);
interval.reset_at(task_utils::next_aligned_instant(POLL_INTERVAL.as_secs()).into());
@@ -157,422 +40,14 @@ async fn remote_task_fetching_task() -> ! {
// a steady tick rate.
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
- if let Err(err) = init_cache().await {
+ if let Err(err) = refresh_task::init_cache().await {
log::error!("error when initialized task cache: {err:#}");
}
loop {
interval.tick().await;
- if let Err(err) = do_tick(&mut task_state).await {
+ if let Err(err) = remote_tasks::refresh_task::handle_timer_tick(&mut task_state).await {
log::error!("error when fetching remote tasks: {err:#}");
}
}
}
-
-/// Handle a single timer tick.
-/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
-async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
- let cache = remote_tasks::get_cache()?;
-
- if task_state.is_due_for_rotate_check() {
- log::debug!("checking if remote task archive should be rotated");
- if rotate_cache(cache.clone()).await? {
- log::info!("rotated remote task archive");
- }
-
- task_state.reset_rotate_check();
- }
-
- if task_state.is_due_for_journal_apply() {
- apply_journal(cache.clone()).await?;
- task_state.reset_journal_apply();
- }
-
- let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
-
- let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
-
- let cache_state = cache.read_state();
-
- let poll_results = if task_state.is_due_for_active_poll() {
- let mut tasks_to_poll: HashSet<RemoteUpid> =
- HashSet::from_iter(cache_state.tracked_tasks().cloned());
-
- let active_tasks = get_active_tasks(cache.clone()).await?;
- tasks_to_poll.extend(active_tasks.into_iter());
-
- let poll_results = poll_tracked_tasks(
- &remote_config,
- tasks_to_poll.iter(),
- Arc::clone(&total_connections_semaphore),
- )
- .await?;
-
- task_state.reset_active_poll();
-
- poll_results
- } else {
- poll_tracked_tasks(
- &remote_config,
- cache_state.tracked_tasks(),
- Arc::clone(&total_connections_semaphore),
- )
- .await?
- };
-
- // Get a list of remotes that we should poll in this cycle.
- let remotes = if task_state.is_due_for_fetch() {
- task_state.reset_fetch();
- get_all_remotes(&remote_config)
- } else {
- get_remotes_with_finished_tasks(&remote_config, &poll_results)
- };
-
- let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
-
- if !all_tasks.is_empty()
- || poll_results
- .iter()
- .any(|(_, result)| matches!(result, PollResult::RemoteGone | PollResult::RequestError))
- {
- update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
- }
-
- Ok(())
-}
-
-/// Initialize the remote task cache with initial archive files, in case there are not
-/// any archive files yet.
-///
-/// This allows us to immediately backfill remote task history when setting up a new PDM instance
-/// without any prior task archive rotation.
-async fn init_cache() -> Result<(), Error> {
- tokio::task::spawn_blocking(|| {
- let cache = remote_tasks::get_cache()?;
- cache.write()?.init(proxmox_time::epoch_i64())?;
- Ok(())
- })
- .await?
-}
-
-/// Fetch tasks from a list of remotes.
-///
-/// Returns a list of tasks and a map that shows whether we want to update the
-/// cutoff timestamp in the statefile. We don't want to update the cutoff if
-/// the connection to one remote failed or if we could not reach all remotes in a cluster.
-async fn fetch_remotes(
- remotes: Vec<Remote>,
- cache_state: Arc<State>,
-) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
- let fetcher = ParallelFetcher {
- max_connections: MAX_CONNECTIONS,
- max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
- context: cache_state,
- };
-
- let fetch_results = fetcher
- .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
- .await;
-
- let mut all_tasks = Vec::new();
- let mut node_success_map = NodeFetchSuccessMap::default();
-
- for (remote_name, result) in fetch_results.remote_results {
- match result {
- Ok(remote_result) => {
- for (node_name, node_result) in remote_result.node_results {
- match node_result {
- Ok(NodeResults { data, .. }) => {
- all_tasks.extend(data);
- node_success_map.set_node_success(remote_name.clone(), node_name);
- }
- Err(err) => {
- log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
- }
- }
- }
- }
- Err(err) => {
- log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
- }
- }
- }
-
- (all_tasks, node_success_map)
-}
-
-async fn fetch_tasks_from_single_node(
- context: Arc<State>,
- remote: Remote,
- node: String,
-) -> Result<Vec<TaskCacheItem>, Error> {
- let since = context
- .cutoff_timestamp(&remote.id, &node)
- .unwrap_or_else(|| {
- proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
- });
-
- match remote.ty {
- RemoteType::Pve => {
- let params = pve_api_types::ListTasks {
- source: Some(pve_api_types::ListTasksSource::Archive),
- since: Some(since),
- // If `limit` is not provided, we only receive 50 tasks
- limit: Some(MAX_TASKS_TO_FETCH),
- ..Default::default()
- };
-
- let client = connection::make_pve_client(&remote)?;
-
- let task_list = client
- .get_task_list(&node, params)
- .await?
- .into_iter()
- .map(|task| map_pve_task(task, remote.id.clone()))
- .collect();
-
- Ok(task_list)
- }
- RemoteType::Pbs => {
- let params = pbs_client::ListTasks {
- since: Some(since),
- // If `limit` is not provided, we only receive 50 tasks
- limit: Some(MAX_TASKS_TO_FETCH),
- };
-
- let client = connection::make_pbs_client(&remote)?;
-
- let task_list = client
- .get_task_list(params)
- .await?
- .into_iter()
- .filter_map(|task| {
- if task.endtime.is_some() {
- // We only care about finished tasks.
- Some(map_pbs_task(task, remote.id.clone()))
- } else {
- None
- }
- })
- .collect();
-
- Ok(task_list)
- }
- }
-}
-
-/// Return all remotes from the given config.
-fn get_all_remotes(remote_config: &SectionConfigData<Remote>) -> Vec<Remote> {
- remote_config
- .into_iter()
- .map(|(_, section)| section)
- .cloned()
- .collect()
-}
-
-/// Return all remotes that correspond to a list of finished tasks.
-fn get_remotes_with_finished_tasks(
- remote_config: &SectionConfigData<Remote>,
- poll_results: &HashMap<RemoteUpid, PollResult>,
-) -> Vec<Remote> {
- let remotes_with_finished_tasks: HashSet<&str> = poll_results
- .iter()
- .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote()))
- .collect();
-
- remote_config
- .into_iter()
- .filter_map(|(name, remote)| {
- remotes_with_finished_tasks
- .contains(&name)
- .then_some(remote)
- })
- .cloned()
- .collect()
-}
-
-/// Rotate the task cache if necessary.
-///
-/// Returns Ok(true) the cache's files were rotated.
-async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
- tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await?
-}
-
-/// Apply the task cache journal.
-async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
- tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
-}
-
-/// Get a list of active tasks.
-async fn get_active_tasks(cache: TaskCache) -> Result<Vec<RemoteUpid>, Error> {
- Ok(tokio::task::spawn_blocking(move || {
- let tasks: Vec<RemoteUpid> = cache
- .read()?
- .get_tasks(GetTasks::Active)?
- .map(|t| t.upid)
- .collect();
-
- Ok::<Vec<RemoteUpid>, Error>(tasks)
- })
- .await??)
-}
-
-#[derive(PartialEq, Debug)]
-/// Outcome from polling a tracked task.
-enum PollResult {
- /// Tasks is still running.
- Running,
- /// Task is finished, poll remote tasks to get final status/endtime.
- Finished,
- /// Should be dropped from the active file.
- RequestError,
- /// Remote does not exist any more -> remove immediately from tracked task list.
- RemoteGone,
-}
-
-/// Poll all tracked tasks.
-async fn poll_tracked_tasks(
- remote_config: &SectionConfigData<Remote>,
- tracked_tasks: impl Iterator<Item = &RemoteUpid>,
- total_connections_semaphore: Arc<Semaphore>,
-) -> Result<HashMap<RemoteUpid, PollResult>, Error> {
- let mut join_set = JoinSet::new();
-
- for task in tracked_tasks.cloned() {
- let permit = Arc::clone(&total_connections_semaphore)
- .acquire_owned()
- .await
- .unwrap();
-
- let remote = remote_config.get(task.remote()).cloned();
-
- join_set.spawn(async move {
- // Move permit into this async block.
- let _permit = permit;
-
- match remote {
- Some(remote) => poll_single_tracked_task(remote, task).await,
- None => {
- log::info!(
- "remote {} does not exist any more, dropping tracked task",
- task.remote()
- );
- (task, PollResult::RemoteGone)
- }
- }
- });
- }
-
- let mut results = HashMap::new();
- while let Some(task_result) = join_set.join_next().await {
- let (upid, result) = task_result?;
- results.insert(upid, result);
- }
-
- Ok(results)
-}
-
-/// Poll a single tracked task.
-async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) {
- match remote.ty {
- RemoteType::Pve => {
- log::debug!("polling tracked task {}", task);
-
- let status = match server::api::pve::tasks::get_task_status(
- remote.id.clone(),
- task.clone(),
- false,
- )
- .await
- {
- Ok(status) => status,
- Err(err) => {
- log::error!("could not get status from remote: {err:#}");
- return (task, PollResult::RequestError);
- }
- };
-
- let result = if status.exitstatus.is_some() {
- PollResult::Finished
- } else {
- PollResult::Running
- };
-
- (task, result)
- }
- RemoteType::Pbs => {
- let status = match server::api::pbs::tasks::get_task_status(
- remote.id.clone(),
- task.clone(),
- false,
- )
- .await
- {
- Ok(status) => status,
- Err(err) => {
- log::error!("could not get status from remote: {err:#}");
- return (task, PollResult::RequestError);
- }
- };
-
- let result = if status.exitstatus.is_some() {
- PollResult::Finished
- } else {
- PollResult::Running
- };
-
- (task, result)
- }
- }
-}
-
-/// Map a `pve_api_types::ListTasksResponse` to `TaskCacheItem`
-fn map_pve_task(task: pve_api_types::ListTasksResponse, remote: String) -> TaskCacheItem {
- let remote_upid = RemoteUpid::new(remote, RemoteType::Pve, task.upid);
-
- TaskCacheItem {
- upid: remote_upid,
- starttime: task.starttime,
- endtime: task.endtime,
- status: task.status,
- }
-}
-
-/// Map a `pbs_api_types::TaskListItem` to `TaskCacheItem`
-fn map_pbs_task(task: pbs_api_types::TaskListItem, remote: String) -> TaskCacheItem {
- let remote_upid = RemoteUpid::new(remote, RemoteType::Pbs, task.upid);
-
- TaskCacheItem {
- upid: remote_upid,
- starttime: task.starttime,
- endtime: task.endtime,
- status: task.status,
- }
-}
-
-/// Update task cache with results from tracked task polling & regular task fetching.
-async fn update_task_cache(
- cache: TaskCache,
- new_tasks: Vec<TaskCacheItem>,
- update_state_for_remote: NodeFetchSuccessMap,
- poll_results: HashMap<RemoteUpid, PollResult>,
-) -> Result<(), Error> {
- tokio::task::spawn_blocking(move || {
- let drop_tracked = poll_results
- .into_iter()
- .filter_map(|(upid, result)| match result {
- PollResult::Running => None,
- PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => {
- Some(upid)
- }
- })
- .collect();
-
- cache
- .write()?
- .update(new_tasks, &update_state_for_remote, drop_tracked)?;
-
- Ok(())
- })
- .await?
-}
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index b080811f..50ac6708 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -5,6 +5,7 @@ use anyhow::Error;
use pdm_api_types::{NativeUpid, RemoteUpid, TaskFilters, TaskListItem, TaskStateType};
use pve_api_types::PveUpid;
+pub mod refresh_task;
pub mod task_cache;
use task_cache::{GetTasks, TaskCache, TaskCacheItem};
diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs
new file mode 100644
index 00000000..0e8ed345
--- /dev/null
+++ b/server/src/remote_tasks/refresh_task.rs
@@ -0,0 +1,530 @@
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+ time::{Duration, Instant},
+};
+
+use anyhow::Error;
+use pdm_api_types::{
+ remotes::{Remote, RemoteType},
+ RemoteUpid,
+};
+use proxmox_section_config::typed::SectionConfigData;
+use tokio::{sync::Semaphore, task::JoinSet};
+
+use crate::{
+ api, connection,
+ parallel_fetcher::{NodeResults, ParallelFetcher},
+ pbs_client,
+ remote_tasks::{
+ task_cache::{GetTasks, NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
+ KEEP_OLD_FILES, ROTATE_AFTER,
+ },
+};
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600);
+
+/// Interval in seconds at which we poll active tasks. This only really affects 'foreign' (as in,
+/// not started by PDM) tasks. Tasks which were started by PDM are always 'tracked' and therefore
+/// polled at the interval set in [`POLL_INTERVAL`].
+// NOTE: Since we at the moment never query active tasks from remotes, this is merely a safeguard
+// to clear stuck active tasks from a previous bug. If we at some point query active tasks, we
+// might lower this interval.
+const POLL_ACTIVE_INTERVAL: Duration = Duration::from_secs(600);
+
+/// Interval at which to check for task cache rotation.
+const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
+
+/// Interval at which the task cache journal should be applied.
+///
+/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes.
+/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing
+/// it will be slower than the task archive itself, as the entire journal must be loaded into
+/// memory and then sorted by task starttime. Applying the journal more often might
+/// lead to more writes, but should yield better performance.
+const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600);
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+
+/// Maximum number of total concurrent connections.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
+/// (Ephemeral) Remote task fetching task state.
+pub struct TaskState {
+ /// Time at which we last checked for archive rotation.
+ last_rotate_check: Instant,
+ /// Time at which we fetch tasks the last time.
+ last_fetch: Instant,
+ /// Time at which we last applied the journal.
+ last_journal_apply: Instant,
+ /// Time at which we polled active tasks. This is done to ensure that
+ /// active tasks are never stuck in the 'active' state
+ last_active_poll: Instant,
+}
+
+impl TaskState {
+ pub fn new() -> Self {
+ let now = Instant::now();
+
+ Self {
+ last_rotate_check: now - CHECK_ROTATE_INTERVAL,
+ last_fetch: now - TASK_FETCH_INTERVAL,
+ last_journal_apply: now - APPLY_JOURNAL_INTERVAL,
+ last_active_poll: now - POLL_ACTIVE_INTERVAL,
+ }
+ }
+
+ /// Reset the task archive rotation timestamp.
+ fn reset_rotate_check(&mut self) {
+ self.last_rotate_check = Instant::now();
+ }
+
+ /// Reset the task fetch timestamp.
+ fn reset_fetch(&mut self) {
+ self.last_fetch = Instant::now();
+ }
+
+ /// Reset the journal apply timestamp.
+ fn reset_journal_apply(&mut self) {
+ self.last_journal_apply = Instant::now();
+ }
+
+ /// Reset the journal apply timestamp.
+ fn reset_active_poll(&mut self) {
+ self.last_active_poll = Instant::now();
+ }
+
+ /// Should we check for archive rotation?
+ fn is_due_for_rotate_check(&self) -> bool {
+ Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL
+ }
+
+ /// Should we fetch tasks?
+ fn is_due_for_fetch(&self) -> bool {
+ Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL
+ }
+
+ /// Should we apply the task archive's journal?
+ fn is_due_for_journal_apply(&self) -> bool {
+ Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL
+ }
+
+ /// Should we poll active tasks?
+ fn is_due_for_active_poll(&self) -> bool {
+ Instant::now().duration_since(self.last_active_poll) > POLL_ACTIVE_INTERVAL
+ }
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> {
+ let cache = super::get_cache()?;
+
+ if task_state.is_due_for_rotate_check() {
+ log::debug!("checking if remote task archive should be rotated");
+ if rotate_cache(cache.clone()).await? {
+ log::info!("rotated remote task archive");
+ }
+
+ task_state.reset_rotate_check();
+ }
+
+ if task_state.is_due_for_journal_apply() {
+ apply_journal(cache.clone()).await?;
+ task_state.reset_journal_apply();
+ }
+
+ let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+ let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+
+ let cache_state = cache.read_state();
+
+ let poll_results = if task_state.is_due_for_active_poll() {
+ let mut tasks_to_poll: HashSet<RemoteUpid> =
+ HashSet::from_iter(cache_state.tracked_tasks().cloned());
+
+ let active_tasks = get_active_tasks(cache.clone()).await?;
+ tasks_to_poll.extend(active_tasks.into_iter());
+
+ let poll_results = poll_tracked_tasks(
+ &remote_config,
+ tasks_to_poll.iter(),
+ Arc::clone(&total_connections_semaphore),
+ )
+ .await?;
+
+ task_state.reset_active_poll();
+
+ poll_results
+ } else {
+ poll_tracked_tasks(
+ &remote_config,
+ cache_state.tracked_tasks(),
+ Arc::clone(&total_connections_semaphore),
+ )
+ .await?
+ };
+
+ // Get a list of remotes that we should poll in this cycle.
+ let remotes = if task_state.is_due_for_fetch() {
+ task_state.reset_fetch();
+ get_all_remotes(&remote_config)
+ } else {
+ get_remotes_with_finished_tasks(&remote_config, &poll_results)
+ };
+
+ let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
+
+ if !all_tasks.is_empty()
+ || poll_results
+ .iter()
+ .any(|(_, result)| matches!(result, PollResult::RemoteGone | PollResult::RequestError))
+ {
+ update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
+ }
+
+ Ok(())
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+pub async fn init_cache() -> Result<(), Error> {
+ tokio::task::spawn_blocking(|| {
+ let cache = super::get_cache()?;
+ cache.write()?.init(proxmox_time::epoch_i64())?;
+ Ok(())
+ })
+ .await?
+}
+
+/// Fetch tasks from a list of remotes.
+///
+/// Returns a list of tasks and a map that shows whether we want to update the
+/// cutoff timestamp in the statefile. We don't want to update the cutoff if
+/// the connection to one remote failed or if we could not reach all remotes in a cluster.
+async fn fetch_remotes(
+ remotes: Vec<Remote>,
+ cache_state: Arc<State>,
+) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
+ let fetcher = ParallelFetcher {
+ max_connections: MAX_CONNECTIONS,
+ max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
+ context: cache_state,
+ };
+
+ let fetch_results = fetcher
+ .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
+ .await;
+
+ let mut all_tasks = Vec::new();
+ let mut node_success_map = NodeFetchSuccessMap::default();
+
+ for (remote_name, result) in fetch_results.remote_results {
+ match result {
+ Ok(remote_result) => {
+ for (node_name, node_result) in remote_result.node_results {
+ match node_result {
+ Ok(NodeResults { data, .. }) => {
+ all_tasks.extend(data);
+ node_success_map.set_node_success(remote_name.clone(), node_name);
+ }
+ Err(err) => {
+ log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+ }
+ }
+ }
+ }
+ Err(err) => {
+ log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
+ }
+ }
+ }
+
+ (all_tasks, node_success_map)
+}
+
+async fn fetch_tasks_from_single_node(
+ context: Arc<State>,
+ remote: Remote,
+ node: String,
+) -> Result<Vec<TaskCacheItem>, Error> {
+ let since = context
+ .cutoff_timestamp(&remote.id, &node)
+ .unwrap_or_else(|| {
+ proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
+ });
+
+ match remote.ty {
+ RemoteType::Pve => {
+ let params = pve_api_types::ListTasks {
+ source: Some(pve_api_types::ListTasksSource::Archive),
+ since: Some(since),
+ // If `limit` is not provided, we only receive 50 tasks
+ limit: Some(MAX_TASKS_TO_FETCH),
+ ..Default::default()
+ };
+
+ let client = connection::make_pve_client(&remote)?;
+
+ let task_list = client
+ .get_task_list(&node, params)
+ .await?
+ .into_iter()
+ .map(|task| map_pve_task(task, remote.id.clone()))
+ .collect();
+
+ Ok(task_list)
+ }
+ RemoteType::Pbs => {
+ let params = pbs_client::ListTasks {
+ since: Some(since),
+ // If `limit` is not provided, we only receive 50 tasks
+ limit: Some(MAX_TASKS_TO_FETCH),
+ };
+
+ let client = connection::make_pbs_client(&remote)?;
+
+ let task_list = client
+ .get_task_list(params)
+ .await?
+ .into_iter()
+ .filter_map(|task| {
+ if task.endtime.is_some() {
+ // We only care about finished tasks.
+ Some(map_pbs_task(task, remote.id.clone()))
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ Ok(task_list)
+ }
+ }
+}
+
+/// Return all remotes from the given config.
+fn get_all_remotes(remote_config: &SectionConfigData<Remote>) -> Vec<Remote> {
+ remote_config
+ .into_iter()
+ .map(|(_, section)| section)
+ .cloned()
+ .collect()
+}
+
+/// Return all remotes that correspond to a list of finished tasks.
+fn get_remotes_with_finished_tasks(
+ remote_config: &SectionConfigData<Remote>,
+ poll_results: &HashMap<RemoteUpid, PollResult>,
+) -> Vec<Remote> {
+ let remotes_with_finished_tasks: HashSet<&str> = poll_results
+ .iter()
+ .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote()))
+ .collect();
+
+ remote_config
+ .into_iter()
+ .filter_map(|(name, remote)| {
+ remotes_with_finished_tasks
+ .contains(&name)
+ .then_some(remote)
+ })
+ .cloned()
+ .collect()
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+ tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await?
+}
+
+/// Apply the task cache journal.
+async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
+}
+
+/// Get a list of active tasks.
+async fn get_active_tasks(cache: TaskCache) -> Result<Vec<RemoteUpid>, Error> {
+ Ok(tokio::task::spawn_blocking(move || {
+ let tasks: Vec<RemoteUpid> = cache
+ .read()?
+ .get_tasks(GetTasks::Active)?
+ .map(|t| t.upid)
+ .collect();
+
+ Ok::<Vec<RemoteUpid>, Error>(tasks)
+ })
+ .await??)
+}
+
+#[derive(PartialEq, Debug)]
+/// Outcome from polling a tracked task.
+enum PollResult {
+ /// Tasks is still running.
+ Running,
+ /// Task is finished, poll remote tasks to get final status/endtime.
+ Finished,
+ /// Should be dropped from the active file.
+ RequestError,
+ /// Remote does not exist any more -> remove immediately from tracked task list.
+ RemoteGone,
+}
+
+/// Poll all tracked tasks.
+async fn poll_tracked_tasks(
+ remote_config: &SectionConfigData<Remote>,
+ tracked_tasks: impl Iterator<Item = &RemoteUpid>,
+ total_connections_semaphore: Arc<Semaphore>,
+) -> Result<HashMap<RemoteUpid, PollResult>, Error> {
+ let mut join_set = JoinSet::new();
+
+ for task in tracked_tasks.cloned() {
+ let permit = Arc::clone(&total_connections_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap();
+
+ let remote = remote_config.get(task.remote()).cloned();
+
+ join_set.spawn(async move {
+ // Move permit into this async block.
+ let _permit = permit;
+
+ match remote {
+ Some(remote) => poll_single_tracked_task(remote, task).await,
+ None => {
+ log::info!(
+ "remote {} does not exist any more, dropping tracked task",
+ task.remote()
+ );
+ (task, PollResult::RemoteGone)
+ }
+ }
+ });
+ }
+
+ let mut results = HashMap::new();
+ while let Some(task_result) = join_set.join_next().await {
+ let (upid, result) = task_result?;
+ results.insert(upid, result);
+ }
+
+ Ok(results)
+}
+
+/// Poll a single tracked task.
+async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) {
+ match remote.ty {
+ RemoteType::Pve => {
+ log::debug!("polling tracked task {}", task);
+
+ let status = match api::pve::tasks::get_task_status(
+ remote.id.clone(),
+ task.clone(),
+ false,
+ )
+ .await
+ {
+ Ok(status) => status,
+ Err(err) => {
+ log::error!("could not get status from remote: {err:#}");
+ return (task, PollResult::RequestError);
+ }
+ };
+
+ let result = if status.exitstatus.is_some() {
+ PollResult::Finished
+ } else {
+ PollResult::Running
+ };
+
+ (task, result)
+ }
+ RemoteType::Pbs => {
+ let status = match api::pbs::tasks::get_task_status(
+ remote.id.clone(),
+ task.clone(),
+ false,
+ )
+ .await
+ {
+ Ok(status) => status,
+ Err(err) => {
+ log::error!("could not get status from remote: {err:#}");
+ return (task, PollResult::RequestError);
+ }
+ };
+
+ let result = if status.exitstatus.is_some() {
+ PollResult::Finished
+ } else {
+ PollResult::Running
+ };
+
+ (task, result)
+ }
+ }
+}
+
+/// Map a `pve_api_types::ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: pve_api_types::ListTasksResponse, remote: String) -> TaskCacheItem {
+ let remote_upid = RemoteUpid::new(remote, RemoteType::Pve, task.upid);
+
+ TaskCacheItem {
+ upid: remote_upid,
+ starttime: task.starttime,
+ endtime: task.endtime,
+ status: task.status,
+ }
+}
+
+/// Map a `pbs_api_types::TaskListItem` to `TaskCacheItem`
+fn map_pbs_task(task: pbs_api_types::TaskListItem, remote: String) -> TaskCacheItem {
+ let remote_upid = RemoteUpid::new(remote, RemoteType::Pbs, task.upid);
+
+ TaskCacheItem {
+ upid: remote_upid,
+ starttime: task.starttime,
+ endtime: task.endtime,
+ status: task.status,
+ }
+}
+
+/// Update task cache with results from tracked task polling & regular task fetching.
+async fn update_task_cache(
+ cache: TaskCache,
+ new_tasks: Vec<TaskCacheItem>,
+ update_state_for_remote: NodeFetchSuccessMap,
+ poll_results: HashMap<RemoteUpid, PollResult>,
+) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || {
+ let drop_tracked = poll_results
+ .into_iter()
+ .filter_map(|(upid, result)| match result {
+ PollResult::Running => None,
+ PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => {
+ Some(upid)
+ }
+ })
+ .collect();
+
+ cache
+ .write()?
+ .update(new_tasks, &update_state_for_remote, drop_tracked)?;
+
+ Ok(())
+ })
+ .await?
+}
--
2.47.3
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 8+ messages in thread* [pdm-devel] [PATCH datacenter-manager 3/5] api: add /remotes/tasks/refresh
2025-12-16 13:15 [pdm-devel] [PATCH-SERIES 0/6] manual refresh button for remote task cache Lukas Wagner
` (2 preceding siblings ...)
2025-12-16 13:15 ` [pdm-devel] [PATCH datacenter-manager 2/5] remote tasks: implement `refresh_task_cache` for manual task fetching Lukas Wagner
@ 2025-12-16 13:15 ` Lukas Wagner
2025-12-16 13:15 ` [pdm-devel] [PATCH datacenter-manager 4/5] ui: remote task view: set refresh_task_url property for task viewer Lukas Wagner
` (2 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Lukas Wagner @ 2025-12-16 13:15 UTC (permalink / raw)
To: pdm-devel
This one allows us to trigger task fetching for a single or all remotes.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/api/remote_tasks.rs | 90 ++++++++++++++++++++++++++++++++--
1 file changed, 87 insertions(+), 3 deletions(-)
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index c7359166..cec4d09e 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -3,11 +3,14 @@ use std::collections::HashMap;
use anyhow::Error;
use pdm_api_types::{
- remotes::REMOTE_ID_SCHEMA, RemoteUpid, TaskCount, TaskFilters, TaskListItem, TaskStateType,
- TaskStatistics, PRIV_RESOURCE_AUDIT, VIEW_ID_SCHEMA,
+ remotes::REMOTE_ID_SCHEMA, Authid, RemoteUpid, TaskCount, TaskFilters, TaskListItem,
+ TaskStateType, TaskStatistics, PRIV_RESOURCE_AUDIT, PRIV_RESOURCE_MODIFY, UPID, VIEW_ID_SCHEMA,
};
use proxmox_access_control::CachedUserInfo;
-use proxmox_router::{list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap};
+use proxmox_rest_server::WorkerTask;
+use proxmox_router::{
+ http_bail, http_err, list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap,
+};
use proxmox_schema::api;
use proxmox_sortable_macro::sortable;
@@ -24,6 +27,10 @@ const SUBDIRS: SubdirMap = &sorted!([
"statistics",
&Router::new().get(&API_METHOD_TASK_STATISTICS)
),
+ (
+ "refresh",
+ &Router::new().post(&API_METHOD_REFRESH_REMOTE_TASKS)
+ )
]);
#[api(
@@ -168,3 +175,80 @@ async fn task_statistics(
Ok(TaskStatistics { by_type, by_remote })
}
+
+#[api(
+ input: {
+ properties: {
+ remote: {
+ schema: REMOTE_ID_SCHEMA,
+ optional: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Anybody,
+ description: "Resource.Modify privileges are needed on /resource/{remote}",
+ },
+ returns: { type: UPID }
+)]
+/// Refresh the update summary of all remotes.
+pub fn refresh_remote_tasks(
+ remote: Option<String>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<UPID, Error> {
+ let (config, _digest) = pdm_config::remotes::config()?;
+
+ let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let remotes = if let Some(remote) = remote {
+ if user_info
+ .check_privs(
+ &auth_id,
+ &["resource", &remote],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )
+ .is_err()
+ {
+ http_bail!(FORBIDDEN, "user has no access to this remote");
+ }
+
+ let remote = config
+ .get(&remote)
+ .ok_or_else(|| http_err!(NOT_FOUND, "remote does not exist"))?;
+ vec![remote.clone()]
+ } else {
+ if !user_info.any_privs_below(&auth_id, &["resource"], PRIV_RESOURCE_MODIFY)? {
+ http_bail!(FORBIDDEN, "user has no access to resources");
+ }
+
+ config
+ .into_iter()
+ .filter_map(|(remote_name, remote)| {
+ user_info
+ .check_privs(
+ &auth_id,
+ &["resource", &remote_name],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )
+ .is_ok()
+ .then_some(remote)
+ })
+ .collect()
+ };
+
+ let upid_str = WorkerTask::spawn(
+ "refresh-remote-tasks",
+ None,
+ auth_id.to_string(),
+ true,
+ |_worker| async {
+ remote_tasks::refresh_task::refresh_taskcache(remotes).await?;
+ Ok(())
+ },
+ )?;
+
+ upid_str.parse()
+}
--
2.47.3
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 8+ messages in thread