public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH datacenter-manager 1/5] remote tasks: move implementation to server crate
Date: Tue, 16 Dec 2025 14:15:14 +0100	[thread overview]
Message-ID: <20251216131518.241022-3-l.wagner@proxmox.com> (raw)
In-Reply-To: <20251216131518.241022-1-l.wagner@proxmox.com>

Most of the functions from the refresh task are needed for any kind of
manual refresh, so they have to moved to the shared server crate.

No functional changes.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 .../tasks/remote_tasks.rs                     | 535 +-----------------
 server/src/remote_tasks/mod.rs                |   1 +
 server/src/remote_tasks/refresh_task.rs       | 530 +++++++++++++++++
 3 files changed, 536 insertions(+), 530 deletions(-)
 create mode 100644 server/src/remote_tasks/refresh_task.rs

diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index c71a0894..fd8823cb 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -1,28 +1,10 @@
-use std::{
-    collections::{HashMap, HashSet},
-    sync::Arc,
-    time::{Duration, Instant},
-};
+use std::time::Duration;
 
 use anyhow::Error;
 use nix::sys::stat::Mode;
-use tokio::{sync::Semaphore, task::JoinSet};
-
-use pdm_api_types::{
-    remotes::{Remote, RemoteType},
-    RemoteUpid,
-};
-use proxmox_section_config::typed::SectionConfigData;
 
 use server::{
-    connection,
-    parallel_fetcher::{NodeResults, ParallelFetcher},
-    pbs_client,
-    remote_tasks::{
-        self,
-        task_cache::{GetTasks, NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
-        KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER,
-    },
+    remote_tasks::{self, refresh_task, REMOTE_TASKS_DIR},
     task_utils,
 };
 
@@ -30,105 +12,6 @@ use server::{
 /// This is also the rate at which we check on tracked tasks.
 const POLL_INTERVAL: Duration = Duration::from_secs(10);
 
-/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
-/// task for this remote).
-const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600);
-
-/// Interval in seconds at which we poll active tasks. This only really affects 'foreign' (as in,
-/// not started by PDM) tasks. Tasks which were started by PDM are always 'tracked' and therefore
-/// polled at the interval set in [`POLL_INTERVAL`].
-// NOTE: Since we at the moment never query active tasks from remotes, this is merely a safeguard
-// to clear stuck active tasks from a previous bug. If we at some point query active tasks, we
-// might lower this interval.
-const POLL_ACTIVE_INTERVAL: Duration = Duration::from_secs(600);
-
-/// Interval at which to check for task cache rotation.
-const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
-
-/// Interval at which the task cache journal should be applied.
-///
-/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes.
-/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing
-/// it will be slower than the task archive itself, as the entire journal must be loaded into
-/// memory and then sorted by task starttime. Applying the journal more often might
-/// lead to more writes, but should yield better performance.
-const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600);
-
-/// Maximum number of concurrent connections per remote.
-const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
-
-/// Maximum number of total concurrent connections.
-const MAX_CONNECTIONS: usize = 20;
-
-/// Maximum number of tasks to fetch from a single remote in one API call.
-const MAX_TASKS_TO_FETCH: u64 = 5000;
-
-/// (Ephemeral) Remote task fetching task state.
-struct TaskState {
-    /// Time at which we last checked for archive rotation.
-    last_rotate_check: Instant,
-    /// Time at which we fetch tasks the last time.
-    last_fetch: Instant,
-    /// Time at which we last applied the journal.
-    last_journal_apply: Instant,
-    /// Time at which we polled active tasks. This is done to ensure that
-    /// active tasks are never stuck in the 'active' state
-    last_active_poll: Instant,
-}
-
-impl TaskState {
-    fn new() -> Self {
-        let now = Instant::now();
-
-        Self {
-            last_rotate_check: now - CHECK_ROTATE_INTERVAL,
-            last_fetch: now - TASK_FETCH_INTERVAL,
-            last_journal_apply: now - APPLY_JOURNAL_INTERVAL,
-            last_active_poll: now - POLL_ACTIVE_INTERVAL,
-        }
-    }
-
-    /// Reset the task archive rotation timestamp.
-    fn reset_rotate_check(&mut self) {
-        self.last_rotate_check = Instant::now();
-    }
-
-    /// Reset the task fetch timestamp.
-    fn reset_fetch(&mut self) {
-        self.last_fetch = Instant::now();
-    }
-
-    /// Reset the journal apply timestamp.
-    fn reset_journal_apply(&mut self) {
-        self.last_journal_apply = Instant::now();
-    }
-
-    /// Reset the journal apply timestamp.
-    fn reset_active_poll(&mut self) {
-        self.last_active_poll = Instant::now();
-    }
-
-    /// Should we check for archive rotation?
-    fn is_due_for_rotate_check(&self) -> bool {
-        Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL
-    }
-
-    /// Should we fetch tasks?
-    fn is_due_for_fetch(&self) -> bool {
-        Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL
-    }
-
-    /// Should we apply the task archive's journal?
-    fn is_due_for_journal_apply(&self) -> bool {
-        Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL
-    }
-
-    /// Should we poll active tasks?
-    fn is_due_for_active_poll(&self) -> bool {
-        Instant::now().duration_since(self.last_active_poll) > POLL_ACTIVE_INTERVAL
-    }
-}
-
 /// Start the remote task fetching task
 pub fn start_task() -> Result<(), Error> {
     let dir_options =
@@ -148,7 +31,7 @@ pub fn start_task() -> Result<(), Error> {
 /// Task which handles fetching remote tasks and task archive rotation.
 /// This function never returns.
 async fn remote_task_fetching_task() -> ! {
-    let mut task_state = TaskState::new();
+    let mut task_state = refresh_task::TaskState::new();
 
     let mut interval = tokio::time::interval(POLL_INTERVAL);
     interval.reset_at(task_utils::next_aligned_instant(POLL_INTERVAL.as_secs()).into());
@@ -157,422 +40,14 @@ async fn remote_task_fetching_task() -> ! {
     // a steady tick rate.
     interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
 
-    if let Err(err) = init_cache().await {
+    if let Err(err) = refresh_task::init_cache().await {
         log::error!("error when initialized task cache: {err:#}");
     }
 
     loop {
         interval.tick().await;
-        if let Err(err) = do_tick(&mut task_state).await {
+        if let Err(err) = remote_tasks::refresh_task::handle_timer_tick(&mut task_state).await {
             log::error!("error when fetching remote tasks: {err:#}");
         }
     }
 }
-
-/// Handle a single timer tick.
-/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
-async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
-    let cache = remote_tasks::get_cache()?;
-
-    if task_state.is_due_for_rotate_check() {
-        log::debug!("checking if remote task archive should be rotated");
-        if rotate_cache(cache.clone()).await? {
-            log::info!("rotated remote task archive");
-        }
-
-        task_state.reset_rotate_check();
-    }
-
-    if task_state.is_due_for_journal_apply() {
-        apply_journal(cache.clone()).await?;
-        task_state.reset_journal_apply();
-    }
-
-    let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
-
-    let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
-
-    let cache_state = cache.read_state();
-
-    let poll_results = if task_state.is_due_for_active_poll() {
-        let mut tasks_to_poll: HashSet<RemoteUpid> =
-            HashSet::from_iter(cache_state.tracked_tasks().cloned());
-
-        let active_tasks = get_active_tasks(cache.clone()).await?;
-        tasks_to_poll.extend(active_tasks.into_iter());
-
-        let poll_results = poll_tracked_tasks(
-            &remote_config,
-            tasks_to_poll.iter(),
-            Arc::clone(&total_connections_semaphore),
-        )
-        .await?;
-
-        task_state.reset_active_poll();
-
-        poll_results
-    } else {
-        poll_tracked_tasks(
-            &remote_config,
-            cache_state.tracked_tasks(),
-            Arc::clone(&total_connections_semaphore),
-        )
-        .await?
-    };
-
-    // Get a list of remotes that we should poll in this cycle.
-    let remotes = if task_state.is_due_for_fetch() {
-        task_state.reset_fetch();
-        get_all_remotes(&remote_config)
-    } else {
-        get_remotes_with_finished_tasks(&remote_config, &poll_results)
-    };
-
-    let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
-
-    if !all_tasks.is_empty()
-        || poll_results
-            .iter()
-            .any(|(_, result)| matches!(result, PollResult::RemoteGone | PollResult::RequestError))
-    {
-        update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
-    }
-
-    Ok(())
-}
-
-/// Initialize the remote task cache with initial archive files, in case there are not
-/// any archive files yet.
-///
-/// This allows us to immediately backfill remote task history when setting up a new PDM instance
-/// without any prior task archive rotation.
-async fn init_cache() -> Result<(), Error> {
-    tokio::task::spawn_blocking(|| {
-        let cache = remote_tasks::get_cache()?;
-        cache.write()?.init(proxmox_time::epoch_i64())?;
-        Ok(())
-    })
-    .await?
-}
-
-/// Fetch tasks from a list of remotes.
-///
-/// Returns a list of tasks and a map that shows whether we want to update the
-/// cutoff timestamp in the statefile. We don't want to update the cutoff if
-/// the connection to one remote failed or if we could not reach all remotes in a cluster.
-async fn fetch_remotes(
-    remotes: Vec<Remote>,
-    cache_state: Arc<State>,
-) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
-    let fetcher = ParallelFetcher {
-        max_connections: MAX_CONNECTIONS,
-        max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
-        context: cache_state,
-    };
-
-    let fetch_results = fetcher
-        .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
-        .await;
-
-    let mut all_tasks = Vec::new();
-    let mut node_success_map = NodeFetchSuccessMap::default();
-
-    for (remote_name, result) in fetch_results.remote_results {
-        match result {
-            Ok(remote_result) => {
-                for (node_name, node_result) in remote_result.node_results {
-                    match node_result {
-                        Ok(NodeResults { data, .. }) => {
-                            all_tasks.extend(data);
-                            node_success_map.set_node_success(remote_name.clone(), node_name);
-                        }
-                        Err(err) => {
-                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
-                        }
-                    }
-                }
-            }
-            Err(err) => {
-                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
-            }
-        }
-    }
-
-    (all_tasks, node_success_map)
-}
-
-async fn fetch_tasks_from_single_node(
-    context: Arc<State>,
-    remote: Remote,
-    node: String,
-) -> Result<Vec<TaskCacheItem>, Error> {
-    let since = context
-        .cutoff_timestamp(&remote.id, &node)
-        .unwrap_or_else(|| {
-            proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
-        });
-
-    match remote.ty {
-        RemoteType::Pve => {
-            let params = pve_api_types::ListTasks {
-                source: Some(pve_api_types::ListTasksSource::Archive),
-                since: Some(since),
-                // If `limit` is not provided, we only receive 50 tasks
-                limit: Some(MAX_TASKS_TO_FETCH),
-                ..Default::default()
-            };
-
-            let client = connection::make_pve_client(&remote)?;
-
-            let task_list = client
-                .get_task_list(&node, params)
-                .await?
-                .into_iter()
-                .map(|task| map_pve_task(task, remote.id.clone()))
-                .collect();
-
-            Ok(task_list)
-        }
-        RemoteType::Pbs => {
-            let params = pbs_client::ListTasks {
-                since: Some(since),
-                // If `limit` is not provided, we only receive 50 tasks
-                limit: Some(MAX_TASKS_TO_FETCH),
-            };
-
-            let client = connection::make_pbs_client(&remote)?;
-
-            let task_list = client
-                .get_task_list(params)
-                .await?
-                .into_iter()
-                .filter_map(|task| {
-                    if task.endtime.is_some() {
-                        // We only care about finished tasks.
-                        Some(map_pbs_task(task, remote.id.clone()))
-                    } else {
-                        None
-                    }
-                })
-                .collect();
-
-            Ok(task_list)
-        }
-    }
-}
-
-/// Return all remotes from the given config.
-fn get_all_remotes(remote_config: &SectionConfigData<Remote>) -> Vec<Remote> {
-    remote_config
-        .into_iter()
-        .map(|(_, section)| section)
-        .cloned()
-        .collect()
-}
-
-/// Return all remotes that correspond to a list of finished tasks.
-fn get_remotes_with_finished_tasks(
-    remote_config: &SectionConfigData<Remote>,
-    poll_results: &HashMap<RemoteUpid, PollResult>,
-) -> Vec<Remote> {
-    let remotes_with_finished_tasks: HashSet<&str> = poll_results
-        .iter()
-        .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote()))
-        .collect();
-
-    remote_config
-        .into_iter()
-        .filter_map(|(name, remote)| {
-            remotes_with_finished_tasks
-                .contains(&name)
-                .then_some(remote)
-        })
-        .cloned()
-        .collect()
-}
-
-/// Rotate the task cache if necessary.
-///
-/// Returns Ok(true) the cache's files were rotated.
-async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
-    tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await?
-}
-
-/// Apply the task cache journal.
-async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
-    tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
-}
-
-/// Get a list of active tasks.
-async fn get_active_tasks(cache: TaskCache) -> Result<Vec<RemoteUpid>, Error> {
-    Ok(tokio::task::spawn_blocking(move || {
-        let tasks: Vec<RemoteUpid> = cache
-            .read()?
-            .get_tasks(GetTasks::Active)?
-            .map(|t| t.upid)
-            .collect();
-
-        Ok::<Vec<RemoteUpid>, Error>(tasks)
-    })
-    .await??)
-}
-
-#[derive(PartialEq, Debug)]
-/// Outcome from polling a tracked task.
-enum PollResult {
-    /// Tasks is still running.
-    Running,
-    /// Task is finished, poll remote tasks to get final status/endtime.
-    Finished,
-    /// Should be dropped from the active file.
-    RequestError,
-    /// Remote does not exist any more -> remove immediately from tracked task list.
-    RemoteGone,
-}
-
-/// Poll all tracked tasks.
-async fn poll_tracked_tasks(
-    remote_config: &SectionConfigData<Remote>,
-    tracked_tasks: impl Iterator<Item = &RemoteUpid>,
-    total_connections_semaphore: Arc<Semaphore>,
-) -> Result<HashMap<RemoteUpid, PollResult>, Error> {
-    let mut join_set = JoinSet::new();
-
-    for task in tracked_tasks.cloned() {
-        let permit = Arc::clone(&total_connections_semaphore)
-            .acquire_owned()
-            .await
-            .unwrap();
-
-        let remote = remote_config.get(task.remote()).cloned();
-
-        join_set.spawn(async move {
-            // Move permit into this async block.
-            let _permit = permit;
-
-            match remote {
-                Some(remote) => poll_single_tracked_task(remote, task).await,
-                None => {
-                    log::info!(
-                        "remote {} does not exist any more, dropping tracked task",
-                        task.remote()
-                    );
-                    (task, PollResult::RemoteGone)
-                }
-            }
-        });
-    }
-
-    let mut results = HashMap::new();
-    while let Some(task_result) = join_set.join_next().await {
-        let (upid, result) = task_result?;
-        results.insert(upid, result);
-    }
-
-    Ok(results)
-}
-
-/// Poll a single tracked task.
-async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) {
-    match remote.ty {
-        RemoteType::Pve => {
-            log::debug!("polling tracked task {}", task);
-
-            let status = match server::api::pve::tasks::get_task_status(
-                remote.id.clone(),
-                task.clone(),
-                false,
-            )
-            .await
-            {
-                Ok(status) => status,
-                Err(err) => {
-                    log::error!("could not get status from remote: {err:#}");
-                    return (task, PollResult::RequestError);
-                }
-            };
-
-            let result = if status.exitstatus.is_some() {
-                PollResult::Finished
-            } else {
-                PollResult::Running
-            };
-
-            (task, result)
-        }
-        RemoteType::Pbs => {
-            let status = match server::api::pbs::tasks::get_task_status(
-                remote.id.clone(),
-                task.clone(),
-                false,
-            )
-            .await
-            {
-                Ok(status) => status,
-                Err(err) => {
-                    log::error!("could not get status from remote: {err:#}");
-                    return (task, PollResult::RequestError);
-                }
-            };
-
-            let result = if status.exitstatus.is_some() {
-                PollResult::Finished
-            } else {
-                PollResult::Running
-            };
-
-            (task, result)
-        }
-    }
-}
-
-/// Map a `pve_api_types::ListTasksResponse` to `TaskCacheItem`
-fn map_pve_task(task: pve_api_types::ListTasksResponse, remote: String) -> TaskCacheItem {
-    let remote_upid = RemoteUpid::new(remote, RemoteType::Pve, task.upid);
-
-    TaskCacheItem {
-        upid: remote_upid,
-        starttime: task.starttime,
-        endtime: task.endtime,
-        status: task.status,
-    }
-}
-
-/// Map a `pbs_api_types::TaskListItem` to `TaskCacheItem`
-fn map_pbs_task(task: pbs_api_types::TaskListItem, remote: String) -> TaskCacheItem {
-    let remote_upid = RemoteUpid::new(remote, RemoteType::Pbs, task.upid);
-
-    TaskCacheItem {
-        upid: remote_upid,
-        starttime: task.starttime,
-        endtime: task.endtime,
-        status: task.status,
-    }
-}
-
-/// Update task cache with results from tracked task polling & regular task fetching.
-async fn update_task_cache(
-    cache: TaskCache,
-    new_tasks: Vec<TaskCacheItem>,
-    update_state_for_remote: NodeFetchSuccessMap,
-    poll_results: HashMap<RemoteUpid, PollResult>,
-) -> Result<(), Error> {
-    tokio::task::spawn_blocking(move || {
-        let drop_tracked = poll_results
-            .into_iter()
-            .filter_map(|(upid, result)| match result {
-                PollResult::Running => None,
-                PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => {
-                    Some(upid)
-                }
-            })
-            .collect();
-
-        cache
-            .write()?
-            .update(new_tasks, &update_state_for_remote, drop_tracked)?;
-
-        Ok(())
-    })
-    .await?
-}
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index b080811f..50ac6708 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -5,6 +5,7 @@ use anyhow::Error;
 use pdm_api_types::{NativeUpid, RemoteUpid, TaskFilters, TaskListItem, TaskStateType};
 use pve_api_types::PveUpid;
 
+pub mod refresh_task;
 pub mod task_cache;
 
 use task_cache::{GetTasks, TaskCache, TaskCacheItem};
diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs
new file mode 100644
index 00000000..0e8ed345
--- /dev/null
+++ b/server/src/remote_tasks/refresh_task.rs
@@ -0,0 +1,530 @@
+use std::{
+    collections::{HashMap, HashSet},
+    sync::Arc,
+    time::{Duration, Instant},
+};
+
+use anyhow::Error;
+use pdm_api_types::{
+    remotes::{Remote, RemoteType},
+    RemoteUpid,
+};
+use proxmox_section_config::typed::SectionConfigData;
+use tokio::{sync::Semaphore, task::JoinSet};
+
+use crate::{
+    api, connection,
+    parallel_fetcher::{NodeResults, ParallelFetcher},
+    pbs_client,
+    remote_tasks::{
+        task_cache::{GetTasks, NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
+        KEEP_OLD_FILES, ROTATE_AFTER,
+    },
+};
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600);
+
+/// Interval in seconds at which we poll active tasks. This only really affects 'foreign' (as in,
+/// not started by PDM) tasks. Tasks which were started by PDM are always 'tracked' and therefore
+/// polled at the interval set in [`POLL_INTERVAL`].
+// NOTE: Since we at the moment never query active tasks from remotes, this is merely a safeguard
+// to clear stuck active tasks from a previous bug. If we at some point query active tasks, we
+// might lower this interval.
+const POLL_ACTIVE_INTERVAL: Duration = Duration::from_secs(600);
+
+/// Interval at which to check for task cache rotation.
+const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
+
+/// Interval at which the task cache journal should be applied.
+///
+/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes.
+/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing
+/// it will be slower than the task archive itself, as the entire journal must be loaded into
+/// memory and then sorted by task starttime. Applying the journal more often might
+/// lead to more writes, but should yield better performance.
+const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600);
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+
+/// Maximum number of total concurrent connections.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
+/// (Ephemeral) Remote task fetching task state.
+pub struct TaskState {
+    /// Time at which we last checked for archive rotation.
+    last_rotate_check: Instant,
+    /// Time at which we fetch tasks the last time.
+    last_fetch: Instant,
+    /// Time at which we last applied the journal.
+    last_journal_apply: Instant,
+    /// Time at which we polled active tasks. This is done to ensure that
+    /// active tasks are never stuck in the 'active' state
+    last_active_poll: Instant,
+}
+
+impl TaskState {
+    pub fn new() -> Self {
+        let now = Instant::now();
+
+        Self {
+            last_rotate_check: now - CHECK_ROTATE_INTERVAL,
+            last_fetch: now - TASK_FETCH_INTERVAL,
+            last_journal_apply: now - APPLY_JOURNAL_INTERVAL,
+            last_active_poll: now - POLL_ACTIVE_INTERVAL,
+        }
+    }
+
+    /// Reset the task archive rotation timestamp.
+    fn reset_rotate_check(&mut self) {
+        self.last_rotate_check = Instant::now();
+    }
+
+    /// Reset the task fetch timestamp.
+    fn reset_fetch(&mut self) {
+        self.last_fetch = Instant::now();
+    }
+
+    /// Reset the journal apply timestamp.
+    fn reset_journal_apply(&mut self) {
+        self.last_journal_apply = Instant::now();
+    }
+
+    /// Reset the journal apply timestamp.
+    fn reset_active_poll(&mut self) {
+        self.last_active_poll = Instant::now();
+    }
+
+    /// Should we check for archive rotation?
+    fn is_due_for_rotate_check(&self) -> bool {
+        Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL
+    }
+
+    /// Should we fetch tasks?
+    fn is_due_for_fetch(&self) -> bool {
+        Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL
+    }
+
+    /// Should we apply the task archive's journal?
+    fn is_due_for_journal_apply(&self) -> bool {
+        Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL
+    }
+
+    /// Should we poll active tasks?
+    fn is_due_for_active_poll(&self) -> bool {
+        Instant::now().duration_since(self.last_active_poll) > POLL_ACTIVE_INTERVAL
+    }
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> {
+    let cache = super::get_cache()?;
+
+    if task_state.is_due_for_rotate_check() {
+        log::debug!("checking if remote task archive should be rotated");
+        if rotate_cache(cache.clone()).await? {
+            log::info!("rotated remote task archive");
+        }
+
+        task_state.reset_rotate_check();
+    }
+
+    if task_state.is_due_for_journal_apply() {
+        apply_journal(cache.clone()).await?;
+        task_state.reset_journal_apply();
+    }
+
+    let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+    let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+
+    let cache_state = cache.read_state();
+
+    let poll_results = if task_state.is_due_for_active_poll() {
+        let mut tasks_to_poll: HashSet<RemoteUpid> =
+            HashSet::from_iter(cache_state.tracked_tasks().cloned());
+
+        let active_tasks = get_active_tasks(cache.clone()).await?;
+        tasks_to_poll.extend(active_tasks.into_iter());
+
+        let poll_results = poll_tracked_tasks(
+            &remote_config,
+            tasks_to_poll.iter(),
+            Arc::clone(&total_connections_semaphore),
+        )
+        .await?;
+
+        task_state.reset_active_poll();
+
+        poll_results
+    } else {
+        poll_tracked_tasks(
+            &remote_config,
+            cache_state.tracked_tasks(),
+            Arc::clone(&total_connections_semaphore),
+        )
+        .await?
+    };
+
+    // Get a list of remotes that we should poll in this cycle.
+    let remotes = if task_state.is_due_for_fetch() {
+        task_state.reset_fetch();
+        get_all_remotes(&remote_config)
+    } else {
+        get_remotes_with_finished_tasks(&remote_config, &poll_results)
+    };
+
+    let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
+
+    if !all_tasks.is_empty()
+        || poll_results
+            .iter()
+            .any(|(_, result)| matches!(result, PollResult::RemoteGone | PollResult::RequestError))
+    {
+        update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
+    }
+
+    Ok(())
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+pub async fn init_cache() -> Result<(), Error> {
+    tokio::task::spawn_blocking(|| {
+        let cache = super::get_cache()?;
+        cache.write()?.init(proxmox_time::epoch_i64())?;
+        Ok(())
+    })
+    .await?
+}
+
+/// Fetch tasks from a list of remotes.
+///
+/// Returns a list of tasks and a map that shows whether we want to update the
+/// cutoff timestamp in the statefile. We don't want to update the cutoff if
+/// the connection to one remote failed or if we could not reach all remotes in a cluster.
+async fn fetch_remotes(
+    remotes: Vec<Remote>,
+    cache_state: Arc<State>,
+) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
+    let fetcher = ParallelFetcher {
+        max_connections: MAX_CONNECTIONS,
+        max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
+        context: cache_state,
+    };
+
+    let fetch_results = fetcher
+        .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
+        .await;
+
+    let mut all_tasks = Vec::new();
+    let mut node_success_map = NodeFetchSuccessMap::default();
+
+    for (remote_name, result) in fetch_results.remote_results {
+        match result {
+            Ok(remote_result) => {
+                for (node_name, node_result) in remote_result.node_results {
+                    match node_result {
+                        Ok(NodeResults { data, .. }) => {
+                            all_tasks.extend(data);
+                            node_success_map.set_node_success(remote_name.clone(), node_name);
+                        }
+                        Err(err) => {
+                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
+                        }
+                    }
+                }
+            }
+            Err(err) => {
+                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
+            }
+        }
+    }
+
+    (all_tasks, node_success_map)
+}
+
+async fn fetch_tasks_from_single_node(
+    context: Arc<State>,
+    remote: Remote,
+    node: String,
+) -> Result<Vec<TaskCacheItem>, Error> {
+    let since = context
+        .cutoff_timestamp(&remote.id, &node)
+        .unwrap_or_else(|| {
+            proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
+        });
+
+    match remote.ty {
+        RemoteType::Pve => {
+            let params = pve_api_types::ListTasks {
+                source: Some(pve_api_types::ListTasksSource::Archive),
+                since: Some(since),
+                // If `limit` is not provided, we only receive 50 tasks
+                limit: Some(MAX_TASKS_TO_FETCH),
+                ..Default::default()
+            };
+
+            let client = connection::make_pve_client(&remote)?;
+
+            let task_list = client
+                .get_task_list(&node, params)
+                .await?
+                .into_iter()
+                .map(|task| map_pve_task(task, remote.id.clone()))
+                .collect();
+
+            Ok(task_list)
+        }
+        RemoteType::Pbs => {
+            let params = pbs_client::ListTasks {
+                since: Some(since),
+                // If `limit` is not provided, we only receive 50 tasks
+                limit: Some(MAX_TASKS_TO_FETCH),
+            };
+
+            let client = connection::make_pbs_client(&remote)?;
+
+            let task_list = client
+                .get_task_list(params)
+                .await?
+                .into_iter()
+                .filter_map(|task| {
+                    if task.endtime.is_some() {
+                        // We only care about finished tasks.
+                        Some(map_pbs_task(task, remote.id.clone()))
+                    } else {
+                        None
+                    }
+                })
+                .collect();
+
+            Ok(task_list)
+        }
+    }
+}
+
+/// Return all remotes from the given config.
+fn get_all_remotes(remote_config: &SectionConfigData<Remote>) -> Vec<Remote> {
+    remote_config
+        .into_iter()
+        .map(|(_, section)| section)
+        .cloned()
+        .collect()
+}
+
+/// Return all remotes that correspond to a list of finished tasks.
+fn get_remotes_with_finished_tasks(
+    remote_config: &SectionConfigData<Remote>,
+    poll_results: &HashMap<RemoteUpid, PollResult>,
+) -> Vec<Remote> {
+    let remotes_with_finished_tasks: HashSet<&str> = poll_results
+        .iter()
+        .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote()))
+        .collect();
+
+    remote_config
+        .into_iter()
+        .filter_map(|(name, remote)| {
+            remotes_with_finished_tasks
+                .contains(&name)
+                .then_some(remote)
+        })
+        .cloned()
+        .collect()
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+    tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await?
+}
+
+/// Apply the task cache journal.
+async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
+}
+
+/// Get a list of active tasks.
+async fn get_active_tasks(cache: TaskCache) -> Result<Vec<RemoteUpid>, Error> {
+    Ok(tokio::task::spawn_blocking(move || {
+        let tasks: Vec<RemoteUpid> = cache
+            .read()?
+            .get_tasks(GetTasks::Active)?
+            .map(|t| t.upid)
+            .collect();
+
+        Ok::<Vec<RemoteUpid>, Error>(tasks)
+    })
+    .await??)
+}
+
+#[derive(PartialEq, Debug)]
+/// Outcome from polling a tracked task.
+enum PollResult {
+    /// Tasks is still running.
+    Running,
+    /// Task is finished, poll remote tasks to get final status/endtime.
+    Finished,
+    /// Should be dropped from the active file.
+    RequestError,
+    /// Remote does not exist any more -> remove immediately from tracked task list.
+    RemoteGone,
+}
+
+/// Poll all tracked tasks.
+async fn poll_tracked_tasks(
+    remote_config: &SectionConfigData<Remote>,
+    tracked_tasks: impl Iterator<Item = &RemoteUpid>,
+    total_connections_semaphore: Arc<Semaphore>,
+) -> Result<HashMap<RemoteUpid, PollResult>, Error> {
+    let mut join_set = JoinSet::new();
+
+    for task in tracked_tasks.cloned() {
+        let permit = Arc::clone(&total_connections_semaphore)
+            .acquire_owned()
+            .await
+            .unwrap();
+
+        let remote = remote_config.get(task.remote()).cloned();
+
+        join_set.spawn(async move {
+            // Move permit into this async block.
+            let _permit = permit;
+
+            match remote {
+                Some(remote) => poll_single_tracked_task(remote, task).await,
+                None => {
+                    log::info!(
+                        "remote {} does not exist any more, dropping tracked task",
+                        task.remote()
+                    );
+                    (task, PollResult::RemoteGone)
+                }
+            }
+        });
+    }
+
+    let mut results = HashMap::new();
+    while let Some(task_result) = join_set.join_next().await {
+        let (upid, result) = task_result?;
+        results.insert(upid, result);
+    }
+
+    Ok(results)
+}
+
+/// Poll a single tracked task.
+async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) {
+    match remote.ty {
+        RemoteType::Pve => {
+            log::debug!("polling tracked task {}", task);
+
+            let status = match api::pve::tasks::get_task_status(
+                remote.id.clone(),
+                task.clone(),
+                false,
+            )
+            .await
+            {
+                Ok(status) => status,
+                Err(err) => {
+                    log::error!("could not get status from remote: {err:#}");
+                    return (task, PollResult::RequestError);
+                }
+            };
+
+            let result = if status.exitstatus.is_some() {
+                PollResult::Finished
+            } else {
+                PollResult::Running
+            };
+
+            (task, result)
+        }
+        RemoteType::Pbs => {
+            let status = match api::pbs::tasks::get_task_status(
+                remote.id.clone(),
+                task.clone(),
+                false,
+            )
+            .await
+            {
+                Ok(status) => status,
+                Err(err) => {
+                    log::error!("could not get status from remote: {err:#}");
+                    return (task, PollResult::RequestError);
+                }
+            };
+
+            let result = if status.exitstatus.is_some() {
+                PollResult::Finished
+            } else {
+                PollResult::Running
+            };
+
+            (task, result)
+        }
+    }
+}
+
+/// Map a `pve_api_types::ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: pve_api_types::ListTasksResponse, remote: String) -> TaskCacheItem {
+    let remote_upid = RemoteUpid::new(remote, RemoteType::Pve, task.upid);
+
+    TaskCacheItem {
+        upid: remote_upid,
+        starttime: task.starttime,
+        endtime: task.endtime,
+        status: task.status,
+    }
+}
+
+/// Map a `pbs_api_types::TaskListItem` to `TaskCacheItem`
+fn map_pbs_task(task: pbs_api_types::TaskListItem, remote: String) -> TaskCacheItem {
+    let remote_upid = RemoteUpid::new(remote, RemoteType::Pbs, task.upid);
+
+    TaskCacheItem {
+        upid: remote_upid,
+        starttime: task.starttime,
+        endtime: task.endtime,
+        status: task.status,
+    }
+}
+
+/// Update task cache with results from tracked task polling & regular task fetching.
+async fn update_task_cache(
+    cache: TaskCache,
+    new_tasks: Vec<TaskCacheItem>,
+    update_state_for_remote: NodeFetchSuccessMap,
+    poll_results: HashMap<RemoteUpid, PollResult>,
+) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || {
+        let drop_tracked = poll_results
+            .into_iter()
+            .filter_map(|(upid, result)| match result {
+                PollResult::Running => None,
+                PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => {
+                    Some(upid)
+                }
+            })
+            .collect();
+
+        cache
+            .write()?
+            .update(new_tasks, &update_state_for_remote, drop_tracked)?;
+
+        Ok(())
+    })
+    .await?
+}
-- 
2.47.3



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  parent reply	other threads:[~2025-12-16 13:15 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-12-16 13:15 [pdm-devel] [PATCH-SERIES 0/6] manual refresh button for remote task cache Lukas Wagner
2025-12-16 13:15 ` [pdm-devel] [PATCH proxmox-yew-comp 1/1] task list: add support for starting refresh tasks Lukas Wagner
2025-12-16 13:15 ` Lukas Wagner [this message]
2025-12-16 13:15 ` [pdm-devel] [PATCH datacenter-manager 2/5] remote tasks: implement `refresh_task_cache` for manual task fetching Lukas Wagner
2025-12-16 13:15 ` [pdm-devel] [PATCH datacenter-manager 3/5] api: add /remotes/tasks/refresh Lukas Wagner
2025-12-16 13:15 ` [pdm-devel] [PATCH datacenter-manager 4/5] ui: remote task view: set refresh_task_url property for task viewer Lukas Wagner
2025-12-16 13:15 ` [pdm-devel] [PATCH datacenter-manager 5/5] ui: register task descriptions for some native PDM tasks Lukas Wagner
2025-12-17 14:37 ` [pdm-devel] [PATCH-SERIES 0/6] manual refresh button for remote task cache Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20251216131518.241022-3-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal