From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pdm-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 41D191FF164
	for <inbox@lore.proxmox.com>; Fri, 14 Mar 2025 15:13:53 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id DC4FF5686;
	Fri, 14 Mar 2025 15:13:44 +0100 (CET)
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Date: Fri, 14 Mar 2025 15:12:22 +0100
Message-Id: <20250314141225.240768-6-l.wagner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20250314141225.240768-1-l.wagner@proxmox.com>
References: <20250314141225.240768-1-l.wagner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL -0.140 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 POISEN_SPAM_PILL          0.1 Meta: its spam
 POISEN_SPAM_PILL_1        0.1 random spam to be learned in bayes
 POISEN_SPAM_PILL_3        0.1 random spam to be learned in bayes
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks:
 add background task for task polling, use new task cache
X-BeenThere: pdm-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Datacenter Manager development discussion
 <pdm-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, 
 <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/>
List-Post: <mailto:pdm-devel@lists.proxmox.com>
List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, 
 <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Datacenter Manager development discussion
 <pdm-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pdm-devel-bounces@lists.proxmox.com
Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com>

This commits changes the remote task module as follows:

- Add a new background task for regular polling of task data
Instead of triggering fetching of task data from the `get_tasks` function,
which is usually called by an API handler, we move the fetching to a
new background task. The task fetches the latest tasks from all remotes
and stores them in the task cache in regular intervals (10 minutes).
The `get_tasks` function itself only reads from the cache.
The main rationale for this change is that for large setups, fetching
tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
each with a >100ms connection - adds up to minutes quickly).
If we do this from within `get_tasks`, the API handler calling the
function is also blocked for the entire time.
The `get_tasks` API is called every couple of seconds by the UI the get
a list of running remote tasks, so this *must* be quick.

- Tracked tasks are also polled in the same background task, but with
a short polling delay (10 seconds). Instead of polling the status specific
tracked task UPID, we simply fetch *all* tasks since the tracked task started.
While this increased the amount of transmitted data a bit for tracked tasks
that run for a very long time, this new approach make the whole
task tracking functionality much more elegant; it integrates better with
the 'regular' task fetching which happens in long intervals.

- Tasks are now stored in the new improved task cache implementation.
This should make retrieving tasks much quicker and avoids
unneeded disk IO.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Max Carrara <m.carrara@proxmox.com>
---
 server/src/api/pve/lxc.rs                |  10 +-
 server/src/api/pve/mod.rs                |   4 +-
 server/src/api/pve/qemu.rs               |   6 +-
 server/src/api/remote_tasks.rs           |  11 +-
 server/src/bin/proxmox-datacenter-api.rs |   3 +-
 server/src/remote_tasks/mod.rs           | 788 +++++++++++------------
 6 files changed, 388 insertions(+), 434 deletions(-)

diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
index f1c31425..83f9f4aa 100644
--- a/server/src/api/pve/lxc.rs
+++ b/server/src/api/pve/lxc.rs
@@ -209,7 +209,7 @@ pub async fn lxc_start(
 
     let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -242,7 +242,7 @@ pub async fn lxc_stop(
 
     let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
         .shutdown_lxc_async(&node, vmid, Default::default())
         .await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -357,7 +357,7 @@ pub async fn lxc_migrate(
     };
     let upid = pve.migrate_lxc(&node, vmid, params).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
     log::info!("migrating vm {vmid} of node {node:?}");
     let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
 
-    new_remote_upid(source, upid)
+    new_remote_upid(source, upid).await
 }
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index c328675a..a351ad69 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -74,9 +74,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
 const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
 
 // converts a remote + PveUpid into a RemoteUpid and starts tracking it
-fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
+async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
     let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
-    remote_tasks::track_running_task(remote_upid.clone());
+    remote_tasks::track_running_task(remote_upid.clone()).await?;
     Ok(remote_upid)
 }
 
diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
index dea0550c..54e310d2 100644
--- a/server/src/api/pve/qemu.rs
+++ b/server/src/api/pve/qemu.rs
@@ -216,7 +216,7 @@ pub async fn qemu_start(
         .start_qemu_async(&node, vmid, Default::default())
         .await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -376,7 +376,7 @@ pub async fn qemu_migrate(
     };
     let upid = pve.migrate_qemu(&node, vmid, params).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -563,5 +563,5 @@ pub async fn qemu_remote_migrate(
     log::info!("migrating vm {vmid} of node {node:?}");
     let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
 
-    new_remote_upid(source, upid)
+    new_remote_upid(source, upid).await
 }
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index e629000c..05ce3666 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
     },
     input: {
         properties: {
-            "max-age": {
-                type: Integer,
-                optional: true,
-                // TODO: sensible default max-age
-                default: 300,
-                description: "Maximum age of cached task data",
-            },
             filters: {
                 type: TaskFilters,
                 flatten: true,
@@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
     },
 )]
 /// Get the list of tasks for all remotes.
-async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
-    let tasks = remote_tasks::get_tasks(max_age, filters).await?;
+async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+    let tasks = remote_tasks::get_tasks(filters).await?;
 
     Ok(tasks)
 }
diff --git a/server/src/bin/proxmox-datacenter-api.rs b/server/src/bin/proxmox-datacenter-api.rs
index a79094d5..da39c85d 100644
--- a/server/src/bin/proxmox-datacenter-api.rs
+++ b/server/src/bin/proxmox-datacenter-api.rs
@@ -25,11 +25,11 @@ use pdm_buildcfg::configdir;
 use pdm_api_types::Authid;
 use proxmox_auth_api::api::assemble_csrf_prevention_token;
 
-use server::auth;
 use server::auth::csrf::csrf_secret;
 use server::metric_collection;
 use server::resource_cache;
 use server::task_utils;
+use server::{auth, remote_tasks};
 
 pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 5 * 60;
 
@@ -288,6 +288,7 @@ async fn run(debug: bool) -> Result<(), Error> {
     start_task_scheduler();
     metric_collection::start_task();
     resource_cache::start_task();
+    remote_tasks::start_task()?;
 
     server.await?;
     log::info!("server shutting down, waiting for active workers to complete");
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 2062f2b7..48d54694 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,65 +1,106 @@
-use std::{
-    collections::{HashMap, HashSet},
-    fs::File,
-    path::{Path, PathBuf},
-    sync::{LazyLock, RwLock},
-    time::Duration,
-};
+use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
 
-use anyhow::Error;
+use anyhow::{format_err, Error};
+use nix::sys::stat::Mode;
 use pdm_api_types::{
     remotes::{Remote, RemoteType},
     RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
 };
 use proxmox_sys::fs::CreateOptions;
 use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
-use tokio::task::JoinHandle;
+use task_cache::{AddTasks, GetTasks, State, TaskCache, TaskCacheItem};
+use tokio::{sync::Semaphore, task::JoinSet};
 
 use crate::{api::pve, task_utils};
 
 mod task_cache;
 
+const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
+
+const SECONDS_PER_MINUTE: u64 = 60;
+const MINUTES_PER_HOUR: u64 = 60;
+
+/// Tick rate for the remote task fetching task.
+/// This is also the rate at which we check on tracked tasks.
+const TICK_RATE_S: u64 = 10;
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const REGULAR_REFRESH_S: u64 = 10 * SECONDS_PER_MINUTE;
+/// Number of cycles until a regular refresh.
+const REGULAR_REFRESH_CYCLES: u64 = REGULAR_REFRESH_S / TICK_RATE_S;
+
+/// Check if we want to rotate once every hour.
+const CHECK_ROTATE_S: u64 = SECONDS_PER_MINUTE * MINUTES_PER_HOUR;
+/// Number of cycles before we want to check if we should rotate the task archives.
+const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_S / TICK_RATE_S;
+
+/// Rotate once the most recent archive file is at least 24 hour old.
+const ROTATE_AFTER_S: u64 = 24 * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
+
+/// Keep 7 days worth of tasks.
+const KEEP_OLD_FILES: u64 = 7;
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+/// Maximum number of total concurrent connections. `CONNECTIONS_PER_REMOTE` is taken into
+/// consideration when accounting for the total number of connections.
+/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_REMOTE` is 5, we can connect
+/// to 4 PVE remotes in parallel.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
 /// Get tasks for all remotes
 // FIXME: filter for privileges
-pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
-    let (remotes, _) = pdm_config::remotes::config()?;
+pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+    let cache = get_cache()?;
 
-    let mut all_tasks = Vec::new();
+    let mut returned_tasks = Vec::new();
 
-    let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
-    let mut cache = TaskCache::new(cache_path)?;
+    let which = if filters.running {
+        GetTasks::Active
+    } else {
+        GetTasks::All
+    };
 
-    // Force a refresh for all tasks of a remote if a task is finished.
-    // Not super nice, but saves us from persisting finished tasks. Also,
-    // the /nodes/<node>/tasks/<upid>/status endpoint does not return
-    // a task's endtime, which is only returned by
-    // /nodes/<node>/tasks...
-    // Room for improvements in the future.
-    invalidate_cache_for_finished_tasks(&mut cache);
+    for task in &mut cache
+        .get_tasks(which)?
+        .skip(filters.start as usize)
+        .take(filters.limit as usize)
+    {
+        let task = match task {
+            Ok(task) => task,
+            Err(err) => {
+                log::error!("could not read task from remote task cache, skipping: {err}");
+                continue;
+            }
+        };
 
-    for (remote_name, remote) in &remotes.sections {
-        let now = proxmox_time::epoch_i64();
-
-        if let Some(tasks) = cache.get_tasks(remote_name.as_str(), now, max_age) {
-            // Data in cache is recent enough and has not been invalidated.
-            all_tasks.extend(tasks);
-        } else {
-            let tasks = match fetch_tasks(remote).await {
-                Ok(tasks) => tasks,
-                Err(err) => {
-                    // ignore errors for not reachable remotes
-                    continue;
-                }
-            };
-            cache.set_tasks(remote_name.as_str(), tasks.clone(), now);
-
-            all_tasks.extend(tasks);
+        // TODO: Handle PBS tasks
+        let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
+        match pve_upid {
+            Ok(pve_upid) => {
+                returned_tasks.push(TaskListItem {
+                    upid: task.upid.to_string(),
+                    node: pve_upid.node,
+                    pid: pve_upid.pid as i64,
+                    pstart: pve_upid.pstart,
+                    starttime: pve_upid.starttime,
+                    worker_type: pve_upid.worker_type,
+                    worker_id: None,
+                    user: pve_upid.auth_id,
+                    endtime: task.endtime,
+                    status: task.status,
+                });
+            }
+            Err(err) => {
+                log::error!("could not parse UPID: {err}");
+            }
         }
     }
 
-    let mut returned_tasks = add_running_tasks(all_tasks)?;
-    returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
     let returned_tasks = returned_tasks
         .into_iter()
         .filter(|item| {
@@ -106,26 +147,228 @@ pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskLis
 
             true
         })
-        .skip(filters.start as usize)
-        .take(filters.limit as usize)
         .collect();
 
-    // We don't need to wait for this task to finish
-    tokio::task::spawn_blocking(move || {
-        if let Err(e) = cache.save() {
-            log::error!("could not save task cache: {e}");
-        }
-    });
-
     Ok(returned_tasks)
 }
 
+/// Insert a newly created tasks into the list of tracked tasks.
+///
+/// Any remote with associated tracked tasks will polled with a short interval
+/// until all tracked tasks have finished.
+pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || {
+        let cache = get_cache()?;
+        // TODO:: Handle PBS tasks correctly.
+        let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
+        let task = TaskCacheItem {
+            upid: task.clone(),
+            starttime: pve_upid.starttime,
+            status: None,
+            endtime: None,
+        };
+        cache.add_tracked_task(task)
+    })
+    .await?
+}
+
+/// Start the remote task fetching task
+pub fn start_task() -> Result<(), Error> {
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
+    let file_options = CreateOptions::new()
+        .owner(api_uid)
+        .group(api_gid)
+        .perm(Mode::from_bits_truncate(0o0750));
+    proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
+
+    tokio::spawn(async move {
+        let task_scheduler = std::pin::pin!(remote_task_fetching_task());
+        let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(task_scheduler, abort_future).await;
+    });
+
+    Ok(())
+}
+
+/// Task which handles fetching remote tasks and task archive rotation.
+/// This function never returns.
+async fn remote_task_fetching_task() -> ! {
+    let mut cycle = 0u64;
+    let mut interval = tokio::time::interval(Duration::from_secs(TICK_RATE_S));
+    interval.reset_at(task_utils::next_aligned_instant(TICK_RATE_S).into());
+
+    // We don't really care about catching up to missed tick, we just want
+    // a steady tick rate.
+    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
+    if let Err(err) = init_cache().await {
+        log::error!("error when initialized task cache: {err}");
+    }
+
+    loop {
+        interval.tick().await;
+        if let Err(err) = do_tick(cycle).await {
+            log::error!("error when fetching remote tasks: {err}");
+        }
+
+        // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
+        // better be safe and use .wrapping_add(1) :)
+        cycle = cycle.wrapping_add(1);
+    }
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
+/// spaced `ROTATE_AFTER_S` seconds apart.
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+async fn init_cache() -> Result<(), Error> {
+    tokio::task::spawn_blocking(|| {
+        let cache = get_cache()?;
+        cache.init(proxmox_time::epoch_i64(), KEEP_OLD_FILES, ROTATE_AFTER_S)?;
+        Ok(())
+    })
+    .await?
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+async fn do_tick(cycle: u64) -> Result<(), Error> {
+    let cache = get_cache()?;
+
+    if should_check_for_cache_rotation(cycle) {
+        log::debug!("checking if remote task archive should be rotated");
+        if rotate_cache(cache.clone()).await? {
+            log::info!("rotated remote task archive");
+        }
+    }
+
+    let state = cache.read_state();
+
+    let mut all_tasks = HashMap::new();
+
+    let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+    let mut join_set = JoinSet::new();
+
+    let remotes = remotes_to_check(cycle, &state).await?;
+    for remote in remotes {
+        let since = get_cutoff_timestamp(&remote, &state);
+
+        let permit = if remote.ty == RemoteType::Pve {
+            // Acquire multiple permits, for PVE remotes we want
+            // to multiple nodes in parallel.
+            //
+            // `.unwrap()` is safe, we never close the semaphore.
+            Arc::clone(&semaphore)
+                .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
+                .await
+                .unwrap()
+        } else {
+            // For PBS remotes we only have a single outgoing connection
+            //
+            // `.unwrap()` is safe, we never close the semaphore.
+            Arc::clone(&semaphore).acquire_owned().await.unwrap()
+        };
+
+        join_set.spawn(async move {
+            log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
+            let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
+                format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
+            });
+
+            drop(permit);
+            tasks
+        });
+    }
+
+    while let Some(res) = join_set.join_next().await {
+        match res {
+            Ok(Ok((remote, request))) => {
+                all_tasks.insert(remote, request);
+            }
+            Ok(Err(err)) => log::error!("{err}"),
+            Err(err) => log::error!("could not join task fetching future: {err}"),
+        }
+    }
+
+    if !all_tasks.is_empty() {
+        save_tasks(cache, all_tasks).await?;
+    }
+
+    Ok(())
+}
+
+/// Return list of remotes that are to be polled in this cycle.
+async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
+    let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+    let all = cycle % REGULAR_REFRESH_CYCLES == 0;
+
+    if all {
+        Ok(config.sections.into_values().collect())
+    } else {
+        Ok(config
+            .sections
+            .into_iter()
+            .filter_map(|(name, remote)| {
+                if let Some(tracked) = state.tracked_tasks.get(&name) {
+                    if !tracked.is_empty() {
+                        Some(remote)
+                    } else {
+                        None
+                    }
+                } else {
+                    None
+                }
+            })
+            .collect())
+    }
+}
+
+/// Get the timestamp from which on we should fetch tasks for a given remote.
+/// The returned timestamp is a UNIX timestamp (in seconds).
+fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
+    let oldest_active = state.oldest_active_task.get(&remote.id).copied();
+    let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
+
+    match (oldest_active, youngest_archived) {
+        (None, None) => 0,
+        (None, Some(youngest_archived)) => youngest_archived,
+        (Some(oldest_active), None) => oldest_active,
+        (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
+    }
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+    tokio::task::spawn_blocking(move || {
+        cache.rotate(proxmox_time::epoch_i64(), ROTATE_AFTER_S, KEEP_OLD_FILES)
+    })
+    .await?
+}
+
+/// Add newly fetched tasks to the cache.
+async fn save_tasks(cache: TaskCache, tasks: HashMap<String, AddTasks>) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || cache.add_tasks(tasks)).await?
+}
+
 /// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
+/// `since` is a UNIX timestamp (seconds).
+async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
     let mut tasks = Vec::new();
 
+    let mut all_successful = true;
+
     match remote.ty {
         RemoteType::Pve => {
+            let semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
+            let mut join_set = JoinSet::new();
+
             let client = pve::connect(remote)?;
 
             // N+1 requests - we could use /cluster/tasks, but that one
@@ -134,16 +377,53 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
                 let params = ListTasks {
                     // Include running tasks
                     source: Some(ListTasksSource::All),
-                    // TODO: How much task history do we want? Right now we just hard-code it
-                    // to 7 days.
-                    since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
+                    since: Some(since),
+                    // If `limit` is not provided, we only receive 50 tasks
+                    limit: Some(MAX_TASKS_TO_FETCH),
                     ..Default::default()
                 };
 
-                let list = client.get_task_list(&node.node, params).await?;
-                let mapped = map_tasks(list, &remote.id)?;
+                let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
 
-                tasks.extend(mapped);
+                let r = remote.clone();
+
+                join_set.spawn(async move {
+                    let client = pve::connect(&r)?;
+                    let task_list =
+                        client
+                            .get_task_list(&node.node, params)
+                            .await
+                            .map_err(|err| {
+                                format_err!("remote '{}', node '{}': {err}", r.id, node.node)
+                            })?;
+
+                    drop(permit);
+
+                    Ok::<Vec<_>, Error>(task_list)
+                });
+            }
+
+            while let Some(res) = join_set.join_next().await {
+                match res {
+                    Ok(Ok(list)) => {
+                        let mapped = list.into_iter().filter_map(|task| {
+                            match map_pve_task(task, &remote.id) {
+                                Ok(task) => Some(task),
+                                Err(err) => {
+                                    log::error!("could not map task data, skipping: {err}");
+                                    None
+                                }
+                            }
+                        });
+
+                        tasks.extend(mapped);
+                    }
+                    Ok(Err(err)) => {
+                        all_successful = false;
+                        log::error!("could not fetch tasks: {err:?}");
+                    }
+                    Err(err) => return Err(err.into()),
+                }
             }
         }
         RemoteType::Pbs => {
@@ -151,365 +431,45 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
         }
     }
 
-    Ok(tasks)
-}
-
-/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
-fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
-    let mut mapped = Vec::new();
-
-    for task in tasks {
-        let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
-
-        mapped.push(TaskListItem {
-            upid: remote_upid.to_string(),
-            node: task.node,
-            pid: task.pid,
-            pstart: task.pstart as u64,
-            starttime: task.starttime,
-            worker_type: task.ty,
-            worker_id: Some(task.id),
-            user: task.user,
-            endtime: task.endtime,
-            status: task.status,
-        })
-    }
-
-    Ok(mapped)
-}
-
-/// Drops the cached task list of a remote for all finished tasks.
-///
-/// We use this to force a refresh so that we get the full task
-/// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
-    let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
-
-    // If a task is finished, we force a refresh for the remote - otherwise
-    // we don't get the 'endtime' for the task.
-    for task in finished.drain() {
-        cache.invalidate_cache_for_remote(task.remote());
-    }
-}
-
-/// Supplement the list of tasks that we received from the remote with
-/// the tasks that were started by PDM and are currently running.
-fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
-    let mut returned_tasks = Vec::new();
-
-    let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
-    for task in cached_tasks {
-        let remote_upid = task.upid.parse()?;
-
-        if running_tasks.contains(&remote_upid) {
-            if task.endtime.is_some() {
-                // Task is finished but we still think it is running ->
-                // Drop it from RUNNING_FOREIGN_TASKS
-                running_tasks.remove(&remote_upid);
-
-                // No need to put it in FINISHED_TASKS, since we already
-                // got its state recently enough (we know the status and endtime)
-            }
-        } else {
-            returned_tasks.push(task);
-        }
-    }
-
-    for task in running_tasks.iter() {
-        let upid: PveUpid = task.upid.parse()?;
-        returned_tasks.push(TaskListItem {
-            upid: task.to_string(),
-            node: upid.node,
-            pid: upid.pid as i64,
-            pstart: upid.pstart,
-            starttime: upid.starttime,
-            worker_type: upid.worker_type,
-            worker_id: upid.worker_id,
-            user: upid.auth_id,
-            endtime: None,
-            status: None,
-        });
-    }
-
-    Ok(returned_tasks)
-}
-
-/// A cache for fetched remote tasks.
-struct TaskCache {
-    /// Cache entries
-    content: TaskCacheContent,
-
-    /// Entries that were added or updated - these will be persistet
-    /// when `save` is called.
-    new_or_updated: TaskCacheContent,
-
-    /// Cache entries were changed/removed.
-    dirty: bool,
-
-    /// File-location at which the cached tasks are stored.
-    cachefile_path: PathBuf,
-}
-
-impl TaskCache {
-    /// Create a new tasks cache instance by loading
-    /// the cache from disk.
-    fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
-        Ok(Self {
-            content: Self::load_content()?,
-            new_or_updated: Default::default(),
-            dirty: false,
-            cachefile_path,
-        })
-    }
-
-    /// Load the task cache contents from disk.
-    fn load_content() -> Result<TaskCacheContent, Error> {
-        let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
-        let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
-
-        let content = if let Some(content) = content {
-            serde_json::from_str(&content)?
-        } else {
-            Default::default()
-        };
-
-        Ok(content)
-    }
-
-    /// Get path for the cache's lockfile.
-    fn lockfile_path(&self) -> PathBuf {
-        let mut path = self.cachefile_path.clone();
-        path.set_extension("lock");
-        path
-    }
-
-    /// Persist the task cache
-    ///
-    /// This method requests an exclusive lock for the task cache lockfile.
-    fn save(&mut self) -> Result<(), Error> {
-        // if we have not updated anything, we don't have to update the cache file
-        if !self.dirty {
-            return Ok(());
-        }
-
-        let _guard = self.lock(Duration::from_secs(5))?;
-
-        // Read content again, in case somebody has changed it in the meanwhile
-        let mut content = Self::load_content()?;
-
-        for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
-            if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
-                // Only update entry if nobody else has updated it in the meanwhile
-                if existing_entry.timestamp < entry.timestamp {
-                    *existing_entry = entry;
-                }
-            } else {
-                content.remote_tasks.insert(remote_name, entry);
-            }
-        }
-
-        let bytes = serde_json::to_vec_pretty(&content)?;
-
-        let api_uid = pdm_config::api_user()?.uid;
-        let api_gid = pdm_config::api_group()?.gid;
-
-        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
-        proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
-
-        self.dirty = false;
-
-        Ok(())
-    }
-
-    // Update task data for a given remote.
-    fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
-        self.dirty = true;
-        self.new_or_updated
-            .remote_tasks
-            .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
-    }
-
-    // Get task data for a given remote.
-    fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
-        if let Some(entry) = self.content.remote_tasks.get(remote) {
-            if (entry.timestamp + max_age) < now {
-                return None;
-            }
-
-            Some(entry.tasks.clone())
-        } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
-            if (entry.timestamp + max_age) < now {
-                return None;
-            }
-            Some(entry.tasks.clone())
-        } else {
-            None
-        }
-    }
-
-    // Invalidate cache for a given remote.
-    fn invalidate_cache_for_remote(&mut self, remote: &str) {
-        self.dirty = true;
-        self.content.remote_tasks.remove(remote);
-    }
-
-    // Lock the cache for modification.
-    //
-    // While the cache is locked, other users can still read the cache
-    // without a lock, since the cache file is replaced atomically
-    // when updating.
-    fn lock(&self, duration: Duration) -> Result<File, Error> {
-        let api_uid = pdm_config::api_user()?.uid;
-        let api_gid = pdm_config::api_group()?.gid;
-
-        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-        proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
-    }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
-    timestamp: i64,
-    tasks: Vec<TaskListItem>,
-}
-
-#[derive(Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
-    remote_tasks: HashMap<String, TaskCacheEntry>,
-}
-
-/// Interval at which tracked tasks are polled
-const RUNNING_CHECK_INTERVAL_S: u64 = 10;
-
-/// Tasks which were started by PDM and are still running
-static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-/// Tasks which were started by PDM and w
-static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-
-fn init() -> RwLock<HashSet<RemoteUpid>> {
-    RwLock::new(HashSet::new())
-}
-
-/// Insert a remote UPID into the running list
-///
-/// If it is the first entry in the list, a background task is started to track its state
-///
-/// Returns the [`JoinHandle`] if a task was started.
-///
-/// panics on a poisoned mutex
-pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
-    let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
-
-    // the call inserting the first task in the list needs to start the checking task
-    let need_start_task = tasks.is_empty();
-    tasks.insert(task);
-
-    if !need_start_task {
-        return None;
-    }
-    drop(tasks);
-
-    Some(tokio::spawn(async move {
-        loop {
-            let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
-            tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
-            let finished_tasks = get_finished_tasks().await;
-
-            // skip iteration if we still have tasks, just not finished ones
-            if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
-                continue;
-            }
-
-            let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
-            // we either have finished tasks, or the running task list was empty
-            let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
-
-            for (upid, _status) in finished_tasks {
-                if set.remove(&upid) {
-                    finished.insert(upid);
-                } else {
-                    // someone else removed & persisted the task in the meantime
-                }
-            }
-
-            // if no task remains, end the current task
-            // it will be restarted by the next caller that inserts one
-            if set.is_empty() {
-                return;
-            }
-        }
-    }))
-}
-
-/// Get a list of running foreign tasks
-///
-/// panics on a poisoned mutex
-pub fn get_running_tasks() -> Vec<RemoteUpid> {
-    RUNNING_FOREIGN_TASKS
-        .read()
-        .unwrap()
-        .iter()
-        .cloned()
-        .collect()
-}
-
-/// Checks all current saved UPIDs if they're still running, and if not,
-/// returns their upids + status
-///
-/// panics on a poisoned mutex
-pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
-    let mut finished = Vec::new();
-    let config = match pdm_config::remotes::config() {
-        Ok((config, _)) => config,
-        Err(err) => {
-            log::error!("could not open remotes config: {err}");
-            return Vec::new();
-        }
+    let new_tasks = AddTasks {
+        update_most_recent_archive_timestamp: all_successful,
+        tasks,
     };
-    for task in get_running_tasks() {
-        match config.get(task.remote()) {
-            Some(remote) => match remote.ty {
-                RemoteType::Pve => {
-                    let status = match crate::api::pve::tasks::get_task_status(
-                        remote.id.clone(),
-                        task.clone(),
-                        false,
-                    )
-                    .await
-                    {
-                        Ok(status) => status,
-                        Err(err) => {
-                            log::error!("could not get status from remote: {err}");
-                            finished.push((task, "could not get status".to_string()));
-                            continue;
-                        }
-                    };
-                    if let Some(status) = status.exitstatus {
-                        finished.push((task, status.to_string()));
-                    }
-                }
-                RemoteType::Pbs => {
-                    let _client = match crate::pbs_client::connect(remote) {
-                        Ok(client) => client,
-                        Err(err) => {
-                            log::error!("could not get status from remote: {err}");
-                            finished.push((task, "could not get status".to_string()));
-                            continue;
-                        }
-                    };
-                    // FIXME implement get task status
-                    finished.push((task, "unknown state".to_string()));
-                }
-            },
-            None => finished.push((task, "unknown remote".to_string())),
-        }
-    }
 
-    finished
+    Ok((remote.id.clone(), new_tasks))
+}
+
+/// Check if we are due for checking for cache rotation.
+fn should_check_for_cache_rotation(cycle: u64) -> bool {
+    cycle % CHECK_ROTATE_CYCLES == 0
+}
+
+/// Get a new [`TaskCache`] instance.
+///
+/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
+/// update loop.
+fn get_cache() -> Result<TaskCache, Error> {
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
+
+    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+    let cache_path = Path::new(REMOTE_TASKS_DIR);
+    let cache = TaskCache::new(cache_path, file_options)?;
+
+    Ok(cache)
+}
+
+/// Map a `ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
+    let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
+
+    Ok(TaskCacheItem {
+        upid: remote_upid,
+        starttime: task.starttime,
+        endtime: task.endtime,
+        status: task.status,
+    })
 }
 
 /// Parses a task status string into a TaskStateType
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel