public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend
@ 2025-03-14 14:12 Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 1/8] test support: add NamedTempFile helper Lukas Wagner
                   ` (7 more replies)
  0 siblings, 8 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

The aim of this patch series is to greatly improve the performance of the
remote task cache for big PDM setups.

The inital, 'dumb' cache implementation had the following problems:
  1.) cache was populated as part of the `get_tasks` API, leading to hanging
    API calls while fetching task data from remotes
  2.) all tasks were stored in a single file, which was completely rewritten
    for any change to the cache's contents
  3.) The caching mechanism was pretty simple, using only a max-age mechanism,
    re-requesting all task data if max-age was exceeded

Now, these characteristics are not really problematic for *small* PDM setups
with only a couple of remotes. However, for big setups (e.g. 100 remotes,
each remote being a PVE cluster with 10 nodes), this completely falls apart:
  1.) fetching remote tasks takes considerable amount of time, especially
      on connections with a high latency. Since the data is requested
      from *within* the `get_tasks` function, which is called by the
      `remote-tasks/list` API handler, the API call is blocked until
     *all* task data is requested.
  2.) The single file approach leads to significant writes to the disk
  3.) Leads to unnecessary network IO, as we re-request data that we
      already have locally.

To rectify the situation, this series performs the following changes:

  - `get_tasks` never does any fetching, it only reads the most recent
    data from the cache
  - There is a new background task which periodically fetches tasks
    from all remotes (every 10mins at the moment). Only the latest
    missing tasks are requested, not the full task history as before
  - The new background task also takes over the 'tracked task' polling
    duty, where we fetch the status for any task started by PDM on
    a remote (short polling interval, 10s at the moment).
  - The task cache storage implementation has been completely overhauled
    and is now optimized for the most common accesses to the cache.
    It is also more storage efficient, occupying rougly 50% of the disk
    space for the same number of tasks (achieved by avoiding duplicate
    information in the files)
  - The size of the task cache is 'limited' by doing file rotation.
    We keep 7 days of task history.

For details on *how* the cache itself works, please refer to the full
commit message of
    remote tasks: implement improved cache for remote tasks

# Benchmarks

Finally, some concrete data to back up the claimed performance improvments.
The times were measured *inside* the `get_tasks` function and not at
the API level, so the times do not include JSON serialization and
data transfer.

Benchmarking was done using the 'fake-remote' feature. There were 100
remotes, 10 PVE nodes per remote. The task cache contained
about 1.5 million tasks.
                                               before        after
list of active tasks (*):                     ~1.3s          ~30µs
list of 500 tasks, offset 0 (**):             ~1.3s         ~500µs
list of 500 tasks, offset 1 million (***):    ~1.3s         ~200ms
Size on disk:                                 ~500MB        ~200MB

(*):  Requested by the UI every 3s
(**): Requested by the UI when visiting Remotes > Tasks
(***): E.g. when scrolling towars the bottom of 'Remotes > Tasks'

In the old implementation, the archive file was *always* fully deserialized
and loaded into RAM, this is the reason why the time needed is pretty
idential for all scenarios.
The new implementation reads the archive files only line by line,
and only 500 tasks were loaded into RAM at the same time. The higher the offset,
the more archive lines/files we have to scan, which increases the
time needed to access the data. The tasks are sorted descending
by starttime, as a result the requests get slower the further you
go back in history.

The 'before' times do NOT include the time needed for actually fetching
the task data.

This series was preseded by [1], however almost all of the code has changes, which
is the reason why I send this as a new series.

Note: 
I asked Max for feedback on this while it was still only available on my staff
repo. He kindly pointed out some smaller issues which are already fixed in this
first version on the list. He was okay with me adding his 'R-b' tags
right away.

[1] https://lore.proxmox.com/pdm-devel/20250128122520.167796-1-l.wagner@proxmox.com/

proxmox-datacenter-manager:

Lukas Wagner (8):
  test support: add NamedTempFile helper
  test support: add NamedTempDir helper
  move task_cache.rs to remote_tasks/mod.rs
  remote tasks: implement improved cache for remote tasks
  remote tasks: add background task for task polling, use new task cache
  pdm-api-types: remote tasks: implement From<&str> for TaskStateType
  fake remote: add missing fields to make the debug feature compile
    again
  fake remote: generate fake task data

 lib/pdm-api-types/src/lib.rs             |  15 +
 server/src/api/pve/lxc.rs                |  10 +-
 server/src/api/pve/mod.rs                |   6 +-
 server/src/api/pve/qemu.rs               |   6 +-
 server/src/api/remote_tasks.rs           |  13 +-
 server/src/bin/proxmox-datacenter-api.rs |   3 +-
 server/src/lib.rs                        |   4 +-
 server/src/remote_tasks/mod.rs           | 473 +++++++++++
 server/src/remote_tasks/task_cache.rs    | 964 +++++++++++++++++++++++
 server/src/task_cache.rs                 | 524 ------------
 server/src/test_support/fake_remote.rs   |  87 +-
 server/src/test_support/mod.rs           |   4 +
 server/src/test_support/temp.rs          |  60 ++
 13 files changed, 1619 insertions(+), 550 deletions(-)
 create mode 100644 server/src/remote_tasks/mod.rs
 create mode 100644 server/src/remote_tasks/task_cache.rs
 delete mode 100644 server/src/task_cache.rs
 create mode 100644 server/src/test_support/temp.rs


Summary over all repositories:
  13 files changed, 1619 insertions(+), 550 deletions(-)

-- 
Generated by git-murpp 0.8.0


_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pdm-devel] [PATCH proxmox-datacenter-manager 1/8] test support: add NamedTempFile helper
  2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
@ 2025-03-14 14:12 ` Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 2/8] test support: add NamedTempDir helper Lukas Wagner
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

This one is useful when writing tests, it automatically removes the
temporary file when dropped. The name was chosen because of the similar
NamedTempFile struct in the popular tempfile crate.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/lib.rs               |  2 +-
 server/src/test_support/mod.rs  |  4 ++++
 server/src/test_support/temp.rs | 33 +++++++++++++++++++++++++++++++++
 3 files changed, 38 insertions(+), 1 deletion(-)
 create mode 100644 server/src/test_support/temp.rs

diff --git a/server/src/lib.rs b/server/src/lib.rs
index 12dc912f..143ee32d 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -13,7 +13,7 @@ pub mod task_utils;
 pub mod connection;
 pub mod pbs_client;
 
-#[cfg(remote_config = "faked")]
+#[cfg(any(remote_config = "faked", test))]
 pub mod test_support;
 
 use anyhow::Error;
diff --git a/server/src/test_support/mod.rs b/server/src/test_support/mod.rs
index e54cd729..f026011c 100644
--- a/server/src/test_support/mod.rs
+++ b/server/src/test_support/mod.rs
@@ -1 +1,5 @@
+#[cfg(remote_config = "faked")]
 pub mod fake_remote;
+
+#[cfg(test)]
+pub mod temp;
diff --git a/server/src/test_support/temp.rs b/server/src/test_support/temp.rs
new file mode 100644
index 00000000..a3a6d59b
--- /dev/null
+++ b/server/src/test_support/temp.rs
@@ -0,0 +1,33 @@
+use std::path::{Path, PathBuf};
+
+use anyhow::Error;
+
+use proxmox_sys::fs::CreateOptions;
+
+/// Temporary file that be cleaned up when dropped.
+pub struct NamedTempFile {
+    path: PathBuf,
+}
+
+impl NamedTempFile {
+    /// Create a new temporary file.
+    ///
+    /// The file will be created with the passed [`CreateOptions`].
+    pub fn new(options: CreateOptions) -> Result<Self, Error> {
+        let base = std::env::temp_dir().join("test");
+        let (_, path) = proxmox_sys::fs::make_tmp_file(base, options)?;
+
+        Ok(Self { path })
+    }
+
+    /// Return the [`Path`] to the temporary file.
+    pub fn path(&self) -> &Path {
+        &self.path
+    }
+}
+
+impl Drop for NamedTempFile {
+    fn drop(&mut self) {
+        let _ = std::fs::remove_file(&self.path);
+    }
+}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pdm-devel] [PATCH proxmox-datacenter-manager 2/8] test support: add NamedTempDir helper
  2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 1/8] test support: add NamedTempFile helper Lukas Wagner
@ 2025-03-14 14:12 ` Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 3/8] move task_cache.rs to remote_tasks/mod.rs Lukas Wagner
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

This one is useful when writing tests, it automatically removes the
temporary directory when dropped.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/test_support/temp.rs | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/server/src/test_support/temp.rs b/server/src/test_support/temp.rs
index a3a6d59b..a93c914d 100644
--- a/server/src/test_support/temp.rs
+++ b/server/src/test_support/temp.rs
@@ -31,3 +31,30 @@ impl Drop for NamedTempFile {
         let _ = std::fs::remove_file(&self.path);
     }
 }
+
+/// Temporary directory that is cleaned up when dropped.
+pub struct NamedTempDir {
+    path: PathBuf,
+}
+
+impl NamedTempDir {
+    /// Create a new temporary directory.
+    ///
+    /// The directory will be created with `0o700` permissions.
+    pub fn new() -> Result<Self, Error> {
+        let path = proxmox_sys::fs::make_tmp_dir("/tmp", None)?;
+
+        Ok(Self { path })
+    }
+
+    /// Return the [`Path`] to the temporary directory.
+    pub fn path(&self) -> &Path {
+        &self.path
+    }
+}
+
+impl Drop for NamedTempDir {
+    fn drop(&mut self) {
+        let _ = std::fs::remove_dir_all(&self.path);
+    }
+}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pdm-devel] [PATCH proxmox-datacenter-manager 3/8] move task_cache.rs to remote_tasks/mod.rs
  2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 1/8] test support: add NamedTempFile helper Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 2/8] test support: add NamedTempDir helper Lukas Wagner
@ 2025-03-14 14:12 ` Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 4/8] remote tasks: implement improved cache for remote tasks Lukas Wagner
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/api/pve/mod.rs                         | 4 ++--
 server/src/api/remote_tasks.rs                    | 4 ++--
 server/src/lib.rs                                 | 2 +-
 server/src/{task_cache.rs => remote_tasks/mod.rs} | 0
 4 files changed, 5 insertions(+), 5 deletions(-)
 rename server/src/{task_cache.rs => remote_tasks/mod.rs} (100%)

diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index 2cefbb4b..c328675a 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -27,7 +27,7 @@ use pve_api_types::{
 
 use super::resources::{map_pve_lxc, map_pve_node, map_pve_qemu, map_pve_storage};
 
-use crate::{connection, task_cache};
+use crate::{connection, remote_tasks};
 
 mod lxc;
 mod node;
@@ -76,7 +76,7 @@ const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
 // converts a remote + PveUpid into a RemoteUpid and starts tracking it
 fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
     let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
-    task_cache::track_running_task(remote_upid.clone());
+    remote_tasks::track_running_task(remote_upid.clone());
     Ok(remote_upid)
 }
 
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index 57b59fdc..e629000c 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -4,7 +4,7 @@ use proxmox_router::{list_subdirs_api_method, Permission, Router, SubdirMap};
 use proxmox_schema::api;
 use proxmox_sortable_macro::sortable;
 
-use crate::task_cache;
+use crate::remote_tasks;
 
 pub const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
@@ -37,7 +37,7 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
 )]
 /// Get the list of tasks for all remotes.
 async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
-    let tasks = task_cache::get_tasks(max_age, filters).await?;
+    let tasks = remote_tasks::get_tasks(max_age, filters).await?;
 
     Ok(tasks)
 }
diff --git a/server/src/lib.rs b/server/src/lib.rs
index 143ee32d..4320e46a 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -6,8 +6,8 @@ pub mod auth;
 pub mod context;
 pub mod env;
 pub mod metric_collection;
+pub mod remote_tasks;
 pub mod resource_cache;
-pub mod task_cache;
 pub mod task_utils;
 
 pub mod connection;
diff --git a/server/src/task_cache.rs b/server/src/remote_tasks/mod.rs
similarity index 100%
rename from server/src/task_cache.rs
rename to server/src/remote_tasks/mod.rs
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pdm-devel] [PATCH proxmox-datacenter-manager 4/8] remote tasks: implement improved cache for remote tasks
  2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
                   ` (2 preceding siblings ...)
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 3/8] move task_cache.rs to remote_tasks/mod.rs Lukas Wagner
@ 2025-03-14 14:12 ` Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache Lukas Wagner
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

This commit adds a new implementation for a cache for remote tasks, one
that should improve performance characteristics in for pretty much all
use cases.

In general storage works pretty similar to the task archive we already
have for (local) PDM tasks.

root@pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l
total 40
-rw-r--r-- 1 www-data www-data     0 Mar 13 13:18 active
-rw-r--r-- 1 www-data www-data  1676 Mar 11 14:51 archive.1741355462
-rw-r--r-- 1 www-data www-data     0 Mar 11 14:51 archive.1741441862
-rw-r--r-- 1 www-data www-data  2538 Mar 11 14:51 archive.1741528262
-rw-r--r-- 1 www-data www-data  8428 Mar 11 15:07 archive.1741614662
-rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062
-rw-r--r-- 1 www-data www-data  3364 Mar 13 13:18 archive.1741788270
-rw-r--r-- 1 www-data www-data   287 Mar 13 13:18 state

Tasks are stored in the 'active' and multiple 'archive' files.
Running tasks are placed into the 'active' file, tasks that are finished
are persisted into one of the archive files.
The archive files are suffixed with a UNIX timestamp which serves
as a lower-bound for start times for tasks stored in this file.
Encoding this lower-bound in the file name instead of using a more
traditional increasing file index (.1, .2, .3) gives us the benefit
that we can decide in which file a newly arrived tasks belongs from
a single readdir call, without even reading the file itself.

The size of the entire archive can be controlled by doing file
'rotation', where the archive file with the oldest timestamp is deleted
and a new file with the current timestamp is added.
If 'rotation' happens at fixed intervals (e.g. once daily), this gives
us a configurable number of days of task history.
There is no direct cap for the total number
of tasks, but this could be easily added by checking the size of the
most recent archive file and by rotating early if a threshold is
exceeded.

The format inside the files is also similar to the local task archive,
with one task corresponding to one line in the file.
One key difference is that here each line is a JSON object, that is to
make it easier to add additional data later, if needed.
The JSON object contains the tasks UPID, status, endtime and starttime
(the starttime is technically also encoded in the UPID, but having it as
a separate value simplified a couple of things)
Each file is sorted by the task's start time, the youngest task coming first.

One key difference between this task cache and the local task archive is
that we need to handle tasks coming in out-of-order, e.g. if remotes
were not reachable for a certain time. To maintain the ordering
in the file, we have to merge the newly arrived tasks into the existing
task file. This was implemented in a way that avoids reading the entire
file into memory at once, exploiting the fact that the contents of the
existing file are already sorted. This allows to do a
zipper/merge-sort like approach (see MergeTaskIterator for
details). The existing file is only read line-by-line and finally
replaced atomically.

The cache also has a separate state file, containing additional
information, e.g. cut-off timestamps for the polling task.
Some of the data in the statefile is technically also contained in the
archive files, but reading the state file is much faster.

This commit also adds an elaborate suite of unit tests for this new
cache. While adding some additional work up front, they paid off themselves
during development quite quickly, since the overall approach for the
cache changed a couple of times. The test suite gave me confidence that
the design changes didn't screw anything up, catching a couple of bugs
in the process.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Max Carrara <m.carrara@proxmox.com>
---
 server/src/remote_tasks/mod.rs        |   2 +
 server/src/remote_tasks/task_cache.rs | 964 ++++++++++++++++++++++++++
 2 files changed, 966 insertions(+)
 create mode 100644 server/src/remote_tasks/task_cache.rs

diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 7ded5408..2062f2b7 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -18,6 +18,8 @@ use tokio::task::JoinHandle;
 
 use crate::{api::pve, task_utils};
 
+mod task_cache;
+
 /// Get tasks for all remotes
 // FIXME: filter for privileges
 pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
new file mode 100644
index 00000000..628a9a40
--- /dev/null
+++ b/server/src/remote_tasks/task_cache.rs
@@ -0,0 +1,964 @@
+//! Task cache implementation, based on rotating files.
+
+use std::{
+    cmp::Ordering,
+    collections::{HashMap, HashSet},
+    fs::File,
+    io::{BufRead, BufReader, BufWriter, Lines, Write},
+    iter::Peekable,
+    path::{Path, PathBuf},
+    time::Duration,
+};
+
+use anyhow::Error;
+use proxmox_sys::fs::CreateOptions;
+use serde::{Deserialize, Serialize};
+
+use pdm_api_types::RemoteUpid;
+
+/// Item which can be put into the task cache.
+#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
+pub struct TaskCacheItem {
+    /// The task's UPID
+    pub upid: RemoteUpid,
+    /// The time at which the task was started (seconds since the UNIX epoch).
+    /// Technically this is also contained within the UPID, duplicating it here
+    /// allows us to directly access it when sorting in new tasks, without having
+    /// to parse the UPID.
+    pub starttime: i64,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// The task's status.
+    pub status: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// The task's endtime (seconds since the UNIX epoch).
+    pub endtime: Option<i64>,
+}
+
+/// State needed for task polling.
+#[derive(Serialize, Deserialize, Default)]
+#[serde(rename_all = "kebab-case")]
+pub struct State {
+    /// Map of remote -> most recent task starttime (UNIX epoch) in the archive.
+    /// This can be used as a cut-off when requesting new task data.
+    pub most_recent_archive_starttime: HashMap<String, i64>,
+    /// Oldest running task. Useful as another cut-off for fetching tasks.
+    /// The timestamp is based on seconds since the UNIX epoch.
+    pub oldest_active_task: HashMap<String, i64>,
+    /// Tracked tasks per remote.
+    pub tracked_tasks: HashMap<String, HashSet<RemoteUpid>>,
+}
+
+/// Cache for remote tasks.
+#[derive(Clone)]
+pub struct TaskCache {
+    /// Path where the cache's files should be placed.
+    base_path: PathBuf,
+    /// File permissions for the cache's files.
+    create_options: CreateOptions,
+}
+
+/// Tasks that should be added to the cache via [`TaskCache::add_tasks`].
+pub struct AddTasks {
+    /// Update most recent archived task in state file.
+    pub update_most_recent_archive_timestamp: bool,
+    /// The tasks to add.
+    pub tasks: Vec<TaskCacheItem>,
+}
+
+/// Lock for the cache.
+#[allow(dead_code)]
+pub struct TaskCacheLock(File);
+
+/// Which tasks to fetch from the archive.
+pub enum GetTasks {
+    /// Get all tasks, finished and running.
+    All,
+    /// Only get running (active) tasks.
+    Active,
+    /// Only get finished (archived) tasks.
+    #[allow(dead_code)]
+    Archived,
+}
+
+impl TaskCache {
+    /// Create a new task cache instance.
+    ///
+    /// Remember to call `init` or `new_file` to create initial archive files before
+    /// adding any tasks.
+    pub fn new<P: AsRef<Path>>(path: P, create_options: CreateOptions) -> Result<Self, Error> {
+        Ok(Self {
+            base_path: path.as_ref().into(),
+            create_options,
+        })
+    }
+
+    /// Create initial task archives that can be backfilled with the
+    /// recent task history from a remote.
+    ///
+    /// This function only has an effect if there are no archive files yet.
+    pub fn init(&self, now: i64, number_of_files: u64, period_per_file: u64) -> Result<(), Error> {
+        let _lock = self.lock(true)?;
+
+        if self.archive_files()?.is_empty() {
+            for i in 0..number_of_files {
+                self.new_file(now - (i * period_per_file) as i64)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Start a new archive file with a given timestamp.
+    /// `now` is supposed to be a UNIX timestamp (seconds).
+    fn new_file(&self, now: i64) -> Result<(), Error> {
+        let new_path = self.base_path.join(format!("archive.{now}"));
+        let mut file = File::create(&new_path)?;
+        self.create_options.apply_to(&mut file, &new_path)?;
+
+        Ok(())
+    }
+
+    /// Lock the cache.
+    fn lock(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
+        let lockfile = self.base_path.join(".lock");
+
+        let fd = proxmox_sys::fs::open_file_locked(
+            lockfile,
+            Duration::from_secs(15),
+            exclusive,
+            self.create_options.clone(),
+        )?;
+
+        Ok(TaskCacheLock(fd))
+    }
+
+    /// Rotate task archive if the the newest archive file is older than `rotate_after`.
+    ///
+    /// The oldest archive files are removed if the total number of archive files exceeds
+    /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
+    ///
+    /// This function requests an exclusive lock, don't call if you already hold a lock.
+    pub fn rotate(&self, now: i64, rotate_after: u64, max_files: u64) -> Result<bool, Error> {
+        let _lock = self.lock(true)?;
+        let mut did_rotate = false;
+
+        let mut bounds = self.archive_files()?;
+
+        match bounds.first() {
+            Some(bound) => {
+                if now > bound.starttime && now - bound.starttime > rotate_after as i64 {
+                    self.new_file(now)?;
+                    did_rotate = true;
+                }
+            }
+            None => {
+                self.new_file(now)?;
+                did_rotate = true;
+            }
+        }
+
+        while bounds.len() >= max_files as usize {
+            // Unwrap is safe because of the length check above
+            let to_remove = bounds.pop().unwrap();
+            std::fs::remove_file(&to_remove.file)?;
+        }
+
+        Ok(did_rotate)
+    }
+
+    /// Add new tasks to the task archive.
+    ///
+    /// Running tasks (tasks without an endtime) are placed into the 'active' file in the
+    /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive
+    /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given
+    /// archive file. If a task which was added as running previously is added again, this time in
+    /// a finished state, it will be removed from the `active` file and also sorted into
+    /// one of the archive files.
+    /// Same goes for the list of tracked tasks; the entry in the state file will be removed.
+    ///
+    /// Crash consistency:
+    ///
+    /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the
+    /// end after all tasks have been added into the archive. Adding tasks is an idempotent
+    /// operation; adding the *same* task multiple times does not lead to duplicated entries in the
+    /// task archive. Individual archive files are updated atomically, but since
+    /// adding tasks can involve updating multiple archive files, the archive could end up
+    /// in a partially-updated, inconsistent state in case of a crash.
+    /// However, since the state file with the cut-off timestamps is updated last,
+    /// the consistency of the archive should be restored at the next update cycle of the archive.
+    pub fn add_tasks(&self, mut added: HashMap<String, AddTasks>) -> Result<(), Error> {
+        let lock = self.lock(true)?;
+
+        // Get a flat `Vec` of all new tasks
+        let tasks: Vec<_> = added
+            .iter_mut()
+            .flat_map(|(_, tasks)| std::mem::take(&mut tasks.tasks))
+            .collect();
+
+        let update_state_for_remote: HashMap<String, bool> = added
+            .into_iter()
+            .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
+            .collect();
+
+        let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?;
+
+        let mut active_tasks =
+            HashSet::from_iter(task_iter.flat_map(|active_task| match active_task {
+                Ok(task) => Some((task.upid, task.starttime)),
+                Err(err) => {
+                    log::error!("failed to read task cache entry from active file: {err}");
+                    None
+                }
+            }));
+
+        // Consume the iterator to get back the lock. The lock is held
+        // until _lock is finally dropped at the end of the function.
+        let _lock = task_iter.into_lock();
+
+        active_tasks.extend(
+            tasks
+                .iter()
+                .filter(|task| task.endtime.is_none())
+                .map(|a| (a.upid.clone(), a.starttime)),
+        );
+
+        let mut state = self.read_state();
+        let mut new_finished_tasks = tasks
+            .into_iter()
+            .filter(|task| task.endtime.is_some())
+            .collect::<Vec<_>>();
+
+        new_finished_tasks.sort_by(compare_tasks_reverse);
+        self.merge_tasks_into_archive(
+            new_finished_tasks,
+            &mut active_tasks,
+            update_state_for_remote,
+            &mut state,
+        )?;
+        self.update_oldest_active(&active_tasks, &mut state);
+
+        let mut active: Vec<TaskCacheItem> = active_tasks
+            .into_iter()
+            .map(|(upid, starttime)| TaskCacheItem {
+                upid,
+                starttime,
+                status: None,
+                endtime: None,
+            })
+            .collect();
+
+        active.sort_by(compare_tasks_reverse);
+        self.write_active_tasks(active.into_iter())?;
+        self.write_state(state)?;
+
+        Ok(())
+    }
+
+    /// Update the timestamp of the oldest running task in `state`.
+    fn update_oldest_active(&self, active_tasks: &HashSet<(RemoteUpid, i64)>, state: &mut State) {
+        // Update the state with timestamp of the oldest running task,
+        // we also use this as cut-off when fetching tasks, so we
+        // for sure know when a task finishes.
+        let mut oldest_active_task_per_remote = HashMap::new();
+        for (upid, startime) in active_tasks.iter() {
+            oldest_active_task_per_remote
+                .entry(upid.remote().to_owned())
+                .and_modify(|time| *time = (*startime).min(*time))
+                .or_insert(*startime);
+        }
+        state.oldest_active_task = oldest_active_task_per_remote;
+    }
+
+    /// Merge a list of *finished* tasks into the remote task archive files.
+    /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
+    /// timestamp, ascending by UPID).
+    ///
+    /// The task archive should be locked when calling this.
+    fn merge_tasks_into_archive(
+        &self,
+        tasks: Vec<TaskCacheItem>,
+        active_tasks: &mut HashSet<(RemoteUpid, i64)>,
+        update_state_for_remote: HashMap<String, bool>,
+        state: &mut State,
+    ) -> Result<(), Error> {
+        debug_assert!(tasks
+            .iter()
+            .is_sorted_by(|a, b| compare_tasks(a, b).is_ge()));
+
+        let files = self.archive_files()?;
+
+        let mut files = files.iter().peekable();
+
+        let mut current = files.next();
+        let mut next = files.peek();
+
+        let mut tasks_for_current_file = Vec::new();
+
+        // Tasks are sorted youngest to oldest (biggest starttime first)
+        for task in tasks {
+            active_tasks.remove(&(task.upid.clone(), task.starttime));
+
+            if let Some(tracked_tasks) = state.tracked_tasks.get_mut(task.upid.remote()) {
+                tracked_tasks.remove(&task.upid);
+            }
+
+            if let Some(true) = update_state_for_remote.get(task.upid.remote()) {
+                // Update the most recent startime per remote, the task polling logic uses it as a
+                // cut-off.
+                state
+                    .most_recent_archive_starttime
+                    .entry(task.upid.remote().to_owned())
+                    .and_modify(|time| *time = (task.starttime).max(*time))
+                    .or_insert(task.starttime);
+            }
+
+            // Skip ahead until we have found the correct file.
+            while next.is_some() {
+                if let Some(current) = current {
+                    if task.starttime >= current.starttime {
+                        break;
+                    }
+                    // The next entry's cut-off is larger then the task's start time, that means
+                    // we want to finalized the current file by merging all tasks that
+                    // should be stored in it...
+                    self.merge_single_archive_file(
+                        std::mem::take(&mut tasks_for_current_file),
+                        &current.file,
+                    )?;
+                }
+
+                // ... and the the `current` file to the next entry.
+                current = files.next();
+                next = files.peek();
+            }
+
+            if let Some(current) = current {
+                if task.starttime < current.starttime {
+                    continue;
+                }
+            }
+            tasks_for_current_file.push(task);
+        }
+
+        // Merge tasks for the last file.
+        if let Some(current) = current {
+            self.merge_single_archive_file(tasks_for_current_file, &current.file)?;
+        }
+
+        Ok(())
+    }
+
+    /// Add a new tracked task.
+    ///
+    /// This will insert the task in the list of tracked tasks in the state file,
+    /// as well as create an entry in the `active` file.
+    ///
+    /// This function will request an exclusive lock for the task cache,
+    /// do not call if you are already holding a lock.
+    pub fn add_tracked_task(&self, task: TaskCacheItem) -> Result<(), Error> {
+        let lock = self.lock(true)?;
+
+        let mut tasks = Vec::new();
+        let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?;
+
+        for task in &mut task_iter {
+            match task {
+                Ok(task) => tasks.push(task),
+                Err(err) => {
+                    log::error!(
+                        "could not read existing task cache entry from 'active' file: {err}"
+                    );
+                }
+            }
+        }
+
+        tasks.push(task.clone());
+        tasks.sort_by(compare_tasks_reverse);
+
+        let _lock = task_iter.into_lock();
+
+        let mut state = self.read_state();
+
+        state
+            .oldest_active_task
+            .entry(task.upid.remote().to_owned())
+            .and_modify(|a| *a = (*a).min(task.starttime))
+            .or_insert(task.starttime);
+
+        let tracked_per_remote = state
+            .tracked_tasks
+            .entry(task.upid.remote().to_owned())
+            .or_default();
+        tracked_per_remote.insert(task.upid);
+
+        self.write_active_tasks(tasks.into_iter())?;
+        self.write_state(state)?;
+
+        Ok(())
+    }
+
+    /// Iterate over cached tasks.
+    ///
+    /// This function will request a non-exclusive read-lock, don't call if
+    /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
+    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
+        let lock = self.lock(false)?;
+        self.get_tasks_with_lock(mode, lock)
+    }
+
+    /// Iterate over cached tasks.
+    ///
+    /// This function requires you to pass a lock. If you want to continue to hold the lock
+    /// after iterating, you can consume the iterator by calling
+    /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
+    pub fn get_tasks_with_lock(
+        &self,
+        mode: GetTasks,
+        lock: TaskCacheLock,
+    ) -> Result<TaskArchiveIterator, Error> {
+        match mode {
+            GetTasks::All => {
+                let mut files = vec![ArchiveFile {
+                    starttime: 0,
+                    file: self.base_path.join("active"),
+                }];
+
+                let archive_files = self.archive_files()?;
+                files.extend(archive_files);
+
+                Ok(TaskArchiveIterator::new(
+                    Box::new(files.into_iter().map(|pair| pair.file)),
+                    lock,
+                ))
+            }
+            GetTasks::Active => {
+                let files = vec![ArchiveFile {
+                    starttime: 0,
+                    file: self.base_path.join("active"),
+                }];
+                Ok(TaskArchiveIterator::new(
+                    Box::new(files.into_iter().map(|pair| pair.file)),
+                    lock,
+                ))
+            }
+            GetTasks::Archived => {
+                let files = self.archive_files()?;
+
+                Ok(TaskArchiveIterator::new(
+                    Box::new(files.into_iter().map(|pair| pair.file)),
+                    lock,
+                ))
+            }
+        }
+    }
+
+    /// Write the provided tasks to the 'active' file.
+    ///
+    /// The tasks are first written to a temporary file, which is then used
+    /// to atomically replace the original.
+    fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
+        let (fd, path) = proxmox_sys::fs::make_tmp_file(
+            self.base_path.join("active"),
+            self.create_options.clone(),
+        )?;
+        let mut fd = BufWriter::new(fd);
+
+        Self::write_tasks(&mut fd, tasks)?;
+
+        if let Err(err) = fd.flush() {
+            log::error!("could not flush 'active' file: {err}");
+        }
+        drop(fd);
+
+        std::fs::rename(path, self.base_path.join("active"))?;
+
+        Ok(())
+    }
+
+    /// Read the state file.
+    /// If the state file could not be read or does not exist, the default (empty) state
+    /// is returned.
+    /// A lock is only necessary if the returned state is modified and later passed
+    /// to [`Self::write_state`]. In case of a read-only access no lock is necessary, as
+    /// the state file is always replaced atomically.
+    pub fn read_state(&self) -> State {
+        fn do_read_state(path: &Path) -> Result<State, Error> {
+            let data = proxmox_sys::fs::file_read_optional_string(path)?;
+            match data {
+                Some(data) => Ok(serde_json::from_str(&data)?),
+                None => Ok(Default::default()),
+            }
+        }
+
+        let path = self.base_path.join("state");
+
+        do_read_state(&path).unwrap_or_else(|err| {
+            log::error!("could not read state file: {err}");
+            Default::default()
+        })
+    }
+
+    /// Write the state file.
+    /// The task archive should be locked for writing when calling this function.
+    pub fn write_state(&self, state: State) -> Result<(), Error> {
+        let path = self.base_path.join("state");
+
+        let data = serde_json::to_vec_pretty(&state)?;
+
+        proxmox_sys::fs::replace_file(path, &data, self.create_options.clone(), true)?;
+
+        Ok(())
+    }
+
+    /// Returns a list of existing archive files, together with their respective
+    /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
+    /// first).
+    /// The task archive should be locked for reading when calling this function.
+    fn archive_files(&self) -> Result<Vec<ArchiveFile>, Error> {
+        let mut names = Vec::new();
+
+        for entry in std::fs::read_dir(&self.base_path)? {
+            let entry = entry?;
+
+            if let Some(endtime) = entry
+                .path()
+                .file_name()
+                .and_then(|s| s.to_str())
+                .and_then(|s| s.strip_prefix("archive."))
+            {
+                match endtime.parse() {
+                    Ok(starttime) => {
+                        names.push(ArchiveFile {
+                            starttime,
+                            file: entry.path(),
+                        });
+                    }
+                    Err(err) => log::error!("could not parse archive timestamp: {err}"),
+                }
+            }
+        }
+
+        names.sort_by_key(|e| -e.starttime);
+
+        Ok(names)
+    }
+
+    /// Merge `tasks` with an existing archive file.
+    /// This function assumes that `tasks` and the pre-existing contents of the archive
+    /// file are both sorted descending by startime (most recent tasks come first).
+    /// The task archive must be locked when calling this function.
+    fn merge_single_archive_file(
+        &self,
+        tasks: Vec<TaskCacheItem>,
+        file: &Path,
+    ) -> Result<(), Error> {
+        if tasks.is_empty() {
+            return Ok(());
+        }
+
+        let (fd, new_path) = proxmox_sys::fs::make_tmp_file(file, self.create_options.clone())?;
+        let mut fd = BufWriter::new(fd);
+
+        if file.exists() {
+            let archive_reader = BufReader::new(File::open(file)?);
+            let archive_iter = ArchiveIterator::new(archive_reader)
+                .flat_map(|item| match item {
+                    Ok(item) => Some(item),
+                    Err(err) => {
+                        log::error!("could not read task cache item while merging: {err}");
+                        None
+                    }
+                })
+                .peekable();
+            let task_iter = tasks.into_iter().peekable();
+
+            Self::write_tasks(&mut fd, MergeTaskIterator::new(archive_iter, task_iter))?;
+        } else {
+            Self::write_tasks(&mut fd, tasks.into_iter())?;
+        }
+
+        if let Err(err) = fd.flush() {
+            log::error!("could not flush BufWriter for {file:?}: {err}");
+        }
+        drop(fd);
+
+        if let Err(err) = std::fs::rename(&new_path, file) {
+            log::error!("could not replace archive file {new_path:?}: {err}");
+        }
+
+        Ok(())
+    }
+
+    /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
+    /// The individual items are encoded as JSON followed by a newline.
+    /// The task archive should be locked when calling this function.
+    fn write_tasks(
+        writer: &mut impl Write,
+        tasks: impl Iterator<Item = TaskCacheItem>,
+    ) -> Result<(), Error> {
+        for task in tasks {
+            serde_json::to_writer(&mut *writer, &task)?;
+            writeln!(writer)?;
+        }
+
+        Ok(())
+    }
+}
+
+/// Comparison function for sorting tasks.
+/// The tasks are compared based on the task's start time, falling
+/// back to the task's UPID as a secondary criterion in case the
+/// start times are equal.
+pub fn compare_tasks(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
+    a.starttime
+        .cmp(&b.starttime)
+        .then_with(|| b.upid.to_string().cmp(&a.upid.to_string()))
+}
+
+/// Comparison function for sorting tasks, reversed
+/// The tasks are compared based on the task's start time, falling
+/// back to the task's UPID as a secondary criterion in case the
+/// start times are equal.
+pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
+    compare_tasks(a, b).reverse()
+}
+
+/// Iterator over the task archive.
+pub struct TaskArchiveIterator {
+    /// Archive files to read.
+    files: Box<dyn Iterator<Item = PathBuf>>,
+    /// Archive iterator we are currently using, if any
+    current: Option<ArchiveIterator<BufReader<File>>>,
+    /// Lock for this archive.
+    lock: TaskCacheLock,
+}
+
+impl TaskArchiveIterator {
+    /// Create a new task archive iterator.
+    pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
+        Self {
+            files,
+            current: None,
+            lock,
+        }
+    }
+
+    /// Return the task archive lock, consuming `self`.
+    pub fn into_lock(self) -> TaskCacheLock {
+        self.lock
+    }
+}
+
+impl Iterator for &mut TaskArchiveIterator {
+    type Item = Result<TaskCacheItem, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        loop {
+            match &mut self.current {
+                Some(current) => {
+                    let next = current.next();
+                    if next.is_some() {
+                        return next;
+                    } else {
+                        self.current = None;
+                    }
+                }
+                None => 'inner: loop {
+                    // Returns `None` if no more files are available, stopping iteration.
+                    let next_file = self.files.next()?;
+
+                    match File::open(&next_file) {
+                        Ok(file) => {
+                            let archive_reader = BufReader::new(file);
+                            let archive_iter = ArchiveIterator::new(archive_reader);
+                            self.current = Some(archive_iter);
+                            break 'inner;
+                        }
+                        Err(err) => {
+                            log::error!("could not open {next_file:?} while iteration over task archive files, skipping: {err}")
+                        }
+                    }
+                },
+            }
+        }
+    }
+}
+
+/// Archive file.
+#[derive(Clone, Debug)]
+struct ArchiveFile {
+    /// The path to the archive file.
+    file: PathBuf,
+    /// The archive's lowest permitted starttime (seconds since UNIX epoch).
+    starttime: i64,
+}
+
+/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
+/// from both iterators sorted.
+/// The two iterators are expected to be sorted descendingly based on the task's starttime and
+/// ascendingly based on the task's UPID's string representation. This can be
+/// achieved by using the [`compare_tasks_reverse`] function when sorting an array of tasks.
+pub struct MergeTaskIterator<T: Iterator<Item = TaskCacheItem>, U: Iterator<Item = TaskCacheItem>> {
+    left: Peekable<T>,
+    right: Peekable<U>,
+}
+
+impl<T, U> MergeTaskIterator<T, U>
+where
+    T: Iterator<Item = TaskCacheItem>,
+    U: Iterator<Item = TaskCacheItem>,
+{
+    /// Create a new merging iterator.
+    pub fn new(left: Peekable<T>, right: Peekable<U>) -> Self {
+        Self { left, right }
+    }
+}
+
+impl<T, U> Iterator for MergeTaskIterator<T, U>
+where
+    T: Iterator<Item = TaskCacheItem>,
+    U: Iterator<Item = TaskCacheItem>,
+{
+    type Item = T::Item;
+
+    fn next(&mut self) -> Option<T::Item> {
+        let order = match (self.left.peek(), self.right.peek()) {
+            (Some(l), Some(r)) => Some(compare_tasks(l, r)),
+            (Some(_), None) => Some(Ordering::Greater),
+            (None, Some(_)) => Some(Ordering::Less),
+            (None, None) => None,
+        };
+
+        match order {
+            Some(Ordering::Greater) => self.left.next(),
+            Some(Ordering::Less) => self.right.next(),
+            Some(Ordering::Equal) => {
+                // Dedup by consuming the other iterator as well
+                let _ = self.right.next();
+                self.left.next()
+            }
+            None => None,
+        }
+    }
+}
+
+/// Iterator for a single task archive file.
+///
+/// This iterator implements `Iterator<Item = Result<TaskCacheItem, Error>`. When iterating,
+/// tasks are read line by line, without leading the entire archive file into memory.
+pub struct ArchiveIterator<T> {
+    iter: Lines<T>,
+}
+
+impl<T: BufRead> ArchiveIterator<T> {
+    /// Create a new iterator.
+    pub fn new(file: T) -> Self {
+        let reader = file.lines();
+
+        Self { iter: reader }
+    }
+}
+
+impl<T: BufRead> Iterator for ArchiveIterator<T> {
+    type Item = Result<TaskCacheItem, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.iter.next().map(|result| {
+            result
+                .and_then(|line| Ok(serde_json::from_str(&line)?))
+                .map_err(Into::into)
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::io::Cursor;
+
+    use crate::test_support::temp::NamedTempDir;
+
+    use super::*;
+
+    #[test]
+    fn archive_iterator() -> Result<(), Error> {
+        let file = r#"
+            {"upid":"pve-remote!UPID:pve:00039E4D:002638B8:67B4A9D1:stopall::root@pam:","status":"OK","endtime":12345, "starttime": 1234}
+            {"upid":"pbs-remote!UPID:pbs:000002B2:00000158:00000000:674D828C:logrotate::root@pam:","status":"OK","endtime":12345, "starttime": 1234}
+            invalid"#
+            .trim_start();
+
+        let cursor = Cursor::new(file.as_bytes());
+        let mut iter = ArchiveIterator::new(cursor);
+
+        assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pve-remote");
+        assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pbs-remote");
+        assert!(iter.next().unwrap().is_err());
+        assert!(iter.next().is_none());
+
+        Ok(())
+    }
+
+    fn task(starttime: i64, ended: bool) -> TaskCacheItem {
+        let (status, endtime) = if ended {
+            (Some("OK".into()), Some(starttime + 10))
+        } else {
+            (None, None)
+        };
+
+        TaskCacheItem {
+            upid: format!(
+                "pve-remote!UPID:pve:00039E4D:002638B8:{starttime:08X}:stopall::root@pam:"
+            )
+            .parse()
+            .unwrap(),
+            starttime,
+            status,
+            endtime,
+        }
+    }
+
+    fn assert_starttimes(cache: &TaskCache, starttimes: &[i64]) {
+        let tasks: Vec<i64> = cache
+            .get_tasks(GetTasks::All)
+            .unwrap()
+            .map(|task| task.unwrap().starttime)
+            .collect();
+
+        assert_eq!(&tasks, starttimes);
+    }
+
+    fn add_tasks(cache: &TaskCache, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
+        let param = AddTasks {
+            update_most_recent_archive_timestamp: true,
+            tasks,
+        };
+        let mut a = HashMap::new();
+        a.insert("pve-remote".into(), param);
+
+        cache.add_tasks(a)
+    }
+
+    #[test]
+    fn test_add_tasks() -> Result<(), Error> {
+        let tmp_dir = NamedTempDir::new()?;
+        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+        cache.new_file(1000)?;
+        assert_eq!(cache.archive_files()?.len(), 1);
+
+        add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
+
+        cache.rotate(1500, 0, 3)?;
+        assert_eq!(cache.archive_files()?.len(), 2);
+
+        add_tasks(&cache, vec![task(1500, true), task(1501, true)])?;
+        add_tasks(&cache, vec![task(1200, true), task(1300, true)])?;
+
+        cache.rotate(2000, 0, 3)?;
+        assert_eq!(cache.archive_files()?.len(), 3);
+
+        add_tasks(&cache, vec![task(2000, true)])?;
+        add_tasks(&cache, vec![task(1502, true)])?;
+        add_tasks(&cache, vec![task(1002, true)])?;
+
+        // These are before the cut-off of 1000, so they will be discarded.
+        add_tasks(&cache, vec![task(800, true), task(900, true)])?;
+
+        // This one should be deduped
+        add_tasks(&cache, vec![task(1000, true)])?;
+
+        assert_starttimes(
+            &cache,
+            &[2000, 1502, 1501, 1500, 1300, 1200, 1002, 1001, 1000],
+        );
+
+        cache.rotate(2500, 0, 3)?;
+
+        assert_eq!(cache.archive_files()?.len(), 3);
+
+        assert_starttimes(&cache, &[2000, 1502, 1501, 1500]);
+
+        cache.rotate(3000, 0, 3)?;
+        assert_eq!(cache.archive_files()?.len(), 3);
+
+        assert_starttimes(&cache, &[2000]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_active_tasks_are_migrated_to_archive() -> Result<(), Error> {
+        let tmp_dir = NamedTempDir::new()?;
+        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+        cache.new_file(1000)?;
+        add_tasks(&cache, vec![task(1000, false), task(1001, false)])?;
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
+
+        let state = cache.read_state();
+        assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1000);
+
+        add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
+
+        assert_starttimes(&cache, &[1001, 1000]);
+
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_init() -> Result<(), Error> {
+        let tmp_dir = NamedTempDir::new()?;
+        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+        cache.init(1000, 3, 100)?;
+        assert_eq!(cache.archive_files()?.len(), 3);
+
+        add_tasks(
+            &cache,
+            vec![
+                task(1050, true),
+                task(950, true),
+                task(850, true),
+                task(750, true), // This one is discarded
+            ],
+        )?;
+
+        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 3);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_tracking_tasks() -> Result<(), Error> {
+        let tmp_dir = NamedTempDir::new()?;
+        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+        cache.init(1000, 3, 100)?;
+
+        cache.add_tracked_task(task(1050, false))?;
+
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
+        cache.add_tracked_task(task(1060, false))?;
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
+
+        let state = cache.read_state();
+        assert_eq!(state.tracked_tasks.get("pve-remote").unwrap().len(), 2);
+        assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1050);
+
+        // Mark first task as finished
+        add_tasks(&cache, vec![task(1050, true)])?;
+
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
+        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 1);
+
+        // Mark second task as finished
+        add_tasks(&cache, vec![task(1060, true)])?;
+
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
+        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 2);
+
+        Ok(())
+    }
+}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache
  2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
                   ` (3 preceding siblings ...)
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 4/8] remote tasks: implement improved cache for remote tasks Lukas Wagner
@ 2025-03-14 14:12 ` Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 6/8] pdm-api-types: remote tasks: implement From<&str> for TaskStateType Lukas Wagner
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

This commits changes the remote task module as follows:

- Add a new background task for regular polling of task data
Instead of triggering fetching of task data from the `get_tasks` function,
which is usually called by an API handler, we move the fetching to a
new background task. The task fetches the latest tasks from all remotes
and stores them in the task cache in regular intervals (10 minutes).
The `get_tasks` function itself only reads from the cache.
The main rationale for this change is that for large setups, fetching
tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
each with a >100ms connection - adds up to minutes quickly).
If we do this from within `get_tasks`, the API handler calling the
function is also blocked for the entire time.
The `get_tasks` API is called every couple of seconds by the UI the get
a list of running remote tasks, so this *must* be quick.

- Tracked tasks are also polled in the same background task, but with
a short polling delay (10 seconds). Instead of polling the status specific
tracked task UPID, we simply fetch *all* tasks since the tracked task started.
While this increased the amount of transmitted data a bit for tracked tasks
that run for a very long time, this new approach make the whole
task tracking functionality much more elegant; it integrates better with
the 'regular' task fetching which happens in long intervals.

- Tasks are now stored in the new improved task cache implementation.
This should make retrieving tasks much quicker and avoids
unneeded disk IO.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Max Carrara <m.carrara@proxmox.com>
---
 server/src/api/pve/lxc.rs                |  10 +-
 server/src/api/pve/mod.rs                |   4 +-
 server/src/api/pve/qemu.rs               |   6 +-
 server/src/api/remote_tasks.rs           |  11 +-
 server/src/bin/proxmox-datacenter-api.rs |   3 +-
 server/src/remote_tasks/mod.rs           | 788 +++++++++++------------
 6 files changed, 388 insertions(+), 434 deletions(-)

diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
index f1c31425..83f9f4aa 100644
--- a/server/src/api/pve/lxc.rs
+++ b/server/src/api/pve/lxc.rs
@@ -209,7 +209,7 @@ pub async fn lxc_start(
 
     let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -242,7 +242,7 @@ pub async fn lxc_stop(
 
     let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
         .shutdown_lxc_async(&node, vmid, Default::default())
         .await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -357,7 +357,7 @@ pub async fn lxc_migrate(
     };
     let upid = pve.migrate_lxc(&node, vmid, params).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
     log::info!("migrating vm {vmid} of node {node:?}");
     let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
 
-    new_remote_upid(source, upid)
+    new_remote_upid(source, upid).await
 }
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index c328675a..a351ad69 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -74,9 +74,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
 const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
 
 // converts a remote + PveUpid into a RemoteUpid and starts tracking it
-fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
+async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
     let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
-    remote_tasks::track_running_task(remote_upid.clone());
+    remote_tasks::track_running_task(remote_upid.clone()).await?;
     Ok(remote_upid)
 }
 
diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
index dea0550c..54e310d2 100644
--- a/server/src/api/pve/qemu.rs
+++ b/server/src/api/pve/qemu.rs
@@ -216,7 +216,7 @@ pub async fn qemu_start(
         .start_qemu_async(&node, vmid, Default::default())
         .await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -376,7 +376,7 @@ pub async fn qemu_migrate(
     };
     let upid = pve.migrate_qemu(&node, vmid, params).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -563,5 +563,5 @@ pub async fn qemu_remote_migrate(
     log::info!("migrating vm {vmid} of node {node:?}");
     let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
 
-    new_remote_upid(source, upid)
+    new_remote_upid(source, upid).await
 }
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index e629000c..05ce3666 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
     },
     input: {
         properties: {
-            "max-age": {
-                type: Integer,
-                optional: true,
-                // TODO: sensible default max-age
-                default: 300,
-                description: "Maximum age of cached task data",
-            },
             filters: {
                 type: TaskFilters,
                 flatten: true,
@@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
     },
 )]
 /// Get the list of tasks for all remotes.
-async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
-    let tasks = remote_tasks::get_tasks(max_age, filters).await?;
+async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+    let tasks = remote_tasks::get_tasks(filters).await?;
 
     Ok(tasks)
 }
diff --git a/server/src/bin/proxmox-datacenter-api.rs b/server/src/bin/proxmox-datacenter-api.rs
index a79094d5..da39c85d 100644
--- a/server/src/bin/proxmox-datacenter-api.rs
+++ b/server/src/bin/proxmox-datacenter-api.rs
@@ -25,11 +25,11 @@ use pdm_buildcfg::configdir;
 use pdm_api_types::Authid;
 use proxmox_auth_api::api::assemble_csrf_prevention_token;
 
-use server::auth;
 use server::auth::csrf::csrf_secret;
 use server::metric_collection;
 use server::resource_cache;
 use server::task_utils;
+use server::{auth, remote_tasks};
 
 pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 5 * 60;
 
@@ -288,6 +288,7 @@ async fn run(debug: bool) -> Result<(), Error> {
     start_task_scheduler();
     metric_collection::start_task();
     resource_cache::start_task();
+    remote_tasks::start_task()?;
 
     server.await?;
     log::info!("server shutting down, waiting for active workers to complete");
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 2062f2b7..48d54694 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,65 +1,106 @@
-use std::{
-    collections::{HashMap, HashSet},
-    fs::File,
-    path::{Path, PathBuf},
-    sync::{LazyLock, RwLock},
-    time::Duration,
-};
+use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
 
-use anyhow::Error;
+use anyhow::{format_err, Error};
+use nix::sys::stat::Mode;
 use pdm_api_types::{
     remotes::{Remote, RemoteType},
     RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
 };
 use proxmox_sys::fs::CreateOptions;
 use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
-use tokio::task::JoinHandle;
+use task_cache::{AddTasks, GetTasks, State, TaskCache, TaskCacheItem};
+use tokio::{sync::Semaphore, task::JoinSet};
 
 use crate::{api::pve, task_utils};
 
 mod task_cache;
 
+const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
+
+const SECONDS_PER_MINUTE: u64 = 60;
+const MINUTES_PER_HOUR: u64 = 60;
+
+/// Tick rate for the remote task fetching task.
+/// This is also the rate at which we check on tracked tasks.
+const TICK_RATE_S: u64 = 10;
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const REGULAR_REFRESH_S: u64 = 10 * SECONDS_PER_MINUTE;
+/// Number of cycles until a regular refresh.
+const REGULAR_REFRESH_CYCLES: u64 = REGULAR_REFRESH_S / TICK_RATE_S;
+
+/// Check if we want to rotate once every hour.
+const CHECK_ROTATE_S: u64 = SECONDS_PER_MINUTE * MINUTES_PER_HOUR;
+/// Number of cycles before we want to check if we should rotate the task archives.
+const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_S / TICK_RATE_S;
+
+/// Rotate once the most recent archive file is at least 24 hour old.
+const ROTATE_AFTER_S: u64 = 24 * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
+
+/// Keep 7 days worth of tasks.
+const KEEP_OLD_FILES: u64 = 7;
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+/// Maximum number of total concurrent connections. `CONNECTIONS_PER_REMOTE` is taken into
+/// consideration when accounting for the total number of connections.
+/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_REMOTE` is 5, we can connect
+/// to 4 PVE remotes in parallel.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
 /// Get tasks for all remotes
 // FIXME: filter for privileges
-pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
-    let (remotes, _) = pdm_config::remotes::config()?;
+pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+    let cache = get_cache()?;
 
-    let mut all_tasks = Vec::new();
+    let mut returned_tasks = Vec::new();
 
-    let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
-    let mut cache = TaskCache::new(cache_path)?;
+    let which = if filters.running {
+        GetTasks::Active
+    } else {
+        GetTasks::All
+    };
 
-    // Force a refresh for all tasks of a remote if a task is finished.
-    // Not super nice, but saves us from persisting finished tasks. Also,
-    // the /nodes/<node>/tasks/<upid>/status endpoint does not return
-    // a task's endtime, which is only returned by
-    // /nodes/<node>/tasks...
-    // Room for improvements in the future.
-    invalidate_cache_for_finished_tasks(&mut cache);
+    for task in &mut cache
+        .get_tasks(which)?
+        .skip(filters.start as usize)
+        .take(filters.limit as usize)
+    {
+        let task = match task {
+            Ok(task) => task,
+            Err(err) => {
+                log::error!("could not read task from remote task cache, skipping: {err}");
+                continue;
+            }
+        };
 
-    for (remote_name, remote) in &remotes.sections {
-        let now = proxmox_time::epoch_i64();
-
-        if let Some(tasks) = cache.get_tasks(remote_name.as_str(), now, max_age) {
-            // Data in cache is recent enough and has not been invalidated.
-            all_tasks.extend(tasks);
-        } else {
-            let tasks = match fetch_tasks(remote).await {
-                Ok(tasks) => tasks,
-                Err(err) => {
-                    // ignore errors for not reachable remotes
-                    continue;
-                }
-            };
-            cache.set_tasks(remote_name.as_str(), tasks.clone(), now);
-
-            all_tasks.extend(tasks);
+        // TODO: Handle PBS tasks
+        let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
+        match pve_upid {
+            Ok(pve_upid) => {
+                returned_tasks.push(TaskListItem {
+                    upid: task.upid.to_string(),
+                    node: pve_upid.node,
+                    pid: pve_upid.pid as i64,
+                    pstart: pve_upid.pstart,
+                    starttime: pve_upid.starttime,
+                    worker_type: pve_upid.worker_type,
+                    worker_id: None,
+                    user: pve_upid.auth_id,
+                    endtime: task.endtime,
+                    status: task.status,
+                });
+            }
+            Err(err) => {
+                log::error!("could not parse UPID: {err}");
+            }
         }
     }
 
-    let mut returned_tasks = add_running_tasks(all_tasks)?;
-    returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
     let returned_tasks = returned_tasks
         .into_iter()
         .filter(|item| {
@@ -106,26 +147,228 @@ pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskLis
 
             true
         })
-        .skip(filters.start as usize)
-        .take(filters.limit as usize)
         .collect();
 
-    // We don't need to wait for this task to finish
-    tokio::task::spawn_blocking(move || {
-        if let Err(e) = cache.save() {
-            log::error!("could not save task cache: {e}");
-        }
-    });
-
     Ok(returned_tasks)
 }
 
+/// Insert a newly created tasks into the list of tracked tasks.
+///
+/// Any remote with associated tracked tasks will polled with a short interval
+/// until all tracked tasks have finished.
+pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || {
+        let cache = get_cache()?;
+        // TODO:: Handle PBS tasks correctly.
+        let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
+        let task = TaskCacheItem {
+            upid: task.clone(),
+            starttime: pve_upid.starttime,
+            status: None,
+            endtime: None,
+        };
+        cache.add_tracked_task(task)
+    })
+    .await?
+}
+
+/// Start the remote task fetching task
+pub fn start_task() -> Result<(), Error> {
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
+    let file_options = CreateOptions::new()
+        .owner(api_uid)
+        .group(api_gid)
+        .perm(Mode::from_bits_truncate(0o0750));
+    proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
+
+    tokio::spawn(async move {
+        let task_scheduler = std::pin::pin!(remote_task_fetching_task());
+        let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(task_scheduler, abort_future).await;
+    });
+
+    Ok(())
+}
+
+/// Task which handles fetching remote tasks and task archive rotation.
+/// This function never returns.
+async fn remote_task_fetching_task() -> ! {
+    let mut cycle = 0u64;
+    let mut interval = tokio::time::interval(Duration::from_secs(TICK_RATE_S));
+    interval.reset_at(task_utils::next_aligned_instant(TICK_RATE_S).into());
+
+    // We don't really care about catching up to missed tick, we just want
+    // a steady tick rate.
+    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
+    if let Err(err) = init_cache().await {
+        log::error!("error when initialized task cache: {err}");
+    }
+
+    loop {
+        interval.tick().await;
+        if let Err(err) = do_tick(cycle).await {
+            log::error!("error when fetching remote tasks: {err}");
+        }
+
+        // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
+        // better be safe and use .wrapping_add(1) :)
+        cycle = cycle.wrapping_add(1);
+    }
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
+/// spaced `ROTATE_AFTER_S` seconds apart.
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+async fn init_cache() -> Result<(), Error> {
+    tokio::task::spawn_blocking(|| {
+        let cache = get_cache()?;
+        cache.init(proxmox_time::epoch_i64(), KEEP_OLD_FILES, ROTATE_AFTER_S)?;
+        Ok(())
+    })
+    .await?
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+async fn do_tick(cycle: u64) -> Result<(), Error> {
+    let cache = get_cache()?;
+
+    if should_check_for_cache_rotation(cycle) {
+        log::debug!("checking if remote task archive should be rotated");
+        if rotate_cache(cache.clone()).await? {
+            log::info!("rotated remote task archive");
+        }
+    }
+
+    let state = cache.read_state();
+
+    let mut all_tasks = HashMap::new();
+
+    let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+    let mut join_set = JoinSet::new();
+
+    let remotes = remotes_to_check(cycle, &state).await?;
+    for remote in remotes {
+        let since = get_cutoff_timestamp(&remote, &state);
+
+        let permit = if remote.ty == RemoteType::Pve {
+            // Acquire multiple permits, for PVE remotes we want
+            // to multiple nodes in parallel.
+            //
+            // `.unwrap()` is safe, we never close the semaphore.
+            Arc::clone(&semaphore)
+                .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
+                .await
+                .unwrap()
+        } else {
+            // For PBS remotes we only have a single outgoing connection
+            //
+            // `.unwrap()` is safe, we never close the semaphore.
+            Arc::clone(&semaphore).acquire_owned().await.unwrap()
+        };
+
+        join_set.spawn(async move {
+            log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
+            let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
+                format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
+            });
+
+            drop(permit);
+            tasks
+        });
+    }
+
+    while let Some(res) = join_set.join_next().await {
+        match res {
+            Ok(Ok((remote, request))) => {
+                all_tasks.insert(remote, request);
+            }
+            Ok(Err(err)) => log::error!("{err}"),
+            Err(err) => log::error!("could not join task fetching future: {err}"),
+        }
+    }
+
+    if !all_tasks.is_empty() {
+        save_tasks(cache, all_tasks).await?;
+    }
+
+    Ok(())
+}
+
+/// Return list of remotes that are to be polled in this cycle.
+async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
+    let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+    let all = cycle % REGULAR_REFRESH_CYCLES == 0;
+
+    if all {
+        Ok(config.sections.into_values().collect())
+    } else {
+        Ok(config
+            .sections
+            .into_iter()
+            .filter_map(|(name, remote)| {
+                if let Some(tracked) = state.tracked_tasks.get(&name) {
+                    if !tracked.is_empty() {
+                        Some(remote)
+                    } else {
+                        None
+                    }
+                } else {
+                    None
+                }
+            })
+            .collect())
+    }
+}
+
+/// Get the timestamp from which on we should fetch tasks for a given remote.
+/// The returned timestamp is a UNIX timestamp (in seconds).
+fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
+    let oldest_active = state.oldest_active_task.get(&remote.id).copied();
+    let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
+
+    match (oldest_active, youngest_archived) {
+        (None, None) => 0,
+        (None, Some(youngest_archived)) => youngest_archived,
+        (Some(oldest_active), None) => oldest_active,
+        (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
+    }
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+    tokio::task::spawn_blocking(move || {
+        cache.rotate(proxmox_time::epoch_i64(), ROTATE_AFTER_S, KEEP_OLD_FILES)
+    })
+    .await?
+}
+
+/// Add newly fetched tasks to the cache.
+async fn save_tasks(cache: TaskCache, tasks: HashMap<String, AddTasks>) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || cache.add_tasks(tasks)).await?
+}
+
 /// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
+/// `since` is a UNIX timestamp (seconds).
+async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
     let mut tasks = Vec::new();
 
+    let mut all_successful = true;
+
     match remote.ty {
         RemoteType::Pve => {
+            let semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
+            let mut join_set = JoinSet::new();
+
             let client = pve::connect(remote)?;
 
             // N+1 requests - we could use /cluster/tasks, but that one
@@ -134,16 +377,53 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
                 let params = ListTasks {
                     // Include running tasks
                     source: Some(ListTasksSource::All),
-                    // TODO: How much task history do we want? Right now we just hard-code it
-                    // to 7 days.
-                    since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
+                    since: Some(since),
+                    // If `limit` is not provided, we only receive 50 tasks
+                    limit: Some(MAX_TASKS_TO_FETCH),
                     ..Default::default()
                 };
 
-                let list = client.get_task_list(&node.node, params).await?;
-                let mapped = map_tasks(list, &remote.id)?;
+                let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
 
-                tasks.extend(mapped);
+                let r = remote.clone();
+
+                join_set.spawn(async move {
+                    let client = pve::connect(&r)?;
+                    let task_list =
+                        client
+                            .get_task_list(&node.node, params)
+                            .await
+                            .map_err(|err| {
+                                format_err!("remote '{}', node '{}': {err}", r.id, node.node)
+                            })?;
+
+                    drop(permit);
+
+                    Ok::<Vec<_>, Error>(task_list)
+                });
+            }
+
+            while let Some(res) = join_set.join_next().await {
+                match res {
+                    Ok(Ok(list)) => {
+                        let mapped = list.into_iter().filter_map(|task| {
+                            match map_pve_task(task, &remote.id) {
+                                Ok(task) => Some(task),
+                                Err(err) => {
+                                    log::error!("could not map task data, skipping: {err}");
+                                    None
+                                }
+                            }
+                        });
+
+                        tasks.extend(mapped);
+                    }
+                    Ok(Err(err)) => {
+                        all_successful = false;
+                        log::error!("could not fetch tasks: {err:?}");
+                    }
+                    Err(err) => return Err(err.into()),
+                }
             }
         }
         RemoteType::Pbs => {
@@ -151,365 +431,45 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
         }
     }
 
-    Ok(tasks)
-}
-
-/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
-fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
-    let mut mapped = Vec::new();
-
-    for task in tasks {
-        let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
-
-        mapped.push(TaskListItem {
-            upid: remote_upid.to_string(),
-            node: task.node,
-            pid: task.pid,
-            pstart: task.pstart as u64,
-            starttime: task.starttime,
-            worker_type: task.ty,
-            worker_id: Some(task.id),
-            user: task.user,
-            endtime: task.endtime,
-            status: task.status,
-        })
-    }
-
-    Ok(mapped)
-}
-
-/// Drops the cached task list of a remote for all finished tasks.
-///
-/// We use this to force a refresh so that we get the full task
-/// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
-    let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
-
-    // If a task is finished, we force a refresh for the remote - otherwise
-    // we don't get the 'endtime' for the task.
-    for task in finished.drain() {
-        cache.invalidate_cache_for_remote(task.remote());
-    }
-}
-
-/// Supplement the list of tasks that we received from the remote with
-/// the tasks that were started by PDM and are currently running.
-fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
-    let mut returned_tasks = Vec::new();
-
-    let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
-    for task in cached_tasks {
-        let remote_upid = task.upid.parse()?;
-
-        if running_tasks.contains(&remote_upid) {
-            if task.endtime.is_some() {
-                // Task is finished but we still think it is running ->
-                // Drop it from RUNNING_FOREIGN_TASKS
-                running_tasks.remove(&remote_upid);
-
-                // No need to put it in FINISHED_TASKS, since we already
-                // got its state recently enough (we know the status and endtime)
-            }
-        } else {
-            returned_tasks.push(task);
-        }
-    }
-
-    for task in running_tasks.iter() {
-        let upid: PveUpid = task.upid.parse()?;
-        returned_tasks.push(TaskListItem {
-            upid: task.to_string(),
-            node: upid.node,
-            pid: upid.pid as i64,
-            pstart: upid.pstart,
-            starttime: upid.starttime,
-            worker_type: upid.worker_type,
-            worker_id: upid.worker_id,
-            user: upid.auth_id,
-            endtime: None,
-            status: None,
-        });
-    }
-
-    Ok(returned_tasks)
-}
-
-/// A cache for fetched remote tasks.
-struct TaskCache {
-    /// Cache entries
-    content: TaskCacheContent,
-
-    /// Entries that were added or updated - these will be persistet
-    /// when `save` is called.
-    new_or_updated: TaskCacheContent,
-
-    /// Cache entries were changed/removed.
-    dirty: bool,
-
-    /// File-location at which the cached tasks are stored.
-    cachefile_path: PathBuf,
-}
-
-impl TaskCache {
-    /// Create a new tasks cache instance by loading
-    /// the cache from disk.
-    fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
-        Ok(Self {
-            content: Self::load_content()?,
-            new_or_updated: Default::default(),
-            dirty: false,
-            cachefile_path,
-        })
-    }
-
-    /// Load the task cache contents from disk.
-    fn load_content() -> Result<TaskCacheContent, Error> {
-        let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
-        let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
-
-        let content = if let Some(content) = content {
-            serde_json::from_str(&content)?
-        } else {
-            Default::default()
-        };
-
-        Ok(content)
-    }
-
-    /// Get path for the cache's lockfile.
-    fn lockfile_path(&self) -> PathBuf {
-        let mut path = self.cachefile_path.clone();
-        path.set_extension("lock");
-        path
-    }
-
-    /// Persist the task cache
-    ///
-    /// This method requests an exclusive lock for the task cache lockfile.
-    fn save(&mut self) -> Result<(), Error> {
-        // if we have not updated anything, we don't have to update the cache file
-        if !self.dirty {
-            return Ok(());
-        }
-
-        let _guard = self.lock(Duration::from_secs(5))?;
-
-        // Read content again, in case somebody has changed it in the meanwhile
-        let mut content = Self::load_content()?;
-
-        for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
-            if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
-                // Only update entry if nobody else has updated it in the meanwhile
-                if existing_entry.timestamp < entry.timestamp {
-                    *existing_entry = entry;
-                }
-            } else {
-                content.remote_tasks.insert(remote_name, entry);
-            }
-        }
-
-        let bytes = serde_json::to_vec_pretty(&content)?;
-
-        let api_uid = pdm_config::api_user()?.uid;
-        let api_gid = pdm_config::api_group()?.gid;
-
-        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
-        proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
-
-        self.dirty = false;
-
-        Ok(())
-    }
-
-    // Update task data for a given remote.
-    fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
-        self.dirty = true;
-        self.new_or_updated
-            .remote_tasks
-            .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
-    }
-
-    // Get task data for a given remote.
-    fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
-        if let Some(entry) = self.content.remote_tasks.get(remote) {
-            if (entry.timestamp + max_age) < now {
-                return None;
-            }
-
-            Some(entry.tasks.clone())
-        } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
-            if (entry.timestamp + max_age) < now {
-                return None;
-            }
-            Some(entry.tasks.clone())
-        } else {
-            None
-        }
-    }
-
-    // Invalidate cache for a given remote.
-    fn invalidate_cache_for_remote(&mut self, remote: &str) {
-        self.dirty = true;
-        self.content.remote_tasks.remove(remote);
-    }
-
-    // Lock the cache for modification.
-    //
-    // While the cache is locked, other users can still read the cache
-    // without a lock, since the cache file is replaced atomically
-    // when updating.
-    fn lock(&self, duration: Duration) -> Result<File, Error> {
-        let api_uid = pdm_config::api_user()?.uid;
-        let api_gid = pdm_config::api_group()?.gid;
-
-        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-        proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
-    }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
-    timestamp: i64,
-    tasks: Vec<TaskListItem>,
-}
-
-#[derive(Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
-    remote_tasks: HashMap<String, TaskCacheEntry>,
-}
-
-/// Interval at which tracked tasks are polled
-const RUNNING_CHECK_INTERVAL_S: u64 = 10;
-
-/// Tasks which were started by PDM and are still running
-static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-/// Tasks which were started by PDM and w
-static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-
-fn init() -> RwLock<HashSet<RemoteUpid>> {
-    RwLock::new(HashSet::new())
-}
-
-/// Insert a remote UPID into the running list
-///
-/// If it is the first entry in the list, a background task is started to track its state
-///
-/// Returns the [`JoinHandle`] if a task was started.
-///
-/// panics on a poisoned mutex
-pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
-    let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
-
-    // the call inserting the first task in the list needs to start the checking task
-    let need_start_task = tasks.is_empty();
-    tasks.insert(task);
-
-    if !need_start_task {
-        return None;
-    }
-    drop(tasks);
-
-    Some(tokio::spawn(async move {
-        loop {
-            let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
-            tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
-            let finished_tasks = get_finished_tasks().await;
-
-            // skip iteration if we still have tasks, just not finished ones
-            if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
-                continue;
-            }
-
-            let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
-            // we either have finished tasks, or the running task list was empty
-            let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
-
-            for (upid, _status) in finished_tasks {
-                if set.remove(&upid) {
-                    finished.insert(upid);
-                } else {
-                    // someone else removed & persisted the task in the meantime
-                }
-            }
-
-            // if no task remains, end the current task
-            // it will be restarted by the next caller that inserts one
-            if set.is_empty() {
-                return;
-            }
-        }
-    }))
-}
-
-/// Get a list of running foreign tasks
-///
-/// panics on a poisoned mutex
-pub fn get_running_tasks() -> Vec<RemoteUpid> {
-    RUNNING_FOREIGN_TASKS
-        .read()
-        .unwrap()
-        .iter()
-        .cloned()
-        .collect()
-}
-
-/// Checks all current saved UPIDs if they're still running, and if not,
-/// returns their upids + status
-///
-/// panics on a poisoned mutex
-pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
-    let mut finished = Vec::new();
-    let config = match pdm_config::remotes::config() {
-        Ok((config, _)) => config,
-        Err(err) => {
-            log::error!("could not open remotes config: {err}");
-            return Vec::new();
-        }
+    let new_tasks = AddTasks {
+        update_most_recent_archive_timestamp: all_successful,
+        tasks,
     };
-    for task in get_running_tasks() {
-        match config.get(task.remote()) {
-            Some(remote) => match remote.ty {
-                RemoteType::Pve => {
-                    let status = match crate::api::pve::tasks::get_task_status(
-                        remote.id.clone(),
-                        task.clone(),
-                        false,
-                    )
-                    .await
-                    {
-                        Ok(status) => status,
-                        Err(err) => {
-                            log::error!("could not get status from remote: {err}");
-                            finished.push((task, "could not get status".to_string()));
-                            continue;
-                        }
-                    };
-                    if let Some(status) = status.exitstatus {
-                        finished.push((task, status.to_string()));
-                    }
-                }
-                RemoteType::Pbs => {
-                    let _client = match crate::pbs_client::connect(remote) {
-                        Ok(client) => client,
-                        Err(err) => {
-                            log::error!("could not get status from remote: {err}");
-                            finished.push((task, "could not get status".to_string()));
-                            continue;
-                        }
-                    };
-                    // FIXME implement get task status
-                    finished.push((task, "unknown state".to_string()));
-                }
-            },
-            None => finished.push((task, "unknown remote".to_string())),
-        }
-    }
 
-    finished
+    Ok((remote.id.clone(), new_tasks))
+}
+
+/// Check if we are due for checking for cache rotation.
+fn should_check_for_cache_rotation(cycle: u64) -> bool {
+    cycle % CHECK_ROTATE_CYCLES == 0
+}
+
+/// Get a new [`TaskCache`] instance.
+///
+/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
+/// update loop.
+fn get_cache() -> Result<TaskCache, Error> {
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
+
+    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+    let cache_path = Path::new(REMOTE_TASKS_DIR);
+    let cache = TaskCache::new(cache_path, file_options)?;
+
+    Ok(cache)
+}
+
+/// Map a `ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
+    let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
+
+    Ok(TaskCacheItem {
+        upid: remote_upid,
+        starttime: task.starttime,
+        endtime: task.endtime,
+        status: task.status,
+    })
 }
 
 /// Parses a task status string into a TaskStateType
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pdm-devel] [PATCH proxmox-datacenter-manager 6/8] pdm-api-types: remote tasks: implement From<&str> for TaskStateType
  2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
                   ` (4 preceding siblings ...)
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache Lukas Wagner
@ 2025-03-14 14:12 ` Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 7/8] fake remote: add missing fields to make the debug feature compile again Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 8/8] fake remote: generate fake task data Lukas Wagner
  7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

This allows us to get rid of the `tasktype` helper in
server::remote_tasks.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Max Carrara <m.carrara@proxmox.com>
---
 lib/pdm-api-types/src/lib.rs   | 15 +++++++++++++++
 server/src/remote_tasks/mod.rs | 15 +--------------
 2 files changed, 16 insertions(+), 14 deletions(-)

diff --git a/lib/pdm-api-types/src/lib.rs b/lib/pdm-api-types/src/lib.rs
index 38449071..209155a2 100644
--- a/lib/pdm-api-types/src/lib.rs
+++ b/lib/pdm-api-types/src/lib.rs
@@ -232,6 +232,21 @@ pub enum TaskStateType {
     Unknown,
 }
 
+impl From<&str> for TaskStateType {
+    /// Parses a task status string into a TaskStateType
+    fn from(status: &str) -> Self {
+        if status == "unknown" || status.is_empty() {
+            TaskStateType::Unknown
+        } else if status == "OK" {
+            TaskStateType::OK
+        } else if status.starts_with("WARNINGS: ") {
+            TaskStateType::Warning
+        } else {
+            TaskStateType::Error
+        }
+    }
+}
+
 #[api(
     properties: {
         upid: { schema: UPID::API_SCHEMA },
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 48d54694..e4304a3a 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -132,7 +132,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
                 }
             }
 
-            let state = item.status.as_ref().map(|status| tasktype(status));
+            let state = item.status.as_deref().map(TaskStateType::from);
 
             match (state, &filters.statusfilter) {
                 (Some(TaskStateType::OK), _) if filters.errors => return false,
@@ -471,16 +471,3 @@ fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem,
         status: task.status,
     })
 }
-
-/// Parses a task status string into a TaskStateType
-pub fn tasktype(status: &str) -> TaskStateType {
-    if status == "unknown" || status.is_empty() {
-        TaskStateType::Unknown
-    } else if status == "OK" {
-        TaskStateType::OK
-    } else if status.starts_with("WARNINGS: ") {
-        TaskStateType::Warning
-    } else {
-        TaskStateType::Error
-    }
-}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pdm-devel] [PATCH proxmox-datacenter-manager 7/8] fake remote: add missing fields to make the debug feature compile again
  2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
                   ` (5 preceding siblings ...)
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 6/8] pdm-api-types: remote tasks: implement From<&str> for TaskStateType Lukas Wagner
@ 2025-03-14 14:12 ` Lukas Wagner
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 8/8] fake remote: generate fake task data Lukas Wagner
  7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Max Carrara <m.carrara@proxmox.com>
---
 server/src/test_support/fake_remote.rs | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/server/src/test_support/fake_remote.rs b/server/src/test_support/fake_remote.rs
index 374dffe3..2419e690 100644
--- a/server/src/test_support/fake_remote.rs
+++ b/server/src/test_support/fake_remote.rs
@@ -40,6 +40,7 @@ impl RemoteConfig for FakeRemoteConfig {
                     nodes: Vec::new(),
                     authid: Authid::root_auth_id().clone(),
                     token: "".into(),
+                    web_url: None,
                 },
             );
 
@@ -85,6 +86,14 @@ impl ClientFactory for FakeClientFactory {
         }))
     }
 
+    fn make_pve_client_with_endpoint(
+        &self,
+        _remote: &Remote,
+        _target_endpoint: Option<&str>,
+    ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+        bail!("not implemented")
+    }
+
     fn make_pbs_client(&self, _remote: &Remote) -> Result<Box<PbsClient>, Error> {
         bail!("not implemented")
     }
@@ -148,6 +157,8 @@ impl PveClient for FakePveClient {
                 ty: ClusterResourceType::Qemu,
                 uptime: Some(1234),
                 vmid: Some(vmid),
+                lock: None,
+                tags: None,
             });
         }
 
@@ -179,6 +190,8 @@ impl PveClient for FakePveClient {
                 ty: ClusterResourceType::Lxc,
                 uptime: Some(1234),
                 vmid: Some(vmid),
+                lock: None,
+                tags: None,
             });
         }
 
@@ -209,6 +222,8 @@ impl PveClient for FakePveClient {
                 ty: ClusterResourceType::Node,
                 uptime: Some(1234),
                 vmid: Some(vmid),
+                lock: None,
+                tags: None,
             });
         }
 
@@ -239,6 +254,8 @@ impl PveClient for FakePveClient {
                 ty: ClusterResourceType::Storage,
                 uptime: None,
                 vmid: None,
+                lock: None,
+                tags: None,
             });
         }
 
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pdm-devel] [PATCH proxmox-datacenter-manager 8/8] fake remote: generate fake task data
  2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
                   ` (6 preceding siblings ...)
  2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 7/8] fake remote: add missing fields to make the debug feature compile again Lukas Wagner
@ 2025-03-14 14:12 ` Lukas Wagner
  7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-03-14 14:12 UTC (permalink / raw)
  To: pdm-devel

This one helps us with the evaluation of the performance characteristics
of huge setups.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Max Carrara <m.carrara@proxmox.com>
---
 server/src/test_support/fake_remote.rs | 70 +++++++++++++++++++++++++-
 1 file changed, 68 insertions(+), 2 deletions(-)

diff --git a/server/src/test_support/fake_remote.rs b/server/src/test_support/fake_remote.rs
index 2419e690..40f68807 100644
--- a/server/src/test_support/fake_remote.rs
+++ b/server/src/test_support/fake_remote.rs
@@ -6,8 +6,9 @@ use pdm_config::remotes::RemoteConfig;
 use proxmox_product_config::ApiLockGuard;
 use proxmox_section_config::typed::SectionConfigData;
 use pve_api_types::{
-    client::PveClient, ClusterMetrics, ClusterMetricsData, ClusterResource, ClusterResourceKind,
-    ClusterResourceType, StorageContent,
+    client::PveClient, ClusterMetrics, ClusterMetricsData, ClusterNodeIndexResponse,
+    ClusterNodeIndexResponseStatus, ClusterResource, ClusterResourceKind, ClusterResourceType,
+    ListTasks, ListTasksResponse, PveUpid, StorageContent,
 };
 use serde::Deserialize;
 
@@ -351,4 +352,69 @@ impl PveClient for FakePveClient {
 
         Ok(ClusterMetrics { data })
     }
+
+    async fn list_nodes(&self) -> Result<Vec<ClusterNodeIndexResponse>, proxmox_client::Error> {
+        tokio::time::sleep(Duration::from_millis(self.api_delay_ms as u64)).await;
+        Ok((0..self.nr_of_nodes)
+            .into_iter()
+            .map(|i| ClusterNodeIndexResponse {
+                node: format!("pve-{i}"),
+                cpu: None,
+                level: None,
+                maxcpu: None,
+                maxmem: None,
+                mem: None,
+                ssl_fingerprint: None,
+                status: ClusterNodeIndexResponseStatus::Online,
+                uptime: None,
+            })
+            .collect())
+    }
+
+    async fn get_task_list(
+        &self,
+        node: &str,
+        params: ListTasks,
+    ) -> Result<Vec<ListTasksResponse>, proxmox_client::Error> {
+        tokio::time::sleep(Duration::from_millis(self.api_delay_ms as u64)).await;
+        let make_task = |starttime| {
+            let endtime = Some(starttime + 4);
+
+            let upid_str =
+                format!("UPID:{node}:0000C530:001C9BEC:{starttime:08X}:stopall::root@pam:",);
+            let upid: PveUpid = upid_str.parse().unwrap();
+
+            ListTasksResponse {
+                node: node.to_string(),
+                endtime,
+                pid: upid.pid as i64,
+                pstart: upid.pstart as i64,
+                starttime,
+                status: Some("OK".to_string()),
+                ty: upid.worker_type,
+                user: upid.auth_id,
+                upid: upid_str,
+                id: upid.worker_id.unwrap_or_default(),
+            }
+        };
+
+        const DEFAULT_LIMIT: u64 = 1500;
+        const DEFAULT_SINCE: i64 = 0;
+        // Let's fake a new task every 5 minutes
+        const NEW_TASK_EVERY: i64 = 5;
+
+        let limit = params.limit.unwrap_or(DEFAULT_LIMIT);
+        let since = params.since.unwrap_or(DEFAULT_SINCE);
+
+        let now = proxmox_time::epoch_i64();
+
+        let number_of_tasks = (now - since) / (NEW_TASK_EVERY * 60);
+
+        let number_of_tasks = limit.min(number_of_tasks as u64);
+
+        Ok((0..number_of_tasks)
+            .into_iter()
+            .map(|i| make_task(now - i as i64 * NEW_TASK_EVERY * 60))
+            .collect())
+    }
 }
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2025-03-14 14:13 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-03-14 14:12 [pdm-devel] [PATCH proxmox-datacenter-manager 0/8] remote task cache fetching task / better cache backend Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 1/8] test support: add NamedTempFile helper Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 2/8] test support: add NamedTempDir helper Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 3/8] move task_cache.rs to remote_tasks/mod.rs Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 4/8] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 6/8] pdm-api-types: remote tasks: implement From<&str> for TaskStateType Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 7/8] fake remote: add missing fields to make the debug feature compile again Lukas Wagner
2025-03-14 14:12 ` [pdm-devel] [PATCH proxmox-datacenter-manager 8/8] fake remote: generate fake task data Lukas Wagner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal