* [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend
@ 2025-08-20 12:43 Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 1/7] remote tasks: implement improved cache for remote tasks Lukas Wagner
` (7 more replies)
0 siblings, 8 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-08-20 12:43 UTC (permalink / raw)
To: pdm-devel
The aim of this patch series is to greatly improve the performance of the
remote task cache for big PDM setups.
The inital cache implementation had the following problems:
1.) cache was populated as part of the `get_tasks` API, leading to hanging
API calls while fetching task data from remotes
2.) all tasks were stored in a single file, which was completely rewritten
for any change to the cache's contents
3.) The caching mechanism was pretty simple, using only a max-age mechanism,
re-requesting all task data if max-age was exceeded
Now, these characteristics are not really problematic for *small* PDM setups
with only a couple of remotes. However, for big setups (e.g. 100 remotes, each
remote being a PVE cluster with 10 nodes), this completely falls apart:
1.) fetching remote tasks takes considerable amount of time, especially
on connections with a high latency. Since the data is requested
from *within* the `get_tasks` function, which is called by the
`remote-tasks/list` API handler, the API call is blocked until
*all* task data is requested.
2.) The single file approach leads to significant writes to the disk
3.) Leads to unnecessary network IO, as we re-request data that we
already have locally.
To rectify the situation, this series performs the following changes:
- `get_tasks` never does any fetching, it only reads the most recent
data from the cache
- There is a new background task which periodically fetches tasks
from all remotes (every 10mins at the moment). Only the latest
missing tasks are requested, not the full task history as before
- The new background task also takes over the 'tracked task' polling
duty, where we fetch the status for any task started by PDM on
a remote (short polling interval, 10s at the moment).
- The task cache storage implementation has been completely overhauled
and is now optimized for the most common accesses to the cache.
It is also more storage efficient, occupying rougly 50% of the disk
space for the same number of tasks (achieved by avoiding duplicate
information in the files)
- The size of the task cache is 'limited' by doing file rotation.
We keep 7 days of task history.
For details on *how* the cache itself works, please refer to the full
commit message of
remote tasks: implement improved cache for remote tasks
# Benchmarks
Finally, some concrete data to back up the claimed performance improvements. The
times were measured *inside* the `get_tasks` function and not at the API level,
so the times do not include JSON serialization and data transfer.
Benchmarking was done using the 'fake-remote' feature. There were 100 remotes,
10 PVE nodes per remote. The task cache contained about 1.5 million tasks.
before v5 v6 (journal, zstd)
list of active tasks (*): ~1.3s ~300µs ~300µs
list of 500 tasks, offset 0 (**): ~1.3s ~1.45ms ~1.5ms
list of 500 tasks, offset 1 million (***): ~1.3s ~175ms ~200ms
list of 500 tasks, offset 0,
2000 tasks in journal (****): ~4.5ms
Size on disk: ~500MB ~200MB ~40MB
(*): Requested by the UI every 3s
(**): Requested by the UI when visiting Remotes > Tasks
(***): E.g. when scrolling towars the bottom of 'Remotes > Tasks'
(****): e.g. when the journal has not been applied for a while. Reading tasks
from the journal is a bit less efficient than from the task archive, since
we have to fully load it into memory so that we can sort the tasks and
also remove potential duplicates
In the old implementation, the archive file was *always* fully deserialized and
loaded into RAM, this is the reason why the time needed is pretty idential for
all scenarios.
The new implementation reads the archive files only line by line, and only 500
tasks were loaded into RAM at the same time. The higher the offset, the more
archive lines/files we have to scan, which increases the time needed to access
the data. The tasks are sorted descending by starttime, as a result the
requests get slower the further you go back in history.
The 'before' times do NOT include the time needed for actually fetching the
task data.
This series was preseded by [1], however almost all of the code has changes,
which is the reason why I send this as a new series.
[1] https://lore.proxmox.com/pdm-devel/20250128122520.167796-1-l.wagner@proxmox.com/
Changes since v6:
- Incorporate additional review feedback from @Dominik:
- Log error in case of task panic
- create `active` file when `init` is called, this avoids an error in the
logs when calling the tasks API before the first task fetching round
- minor style suggestions/fixes
Changes since v5:
- Incorporate review feedback from @Dominik:
- Poll tracked tasks individually instead of doing a full task refresh with the
oldest running task as cutoff. This should be much more efficient
for long-running tasks.
- Change state-file representation
- Improved some doc comments
- Use timestamps instead of cycle counter for the fetching task
- make total connection semaphore allocation more efficient
- Use dedicated types for (read/write)-locked task cache, encoding
the locking requirements in Rust's type system. Neat!
- Keep track of cut-off typestamps per node, not per remote.
- This makes sure that we don't refetch tasks that we already have
if one node in a cluster is offline for a longer period of time
- Instead of writing new task directly into the archive files, append
them to a journal/write-ahead-log file, which is then applied in regular intervals.
This should reduce disk writes, since every single time an archive file is
changed, it has to be completely rewritten (tasks might arrive out-of-order and
the contents of the archive are sorted by the task's starttime). The journal allows
us write more tasks at once.
- Compress older archive files using zstd - this greatly reduces disk usage
of task data
Changes since v4:
- Rebased onto latest master, adapting to Gabriel's section config changes
Changes since v3:
- Include benchmark results in commit message
- Remove unneeded and potentially unsafe `pub` (thx Wolfgang)
Changes since v2:
- Change locking approach as suggested by Wolfgang
- Incorporated feedback from Wolfang
- see patch notes for details
- Added some .context/.with_context for better error messages
Changes since v1:
- Drop already applied patches
- Some code style improvents, see individual patch changelogs
- Move tack fetching task to bin/proxmox-datacenter-api/tasks/remote_task.rs
- Make sure that remote_tasks::get_tasks does not block the async executor
proxmox-datacenter-manager:
Lukas Wagner (7):
remote tasks: implement improved cache for remote tasks
remote tasks: add background task for task polling, use new task cache
pdm-api-types: remote tasks: add new_from_str constructor for
TaskStateType
fake remote: make the fake_remote feature compile again
fake remote: clippy fixes
remote tasks: task cache: create `active` file in init
remote tasks: log error in case of task panic, instead of cancelling
all tasks
Cargo.toml | 2 +-
lib/pdm-api-types/src/lib.rs | 15 +
server/Cargo.toml | 1 +
server/src/api/pve/lxc.rs | 10 +-
server/src/api/pve/mod.rs | 4 +-
server/src/api/pve/qemu.rs | 6 +-
server/src/api/remote_tasks.rs | 11 +-
server/src/bin/proxmox-datacenter-api/main.rs | 1 +
.../bin/proxmox-datacenter-api/tasks/mod.rs | 1 +
.../tasks/remote_tasks.rs | 560 +++++++
server/src/remote_tasks/mod.rs | 632 ++-----
server/src/remote_tasks/task_cache.rs | 1491 +++++++++++++++++
server/src/test_support/fake_remote.rs | 39 +-
13 files changed, 2234 insertions(+), 539 deletions(-)
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
create mode 100644 server/src/remote_tasks/task_cache.rs
Summary over all repositories:
13 files changed, 2234 insertions(+), 539 deletions(-)
--
Generated by murpp 0.9.0
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v7 1/7] remote tasks: implement improved cache for remote tasks
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
@ 2025-08-20 12:43 ` Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 2/7] remote tasks: add background task for task polling, use new task cache Lukas Wagner
` (6 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-08-20 12:43 UTC (permalink / raw)
To: pdm-devel
This commit adds a new implementation for a cache for remote tasks, one
that should improve performance characteristics in for pretty much all
use cases.
In general storage works pretty similar to the task archive we already
have for (local) PDM tasks.
root@pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l
total 40
-rw-r--r-- 1 www-data www-data 0 Mar 13 13:18 active
-rw-r--r-- 1 www-data www-data 1676 Mar 11 14:51 archive.1741355462.zst
-rw-r--r-- 1 www-data www-data 0 Mar 11 14:51 archive.1741441862.zst
-rw-r--r-- 1 www-data www-data 2538 Mar 11 14:51 archive.1741528262.zst
-rw-r--r-- 1 www-data www-data 8428 Mar 11 15:07 archive.1741614662.zst
-rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062
-rw-r--r-- 1 www-data www-data 3364 Mar 13 13:18 archive.1741788270
-rw-r--r-- 1 www-data www-data 0 Mar 13 13:18 journal
-rw-r--r-- 1 www-data www-data 287 Mar 13 13:18 state
Tasks are stored in the 'active' and multiple 'archive' files. Running
tasks are placed into the 'active' file, tasks that are finished are
persisted into one of the archive files. The archive files are suffixed
with a UNIX timestamp which serves as a lower-bound for start times for
tasks stored in this file. Encoding this lower-bound in the file name
instead of using a more traditional increasing file index (.1, .2, .3)
gives us the benefit that we can decide in which file a newly arrived
tasks belongs from a single readdir call, without even reading the file
itself.
Older archive files are additionally compressed using zstd.
The size of the entire archive can be controlled by doing file
'rotation', where the archive file with the oldest timestamp is deleted
and a new file with the current timestamp is added. If 'rotation'
happens at fixed intervals (e.g. once daily), this gives us a
configurable number of days of task history. There is no direct cap for
the total number of tasks, but this could be easily added by checking
the size of the most recent archive file and by rotating early if a
threshold is exceeded.
The format inside the files is also similar to the local task archive,
with one task corresponding to one line in the file. One key difference
is that here each line is a JSON object, that is to make it easier to
add additional data later, if needed. The JSON object contains the tasks
UPID, status, endtime and starttime (the starttime is technically also
encoded in the UPID, but having it as a separate value simplified a
couple of things) Each file is sorted by the task's start time, the
youngest task coming first.
One key difference between this task cache and the local task archive is
that we need to handle tasks coming in out-of-order, e.g. if remotes
were not reachable for a certain time. To maintain the ordering in the
file, we have to merge the newly arrived tasks into the existing task
file. This was implemented in a way that avoids reading the entire file
into memory at once, exploiting the fact that the contents of the
existing file are already sorted. This allows to do a zipper/merge-sort
like approach (see MergeTaskIterator for details). The existing file is
only read line-by-line and finally replaced atomically.
Since we always need to rewrite the entire archive file, even if there
is only a single new task added, there is also a journal/WAL file.
When adding new tasks, they will be first appended to the journal file
and then later applied to the actual archive files. The journal is
applied after a certain time or if the journal file itself grows too
large.
The cache also has a separate state file, containing additional
information, e.g. cut-off timestamps for the polling task. Some of the
data in the statefile is technically also contained in the archive
files, but reading the state file is much faster.
This commit also adds an elaborate suite of unit tests for this new
cache. While adding some additional work up front, they paid off
themselves during development quite quickly, since the overall approach
for the cache changed a couple of times. The test suite gave me
confidence that the design changes didn't screw anything up, catching a
couple of bugs in the process.
Finally, some concrete numbers about the performance improvments.
Benchmarking was done using the 'fake-remote' feature. There were 100
remotes, 10 PVE nodes per remote. The task cache contained
about 1.5 million tasks.
before after
list of active tasks (*): ~1.3s ~300µs
list of 500 tasks, offset 0 (**): ~1.3s ~1.5ms
list of 500 tasks, offset 1 million (***): ~1.3s ~200ms
list of 500 tasks, offset 0,
2000 tasks in journal (****): ~4.5ms
Size on disk: ~500MB ~40MB
(*): Requested by the UI every 3s
(**): Requested by the UI when visiting Remotes > Tasks
(***): E.g. when scrolling towards the bottom of 'Remotes > Tasks'
(****): e.g. when the journal has not been applied for a while. Reading tasks
from the journal is a bit less efficient than from the task archive, since
we have to fully load it into memory so that we can sort the tasks and
also remove potential duplicates
In the old implementation, the archive file was *always* fully
deserialized and loaded into RAM, this is the reason why the time needed
is pretty idential for all scenarios. The new implementation reads the
archive files only line by line, and only 500 tasks were loaded into RAM
at the same time. The higher the offset, the more archive lines/files we
have to scan, which increases the time needed to access the data. The
tasks are sorted descending by starttime, as a result the requests get
slower the further you go back in history.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Dominik Csapak <d.csapak@proxmox.com>
---
Notes:
Changes since v6:
- Remove superflous ZSTD_EXTENSION const
- Refactor common code into start_new_file
- Make compare_tasks_reverse (maybe) (slightly) more efficient by
not using .reverse, but just switching both params
Changes since v5:
- Change format of state file, include a per-node cutoff time
- Change locking approach, using separate types to represent
a locked cached (ReadableTaskCache, WritableTaskCache)
(as suggested by Wolfgang and Dominik independently)
- Rename 'add_tasks' to 'update', since it also drops
tracked tasks from the state file, moves tasks from
the active file to the archive, etc.
- Instead of adding new tasks directly to the archive, they
are added to an append-only journal at first. This allows us to
better recover from a crash/shutdown while we are adding tasks,
as well as reduce disk writes. The latter is due to the fact that
we have to rewrite the archive file in its entirety, even if
only a single task is added.
- Use zstd compression for older archive files, greatly reducing
space consumption
- Some general refactoring to make the code a bit nicer
Changes since v4:
- Rebased
Changes since v3:
- Included benchmark results in the commit message
- Drop `pub` for TaskCache::write_state
Changes since v2:
- Incorporate feedback from Wolfang (thx!)
- eliminate one .clone()
- get_tasks_with_lock: directly create iterator over PathBuf
instead of first creating ArchiveFile vec and the mapping
- Fixed TOCTOU race condition (checked Path::exists instead of
trying the actual operation and reacting to ErrorKind::NotFound)
- handle error when replacing 'active' file
- unlink temp file when replacing the archive file did not work
- avoid UTF8 decoding where not really necessary
- Added a couple of .context()/.with_context() for better error
messages
- Extracted the file names into consts
- Fixed some clippy issues (.clone() for CreateOptions - when
this series was written, CreateOptions were not Copy yet)
Cargo.toml | 2 +-
server/Cargo.toml | 1 +
server/src/remote_tasks/mod.rs | 2 +
server/src/remote_tasks/task_cache.rs | 1483 +++++++++++++++++++++++++
4 files changed, 1487 insertions(+), 1 deletion(-)
create mode 100644 server/src/remote_tasks/task_cache.rs
diff --git a/Cargo.toml b/Cargo.toml
index 658c53a7..08b93737 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -128,7 +128,7 @@ url = "2.1"
walkdir = "2"
webauthn-rs-core = "0.5"
xdg = "2.2"
-zstd = { version = "0.12", features = [ "bindgen" ] }
+zstd = { version = "0.13" }
# Local path overrides
# NOTE: You must run `cargo update` after changing this for it to take effect!
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 2d1fa5d2..24a2e40c 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -32,6 +32,7 @@ tokio = { workspace = true, features = [ "fs", "io-util", "io-std", "macros", "n
tokio-stream.workspace = true
tracing.workspace = true
url.workspace = true
+zstd.workspace = true
proxmox-access-control = { workspace = true, features = [ "impl" ] }
proxmox-async.workspace = true
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 234ffa76..7c8e31ef 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -18,6 +18,8 @@ use tokio::task::JoinHandle;
use crate::{api::pve, task_utils};
+mod task_cache;
+
/// Get tasks for all remotes
// FIXME: filter for privileges
pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
new file mode 100644
index 00000000..9e6a65cd
--- /dev/null
+++ b/server/src/remote_tasks/task_cache.rs
@@ -0,0 +1,1483 @@
+//! Task cache implementation, based on rotating files.
+use std::{
+ cmp::Ordering,
+ collections::{HashMap, HashSet},
+ fs::{File, OpenOptions},
+ io::{BufRead, BufReader, BufWriter, ErrorKind, Lines, Write},
+ iter::Peekable,
+ os::unix::fs::MetadataExt,
+ path::{Path, PathBuf},
+ time::{Duration, Instant},
+};
+
+use anyhow::{Context, Error};
+use serde::{Deserialize, Serialize};
+
+use proxmox_sys::fs::CreateOptions;
+
+use pdm_api_types::RemoteUpid;
+use pve_api_types::PveUpid;
+
+/// Filename for the file containing running tasks.
+const ACTIVE_FILENAME: &str = "active";
+/// Filename prefix for archive files.
+const ARCHIVE_FILENAME_PREFIX: &str = "archive.";
+/// Filename for the state file.
+const STATE_FILENAME: &str = "state";
+/// Filename of the archive lockfile.
+const LOCKFILE_FILENAME: &str = ".lock";
+/// Write-ahead log.
+const WAL_FILENAME: &str = "journal";
+
+/// File name extension for zstd compressed archive files
+const ZSTD_EXTENSION_WITH_DOT: &str = ".zst";
+
+/// Item which can be put into the task cache.
+#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
+#[serde(rename_all = "kebab-case")]
+pub struct TaskCacheItem {
+ /// The task's UPID
+ pub upid: RemoteUpid,
+ /// The time at which the task was started (seconds since the UNIX epoch).
+ /// Technically this is also contained within the UPID, duplicating it here
+ /// allows us to directly access it when sorting in new tasks, without having
+ /// to parse the UPID.
+ pub starttime: i64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ /// The task's status.
+ pub status: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ /// The task's endtime (seconds since the UNIX epoch).
+ pub endtime: Option<i64>,
+}
+
+#[derive(Serialize, Deserialize, Default)]
+#[serde(rename_all = "kebab-case")]
+/// Per remote state.
+struct RemoteState {
+ /// Per-node state for this remote.
+ node_state: HashMap<String, NodeState>,
+}
+
+#[derive(Serialize, Deserialize, Default)]
+#[serde(rename_all = "kebab-case")]
+struct NodeState {
+ /// Cutoff timestamp for this node when fetching archived tasks.
+ cutoff: i64,
+}
+
+/// State needed for task polling.
+#[derive(Serialize, Deserialize, Default)]
+#[serde(rename_all = "kebab-case")]
+pub struct State {
+ /// Map of remote -> most recent task starttime (UNIX epoch) in the archive.
+ /// This can be used as a cut-off when requesting new task data.
+ #[serde(default)]
+ remote_state: HashMap<String, RemoteState>,
+ /// Tracked tasks which are polled in short intervals.
+ #[serde(default)]
+ tracked_tasks: HashSet<RemoteUpid>,
+}
+
+impl State {
+ /// Get tracked tasks.
+ pub fn tracked_tasks(&self) -> impl Iterator<Item = &RemoteUpid> {
+ self.tracked_tasks.iter()
+ }
+
+ /// Get the cutoff timestamp for a node of a remote.
+ pub fn cutoff_timestamp(&self, remote_id: &str, node: &str) -> Option<i64> {
+ self.remote_state
+ .get(remote_id)
+ .and_then(|remote_state| remote_state.node_state.get(node))
+ .map(|state| state.cutoff)
+ }
+
+ /// Add a new tracked task.
+ fn add_tracked_task(&mut self, upid: RemoteUpid) {
+ self.tracked_tasks.insert(upid);
+ }
+
+ /// Remove a tracked task.
+ fn remove_tracked_task(&mut self, upid: &RemoteUpid) {
+ self.tracked_tasks.remove(upid);
+ }
+
+ /// Update the per-node cutoff timestamp if it is higher than the current one.
+ fn update_cutoff_timestamp(&mut self, remote_id: &str, node: &str, starttime: i64) {
+ match self.remote_state.get_mut(remote_id) {
+ Some(remote_state) => match remote_state.node_state.get_mut(node) {
+ Some(node_state) => {
+ node_state.cutoff = node_state.cutoff.max(starttime);
+ }
+ None => {
+ remote_state
+ .node_state
+ .insert(node.to_string(), NodeState { cutoff: starttime });
+ }
+ },
+ None => {
+ let node_state =
+ HashMap::from_iter([(node.to_string(), NodeState { cutoff: starttime })]);
+
+ self.remote_state
+ .insert(remote_id.to_string(), RemoteState { node_state });
+ }
+ }
+ }
+}
+
+/// Cache for remote tasks.
+#[derive(Clone)]
+pub struct TaskCache {
+ /// Path where the cache's files should be placed.
+ base_path: PathBuf,
+ /// File permissions for the cache's files.
+ create_options: CreateOptions,
+
+ /// Maximum size of the journal. If it grows larger than size after
+ /// tasks have been added, it will be applied immediately.
+ journal_max_size: u64,
+
+ /// Maximum number of archive files. If the archive is rotated and `max_files` is exceeded, the
+ /// oldest fill we dropped
+ max_files: u32,
+
+ /// Number of uncompressed archive files to keep. These will be the most recent ones.
+ uncompressed_files: u32,
+
+ /// Rotate archive file if it is older than this number of seconds.
+ rotate_after: u64,
+}
+
+/// A [`TaskCache`] locked for writing.
+pub struct WritableTaskCache {
+ cache: TaskCache,
+ lock: TaskCacheLock,
+}
+
+/// A [`TaskCache`] locked for reading.
+pub struct ReadableTaskCache {
+ cache: TaskCache,
+ lock: TaskCacheLock,
+}
+
+/// Lock for the cache.
+#[allow(dead_code)]
+struct TaskCacheLock(File);
+
+/// Which tasks to fetch from the archive.
+pub enum GetTasks {
+ /// Get all tasks, finished and running.
+ All,
+ /// Only get running (active) tasks.
+ Active,
+ #[cfg(test)] // Used by tests, might be used by production code in the future
+ /// Only get finished (archived) tasks.
+ Archived,
+}
+
+/// Map that stores whether a remote node's tasks were successfully
+/// fetched.
+#[derive(Default)]
+pub struct NodeFetchSuccessMap(HashMap<(String, String), bool>);
+
+impl NodeFetchSuccessMap {
+ /// Mark a node of a given remote as successful.
+ pub fn set_node_success(&mut self, remote: String, node: String) {
+ self.0.insert((remote, node), true);
+ }
+
+ /// Mark a node of a given remote as failed.
+ pub fn set_node_failure(&mut self, remote: String, node: String) {
+ self.0.insert((remote, node), false);
+ }
+
+ /// Returns whether tasks from a given node of a remote were successfully fetched.
+ pub fn node_successful(&self, remote: &str, node: &str) -> bool {
+ matches!(self.0.get(&(remote.into(), node.into())), Some(true))
+ }
+
+ /// Merge this map with another.
+ pub fn merge(&mut self, other: Self) {
+ self.0.extend(other.0);
+ }
+}
+
+impl ReadableTaskCache {
+ /// Iterate over cached tasks.
+ pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
+ self.cache
+ .get_tasks_impl(mode, &self.lock)
+ .context("failed to create task archive iterator")
+ }
+}
+
+impl WritableTaskCache {
+ /// Create initial task archives that can be backfilled with the
+ /// recent task history from a remote.
+ ///
+ /// This function only has an effect if there are no archive files yet.
+ pub fn init(&self, now: i64) -> Result<(), Error> {
+ if self.cache.archive_files(&self.lock)?.is_empty() {
+ for i in 0..self.cache.max_files {
+ self.new_file(
+ now - (i as u64 * self.cache.rotate_after) as i64,
+ i >= self.cache.uncompressed_files,
+ )?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Start a new archive file with a given timestamp.
+ /// `now` is supposed to be a UNIX timestamp (seconds).
+ fn new_file(&self, now: i64, compress: bool) -> Result<ArchiveFile, Error> {
+ let suffix = if compress {
+ ZSTD_EXTENSION_WITH_DOT
+ } else {
+ ""
+ };
+
+ let new_path = self
+ .cache
+ .base_path
+ .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}{suffix}"));
+
+ let mut file = File::create(&new_path)?;
+ self.cache.create_options.apply_to(&mut file, &new_path)?;
+
+ if compress {
+ let encoder = zstd::stream::write::Encoder::new(file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
+ encoder.finish()?;
+ }
+
+ Ok(ArchiveFile {
+ path: new_path,
+ compressed: compress,
+ starttime: now,
+ })
+ }
+
+ /// Rotate task archive if the the newest archive file is older than `rotate_after`.
+ ///
+ /// The oldest archive files are removed if the total number of archive files exceeds
+ /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
+ pub fn rotate(&self, now: i64) -> Result<bool, Error> {
+ let mut did_rotate = false;
+ let mut archive_files = self.cache.archive_files(&self.lock)?;
+
+ let mut start_new_file = |files: &mut Vec<ArchiveFile>| -> Result<(), Error> {
+ let new_file = self.new_file(now, self.cache.uncompressed_files == 0)?;
+ files.insert(0, new_file);
+ self.apply_journal()?;
+
+ did_rotate = true;
+ Ok(())
+ };
+
+ match archive_files.first() {
+ Some(bound) => {
+ if now > bound.starttime && now - bound.starttime > self.cache.rotate_after as i64 {
+ start_new_file(&mut archive_files)?;
+ }
+ }
+ None => start_new_file(&mut archive_files)?,
+ }
+
+ while archive_files.len() > self.cache.max_files as usize {
+ // Unwrap is safe because of the length check above
+ let to_remove = archive_files.pop().unwrap();
+ std::fs::remove_file(&to_remove.path)
+ .with_context(|| format!("failed to remove {}", to_remove.path.display()))?;
+ }
+
+ for file in archive_files
+ .iter_mut()
+ .skip(self.cache.uncompressed_files as usize)
+ {
+ if !file.compressed {
+ file.compress(self.cache.create_options)
+ .with_context(|| format!("failed to compress {}", file.path.display()))?;
+ }
+ }
+
+ Ok(did_rotate)
+ }
+
+ /// Iterate over cached tasks.
+ pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
+ self.cache
+ .get_tasks_impl(mode, &self.lock)
+ .context("failed to create task archive iterator")
+ }
+
+ /// Update task cache contents.
+ ///
+ /// This is mostly used for adding new tasks to tasks to the cache, but
+ /// will also handle dropping finished/failed tracked tasks from the
+ /// state file and active file. This is done so that we don't have to update
+ /// these files multiple times.
+ ///
+ /// Running tasks (tasks without an endtime) are placed into the 'active' file in the
+ /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive
+ /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given
+ /// archive file. If a task which was added as running previously is added again, this time in
+ /// a finished state, it will be removed from the `active` file and also sorted into
+ /// one of the archive files.
+ /// Same goes for the list of tracked tasks; the entry in the state file will be removed.
+ ///
+ /// Crash consistency:
+ ///
+ /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the
+ /// end after all tasks have been added into the archive. Adding tasks is an idempotent
+ /// operation; adding the *same* task multiple times does not lead to duplicated entries in the
+ /// task archive. Individual archive files are updated atomically, but since
+ /// adding tasks can involve updating multiple archive files, the archive could end up
+ /// in a partially-updated, inconsistent state in case of a crash.
+ /// However, since the state file with the cut-off timestamps is updated last,
+ /// the consistency of the archive should be restored at the next update cycle of the archive.
+ pub fn update(
+ &self,
+ new_tasks: Vec<TaskCacheItem>,
+ update_state_for_remote: &NodeFetchSuccessMap,
+ drop_tracked: HashSet<RemoteUpid>,
+ ) -> Result<(), Error> {
+ let task_iter = self
+ .get_tasks(GetTasks::Active)
+ .context("failed to create archive iterator for active tasks")?;
+
+ let mut active_tasks = HashMap::from_iter(task_iter.filter_map(|task| {
+ if !drop_tracked.contains(&task.upid) {
+ Some((task.upid.clone(), task))
+ } else {
+ None
+ }
+ }));
+
+ let mut new_finished_tasks = Vec::new();
+
+ for task in new_tasks {
+ if task.endtime.is_none() {
+ active_tasks.insert(task.upid.clone(), task);
+ } else {
+ new_finished_tasks.push(task);
+ }
+ }
+
+ let mut state = self.read_state();
+
+ for upid in drop_tracked {
+ state.remove_tracked_task(&upid);
+ }
+
+ self.write_tasks_to_journal(
+ new_finished_tasks,
+ &mut active_tasks,
+ update_state_for_remote,
+ &mut state,
+ )?;
+
+ let mut active: Vec<TaskCacheItem> = active_tasks.into_values().collect();
+
+ active.sort_by(compare_tasks_reverse);
+ self.write_active_tasks(active.into_iter())
+ .context("failed to write active task file when adding tasks")?;
+ self.write_state(state)
+ .context("failed to update task archive state file when adding tasks")?;
+
+ self.apply_journal_if_too_large()
+ .context("could not apply journal early")?;
+
+ Ok(())
+ }
+
+ fn write_tasks_to_journal(
+ &self,
+ tasks: Vec<TaskCacheItem>,
+ active_tasks: &mut HashMap<RemoteUpid, TaskCacheItem>,
+ node_success_map: &NodeFetchSuccessMap,
+ state: &mut State,
+ ) -> Result<(), Error> {
+ let filename = self.cache.base_path.join(WAL_FILENAME);
+ let mut file = OpenOptions::new()
+ .append(true)
+ .create(true)
+ .open(filename)?;
+
+ for task in tasks {
+ // Remove this finished task from our set of active tasks.
+ active_tasks.remove(&task.upid);
+
+ // TODO:: Handle PBS tasks correctly.
+ // TODO: This is awkward, maybe overhaul RemoteUpid type to make this easier
+ match task.upid.upid.parse::<PveUpid>() {
+ Ok(upid) => {
+ let node = &upid.node;
+ let remote = task.upid.remote();
+
+ if node_success_map.node_successful(remote, node) {
+ state.update_cutoff_timestamp(task.upid.remote(), node, task.starttime);
+ }
+ }
+ Err(error) => {
+ log::error!("could not parse PVE UPID - not saving to task cache: {error:#}");
+ continue;
+ }
+ }
+
+ serde_json::to_writer(&mut file, &task)?;
+ writeln!(&file)?;
+ }
+
+ file.sync_all()?;
+
+ Ok(())
+ }
+
+ /// Returns the current size of the journal file.
+ fn journal_size(&self) -> Result<u64, Error> {
+ let metadata = self
+ .cache
+ .base_path
+ .join(WAL_FILENAME)
+ .metadata()
+ .context("failed to read metadata of journal file")?;
+
+ Ok(metadata.size())
+ }
+
+ /// Apply the journal early if it has grown larger than the maximum allowed size.
+ fn apply_journal_if_too_large(&self) -> Result<(), Error> {
+ let size = self.journal_size()?;
+
+ if size > self.cache.journal_max_size {
+ log::info!("task cache journal size {size} bytes, applying early");
+ self.apply_journal()?;
+ }
+
+ Ok(())
+ }
+
+ /// Apply the task journal.
+ ///
+ /// This will merge all tasks in the journal file into the task archive.
+ pub fn apply_journal(&self) -> Result<(), Error> {
+ let start = Instant::now();
+ let filename = self.cache.base_path.join(WAL_FILENAME);
+
+ let file = match File::open(&filename) {
+ Ok(file) => Box::new(BufReader::new(file)),
+ Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
+ Err(err) => return Err(err.into()),
+ };
+
+ log::info!("applying task cache journal");
+ let iterator = ArchiveIterator::new(file);
+
+ let mut tasks: Vec<TaskCacheItem> = iterator
+ .filter_map(|task| match task {
+ Ok(task) => Some(task),
+ Err(err) => {
+ log::error!("could not read task from journal file: {err:#}");
+ None
+ }
+ })
+ .collect();
+
+ // The WAL contains tasks in arbitrary order since we always append.
+ tasks.sort_by(compare_tasks_reverse);
+ tasks.dedup();
+
+ let count = tasks.len();
+
+ self.merge_tasks_into_archive(tasks)?;
+
+ // truncate the journal file
+ OpenOptions::new()
+ .write(true)
+ .truncate(true)
+ .open(filename)
+ .context("failed to truncate journal file")?;
+
+ log::info!(
+ "commited {count} tasks in {:.3}.s to task cache archive",
+ start.elapsed().as_secs_f32()
+ );
+
+ Ok(())
+ }
+
+ /// Merge a list of *finished* tasks into the remote task archive files.
+ /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
+ /// timestamp, ascending by UPID).
+ fn merge_tasks_into_archive(&self, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
+ debug_assert!(tasks
+ .iter()
+ .is_sorted_by(|a, b| compare_tasks(a, b).is_ge()));
+
+ let files = self
+ .cache
+ .archive_files(&self.lock)
+ .context("failed to read achive files")?;
+
+ let mut files = files.iter().peekable();
+
+ let mut current = files.next();
+ let mut next = files.peek();
+
+ let mut tasks_for_current_file = Vec::new();
+
+ // Tasks are sorted youngest to oldest (biggest start time first)
+ for task in tasks {
+ // Skip ahead until we have found the correct file.
+ while next.is_some() {
+ if let Some(current) = current {
+ if task.starttime >= current.starttime {
+ break;
+ }
+ // The next entry's cut-off is larger then the task's start time, that means
+ // we want to finalized the current file by merging all tasks that
+ // should be stored in it...
+ self.merge_single_archive_file(
+ std::mem::take(&mut tasks_for_current_file),
+ current,
+ )
+ .with_context(|| {
+ format!("failed to merge archive file {}", current.path.display())
+ })?;
+ }
+
+ // ... and the `current` file to the next entry.
+ current = files.next();
+ next = files.peek();
+ }
+
+ if let Some(current) = current {
+ if task.starttime < current.starttime {
+ continue;
+ }
+ }
+ tasks_for_current_file.push(task);
+ }
+
+ // Merge tasks for the last file.
+ if let Some(current) = current {
+ self.merge_single_archive_file(tasks_for_current_file, current)
+ .with_context(|| {
+ format!("failed to merge archive file {}", current.path.display())
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Add a new tracked task.
+ ///
+ /// This will insert the task in the list of tracked tasks in the state file,
+ /// as well as create an entry in the `active` file.
+ pub fn add_tracked_task(&self, task: TaskCacheItem) -> Result<(), Error> {
+ let mut state = self.read_state();
+
+ let mut tasks: Vec<TaskCacheItem> = self
+ .get_tasks(GetTasks::Active)
+ .context("failed to create active task iterator")?
+ .collect();
+
+ tasks.push(task.clone());
+ tasks.sort_by(compare_tasks_reverse);
+
+ state.add_tracked_task(task.upid);
+
+ self.write_active_tasks(tasks.into_iter())
+ .context("failed to write active tasks file when adding tracked task")?;
+
+ self.write_state(state)
+ .context("failed to write state when adding tracked task")?;
+
+ Ok(())
+ }
+
+ /// Read the state file.
+ /// If the state file could not be read or does not exist, the default (empty) state
+ /// is returned.
+ pub fn read_state(&self) -> State {
+ self.cache.read_state()
+ }
+
+ /// Write the state file.
+ fn write_state(&self, state: State) -> Result<(), Error> {
+ let path = self.cache.base_path.join(STATE_FILENAME);
+
+ let data = serde_json::to_vec_pretty(&state)?;
+
+ proxmox_sys::fs::replace_file(path, &data, self.cache.create_options, true)?;
+
+ Ok(())
+ }
+
+ /// Write the provided tasks to the 'active' file.
+ ///
+ /// The tasks are first written to a temporary file, which is then used
+ /// to atomically replace the original.
+ fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
+ let (fd, path) = proxmox_sys::fs::make_tmp_file(
+ self.cache.base_path.join(ACTIVE_FILENAME),
+ self.cache.create_options,
+ )?;
+ let mut fd = BufWriter::new(fd);
+
+ Self::write_tasks(&mut fd, tasks)?;
+
+ if let Err(err) = fd.flush() {
+ log::error!("could not flush 'active' file: {err:#}");
+ }
+ drop(fd);
+
+ let target = self.cache.base_path.join(ACTIVE_FILENAME);
+
+ let res = std::fs::rename(&path, &target).with_context(|| {
+ format!(
+ "failed to replace {} with {}",
+ target.display(),
+ path.display(),
+ )
+ });
+
+ if let Err(err) = res {
+ if let Err(err) = std::fs::remove_file(&path) {
+ log::error!(
+ "failed to cleanup temporary file {}: {err:#}",
+ path.display()
+ );
+ }
+
+ return Err(err);
+ }
+
+ Ok(())
+ }
+
+ /// Merge `tasks` with an existing archive file.
+ /// This function assumes that `tasks` and the pre-existing contents of the archive
+ /// file are both sorted descending by starttime (most recent tasks come first).
+ /// The task archive must be locked when calling this function.
+ fn merge_single_archive_file(
+ &self,
+ tasks: Vec<TaskCacheItem>,
+ file: &ArchiveFile,
+ ) -> Result<(), Error> {
+ if tasks.is_empty() {
+ return Ok(());
+ }
+
+ // TODO: Might be nice to also move this to ArchiveFile
+ let (temp_file, temp_file_path) =
+ proxmox_sys::fs::make_tmp_file(&file.path, self.cache.create_options)?;
+ let mut writer = if file.compressed {
+ let encoder =
+ zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?
+ .auto_finish();
+ Box::new(BufWriter::new(encoder)) as Box<dyn Write>
+ } else {
+ Box::new(BufWriter::new(temp_file)) as Box<dyn Write>
+ };
+
+ let archive_iter = file
+ .iter()?
+ .flat_map(|item| match item {
+ Ok(item) => Some(item),
+ Err(err) => {
+ log::error!("could not read task cache item while merging: {err:#}");
+ None
+ }
+ })
+ .peekable();
+ let task_iter = tasks.into_iter().peekable();
+
+ Self::write_tasks(&mut writer, MergeTaskIterator::new(archive_iter, task_iter))?;
+
+ if let Err(err) = writer.flush() {
+ log::error!("could not flush BufWriter for {file:?}: {err:#}");
+ }
+ drop(writer);
+
+ if let Err(err) = std::fs::rename(&temp_file_path, &file.path).with_context(|| {
+ format!(
+ "failed to replace {} with {}",
+ file.path.display(),
+ temp_file_path.display()
+ )
+ }) {
+ if let Err(err) = std::fs::remove_file(&temp_file_path) {
+ log::error!(
+ "failed to clean up temporary file {}: {err:#}",
+ temp_file_path.display()
+ );
+ }
+
+ return Err(err);
+ }
+
+ Ok(())
+ }
+
+ /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
+ /// The individual items are encoded as JSON followed by a newline.
+ fn write_tasks(
+ writer: &mut impl Write,
+ tasks: impl Iterator<Item = TaskCacheItem>,
+ ) -> Result<(), Error> {
+ for task in tasks {
+ serde_json::to_writer(&mut *writer, &task)?;
+ writeln!(writer)?;
+ }
+
+ Ok(())
+ }
+}
+
+impl TaskCache {
+ /// Create a new task cache instance.
+ ///
+ /// Remember to call `init` or `new_file` on a locked, writable TaskCache
+ /// to create the initial archive files.
+ pub fn new<P: AsRef<Path>>(
+ path: P,
+ create_options: CreateOptions,
+ max_files: u32,
+ uncompressed: u32,
+ rotate_after: u64,
+ journal_max_size: u64,
+ ) -> Result<Self, Error> {
+ Ok(Self {
+ base_path: path.as_ref().into(),
+ create_options,
+ journal_max_size,
+ max_files,
+ rotate_after,
+ uncompressed_files: uncompressed,
+ })
+ }
+
+ /// Lock the cache for reading.
+ pub fn read(self) -> Result<ReadableTaskCache, Error> {
+ let lock = self.lock_impl(false)?;
+
+ Ok(ReadableTaskCache { cache: self, lock })
+ }
+
+ /// Lock the cache for writing.
+ pub fn write(self) -> Result<WritableTaskCache, Error> {
+ let lock = self.lock_impl(true)?;
+
+ Ok(WritableTaskCache { cache: self, lock })
+ }
+
+ fn lock_impl(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
+ let lockfile = self.base_path.join(LOCKFILE_FILENAME);
+
+ Ok(TaskCacheLock(proxmox_sys::fs::open_file_locked(
+ lockfile,
+ Duration::from_secs(10),
+ exclusive,
+ self.create_options,
+ )?))
+ }
+
+ /// Read the state file.
+ /// If the state file could not be read or does not exist, the default (empty) state
+ /// is returned.
+ pub fn read_state(&self) -> State {
+ fn do_read_state(path: &Path) -> Result<State, Error> {
+ match std::fs::read(path) {
+ Ok(content) => serde_json::from_slice(&content).map_err(|err| err.into()),
+ Err(err) if err.kind() == ErrorKind::NotFound => Ok(Default::default()),
+ Err(err) => Err(err.into()),
+ }
+ }
+
+ let path = self.base_path.join(STATE_FILENAME);
+ do_read_state(&path).unwrap_or_else(|err| {
+ log::error!("could not read state file: {err:#}");
+ Default::default()
+ })
+ }
+
+ fn get_tasks_impl<'a>(
+ &self,
+ mode: GetTasks,
+ lock: &'a TaskCacheLock,
+ ) -> Result<TaskArchiveIterator<'a>, Error> {
+ let journal_file = self.base_path.join(WAL_FILENAME);
+ let active_path = self.base_path.join(ACTIVE_FILENAME);
+
+ match mode {
+ GetTasks::All => {
+ let mut archive_files = self.archive_files(lock)?;
+ archive_files.reverse();
+
+ if active_path.exists() {
+ archive_files.push(ArchiveFile {
+ path: self.base_path.join(ACTIVE_FILENAME),
+ compressed: false,
+ starttime: 0,
+ });
+ }
+
+ TaskArchiveIterator::new(Some(journal_file), archive_files, lock)
+ }
+ GetTasks::Active => {
+ let mut archive_files = Vec::new();
+
+ if active_path.exists() {
+ archive_files.push(ArchiveFile {
+ path: self.base_path.join(ACTIVE_FILENAME),
+ compressed: false,
+ starttime: 0,
+ });
+ }
+
+ TaskArchiveIterator::new(None, archive_files, lock)
+ }
+ #[cfg(test)]
+ GetTasks::Archived => {
+ let mut files = self.archive_files(lock)?;
+ files.reverse();
+
+ TaskArchiveIterator::new(Some(journal_file), files, lock)
+ }
+ }
+ }
+
+ /// Returns a list of existing archive files, together with their respective
+ /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
+ /// first).
+ /// The task archive should be locked for reading when calling this function.
+ fn archive_files(&self, _lock: &TaskCacheLock) -> Result<Vec<ArchiveFile>, Error> {
+ let mut names = Vec::new();
+
+ for entry in std::fs::read_dir(&self.base_path)? {
+ let entry = entry?;
+
+ let path = entry.path();
+
+ if let Some(file) = Self::parse_archive_filename(&path) {
+ names.push(file);
+ }
+ }
+
+ names.sort_by_key(|e| -e.starttime);
+
+ Ok(names)
+ }
+
+ fn parse_archive_filename(path: &Path) -> Option<ArchiveFile> {
+ let filename = path.file_name()?.to_str()?;
+ let filename = filename.strip_prefix(ARCHIVE_FILENAME_PREFIX)?;
+
+ if let Some(starttime) = filename.strip_suffix(ZSTD_EXTENSION_WITH_DOT) {
+ let starttime: i64 = starttime.parse().ok()?;
+
+ Some(ArchiveFile {
+ path: path.to_path_buf(),
+ compressed: true,
+ starttime,
+ })
+ } else {
+ let starttime: i64 = filename.parse().ok()?;
+
+ Some(ArchiveFile {
+ path: path.to_path_buf(),
+ compressed: false,
+ starttime,
+ })
+ }
+ }
+}
+
+/// Comparison function for sorting tasks.
+/// The tasks are compared based on the task's start time, falling
+/// back to the task's UPID as a secondary criterion in case the
+/// start times are equal.
+fn compare_tasks(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
+ a.starttime
+ .cmp(&b.starttime)
+ .then_with(|| b.upid.to_string().cmp(&a.upid.to_string()))
+}
+
+/// Comparison function for sorting tasks, reversed
+/// The tasks are compared based on the task's start time, falling
+/// back to the task's UPID as a secondary criterion in case the
+/// start times are equal.
+fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
+ compare_tasks(b, a)
+}
+
+/// Iterator over the task archive.
+pub struct TaskArchiveIterator<'a> {
+ inner: Box<dyn Iterator<Item = TaskCacheItem>>,
+
+ /// Lock for this archive. This contains the lock in case we
+ /// need to keep the archive locked while iterating over it.
+ _lock: &'a TaskCacheLock,
+}
+
+impl<'a> TaskArchiveIterator<'a> {
+ /// Create a new task archive iterator.
+ ///
+ /// `files` should be sorted with the most recent archive file *last*.
+ fn new(
+ journal: Option<PathBuf>,
+ files: Vec<ArchiveFile>,
+ lock: &'a TaskCacheLock,
+ ) -> Result<Self, Error> {
+ let inner = InnerTaskArchiveIterator::new(files)
+ .filter_map(|res| match res {
+ Ok(task) => Some(task),
+ Err(err) => {
+ log::error!("could not read task from archive file: {err:#}");
+ None
+ }
+ })
+ .peekable();
+
+ if let Some(journal) = journal {
+ let journal_reader = Box::new(BufReader::new(File::open(journal)?));
+ let journal_task_iterator = JournalIterator::new(journal_reader).peekable();
+ let merge_task_iter = MergeTaskIterator::new(journal_task_iterator, inner);
+
+ Ok(Self {
+ inner: Box::new(merge_task_iter),
+ _lock: lock,
+ })
+ } else {
+ Ok(Self {
+ inner: Box::new(inner),
+ _lock: lock,
+ })
+ }
+ }
+}
+
+impl Iterator for TaskArchiveIterator<'_> {
+ type Item = TaskCacheItem;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.inner.next()
+ }
+}
+
+struct InnerTaskArchiveIterator {
+ /// Archive files to read.
+ files: Vec<ArchiveFile>,
+ /// Archive iterator we are currently using, if any
+ current: Option<ArchiveIterator>,
+}
+
+impl InnerTaskArchiveIterator {
+ /// Create a new task archive iterator.
+ pub fn new(files: Vec<ArchiveFile>) -> Self {
+ Self {
+ files,
+ current: None,
+ }
+ }
+}
+
+impl Iterator for InnerTaskArchiveIterator {
+ type Item = Result<TaskCacheItem, Error>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ match &mut self.current {
+ Some(current) => {
+ let next = current.next();
+ if next.is_some() {
+ return next;
+ } else {
+ self.current = None;
+ }
+ }
+ None => 'inner: loop {
+ // Returns `None` if no more files are available, stopping iteration.
+ let next_file = self.files.pop()?;
+
+ match next_file.iter() {
+ Ok(iter) => {
+ self.current = Some(iter);
+ break 'inner;
+ }
+ Err(err) => {
+ log::error!("could not create archive iterator while iteration over task archive files, skipping: {err:#}")
+ }
+ }
+ },
+ }
+ }
+ }
+}
+
+/// Archive file.
+#[derive(Clone, Debug)]
+struct ArchiveFile {
+ /// The path to the archive file.
+ path: PathBuf,
+ /// This archive file is compressed using zstd.
+ compressed: bool,
+ /// The archive's lowest permitted starttime (seconds since UNIX epoch).
+ starttime: i64,
+}
+
+impl ArchiveFile {
+ /// Create an [`ArchiveIterator`] for this file.
+ fn iter(&self) -> Result<ArchiveIterator, Error> {
+ let fd = File::open(&self.path)
+ .with_context(|| format!("failed to open archive file {}", self.path.display()))?;
+
+ let iter = if self.compressed {
+ let reader = zstd::stream::read::Decoder::new(fd).with_context(|| {
+ format!(
+ "failed to create zstd decoder for archive file {}",
+ self.path.display()
+ )
+ })?;
+ ArchiveIterator::new(Box::new(BufReader::new(reader)))
+ } else {
+ ArchiveIterator::new(Box::new(BufReader::new(fd)))
+ };
+
+ Ok(iter)
+ }
+
+ fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
+ let uncompressed_file_path = &self.path;
+
+ let (temp_file, temp_file_path) =
+ proxmox_sys::fs::make_tmp_file(uncompressed_file_path, options)
+ .context("failed to create temporary file")?;
+
+ let uncompressed_file =
+ File::open(uncompressed_file_path).context("failed to open uncompressed file")?;
+
+ zstd::stream::copy_encode(
+ uncompressed_file,
+ temp_file,
+ zstd::DEFAULT_COMPRESSION_LEVEL,
+ )
+ .context("zstd::stream::copy_encode failed")?;
+
+ let mut new_path_for_compressed = uncompressed_file_path.clone();
+ new_path_for_compressed
+ .set_extension(format!("{}{ZSTD_EXTENSION_WITH_DOT}", self.starttime));
+
+ std::fs::rename(&temp_file_path, &new_path_for_compressed)
+ .context("failed to move compressed task achive file")?;
+ std::fs::remove_file(uncompressed_file_path)
+ .context("failed to remove uncompressed archive file")?;
+
+ self.path = new_path_for_compressed;
+ self.compressed = true;
+
+ Ok(())
+ }
+}
+
+/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
+/// from both iterators sorted.
+/// The two iterators are expected to be sorted descendingly based on the task's starttime and
+/// ascendingly based on the task's UPID's string representation. This can be
+/// achieved by using the [`compare_tasks_reverse`] function when sorting an array of tasks.
+struct MergeTaskIterator<T: Iterator<Item = TaskCacheItem>, U: Iterator<Item = TaskCacheItem>> {
+ left: Peekable<T>,
+ right: Peekable<U>,
+}
+
+impl<T, U> MergeTaskIterator<T, U>
+where
+ T: Iterator<Item = TaskCacheItem>,
+ U: Iterator<Item = TaskCacheItem>,
+{
+ /// Create a new merging iterator.
+ fn new(left: Peekable<T>, right: Peekable<U>) -> Self {
+ Self { left, right }
+ }
+}
+
+impl<T, U> Iterator for MergeTaskIterator<T, U>
+where
+ T: Iterator<Item = TaskCacheItem>,
+ U: Iterator<Item = TaskCacheItem>,
+{
+ type Item = T::Item;
+
+ fn next(&mut self) -> Option<T::Item> {
+ let order = match (self.left.peek(), self.right.peek()) {
+ (Some(l), Some(r)) => Some(compare_tasks(l, r)),
+ (Some(_), None) => Some(Ordering::Greater),
+ (None, Some(_)) => Some(Ordering::Less),
+ (None, None) => None,
+ };
+
+ match order {
+ Some(Ordering::Greater) => self.left.next(),
+ Some(Ordering::Less) => self.right.next(),
+ Some(Ordering::Equal) => {
+ // Dedup by consuming the other iterator as well
+ let _ = self.right.next();
+ self.left.next()
+ }
+ None => None,
+ }
+ }
+}
+
+/// Iterator for a single task archive file.
+///
+/// This iterator implements `Iterator<Item = Result<TaskCacheItem, Error>`. When iterating,
+/// tasks are read line by line, without leading the entire archive file into memory.
+struct ArchiveIterator {
+ iter: Lines<Box<dyn BufRead>>,
+}
+
+impl ArchiveIterator {
+ /// Create a new iterator.
+ pub fn new(reader: Box<dyn BufRead>) -> Self {
+ let lines = reader.lines();
+
+ Self { iter: lines }
+ }
+}
+
+impl Iterator for ArchiveIterator {
+ type Item = Result<TaskCacheItem, Error>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.iter.next().map(|result| {
+ result
+ .and_then(|line| Ok(serde_json::from_str(&line)?))
+ .map_err(Into::into)
+ })
+ }
+}
+
+/// Iterator for journal files. This iterator uses [`ArchiveIterator`] internally, but will eagerly
+/// load all tasks into memory to sort and deduplicate them.
+struct JournalIterator {
+ inner: Box<dyn Iterator<Item = TaskCacheItem>>,
+}
+
+impl JournalIterator {
+ fn new(file: Box<dyn BufRead>) -> Self {
+ let iter = ArchiveIterator::new(file);
+
+ let mut tasks: Vec<TaskCacheItem> = iter
+ .flat_map(|task| match task {
+ Ok(task) => Some(task),
+ Err(err) => {
+ log::error!("could not read task while iterating over archive file: {err:#}");
+ None
+ }
+ })
+ .collect();
+
+ tasks.sort_by(compare_tasks_reverse);
+ tasks.dedup();
+
+ Self {
+ inner: Box::new(tasks.into_iter()),
+ }
+ }
+}
+
+impl Iterator for JournalIterator {
+ type Item = TaskCacheItem;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.inner.next()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io::Cursor;
+
+ use crate::test_support::temp::NamedTempDir;
+
+ use super::*;
+
+ #[test]
+ fn archive_filename() {
+ let a = TaskCache::parse_archive_filename(&PathBuf::from("/tmp/archive.10000")).unwrap();
+
+ assert_eq!(a.path, PathBuf::from("/tmp/archive.10000"));
+ assert_eq!(a.starttime, 10000);
+ assert!(!a.compressed);
+
+ let a = TaskCache::parse_archive_filename(&PathBuf::from("/tmp/archive.1234.zst")).unwrap();
+
+ assert_eq!(a.path, PathBuf::from("/tmp/archive.1234.zst"));
+ assert_eq!(a.starttime, 1234);
+ assert!(a.compressed);
+ }
+
+ #[test]
+ fn archive_iterator() -> Result<(), Error> {
+ let file = r#"
+ {"upid":"pve-remote!UPID:pve:00039E4D:002638B8:67B4A9D1:stopall::root@pam:","status":"OK","endtime":12345, "starttime": 1234}
+ {"upid":"pbs-remote!UPID:pbs:000002B2:00000158:00000000:674D828C:logrotate::root@pam:","status":"OK","endtime":12345, "starttime": 1234}
+ invalid"#
+ .trim_start();
+
+ let cursor = Box::new(Cursor::new(file.as_bytes()));
+ let mut iter = ArchiveIterator::new(cursor);
+
+ assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pve-remote");
+ assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pbs-remote");
+ assert!(iter.next().unwrap().is_err());
+ assert!(iter.next().is_none());
+
+ Ok(())
+ }
+
+ fn task(starttime: i64, ended: bool) -> TaskCacheItem {
+ let (status, endtime) = if ended {
+ (Some("OK".into()), Some(starttime + 10))
+ } else {
+ (None, None)
+ };
+
+ TaskCacheItem {
+ upid: format!(
+ "pve-remote!UPID:pve:00039E4D:002638B8:{starttime:08X}:stopall::root@pam:"
+ )
+ .parse()
+ .unwrap(),
+ starttime,
+ status,
+ endtime,
+ }
+ }
+
+ fn assert_starttimes(cache: &WritableTaskCache, starttimes: &[i64]) {
+ let tasks: Vec<i64> = cache
+ .get_tasks(GetTasks::All)
+ .unwrap()
+ .map(|task| task.starttime)
+ .collect();
+
+ assert_eq!(&tasks, starttimes);
+ }
+
+ fn add_tasks(cache: &WritableTaskCache, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
+ let mut node_map = NodeFetchSuccessMap::default();
+ node_map.set_node_success("pve-remote".to_string(), "pve".to_string());
+
+ cache.update(tasks, &node_map, HashSet::new())
+ }
+
+ const DEFAULT_MAX_SIZE: u64 = 10000;
+
+ #[test]
+ fn test_add_tasks() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+ let cache = TaskCache::new(
+ tmp_dir.path(),
+ CreateOptions::new(),
+ 3,
+ 1,
+ 0,
+ DEFAULT_MAX_SIZE,
+ )
+ .unwrap()
+ .write()?;
+
+ cache.new_file(1000, false)?;
+ assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 1);
+
+ add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
+
+ assert_eq!(
+ cache.read_state().cutoff_timestamp("pve-remote", "pve"),
+ Some(1001)
+ );
+
+ cache.rotate(1500)?;
+
+ assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 2);
+
+ add_tasks(&cache, vec![task(1500, true), task(1501, true)])?;
+ add_tasks(&cache, vec![task(1200, true), task(1300, true)])?;
+
+ assert_eq!(
+ cache.read_state().cutoff_timestamp("pve-remote", "pve"),
+ Some(1501),
+ );
+
+ cache.rotate(2000)?;
+ assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 3);
+
+ add_tasks(&cache, vec![task(2000, true)])?;
+ add_tasks(&cache, vec![task(1502, true)])?;
+ add_tasks(&cache, vec![task(1002, true)])?;
+
+ // These are before the cut-off of 1000, so they will be discarded.
+ // add_tasks(&cache, vec![task(800, true), task(900, true)])?;
+
+ // This one should be deduped
+ add_tasks(&cache, vec![task(1000, true)])?;
+
+ assert_starttimes(
+ &cache,
+ &[2000, 1502, 1501, 1500, 1300, 1200, 1002, 1001, 1000],
+ );
+
+ cache.rotate(2500)?;
+
+ assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 3);
+
+ assert_starttimes(&cache, &[2000, 1502, 1501, 1500]);
+
+ cache.rotate(3000)?;
+ assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 3);
+
+ assert_starttimes(&cache, &[2000]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_active_tasks_are_migrated_to_archive() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+ let cache = TaskCache::new(
+ tmp_dir.path(),
+ CreateOptions::new(),
+ 3,
+ 1,
+ 0,
+ DEFAULT_MAX_SIZE,
+ )
+ .unwrap()
+ .write()?;
+
+ cache.new_file(1000, false)?;
+ add_tasks(&cache, vec![task(1000, false), task(1001, false)])?;
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
+
+ add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
+
+ assert_starttimes(&cache, &[1001, 1000]);
+
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_init() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+ let cache = TaskCache::new(
+ tmp_dir.path(),
+ CreateOptions::new(),
+ 3,
+ 1,
+ 100,
+ DEFAULT_MAX_SIZE,
+ )
+ .unwrap()
+ .write()?;
+
+ cache.init(1000)?;
+ assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 3);
+
+ add_tasks(
+ &cache,
+ vec![task(1050, true), task(950, true), task(850, true)],
+ )?;
+
+ assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 3);
+
+ Ok(())
+ }
+
+ fn add_finished_tracked(cache: &WritableTaskCache, starttime: i64) -> Result<(), Error> {
+ let t = task(starttime, true);
+ let upid = t.upid.clone();
+
+ let mut node_map = NodeFetchSuccessMap::default();
+ node_map.set_node_success("pve-remote".to_string(), "pve".to_string());
+
+ cache.update(vec![t], &node_map, HashSet::from_iter([upid]))
+ }
+
+ #[test]
+ fn test_tracking_tasks() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+ let cache = TaskCache::new(
+ tmp_dir.path(),
+ CreateOptions::new(),
+ 3,
+ 1,
+ 100,
+ DEFAULT_MAX_SIZE,
+ )
+ .unwrap()
+ .write()?;
+
+ cache.init(1000)?;
+
+ cache.add_tracked_task(task(1050, false))?;
+
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
+ cache.add_tracked_task(task(1060, false))?;
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
+
+ assert_eq!(cache.read_state().tracked_tasks().count(), 2);
+
+ // Mark first task as finished
+ add_finished_tracked(&cache, 1050)?;
+
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
+ assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 1);
+ assert_eq!(cache.read_state().tracked_tasks().count(), 1);
+
+ // Mark second task as finished
+ add_finished_tracked(&cache, 1060)?;
+
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
+ assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 2);
+ assert_eq!(cache.read_state().tracked_tasks().count(), 0);
+
+ Ok(())
+ }
+
+ #[test]
+ fn journal_is_applied_if_max_size_exceeded() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+
+ // Should be *just* enough to fit a single task, which means that we apply the journal
+ // after adding a second one.
+ const ENOUGH_FOR_SINGLE_TASK: u64 = 200;
+
+ let cache = TaskCache::new(
+ tmp_dir.path(),
+ CreateOptions::new(),
+ 3,
+ 1,
+ 100,
+ ENOUGH_FOR_SINGLE_TASK,
+ )
+ .unwrap()
+ .write()?;
+
+ add_tasks(&cache, vec![task(1000, true)])?;
+ assert!(cache.journal_size()? > 0);
+
+ add_tasks(&cache, vec![task(1000, true)])?;
+
+ assert_eq!(cache.journal_size()?, 0);
+
+ Ok(())
+ }
+}
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v7 2/7] remote tasks: add background task for task polling, use new task cache
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 1/7] remote tasks: implement improved cache for remote tasks Lukas Wagner
@ 2025-08-20 12:43 ` Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 3/7] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
` (5 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-08-20 12:43 UTC (permalink / raw)
To: pdm-devel
This commits changes the remote task module as follows:
- Add a new background task for regular polling of task data
Instead of triggering fetching of task data from the `get_tasks` function,
which is usually called by an API handler, we move the fetching to a
new background task. The task fetches the latest tasks from all remotes
and stores them in the task cache in regular intervals (10 minutes).
The `get_tasks` function itself only reads from the cache.
The main rationale for this change is that for large setups, fetching
tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
each with a >100ms connection - adds up to minutes quickly).
If we do this from within `get_tasks`, the API handler calling the
function is also blocked for the entire time.
The `get_tasks` API is called every couple of seconds by the UI the get
a list of running remote tasks, so this *must* be quick.
- Tracked tasks are also polled in the same background task, but with
a short polling delay (10 seconds). If a tracked task finishes,
a out-of-order fetch of tasks for a remote is performed to update
the cache with all task data from the finished task.
- Only finished tasks are requested from the remotes. This avoids a
foreign (as in, not started by PDM) running task to appear stuck in
the running state until the next regular task cache refresh.
The tracked task polling could be extended to also poll running foreign
tasks, but this is easy addition for the future.
- Tasks are now stored in the new improved task cache implementation.
This should make retrieving tasks much quicker and avoids
unneeded disk IO.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Dominik Csapak <d.csapak@proxmox.com>
---
Notes:
Changes since v5:
- Incorporate review feedback from @Dominik (thx!)
- Change task tracking approach:
- Instead of using the oldest running task as a cutoff and
switching to a lower fetching interval if there is a tracked task,
we poll tracked tasks directly with a 10 second interval.
Once a tracked task finishes, we do a regular task fetch once
to get full task data (endtime, status).
This is a nicer approach for long running tasks, since we do
not repeatedly request the same tasks over and over again.
- Use proxmox_product_config to get CreateOptions where
it makes sense.
- Use timestamps instead of cycle counts to keep track
of when we want to rotate the task archive or do a full
task fetch
- Be more clever about how we request the semaphores. Instead
of requesting all semaphores that we could potentially to poll
multiple nodes of a remote in parallel, request them
on demand.
- Keep track of per-node failures while fetching tasks and
feed this information to the cache implementation
so that it can maintain the per-node cutoff timestamp.
- Make documentation of public constants a bit easier
to understand.
Changes since v4:
- Rebase onto latest master, adapting to changes in
the section config type
Changes since v2:
- Adapt to new locking approach (only drops a `mut`)
Changes since v1:
- use const Duration instead of u64s for durations, using
Duration::as_secs() where needed
- Move the remote_task fetching task functions to
src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
- remote_tasks::get_tasks: wrap function body in a
tokio::task::spawn_blocking. using the TaskCache::get_tasks
iterator does disk IO and could block the executor
- Added some doc strings to make the purpose/workings of
some functions clearer
- Couple of variables have been renamed for more clarity
server/src/api/pve/lxc.rs | 10 +-
server/src/api/pve/mod.rs | 4 +-
server/src/api/pve/qemu.rs | 6 +-
server/src/api/remote_tasks.rs | 11 +-
server/src/bin/proxmox-datacenter-api/main.rs | 1 +
.../bin/proxmox-datacenter-api/tasks/mod.rs | 1 +
.../tasks/remote_tasks.rs | 559 ++++++++++++++++
server/src/remote_tasks/mod.rs | 625 ++++--------------
8 files changed, 706 insertions(+), 511 deletions(-)
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
index f1c31425..83f9f4aa 100644
--- a/server/src/api/pve/lxc.rs
+++ b/server/src/api/pve/lxc.rs
@@ -209,7 +209,7 @@ pub async fn lxc_start(
let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -242,7 +242,7 @@ pub async fn lxc_stop(
let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
.shutdown_lxc_async(&node, vmid, Default::default())
.await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -357,7 +357,7 @@ pub async fn lxc_migrate(
};
let upid = pve.migrate_lxc(&node, vmid, params).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
log::info!("migrating vm {vmid} of node {node:?}");
let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
- new_remote_upid(source, upid)
+ new_remote_upid(source, upid).await
}
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index dd7cf382..d472cf58 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -76,9 +76,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
// converts a remote + PveUpid into a RemoteUpid and starts tracking it
-fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
+async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
- remote_tasks::track_running_task(remote_upid.clone());
+ remote_tasks::track_running_task(remote_upid.clone()).await?;
Ok(remote_upid)
}
diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
index 5a41a69e..54ede112 100644
--- a/server/src/api/pve/qemu.rs
+++ b/server/src/api/pve/qemu.rs
@@ -216,7 +216,7 @@ pub async fn qemu_start(
.start_qemu_async(&node, vmid, Default::default())
.await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -377,7 +377,7 @@ pub async fn qemu_migrate(
};
let upid = pve.migrate_qemu(&node, vmid, params).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -564,5 +564,5 @@ pub async fn qemu_remote_migrate(
log::info!("migrating vm {vmid} of node {node:?}");
let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
- new_remote_upid(source, upid)
+ new_remote_upid(source, upid).await
}
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index e629000c..05ce3666 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
},
input: {
properties: {
- "max-age": {
- type: Integer,
- optional: true,
- // TODO: sensible default max-age
- default: 300,
- description: "Maximum age of cached task data",
- },
filters: {
type: TaskFilters,
flatten: true,
@@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
},
)]
/// Get the list of tasks for all remotes.
-async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
- let tasks = remote_tasks::get_tasks(max_age, filters).await?;
+async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+ let tasks = remote_tasks::get_tasks(filters).await?;
Ok(tasks)
}
diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
index db6b2585..42bc0e1e 100644
--- a/server/src/bin/proxmox-datacenter-api/main.rs
+++ b/server/src/bin/proxmox-datacenter-api/main.rs
@@ -376,6 +376,7 @@ async fn run(debug: bool) -> Result<(), Error> {
metric_collection::start_task();
tasks::remote_node_mapping::start_task();
resource_cache::start_task();
+ tasks::remote_tasks::start_task()?;
server.await?;
log::info!("server shutting down, waiting for active workers to complete");
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
index e6ead882..a6b1f439 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
@@ -1 +1,2 @@
pub mod remote_node_mapping;
+pub mod remote_tasks;
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
new file mode 100644
index 00000000..4701a935
--- /dev/null
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -0,0 +1,559 @@
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+ time::{Duration, Instant},
+};
+
+use anyhow::{format_err, Error};
+use nix::sys::stat::Mode;
+use tokio::{sync::Semaphore, task::JoinSet};
+
+use pdm_api_types::{
+ remotes::{Remote, RemoteType},
+ RemoteUpid,
+};
+use proxmox_section_config::typed::SectionConfigData;
+use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
+
+use server::{
+ api::pve,
+ remote_tasks::{
+ self,
+ task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
+ KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER,
+ },
+ task_utils,
+};
+
+/// Tick interval for the remote task fetching task.
+/// This is also the rate at which we check on tracked tasks.
+const POLL_INTERVAL: Duration = Duration::from_secs(10);
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600);
+
+/// Interval at which to check for task cache rotation.
+const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
+
+/// Interval at which the task cache journal should be applied.
+///
+/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes.
+/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing
+/// it will be slower than the task archive itself, as the entire journal must be loaded into
+/// memory and then sorted by task starttime. Applying the journal more often might
+/// lead to more writes, but should yield better performance.
+const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600);
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+
+/// Maximum number of total concurrent connections.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
+/// (Ephemeral) Remote task fetching task state.
+struct TaskState {
+ /// Time at which we last checked for archive rotation.
+ last_rotate_check: Instant,
+ /// Time at which we fetch tasks the last time.
+ last_fetch: Instant,
+ /// Time at which we last applied the journal.
+ last_journal_apply: Instant,
+}
+
+impl TaskState {
+ fn new() -> Self {
+ let now = Instant::now();
+
+ Self {
+ last_rotate_check: now - CHECK_ROTATE_INTERVAL,
+ last_fetch: now - TASK_FETCH_INTERVAL,
+ last_journal_apply: now - APPLY_JOURNAL_INTERVAL,
+ }
+ }
+
+ /// Reset the task archive rotation timestamp.
+ fn reset_rotate_check(&mut self) {
+ self.last_rotate_check = Instant::now();
+ }
+
+ /// Reset the task fetch timestamp.
+ fn reset_fetch(&mut self) {
+ self.last_fetch = Instant::now();
+ }
+
+ /// Reset the journal apply timestamp.
+ fn reset_journal_apply(&mut self) {
+ self.last_journal_apply = Instant::now();
+ }
+
+ /// Should we check for archive rotation?
+ fn is_due_for_rotate_check(&self) -> bool {
+ Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL
+ }
+
+ /// Should we fetch tasks?
+ fn is_due_for_fetch(&self) -> bool {
+ Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL
+ }
+
+ /// Should we apply the task archive's journal?
+ fn is_due_for_journal_apply(&self) -> bool {
+ Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL
+ }
+}
+
+/// Start the remote task fetching task
+pub fn start_task() -> Result<(), Error> {
+ let dir_options =
+ proxmox_product_config::default_create_options().perm(Mode::from_bits_truncate(0o0750));
+
+ proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(dir_options))?;
+
+ tokio::spawn(async move {
+ let task_scheduler = std::pin::pin!(remote_task_fetching_task());
+ let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(task_scheduler, abort_future).await;
+ });
+
+ Ok(())
+}
+
+/// Task which handles fetching remote tasks and task archive rotation.
+/// This function never returns.
+async fn remote_task_fetching_task() -> ! {
+ let mut task_state = TaskState::new();
+
+ let mut interval = tokio::time::interval(POLL_INTERVAL);
+ interval.reset_at(task_utils::next_aligned_instant(POLL_INTERVAL.as_secs()).into());
+
+ // We don't really care about catching up to missed tick, we just want
+ // a steady tick rate.
+ interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
+ if let Err(err) = init_cache().await {
+ log::error!("error when initialized task cache: {err:#}");
+ }
+
+ loop {
+ interval.tick().await;
+ if let Err(err) = do_tick(&mut task_state).await {
+ log::error!("error when fetching remote tasks: {err:#}");
+ }
+ }
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
+ let cache = remote_tasks::get_cache()?;
+
+ if task_state.is_due_for_rotate_check() {
+ log::debug!("checking if remote task archive should be rotated");
+ if rotate_cache(cache.clone()).await? {
+ log::info!("rotated remote task archive");
+ }
+
+ task_state.reset_rotate_check();
+ }
+
+ if task_state.is_due_for_journal_apply() {
+ apply_journal(cache.clone()).await?;
+ task_state.reset_journal_apply();
+ }
+
+ let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+ let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+
+ let cache_state = cache.read_state();
+ let poll_results = poll_tracked_tasks(
+ &remote_config,
+ cache_state.tracked_tasks(),
+ Arc::clone(&total_connections_semaphore),
+ )
+ .await?;
+
+ // Get a list of remotes that we should poll in this cycle.
+ let remotes = if task_state.is_due_for_fetch() {
+ task_state.reset_fetch();
+ get_all_remotes(&remote_config)
+ } else {
+ get_remotes_with_finished_tasks(&remote_config, &poll_results)
+ };
+
+ let (all_tasks, update_state_for_remote) = fetch_remotes(
+ remotes,
+ Arc::new(cache_state),
+ Arc::clone(&total_connections_semaphore),
+ )
+ .await;
+
+ if !all_tasks.is_empty() {
+ update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
+ }
+
+ Ok(())
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+async fn init_cache() -> Result<(), Error> {
+ tokio::task::spawn_blocking(|| {
+ let cache = remote_tasks::get_cache()?;
+ cache.write()?.init(proxmox_time::epoch_i64())?;
+ Ok(())
+ })
+ .await?
+}
+
+/// Fetch tasks from a list of remotes.
+///
+/// Returns a list of tasks and a map that shows whether we want to update the
+/// cutoff timestamp in the statefile. We don't want to update the cutoff if
+/// the connection to one remote failed or if we could not reach all remotes in a cluster.
+async fn fetch_remotes(
+ remotes: Vec<Remote>,
+ cache_state: Arc<State>,
+ total_connections_semaphore: Arc<Semaphore>,
+) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
+ let mut join_set = JoinSet::new();
+
+ for remote in remotes {
+ let semaphore = Arc::clone(&total_connections_semaphore);
+ let state_clone = Arc::clone(&cache_state);
+
+ join_set.spawn(async move {
+ log::debug!("fetching remote tasks for '{}'", remote.id);
+ fetch_tasks(&remote, state_clone, semaphore)
+ .await
+ .map_err(|err| {
+ format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
+ })
+ });
+ }
+
+ let mut all_tasks = Vec::new();
+ let mut update_state_for_remote = NodeFetchSuccessMap::default();
+
+ while let Some(res) = join_set.join_next().await {
+ match res {
+ Ok(Ok(FetchedTasks {
+ tasks,
+ node_results,
+ })) => {
+ all_tasks.extend(tasks);
+ update_state_for_remote.merge(node_results);
+ }
+ Ok(Err(err)) => log::error!("{err:#}"),
+ Err(err) => log::error!("could not join task fetching future: {err:#}"),
+ }
+ }
+
+ (all_tasks, update_state_for_remote)
+}
+
+/// Return all remotes from the given config.
+fn get_all_remotes(remote_config: &SectionConfigData<Remote>) -> Vec<Remote> {
+ remote_config
+ .into_iter()
+ .map(|(_, section)| section)
+ .cloned()
+ .collect()
+}
+
+/// Return all remotes that correspond to a list of finished tasks.
+fn get_remotes_with_finished_tasks(
+ remote_config: &SectionConfigData<Remote>,
+ poll_results: &HashMap<RemoteUpid, PollResult>,
+) -> Vec<Remote> {
+ let remotes_with_finished_tasks: HashSet<&str> = poll_results
+ .iter()
+ .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote()))
+ .collect();
+
+ remote_config
+ .into_iter()
+ .filter_map(|(name, remote)| {
+ remotes_with_finished_tasks
+ .contains(&name)
+ .then_some(remote)
+ })
+ .cloned()
+ .collect()
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+ tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await?
+}
+
+/// Apply the task cache journal.
+async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
+}
+
+/// Fetched tasks from a single remote.
+struct FetchedTasks {
+ /// List of tasks.
+ tasks: Vec<TaskCacheItem>,
+ /// Contains whether a cluster node was fetched successfully.
+ node_results: NodeFetchSuccessMap,
+}
+
+/// Fetch tasks (active and finished) from a remote.
+async fn fetch_tasks(
+ remote: &Remote,
+ state: Arc<State>,
+ total_connections_semaphore: Arc<Semaphore>,
+) -> Result<FetchedTasks, Error> {
+ let mut tasks = Vec::new();
+
+ let mut node_results = NodeFetchSuccessMap::default();
+
+ match remote.ty {
+ RemoteType::Pve => {
+ let client = pve::connect(remote)?;
+
+ let nodes = {
+ // This permit *must* be dropped before we acquire the permits for the
+ // per-node connections - otherwise we risk a deadlock.
+ let _permit = total_connections_semaphore.acquire().await.unwrap();
+ client.list_nodes().await?
+ };
+
+ // This second semaphore is used to limit the number of concurrent connections
+ // *per remote*, not in total.
+ let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
+ let mut join_set = JoinSet::new();
+
+ for node in nodes {
+ let node_name = node.node.to_string();
+
+ let since = state
+ .cutoff_timestamp(&remote.id, &node_name)
+ .unwrap_or_else(|| {
+ proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
+ });
+
+ let params = ListTasks {
+ source: Some(ListTasksSource::Archive),
+ since: Some(since),
+ // If `limit` is not provided, we only receive 50 tasks
+ limit: Some(MAX_TASKS_TO_FETCH),
+ ..Default::default()
+ };
+
+ let per_remote_permit = Arc::clone(&per_remote_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap();
+
+ let total_connections_permit = Arc::clone(&total_connections_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap();
+
+ let remote_clone = remote.clone();
+
+ join_set.spawn(async move {
+ let res = async {
+ let client = pve::connect(&remote_clone)?;
+ let task_list =
+ client
+ .get_task_list(&node.node, params)
+ .await
+ .map_err(|err| {
+ format_err!(
+ "remote '{}', node '{}': {err}",
+ remote_clone.id,
+ node.node
+ )
+ })?;
+ Ok::<Vec<_>, Error>(task_list)
+ }
+ .await;
+
+ drop(total_connections_permit);
+ drop(per_remote_permit);
+
+ (node_name, res)
+ });
+ }
+
+ while let Some(result) = join_set.join_next().await {
+ match result {
+ Ok((node_name, result)) => match result {
+ Ok(task_list) => {
+ let mapped =
+ task_list.into_iter().filter_map(|task| {
+ match map_pve_task(task, &remote.id) {
+ Ok(task) => Some(task),
+ Err(err) => {
+ log::error!(
+ "could not map task data, skipping: {err:#}"
+ );
+ None
+ }
+ }
+ });
+
+ tasks.extend(mapped);
+ node_results.set_node_success(remote.id.clone(), node_name);
+ }
+ Err(error) => {
+ log::error!("could not fetch tasks: {error:#}");
+ node_results.set_node_failure(remote.id.clone(), node_name);
+ }
+ },
+ Err(err) => return Err(err.into()),
+ }
+ }
+ }
+ RemoteType::Pbs => {
+ // TODO: Add code for PBS
+ }
+ }
+
+ Ok(FetchedTasks {
+ tasks,
+ node_results,
+ })
+}
+
+#[derive(PartialEq, Debug)]
+/// Outcome from polling a tracked task.
+enum PollResult {
+ /// Tasks is still running.
+ Running,
+ /// Task is finished, poll remote tasks to get final status/endtime.
+ Finished,
+ /// Should be dropped from the active file.
+ RequestError,
+ /// Remote does not exist any more -> remove immediately from tracked task list.
+ RemoteGone,
+}
+
+/// Poll all tracked tasks.
+async fn poll_tracked_tasks(
+ remote_config: &SectionConfigData<Remote>,
+ tracked_tasks: impl Iterator<Item = &RemoteUpid>,
+ total_connections_semaphore: Arc<Semaphore>,
+) -> Result<HashMap<RemoteUpid, PollResult>, Error> {
+ let mut join_set = JoinSet::new();
+
+ for task in tracked_tasks.cloned() {
+ let permit = Arc::clone(&total_connections_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap();
+
+ let remote = remote_config.get(task.remote()).cloned();
+
+ join_set.spawn(async move {
+ // Move permit into this async block.
+ let _permit = permit;
+
+ match remote {
+ Some(remote) => poll_single_tracked_task(remote, task).await,
+ None => {
+ log::info!(
+ "remote {} does not exist any more, dropping tracked task",
+ task.remote()
+ );
+ (task, PollResult::RemoteGone)
+ }
+ }
+ });
+ }
+
+ let mut results = HashMap::new();
+ while let Some(task_result) = join_set.join_next().await {
+ let (upid, result) = task_result?;
+ results.insert(upid, result);
+ }
+
+ Ok(results)
+}
+
+/// Poll a single tracked task.
+async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) {
+ match remote.ty {
+ RemoteType::Pve => {
+ log::debug!("polling tracked task {}", task);
+
+ let status = match server::api::pve::tasks::get_task_status(
+ remote.id.clone(),
+ task.clone(),
+ false,
+ )
+ .await
+ {
+ Ok(status) => status,
+ Err(err) => {
+ log::error!("could not get status from remote: {err:#}");
+ return (task, PollResult::RequestError);
+ }
+ };
+
+ let result = if status.exitstatus.is_some() {
+ PollResult::Finished
+ } else {
+ PollResult::Running
+ };
+
+ (task, result)
+ }
+ RemoteType::Pbs => {
+ // TODO: Implement for PBS
+ (task, PollResult::RequestError)
+ }
+ }
+}
+
+/// Map a `ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
+ let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
+
+ Ok(TaskCacheItem {
+ upid: remote_upid,
+ starttime: task.starttime,
+ endtime: task.endtime,
+ status: task.status,
+ })
+}
+
+/// Update task cache with results from tracked task polling & regular task fetching.
+async fn update_task_cache(
+ cache: TaskCache,
+ new_tasks: Vec<TaskCacheItem>,
+ update_state_for_remote: NodeFetchSuccessMap,
+ poll_results: HashMap<RemoteUpid, PollResult>,
+) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || {
+ let drop_tracked = poll_results
+ .into_iter()
+ .filter_map(|(upid, result)| match result {
+ PollResult::Running => None,
+ PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => {
+ Some(upid)
+ }
+ })
+ .collect();
+
+ cache
+ .write()?
+ .update(new_tasks, &update_state_for_remote, drop_tracked)?;
+
+ Ok(())
+ })
+ .await?
+}
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 7c8e31ef..cec2cc1e 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,515 +1,156 @@
-use std::{
- collections::{HashMap, HashSet},
- fs::File,
- path::{Path, PathBuf},
- sync::{LazyLock, RwLock},
- time::Duration,
-};
+use std::path::Path;
use anyhow::Error;
-use pdm_api_types::{
- remotes::{Remote, RemoteType},
- RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
-};
-use proxmox_sys::fs::CreateOptions;
-use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
-use tokio::task::JoinHandle;
-use crate::{api::pve, task_utils};
+use pdm_api_types::{RemoteUpid, TaskFilters, TaskListItem, TaskStateType};
+use pve_api_types::PveUpid;
-mod task_cache;
+pub mod task_cache;
+
+use task_cache::{GetTasks, TaskCache, TaskCacheItem};
+
+/// Base directory for the remote task cache.
+pub const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
+
+/// Maximum size at which the journal will applied early when adding new tasks.
+const JOURNAL_MAX_SIZE: u64 = 5 * 1024 * 1024;
+
+/// Rotate once the most recent archive file is at least 24 hour old.
+pub const ROTATE_AFTER: u64 = 24 * 3600;
+
+/// Keep 7 days worth of tasks.
+pub const KEEP_OLD_FILES: u32 = 7;
+
+/// Number of uncompressed archive files. These will be be the most recent ones.
+const NUMBER_OF_UNCOMPRESSED_FILES: u32 = 2;
/// Get tasks for all remotes
// FIXME: filter for privileges
-pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
- let (remotes, _) = pdm_config::remotes::config()?;
+pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+ tokio::task::spawn_blocking(move || {
+ let cache = get_cache()?.read()?;
- let mut all_tasks = Vec::new();
-
- let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let mut cache = TaskCache::new(cache_path)?;
-
- // Force a refresh for all tasks of a remote if a task is finished.
- // Not super nice, but saves us from persisting finished tasks. Also,
- // the /nodes/<node>/tasks/<upid>/status endpoint does not return
- // a task's endtime, which is only returned by
- // /nodes/<node>/tasks...
- // Room for improvements in the future.
- invalidate_cache_for_finished_tasks(&mut cache);
-
- for (remote_name, remote) in remotes.iter() {
- let now = proxmox_time::epoch_i64();
-
- if let Some(tasks) = cache.get_tasks(remote_name, now, max_age) {
- // Data in cache is recent enough and has not been invalidated.
- all_tasks.extend(tasks);
+ let which = if filters.running {
+ GetTasks::Active
} else {
- let tasks = match fetch_tasks(remote).await {
- Ok(tasks) => tasks,
- Err(err) => {
- // ignore errors for not reachable remotes
- continue;
+ GetTasks::All
+ };
+
+ let returned_tasks = cache
+ .get_tasks(which)?
+ .skip(filters.start as usize)
+ .take(filters.limit as usize)
+ .filter_map(|task| {
+ // TODO: Handle PBS tasks
+ let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
+ match pve_upid {
+ Ok(pve_upid) => Some(TaskListItem {
+ upid: task.upid.to_string(),
+ node: pve_upid.node,
+ pid: pve_upid.pid as i64,
+ pstart: pve_upid.pstart,
+ starttime: pve_upid.starttime,
+ worker_type: pve_upid.worker_type,
+ worker_id: None,
+ user: pve_upid.auth_id,
+ endtime: task.endtime,
+ status: task.status,
+ }),
+ Err(err) => {
+ log::error!("could not parse UPID: {err:#}");
+ None
+ }
}
- };
- cache.set_tasks(remote_name, tasks.clone(), now);
-
- all_tasks.extend(tasks);
- }
- }
-
- let mut returned_tasks = add_running_tasks(all_tasks)?;
- returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
- let returned_tasks = returned_tasks
- .into_iter()
- .filter(|item| {
- if filters.running && item.endtime.is_some() {
- return false;
- }
-
- if let Some(until) = filters.until {
- if item.starttime > until {
+ })
+ .filter(|item| {
+ if filters.running && item.endtime.is_some() {
return false;
}
- }
- if let Some(since) = filters.since {
- if item.starttime < since {
- return false;
- }
- }
-
- if let Some(needle) = &filters.userfilter {
- if !item.user.contains(needle) {
- return false;
- }
- }
-
- if let Some(typefilter) = &filters.typefilter {
- if !item.worker_type.contains(typefilter) {
- return false;
- }
- }
-
- let state = item.status.as_ref().map(|status| tasktype(status));
-
- match (state, &filters.statusfilter) {
- (Some(TaskStateType::OK), _) if filters.errors => return false,
- (Some(state), Some(filters)) => {
- if !filters.contains(&state) {
+ if let Some(until) = filters.until {
+ if item.starttime > until {
return false;
}
}
- (None, Some(_)) => return false,
- _ => {}
- }
- true
- })
- .skip(filters.start as usize)
- .take(filters.limit as usize)
- .collect();
-
- // We don't need to wait for this task to finish
- tokio::task::spawn_blocking(move || {
- if let Err(e) = cache.save() {
- log::error!("could not save task cache: {e}");
- }
- });
-
- Ok(returned_tasks)
-}
-
-/// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
- let mut tasks = Vec::new();
-
- match remote.ty {
- RemoteType::Pve => {
- let client = pve::connect(remote)?;
-
- // N+1 requests - we could use /cluster/tasks, but that one
- // only gives a limited task history
- for node in client.list_nodes().await? {
- let params = ListTasks {
- // Include running tasks
- source: Some(ListTasksSource::All),
- // TODO: How much task history do we want? Right now we just hard-code it
- // to 7 days.
- since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
- ..Default::default()
- };
-
- let list = client.get_task_list(&node.node, params).await?;
- let mapped = map_tasks(list, &remote.id)?;
-
- tasks.extend(mapped);
- }
- }
- RemoteType::Pbs => {
- // TODO: Add code for PBS
- }
- }
-
- Ok(tasks)
-}
-
-/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
-fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
- let mut mapped = Vec::new();
-
- for task in tasks {
- let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
-
- mapped.push(TaskListItem {
- upid: remote_upid.to_string(),
- node: task.node,
- pid: task.pid,
- pstart: task.pstart as u64,
- starttime: task.starttime,
- worker_type: task.ty,
- worker_id: Some(task.id),
- user: task.user,
- endtime: task.endtime,
- status: task.status,
- })
- }
-
- Ok(mapped)
-}
-
-/// Drops the cached task list of a remote for all finished tasks.
-///
-/// We use this to force a refresh so that we get the full task
-/// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
- let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
-
- // If a task is finished, we force a refresh for the remote - otherwise
- // we don't get the 'endtime' for the task.
- for task in finished.drain() {
- cache.invalidate_cache_for_remote(task.remote());
- }
-}
-
-/// Supplement the list of tasks that we received from the remote with
-/// the tasks that were started by PDM and are currently running.
-fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
- let mut returned_tasks = Vec::new();
-
- let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
- for task in cached_tasks {
- let remote_upid = task.upid.parse()?;
-
- if running_tasks.contains(&remote_upid) {
- if task.endtime.is_some() {
- // Task is finished but we still think it is running ->
- // Drop it from RUNNING_FOREIGN_TASKS
- running_tasks.remove(&remote_upid);
-
- // No need to put it in FINISHED_TASKS, since we already
- // got its state recently enough (we know the status and endtime)
- }
- } else {
- returned_tasks.push(task);
- }
- }
-
- for task in running_tasks.iter() {
- let upid: PveUpid = task.upid.parse()?;
- returned_tasks.push(TaskListItem {
- upid: task.to_string(),
- node: upid.node,
- pid: upid.pid as i64,
- pstart: upid.pstart,
- starttime: upid.starttime,
- worker_type: upid.worker_type,
- worker_id: upid.worker_id,
- user: upid.auth_id,
- endtime: None,
- status: None,
- });
- }
-
- Ok(returned_tasks)
-}
-
-/// A cache for fetched remote tasks.
-struct TaskCache {
- /// Cache entries
- content: TaskCacheContent,
-
- /// Entries that were added or updated - these will be persistet
- /// when `save` is called.
- new_or_updated: TaskCacheContent,
-
- /// Cache entries were changed/removed.
- dirty: bool,
-
- /// File-location at which the cached tasks are stored.
- cachefile_path: PathBuf,
-}
-
-impl TaskCache {
- /// Create a new tasks cache instance by loading
- /// the cache from disk.
- fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
- Ok(Self {
- content: Self::load_content()?,
- new_or_updated: Default::default(),
- dirty: false,
- cachefile_path,
- })
- }
-
- /// Load the task cache contents from disk.
- fn load_content() -> Result<TaskCacheContent, Error> {
- let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
-
- let content = if let Some(content) = content {
- serde_json::from_str(&content)?
- } else {
- Default::default()
- };
-
- Ok(content)
- }
-
- /// Get path for the cache's lockfile.
- fn lockfile_path(&self) -> PathBuf {
- let mut path = self.cachefile_path.clone();
- path.set_extension("lock");
- path
- }
-
- /// Persist the task cache
- ///
- /// This method requests an exclusive lock for the task cache lockfile.
- fn save(&mut self) -> Result<(), Error> {
- // if we have not updated anything, we don't have to update the cache file
- if !self.dirty {
- return Ok(());
- }
-
- let _guard = self.lock(Duration::from_secs(5))?;
-
- // Read content again, in case somebody has changed it in the meanwhile
- let mut content = Self::load_content()?;
-
- for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
- if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
- // Only update entry if nobody else has updated it in the meanwhile
- if existing_entry.timestamp < entry.timestamp {
- *existing_entry = entry;
- }
- } else {
- content.remote_tasks.insert(remote_name, entry);
- }
- }
-
- let bytes = serde_json::to_vec_pretty(&content)?;
-
- let api_uid = pdm_config::api_user()?.uid;
- let api_gid = pdm_config::api_group()?.gid;
-
- let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
- proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
-
- self.dirty = false;
-
- Ok(())
- }
-
- // Update task data for a given remote.
- fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
- self.dirty = true;
- self.new_or_updated
- .remote_tasks
- .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
- }
-
- // Get task data for a given remote.
- fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
- if let Some(entry) = self.content.remote_tasks.get(remote) {
- if (entry.timestamp + max_age) < now {
- return None;
- }
-
- Some(entry.tasks.clone())
- } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
- if (entry.timestamp + max_age) < now {
- return None;
- }
- Some(entry.tasks.clone())
- } else {
- None
- }
- }
-
- // Invalidate cache for a given remote.
- fn invalidate_cache_for_remote(&mut self, remote: &str) {
- self.dirty = true;
- self.content.remote_tasks.remove(remote);
- }
-
- // Lock the cache for modification.
- //
- // While the cache is locked, other users can still read the cache
- // without a lock, since the cache file is replaced atomically
- // when updating.
- fn lock(&self, duration: Duration) -> Result<File, Error> {
- let api_uid = pdm_config::api_user()?.uid;
- let api_gid = pdm_config::api_group()?.gid;
-
- let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
- proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
- }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
- timestamp: i64,
- tasks: Vec<TaskListItem>,
-}
-
-#[derive(Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
- remote_tasks: HashMap<String, TaskCacheEntry>,
-}
-
-/// Interval at which tracked tasks are polled
-const RUNNING_CHECK_INTERVAL_S: u64 = 10;
-
-/// Tasks which were started by PDM and are still running
-static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-/// Tasks which were started by PDM and w
-static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-
-fn init() -> RwLock<HashSet<RemoteUpid>> {
- RwLock::new(HashSet::new())
-}
-
-/// Insert a remote UPID into the running list
-///
-/// If it is the first entry in the list, a background task is started to track its state
-///
-/// Returns the [`JoinHandle`] if a task was started.
-///
-/// panics on a poisoned mutex
-pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
- let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
-
- // the call inserting the first task in the list needs to start the checking task
- let need_start_task = tasks.is_empty();
- tasks.insert(task);
-
- if !need_start_task {
- return None;
- }
- drop(tasks);
-
- Some(tokio::spawn(async move {
- loop {
- let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
- let finished_tasks = get_finished_tasks().await;
-
- // skip iteration if we still have tasks, just not finished ones
- if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
- continue;
- }
-
- let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
- // we either have finished tasks, or the running task list was empty
- let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
-
- for (upid, _status) in finished_tasks {
- if set.remove(&upid) {
- finished.insert(upid);
- } else {
- // someone else removed & persisted the task in the meantime
- }
- }
-
- // if no task remains, end the current task
- // it will be restarted by the next caller that inserts one
- if set.is_empty() {
- return;
- }
- }
- }))
-}
-
-/// Get a list of running foreign tasks
-///
-/// panics on a poisoned mutex
-pub fn get_running_tasks() -> Vec<RemoteUpid> {
- RUNNING_FOREIGN_TASKS
- .read()
- .unwrap()
- .iter()
- .cloned()
- .collect()
-}
-
-/// Checks all current saved UPIDs if they're still running, and if not,
-/// returns their upids + status
-///
-/// panics on a poisoned mutex
-pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
- let mut finished = Vec::new();
- let config = match pdm_config::remotes::config() {
- Ok((config, _)) => config,
- Err(err) => {
- log::error!("could not open remotes config: {err}");
- return Vec::new();
- }
- };
- for task in get_running_tasks() {
- match config.get(task.remote()) {
- Some(remote) => match remote.ty {
- RemoteType::Pve => {
- let status = match crate::api::pve::tasks::get_task_status(
- remote.id.clone(),
- task.clone(),
- false,
- )
- .await
- {
- Ok(status) => status,
- Err(err) => {
- log::error!("could not get status from remote: {err}");
- finished.push((task, "could not get status".to_string()));
- continue;
- }
- };
- if let Some(status) = status.exitstatus {
- finished.push((task, status.to_string()));
+ if let Some(since) = filters.since {
+ if item.starttime < since {
+ return false;
}
}
- RemoteType::Pbs => {
- let _client = match crate::pbs_client::connect(remote) {
- Ok(client) => client,
- Err(err) => {
- log::error!("could not get status from remote: {err}");
- finished.push((task, "could not get status".to_string()));
- continue;
- }
- };
- // FIXME implement get task status
- finished.push((task, "unknown state".to_string()));
- }
- },
- None => finished.push((task, "unknown remote".to_string())),
- }
- }
- finished
+ if let Some(needle) = &filters.userfilter {
+ if !item.user.contains(needle) {
+ return false;
+ }
+ }
+
+ if let Some(typefilter) = &filters.typefilter {
+ if !item.worker_type.contains(typefilter) {
+ return false;
+ }
+ }
+
+ let state = item.status.as_ref().map(|status| tasktype(status));
+
+ match (state, &filters.statusfilter) {
+ (Some(TaskStateType::OK), _) if filters.errors => return false,
+ (Some(state), Some(filters)) => {
+ if !filters.contains(&state) {
+ return false;
+ }
+ }
+ (None, Some(_)) => return false,
+ _ => {}
+ }
+
+ true
+ })
+ .collect();
+
+ Ok(returned_tasks)
+ })
+ .await?
+}
+
+/// Insert a newly created tasks into the list of tracked tasks.
+///
+/// Any tracked task will be polled with a short interval until the task
+/// has finished.
+pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || {
+ let cache = get_cache()?.write()?;
+ // TODO:: Handle PBS tasks correctly.
+ let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
+ let task = TaskCacheItem {
+ upid: task.clone(),
+ starttime: pve_upid.starttime,
+ status: None,
+ endtime: None,
+ };
+ cache.add_tracked_task(task)
+ })
+ .await?
+}
+
+/// Get a new [`TaskCache`] instance.
+///
+/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
+/// update loop.
+pub fn get_cache() -> Result<TaskCache, Error> {
+ let file_options = proxmox_product_config::default_create_options();
+
+ let cache_path = Path::new(REMOTE_TASKS_DIR);
+ let cache = TaskCache::new(
+ cache_path,
+ file_options,
+ KEEP_OLD_FILES,
+ NUMBER_OF_UNCOMPRESSED_FILES,
+ ROTATE_AFTER,
+ JOURNAL_MAX_SIZE,
+ )?;
+
+ Ok(cache)
}
/// Parses a task status string into a TaskStateType
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v7 3/7] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 1/7] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 2/7] remote tasks: add background task for task polling, use new task cache Lukas Wagner
@ 2025-08-20 12:43 ` Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 4/7] fake remote: make the fake_remote feature compile again Lukas Wagner
` (4 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-08-20 12:43 UTC (permalink / raw)
To: pdm-devel
This allows us to get rid of the `tasktype` helper in
server::remote_tasks.
We don't impl `From<&str>` because those should be value-preserving,
lossless and obvious. Also, the function is not called from_str to
avoid any confusion with FromStr.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Dominik Csapak <d.csapak@proxmox.com>
---
Notes:
No changes in v6
No changes in v5
No changes in v4
Changes in v3:
- move the function to TaskStateType::new_from_str instead
of From<&str>
lib/pdm-api-types/src/lib.rs | 15 +++++++++++++++
server/src/remote_tasks/mod.rs | 15 +--------------
2 files changed, 16 insertions(+), 14 deletions(-)
diff --git a/lib/pdm-api-types/src/lib.rs b/lib/pdm-api-types/src/lib.rs
index 38449071..9373725c 100644
--- a/lib/pdm-api-types/src/lib.rs
+++ b/lib/pdm-api-types/src/lib.rs
@@ -232,6 +232,21 @@ pub enum TaskStateType {
Unknown,
}
+impl TaskStateType {
+ /// Construct a new instance from a `&str`.
+ pub fn new_from_str(status: &str) -> Self {
+ if status == "unknown" || status.is_empty() {
+ TaskStateType::Unknown
+ } else if status == "OK" {
+ TaskStateType::OK
+ } else if status.starts_with("WARNINGS: ") {
+ TaskStateType::Warning
+ } else {
+ TaskStateType::Error
+ }
+ }
+}
+
#[api(
properties: {
upid: { schema: UPID::API_SCHEMA },
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index cec2cc1e..8638ebd8 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -91,7 +91,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
}
}
- let state = item.status.as_ref().map(|status| tasktype(status));
+ let state = item.status.as_deref().map(TaskStateType::new_from_str);
match (state, &filters.statusfilter) {
(Some(TaskStateType::OK), _) if filters.errors => return false,
@@ -152,16 +152,3 @@ pub fn get_cache() -> Result<TaskCache, Error> {
Ok(cache)
}
-
-/// Parses a task status string into a TaskStateType
-pub fn tasktype(status: &str) -> TaskStateType {
- if status == "unknown" || status.is_empty() {
- TaskStateType::Unknown
- } else if status == "OK" {
- TaskStateType::OK
- } else if status.starts_with("WARNINGS: ") {
- TaskStateType::Warning
- } else {
- TaskStateType::Error
- }
-}
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v7 4/7] fake remote: make the fake_remote feature compile again
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
` (2 preceding siblings ...)
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 3/7] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
@ 2025-08-20 12:43 ` Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 5/7] fake remote: clippy fixes Lukas Wagner
` (3 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-08-20 12:43 UTC (permalink / raw)
To: pdm-devel
The ClientFactory trait was changed in Wolfgang's multi-client patches,
this commit adapts the fake remote feature to the changes.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Dominik Csapak <d.csapak@proxmox.com>
Tested-by: Dominik Csapak <d.csapak@proxmox.com>
---
Notes:
Changes in v6:
- add missing memhost value for ClusterResource
No changes in v5
No changes in v4
No changes in v3
new in v2
server/src/test_support/fake_remote.rs | 33 +++++++++++++++-----------
1 file changed, 19 insertions(+), 14 deletions(-)
diff --git a/server/src/test_support/fake_remote.rs b/server/src/test_support/fake_remote.rs
index 0161d8e6..02d11f88 100644
--- a/server/src/test_support/fake_remote.rs
+++ b/server/src/test_support/fake_remote.rs
@@ -1,18 +1,22 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use anyhow::{bail, Error};
+use serde::Deserialize;
+
use pdm_api_types::{remotes::Remote, Authid, ConfigDigest};
use pdm_config::remotes::RemoteConfig;
use proxmox_product_config::ApiLockGuard;
use proxmox_section_config::typed::SectionConfigData;
use pve_api_types::{
- client::PveClient, ClusterMetrics, ClusterMetricsData, ClusterNodeIndexResponse,
- ClusterNodeIndexResponseStatus, ClusterResource, ClusterResourceKind, ClusterResourceType,
- ListTasks, ListTasksResponse, PveUpid, StorageContent,
+ ClusterMetrics, ClusterMetricsData, ClusterNodeIndexResponse, ClusterNodeIndexResponseStatus,
+ ClusterResource, ClusterResourceKind, ClusterResourceType, ListTasks, ListTasksResponse,
+ PveUpid, StorageContent,
};
-use serde::Deserialize;
-use crate::{connection::ClientFactory, pbs_client::PbsClient};
+use crate::{
+ connection::{ClientFactory, PveClient},
+ pbs_client::PbsClient,
+};
#[derive(Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
@@ -74,8 +78,8 @@ impl FakeRemoteConfig {
#[async_trait::async_trait]
impl ClientFactory for FakeClientFactory {
- fn make_pve_client(&self, _remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
- Ok(Box::new(FakePveClient {
+ fn make_pve_client(&self, _remote: &Remote) -> Result<Arc<PveClient>, Error> {
+ Ok(Arc::new(FakePveClient {
nr_of_vms: self.config.vms_per_pve_remote,
nr_of_cts: self.config.cts_per_pve_remote,
nr_of_nodes: self.config.nodes_per_pve_remote,
@@ -88,7 +92,7 @@ impl ClientFactory for FakeClientFactory {
&self,
_remote: &Remote,
_target_endpoint: Option<&str>,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+ ) -> Result<Arc<PveClient>, Error> {
bail!("not implemented")
}
@@ -96,10 +100,7 @@ impl ClientFactory for FakeClientFactory {
bail!("not implemented")
}
- async fn make_pve_client_and_login(
- &self,
- _remote: &Remote,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+ async fn make_pve_client_and_login(&self, _remote: &Remote) -> Result<Arc<PveClient>, Error> {
bail!("not implemented")
}
@@ -118,7 +119,7 @@ struct FakePveClient {
}
#[async_trait::async_trait]
-impl PveClient for FakePveClient {
+impl pve_api_types::client::PveClient for FakePveClient {
async fn cluster_resources(
&self,
_ty: Option<ClusterResourceKind>,
@@ -143,6 +144,7 @@ impl PveClient for FakePveClient {
maxdisk: Some(100 * 1024 * 1024),
maxmem: Some(8 * 1024 * 1024 * 1024),
mem: Some(3 * 1024 * 1024 * 1024),
+ memhost: Some(4 * 1024 * 1024),
name: Some(format!("vm-{vmid}")),
netin: Some(1034),
netout: Some(1034),
@@ -175,6 +177,7 @@ impl PveClient for FakePveClient {
maxcpu: Some(4.),
maxdisk: Some(100 * 1024 * 1024),
maxmem: Some(8 * 1024 * 1024 * 1024),
+ memhost: Some(4 * 1024 * 1024),
mem: Some(3 * 1024 * 1024 * 1024),
name: Some(format!("ct-{vmid}")),
netin: Some(1034),
@@ -208,6 +211,7 @@ impl PveClient for FakePveClient {
maxdisk: Some(100 * 1024 * 1024),
maxmem: Some(8 * 1024 * 1024 * 1024),
mem: Some(3 * 1024 * 1024 * 1024),
+ memhost: None,
name: None,
netin: None,
netout: None,
@@ -240,6 +244,7 @@ impl PveClient for FakePveClient {
maxdisk: Some(100 * 1024 * 1024),
maxmem: None,
mem: None,
+ memhost: None,
name: None,
netin: None,
netout: None,
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v7 5/7] fake remote: clippy fixes
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
` (3 preceding siblings ...)
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 4/7] fake remote: make the fake_remote feature compile again Lukas Wagner
@ 2025-08-20 12:43 ` Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 6/7] remote tasks: task cache: create `active` file in init Lukas Wagner
` (2 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-08-20 12:43 UTC (permalink / raw)
To: pdm-devel
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Dominik Csapak <d.csapak@proxmox.com>
---
Notes:
No changes in v6
No changes in v5
No changes in v4
Added in v3
server/src/test_support/fake_remote.rs | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/server/src/test_support/fake_remote.rs b/server/src/test_support/fake_remote.rs
index 02d11f88..e32a88a1 100644
--- a/server/src/test_support/fake_remote.rs
+++ b/server/src/test_support/fake_remote.rs
@@ -1,4 +1,4 @@
-use std::{collections::HashMap, sync::Arc, time::Duration};
+use std::{sync::Arc, time::Duration};
use anyhow::{bail, Error};
use serde::Deserialize;
@@ -129,7 +129,7 @@ impl pve_api_types::client::PveClient for FakePveClient {
let mut vmid = 100;
for _ in 0..self.nr_of_vms {
- vmid = vmid + 1;
+ vmid += 1;
result.push(ClusterResource {
cgroup_mode: None,
content: None,
@@ -163,7 +163,7 @@ impl pve_api_types::client::PveClient for FakePveClient {
}
for _ in 0..self.nr_of_cts {
- vmid = vmid + 1;
+ vmid += 1;
result.push(ClusterResource {
cgroup_mode: None,
content: None,
@@ -358,7 +358,6 @@ impl pve_api_types::client::PveClient for FakePveClient {
async fn list_nodes(&self) -> Result<Vec<ClusterNodeIndexResponse>, proxmox_client::Error> {
tokio::time::sleep(Duration::from_millis(self.api_delay_ms as u64)).await;
Ok((0..self.nr_of_nodes)
- .into_iter()
.map(|i| ClusterNodeIndexResponse {
node: format!("pve-{i}"),
cpu: None,
@@ -415,7 +414,6 @@ impl pve_api_types::client::PveClient for FakePveClient {
let number_of_tasks = limit.min(number_of_tasks as u64);
Ok((0..number_of_tasks)
- .into_iter()
.map(|i| make_task(now - i as i64 * NEW_TASK_EVERY * 60))
.collect())
}
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v7 6/7] remote tasks: task cache: create `active` file in init
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
` (4 preceding siblings ...)
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 5/7] fake remote: clippy fixes Lukas Wagner
@ 2025-08-20 12:43 ` Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 7/7] remote tasks: log error in case of task panic, instead of cancelling all tasks Lukas Wagner
2025-08-21 9:20 ` [pdm-devel] applied: [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Dominik Csapak
7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-08-20 12:43 UTC (permalink / raw)
To: pdm-devel
This avoids a 'could not create task archive iterator' error before the
first round of task fetching.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
New in v7
server/src/remote_tasks/task_cache.rs | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 9e6a65cd..1afeaee4 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -214,11 +214,24 @@ impl ReadableTaskCache {
}
impl WritableTaskCache {
- /// Create initial task archives that can be backfilled with the
+ /// Create initial task archive files that can be backfilled with the
/// recent task history from a remote.
///
/// This function only has an effect if there are no archive files yet.
pub fn init(&self, now: i64) -> Result<(), Error> {
+ let active_filename = self.cache.base_path.join(ACTIVE_FILENAME);
+
+ if !active_filename.exists() {
+ let mut file = OpenOptions::new()
+ .create(true)
+ .write(true)
+ .open(&active_filename)?;
+
+ self.cache
+ .create_options
+ .apply_to(&mut file, &active_filename)?;
+ }
+
if self.cache.archive_files(&self.lock)?.is_empty() {
for i in 0..self.cache.max_files {
self.new_file(
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v7 7/7] remote tasks: log error in case of task panic, instead of cancelling all tasks
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
` (5 preceding siblings ...)
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 6/7] remote tasks: task cache: create `active` file in init Lukas Wagner
@ 2025-08-20 12:43 ` Lukas Wagner
2025-08-21 9:20 ` [pdm-devel] applied: [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Dominik Csapak
7 siblings, 0 replies; 9+ messages in thread
From: Lukas Wagner @ 2025-08-20 12:43 UTC (permalink / raw)
To: pdm-devel
Should hopefully never happen, but for cases like these we still want to
to persist the tasks from other nodes instead of failing completely.
Also remove the `set_node_failure` function for NodeFetchSuccessMap. For
panic'd tasks we don't have a straightforward way to get the node name,
and we only ever check for success any way, not failure.
Suggested-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
New in v7.
server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs | 5 +++--
server/src/remote_tasks/task_cache.rs | 5 -----
2 files changed, 3 insertions(+), 7 deletions(-)
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
index 4701a935..04c51dac 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -411,10 +411,11 @@ async fn fetch_tasks(
}
Err(error) => {
log::error!("could not fetch tasks: {error:#}");
- node_results.set_node_failure(remote.id.clone(), node_name);
}
},
- Err(err) => return Err(err.into()),
+ Err(error) => {
+ log::error!("could not join task fetching task: {error:#}");
+ }
}
}
}
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 1afeaee4..e9e708e4 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -188,11 +188,6 @@ impl NodeFetchSuccessMap {
self.0.insert((remote, node), true);
}
- /// Mark a node of a given remote as failed.
- pub fn set_node_failure(&mut self, remote: String, node: String) {
- self.0.insert((remote, node), false);
- }
-
/// Returns whether tasks from a given node of a remote were successfully fetched.
pub fn node_successful(&self, remote: &str, node: &str) -> bool {
matches!(self.0.get(&(remote.into(), node.into())), Some(true))
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pdm-devel] applied: [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
` (6 preceding siblings ...)
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 7/7] remote tasks: log error in case of task panic, instead of cancelling all tasks Lukas Wagner
@ 2025-08-21 9:20 ` Dominik Csapak
7 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2025-08-21 9:20 UTC (permalink / raw)
To: Proxmox Datacenter Manager development discussion, Lukas Wagner
applied the whole series, thanks!
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
end of thread, other threads:[~2025-08-21 9:20 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-08-20 12:43 [pdm-devel] [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 1/7] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 2/7] remote tasks: add background task for task polling, use new task cache Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 3/7] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 4/7] fake remote: make the fake_remote feature compile again Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 5/7] fake remote: clippy fixes Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 6/7] remote tasks: task cache: create `active` file in init Lukas Wagner
2025-08-20 12:43 ` [pdm-devel] [PATCH proxmox-datacenter-manager v7 7/7] remote tasks: log error in case of task panic, instead of cancelling all tasks Lukas Wagner
2025-08-21 9:20 ` [pdm-devel] applied: [PATCH proxmox-datacenter-manager v7 0/7] remote task cache fetching task / better cache backend Dominik Csapak
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.