* [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend
@ 2025-04-17 13:22 Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 1/6] remote tasks: implement improved cache for remote tasks Lukas Wagner
` (7 more replies)
0 siblings, 8 replies; 13+ messages in thread
From: Lukas Wagner @ 2025-04-17 13:22 UTC (permalink / raw)
To: pdm-devel
The aim of this patch series is to greatly improve the performance of the
remote task cache for big PDM setups.
The inital, 'dumb' cache implementation had the following problems:
1.) cache was populated as part of the `get_tasks` API, leading to hanging
API calls while fetching task data from remotes
2.) all tasks were stored in a single file, which was completely rewritten
for any change to the cache's contents
3.) The caching mechanism was pretty simple, using only a max-age mechanism,
re-requesting all task data if max-age was exceeded
Now, these characteristics are not really problematic for *small* PDM setups
with only a couple of remotes. However, for big setups (e.g. 100 remotes,
each remote being a PVE cluster with 10 nodes), this completely falls apart:
1.) fetching remote tasks takes considerable amount of time, especially
on connections with a high latency. Since the data is requested
from *within* the `get_tasks` function, which is called by the
`remote-tasks/list` API handler, the API call is blocked until
*all* task data is requested.
2.) The single file approach leads to significant writes to the disk
3.) Leads to unnecessary network IO, as we re-request data that we
already have locally.
To rectify the situation, this series performs the following changes:
- `get_tasks` never does any fetching, it only reads the most recent
data from the cache
- There is a new background task which periodically fetches tasks
from all remotes (every 10mins at the moment). Only the latest
missing tasks are requested, not the full task history as before
- The new background task also takes over the 'tracked task' polling
duty, where we fetch the status for any task started by PDM on
a remote (short polling interval, 10s at the moment).
- The task cache storage implementation has been completely overhauled
and is now optimized for the most common accesses to the cache.
It is also more storage efficient, occupying rougly 50% of the disk
space for the same number of tasks (achieved by avoiding duplicate
information in the files)
- The size of the task cache is 'limited' by doing file rotation.
We keep 7 days of task history.
For details on *how* the cache itself works, please refer to the full
commit message of
remote tasks: implement improved cache for remote tasks
# Benchmarks
Finally, some concrete data to back up the claimed performance improvments.
The times were measured *inside* the `get_tasks` function and not at
the API level, so the times do not include JSON serialization and
data transfer.
Benchmarking was done using the 'fake-remote' feature. There were 100
remotes, 10 PVE nodes per remote. The task cache contained
about 1.5 million tasks.
before after
list of active tasks (*): ~1.3s ~30µs
list of 500 tasks, offset 0 (**): ~1.3s ~500µs
list of 500 tasks, offset 1 million (***): ~1.3s ~200ms
Size on disk: ~500MB ~200MB
(*): Requested by the UI every 3s
(**): Requested by the UI when visiting Remotes > Tasks
(***): E.g. when scrolling towars the bottom of 'Remotes > Tasks'
In the old implementation, the archive file was *always* fully deserialized
and loaded into RAM, this is the reason why the time needed is pretty
idential for all scenarios.
The new implementation reads the archive files only line by line,
and only 500 tasks were loaded into RAM at the same time. The higher the offset,
the more archive lines/files we have to scan, which increases the
time needed to access the data. The tasks are sorted descending
by starttime, as a result the requests get slower the further you
go back in history.
The 'before' times do NOT include the time needed for actually fetching
the task data.
This series was preseded by [1], however almost all of the code has changes, which
is the reason why I send this as a new series.
[1] https://lore.proxmox.com/pdm-devel/20250128122520.167796-1-l.wagner@proxmox.com/
Changes since v2:
- Change locking approach as suggested by Wolfgang
- Incorporated feedback from Wolfang
- see patch notes for details
- Added some .context/.with_context for better error messages
Changes since v1:
- Drop already applied patches
- Some code style improvents, see individual patch changelogs
- Move tack fetching task to bin/proxmox-datacenter-api/tasks/remote_task.rs
- Make sure that remote_tasks::get_tasks does not block the async executor
proxmox-datacenter-manager:
Lukas Wagner (6):
remote tasks: implement improved cache for remote tasks
remote tasks: add background task for task polling, use new task cache
remote tasks: improve locking for task archive iterator
pdm-api-types: remote tasks: add new_from_str constructor for
TaskStateType
fake remote: make the fake_remote feature compile again
fake remote: clippy fixes
lib/pdm-api-types/src/lib.rs | 15 +
server/src/api/pve/lxc.rs | 10 +-
server/src/api/pve/mod.rs | 4 +-
server/src/api/pve/qemu.rs | 6 +-
server/src/api/remote_tasks.rs | 11 +-
server/src/bin/proxmox-datacenter-api/main.rs | 1 +
.../bin/proxmox-datacenter-api/tasks/mod.rs | 1 +
.../tasks/remote_tasks.rs | 364 ++++++
server/src/remote_tasks/mod.rs | 612 ++--------
server/src/remote_tasks/task_cache.rs | 1020 +++++++++++++++++
server/src/test_support/fake_remote.rs | 35 +-
11 files changed, 1549 insertions(+), 530 deletions(-)
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
create mode 100644 server/src/remote_tasks/task_cache.rs
Summary over all repositories:
11 files changed, 1549 insertions(+), 530 deletions(-)
--
Generated by git-murpp 0.8.1
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v3 1/6] remote tasks: implement improved cache for remote tasks
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
@ 2025-04-17 13:22 ` Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 2/6] remote tasks: add background task for task polling, use new task cache Lukas Wagner
` (6 subsequent siblings)
7 siblings, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2025-04-17 13:22 UTC (permalink / raw)
To: pdm-devel
This commit adds a new implementation for a cache for remote tasks, one
that should improve performance characteristics in for pretty much all
use cases.
In general storage works pretty similar to the task archive we already
have for (local) PDM tasks.
root@pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l
total 40
-rw-r--r-- 1 www-data www-data 0 Mar 13 13:18 active
-rw-r--r-- 1 www-data www-data 1676 Mar 11 14:51 archive.1741355462
-rw-r--r-- 1 www-data www-data 0 Mar 11 14:51 archive.1741441862
-rw-r--r-- 1 www-data www-data 2538 Mar 11 14:51 archive.1741528262
-rw-r--r-- 1 www-data www-data 8428 Mar 11 15:07 archive.1741614662
-rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062
-rw-r--r-- 1 www-data www-data 3364 Mar 13 13:18 archive.1741788270
-rw-r--r-- 1 www-data www-data 287 Mar 13 13:18 state
Tasks are stored in the 'active' and multiple 'archive' files.
Running tasks are placed into the 'active' file, tasks that are finished
are persisted into one of the archive files.
The archive files are suffixed with a UNIX timestamp which serves
as a lower-bound for start times for tasks stored in this file.
Encoding this lower-bound in the file name instead of using a more
traditional increasing file index (.1, .2, .3) gives us the benefit
that we can decide in which file a newly arrived tasks belongs from
a single readdir call, without even reading the file itself.
The size of the entire archive can be controlled by doing file
'rotation', where the archive file with the oldest timestamp is deleted
and a new file with the current timestamp is added.
If 'rotation' happens at fixed intervals (e.g. once daily), this gives
us a configurable number of days of task history.
There is no direct cap for the total number
of tasks, but this could be easily added by checking the size of the
most recent archive file and by rotating early if a threshold is
exceeded.
The format inside the files is also similar to the local task archive,
with one task corresponding to one line in the file.
One key difference is that here each line is a JSON object, that is to
make it easier to add additional data later, if needed.
The JSON object contains the tasks UPID, status, endtime and starttime
(the starttime is technically also encoded in the UPID, but having it as
a separate value simplified a couple of things)
Each file is sorted by the task's start time, the youngest task coming first.
One key difference between this task cache and the local task archive is
that we need to handle tasks coming in out-of-order, e.g. if remotes
were not reachable for a certain time. To maintain the ordering
in the file, we have to merge the newly arrived tasks into the existing
task file. This was implemented in a way that avoids reading the entire
file into memory at once, exploiting the fact that the contents of the
existing file are already sorted. This allows to do a
zipper/merge-sort like approach (see MergeTaskIterator for
details). The existing file is only read line-by-line and finally
replaced atomically.
The cache also has a separate state file, containing additional
information, e.g. cut-off timestamps for the polling task.
Some of the data in the statefile is technically also contained in the
archive files, but reading the state file is much faster.
This commit also adds an elaborate suite of unit tests for this new
cache. While adding some additional work up front, they paid off themselves
during development quite quickly, since the overall approach for the
cache changed a couple of times. The test suite gave me confidence that
the design changes didn't screw anything up, catching a couple of bugs
in the process.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Changes since v2:
- Incorporate feedback from Wolfang (thx!)
- eliminate one .clone()
- get_tasks_with_lock: directly create iterator over PathBuf
instead of first creating ArchiveFile vec and the mapping
- Fixed TOCTOU race condition (checked Path::exists instead of
trying the actual operation and reacting to ErrorKind::NotFound)
- handle error when replacing 'active' file
- unlink temp file when replacing the archive file did not work
- avoid UTF8 decoding where not really necessary
- Added a couple of .context()/.with_context() for better error
messages
- Extracted the file names into consts
- Fixed some clippy issues (.clone() for CreateOptions - when
this series was written, CreateOptions were not Copy yet)
server/src/remote_tasks/mod.rs | 2 +
server/src/remote_tasks/task_cache.rs | 1022 +++++++++++++++++++++++++
2 files changed, 1024 insertions(+)
create mode 100644 server/src/remote_tasks/task_cache.rs
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 7ded5408..2062f2b7 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -18,6 +18,8 @@ use tokio::task::JoinHandle;
use crate::{api::pve, task_utils};
+mod task_cache;
+
/// Get tasks for all remotes
// FIXME: filter for privileges
pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
new file mode 100644
index 00000000..f847d441
--- /dev/null
+++ b/server/src/remote_tasks/task_cache.rs
@@ -0,0 +1,1022 @@
+//! Task cache implementation, based on rotating files.
+
+use std::{
+ cmp::Ordering,
+ collections::{HashMap, HashSet},
+ fs::File,
+ io::{BufRead, BufReader, BufWriter, ErrorKind, Lines, Write},
+ iter::Peekable,
+ path::{Path, PathBuf},
+ time::Duration,
+};
+
+use anyhow::{Context, Error};
+use proxmox_sys::fs::CreateOptions;
+use serde::{Deserialize, Serialize};
+
+use pdm_api_types::RemoteUpid;
+
+/// Filename for the file containing running tasks.
+const ACTIVE_FILENAME: &str = "active";
+/// Filename prefix for archive files.
+const ARCHIVE_FILENAME_PREFIX: &str = "archive.";
+/// Filename for the state file.
+const STATE_FILENAME: &str = "state";
+/// Filename of the archive lockfile.
+const LOCKFILE_FILENAME: &str = ".lock";
+
+/// Item which can be put into the task cache.
+#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
+pub struct TaskCacheItem {
+ /// The task's UPID
+ pub upid: RemoteUpid,
+ /// The time at which the task was started (seconds since the UNIX epoch).
+ /// Technically this is also contained within the UPID, duplicating it here
+ /// allows us to directly access it when sorting in new tasks, without having
+ /// to parse the UPID.
+ pub starttime: i64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ /// The task's status.
+ pub status: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ /// The task's endtime (seconds since the UNIX epoch).
+ pub endtime: Option<i64>,
+}
+
+/// State needed for task polling.
+#[derive(Serialize, Deserialize, Default)]
+#[serde(rename_all = "kebab-case")]
+pub struct State {
+ /// Map of remote -> most recent task starttime (UNIX epoch) in the archive.
+ /// This can be used as a cut-off when requesting new task data.
+ pub most_recent_archive_starttime: HashMap<String, i64>,
+ /// Oldest running task. Useful as another cut-off for fetching tasks.
+ /// The timestamp is based on seconds since the UNIX epoch.
+ pub oldest_active_task: HashMap<String, i64>,
+ /// Tracked tasks per remote.
+ pub tracked_tasks: HashMap<String, HashSet<RemoteUpid>>,
+}
+
+/// Cache for remote tasks.
+#[derive(Clone)]
+pub struct TaskCache {
+ /// Path where the cache's files should be placed.
+ base_path: PathBuf,
+ /// File permissions for the cache's files.
+ create_options: CreateOptions,
+}
+
+/// Tasks that should be added to the cache via [`TaskCache::add_tasks`].
+pub struct AddTasks {
+ /// Update the remote's entry in the statefile which contains the
+ /// timestamp of the most recent *finished* task from the task archive.
+ pub update_most_recent_archive_timestamp: bool,
+ /// The tasks to add.
+ pub tasks: Vec<TaskCacheItem>,
+}
+
+/// Lock for the cache.
+#[allow(dead_code)]
+pub struct TaskCacheLock(File);
+
+/// Which tasks to fetch from the archive.
+pub enum GetTasks {
+ /// Get all tasks, finished and running.
+ All,
+ /// Only get running (active) tasks.
+ Active,
+ #[cfg(test)] // Used by tests, might be used by production code in the future
+ /// Only get finished (archived) tasks.
+ Archived,
+}
+
+impl TaskCache {
+ /// Create a new task cache instance.
+ ///
+ /// Remember to call `init` or `new_file` to create initial archive files before
+ /// adding any tasks.
+ pub fn new<P: AsRef<Path>>(path: P, create_options: CreateOptions) -> Result<Self, Error> {
+ Ok(Self {
+ base_path: path.as_ref().into(),
+ create_options,
+ })
+ }
+
+ /// Create initial task archives that can be backfilled with the
+ /// recent task history from a remote.
+ ///
+ /// This function only has an effect if there are no archive files yet.
+ pub fn init(&self, now: i64, number_of_files: u64, period_per_file: u64) -> Result<(), Error> {
+ let _lock = self.lock(true).context("failed to lock archive")?;
+
+ if self.archive_files()?.is_empty() {
+ for i in 0..number_of_files {
+ self.new_file(now - (i * period_per_file) as i64)?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Start a new archive file with a given timestamp.
+ /// `now` is supposed to be a UNIX timestamp (seconds).
+ fn new_file(&self, now: i64) -> Result<(), Error> {
+ let new_path = self
+ .base_path
+ .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}"));
+ let mut file = File::create(&new_path)?;
+ self.create_options.apply_to(&mut file, &new_path)?;
+
+ Ok(())
+ }
+
+ /// Lock the cache.
+ fn lock(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
+ let lockfile = self.base_path.join(LOCKFILE_FILENAME);
+
+ let fd = proxmox_sys::fs::open_file_locked(
+ lockfile,
+ Duration::from_secs(15),
+ exclusive,
+ self.create_options,
+ )?;
+
+ Ok(TaskCacheLock(fd))
+ }
+
+ /// Rotate task archive if the the newest archive file is older than `rotate_after`.
+ ///
+ /// The oldest archive files are removed if the total number of archive files exceeds
+ /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
+ ///
+ /// This function requests an exclusive lock, don't call if you already hold a lock.
+ pub fn rotate(&self, now: i64, rotate_after: u64, max_files: u64) -> Result<bool, Error> {
+ let _lock = self.lock(true).context("failed to lock archive")?;
+ let mut did_rotate = false;
+
+ let mut bounds = self.archive_files()?;
+
+ match bounds.first() {
+ Some(bound) => {
+ if now > bound.starttime && now - bound.starttime > rotate_after as i64 {
+ self.new_file(now)?;
+ did_rotate = true;
+ }
+ }
+ None => {
+ self.new_file(now)?;
+ did_rotate = true;
+ }
+ }
+
+ while bounds.len() >= max_files as usize {
+ // Unwrap is safe because of the length check above
+ let to_remove = bounds.pop().unwrap();
+ std::fs::remove_file(&to_remove.file)
+ .with_context(|| format!("failed to remove {}", to_remove.file.display()))?;
+ }
+
+ Ok(did_rotate)
+ }
+
+ /// Add new tasks to the task archive.
+ ///
+ /// Running tasks (tasks without an endtime) are placed into the 'active' file in the
+ /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive
+ /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given
+ /// archive file. If a task which was added as running previously is added again, this time in
+ /// a finished state, it will be removed from the `active` file and also sorted into
+ /// one of the archive files.
+ /// Same goes for the list of tracked tasks; the entry in the state file will be removed.
+ ///
+ /// Crash consistency:
+ ///
+ /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the
+ /// end after all tasks have been added into the archive. Adding tasks is an idempotent
+ /// operation; adding the *same* task multiple times does not lead to duplicated entries in the
+ /// task archive. Individual archive files are updated atomically, but since
+ /// adding tasks can involve updating multiple archive files, the archive could end up
+ /// in a partially-updated, inconsistent state in case of a crash.
+ /// However, since the state file with the cut-off timestamps is updated last,
+ /// the consistency of the archive should be restored at the next update cycle of the archive.
+ pub fn add_tasks(&self, mut added: HashMap<String, AddTasks>) -> Result<(), Error> {
+ let lock = self.lock(true).context("failed to lock archive")?;
+
+ // Get a flat `Vec` of all new tasks
+ let tasks: Vec<_> = added
+ .iter_mut()
+ .flat_map(|(_, tasks)| std::mem::take(&mut tasks.tasks))
+ .collect();
+
+ let update_state_for_remote: HashMap<String, bool> = added
+ .into_iter()
+ .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
+ .collect();
+
+ let mut task_iter = self
+ .get_tasks_with_lock(GetTasks::Active, lock)
+ .context("failed to create archive iterator for active tasks")?;
+
+ let mut active_tasks =
+ HashSet::from_iter(task_iter.flat_map(|active_task| match active_task {
+ Ok(task) => Some((task.upid, task.starttime)),
+ Err(err) => {
+ log::error!("failed to read task cache entry from active file: {err}");
+ None
+ }
+ }));
+
+ // Consume the iterator to get back the lock. The lock is held
+ // until _lock is finally dropped at the end of the function.
+ let _lock = task_iter.into_lock();
+
+ let mut new_finished_tasks = Vec::new();
+
+ for task in tasks {
+ if task.endtime.is_none() {
+ active_tasks.insert((task.upid, task.starttime));
+ } else {
+ new_finished_tasks.push(task);
+ }
+ }
+
+ let mut state = self.read_state();
+
+ new_finished_tasks.sort_by(compare_tasks_reverse);
+ self.merge_tasks_into_archive(
+ new_finished_tasks,
+ &mut active_tasks,
+ update_state_for_remote,
+ &mut state,
+ )?;
+ self.update_oldest_active(&active_tasks, &mut state);
+
+ let mut active: Vec<TaskCacheItem> = active_tasks
+ .into_iter()
+ .map(|(upid, starttime)| TaskCacheItem {
+ upid,
+ starttime,
+ status: None,
+ endtime: None,
+ })
+ .collect();
+
+ active.sort_by(compare_tasks_reverse);
+ self.write_active_tasks(active.into_iter())
+ .context("failed to write active task file when adding tasks")?;
+ self.write_state(state)
+ .context("failed to update task archive state file when adding tasks")?;
+
+ Ok(())
+ }
+
+ /// Update the timestamp of the oldest running task in `state`.
+ fn update_oldest_active(&self, active_tasks: &HashSet<(RemoteUpid, i64)>, state: &mut State) {
+ // Update the state with timestamp of the oldest running task,
+ // we also use this as cut-off when fetching tasks, so we
+ // for sure know when a task finishes.
+ let mut oldest_active_task_per_remote = HashMap::new();
+ for (upid, startime) in active_tasks.iter() {
+ oldest_active_task_per_remote
+ .entry(upid.remote().to_owned())
+ .and_modify(|time| *time = (*startime).min(*time))
+ .or_insert(*startime);
+ }
+ state.oldest_active_task = oldest_active_task_per_remote;
+ }
+
+ /// Merge a list of *finished* tasks into the remote task archive files.
+ /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
+ /// timestamp, ascending by UPID).
+ ///
+ /// The task archive should be locked when calling this.
+ fn merge_tasks_into_archive(
+ &self,
+ tasks: Vec<TaskCacheItem>,
+ active_tasks: &mut HashSet<(RemoteUpid, i64)>,
+ update_state_for_remote: HashMap<String, bool>,
+ state: &mut State,
+ ) -> Result<(), Error> {
+ debug_assert!(tasks
+ .iter()
+ .is_sorted_by(|a, b| compare_tasks(a, b).is_ge()));
+
+ let files = self
+ .archive_files()
+ .context("failed to read achive files")?;
+
+ let mut files = files.iter().peekable();
+
+ let mut current = files.next();
+ let mut next = files.peek();
+
+ let mut tasks_for_current_file = Vec::new();
+
+ // Tasks are sorted youngest to oldest (biggest starttime first)
+ for mut task in tasks {
+ // Using a partial move to avoid having to clone task.upid
+ let upid_and_starttime = (task.upid, task.starttime);
+ active_tasks.remove(&upid_and_starttime);
+ task.upid = upid_and_starttime.0;
+
+ if let Some(tracked_tasks) = state.tracked_tasks.get_mut(task.upid.remote()) {
+ tracked_tasks.remove(&task.upid);
+ }
+
+ if let Some(true) = update_state_for_remote.get(task.upid.remote()) {
+ // Update the most recent startime per remote, the task polling logic uses it as a
+ // cut-off.
+ state
+ .most_recent_archive_starttime
+ .entry(task.upid.remote().to_owned())
+ .and_modify(|time| *time = (task.starttime).max(*time))
+ .or_insert(task.starttime);
+ }
+
+ // Skip ahead until we have found the correct file.
+ while next.is_some() {
+ if let Some(current) = current {
+ if task.starttime >= current.starttime {
+ break;
+ }
+ // The next entry's cut-off is larger then the task's start time, that means
+ // we want to finalized the current file by merging all tasks that
+ // should be stored in it...
+ self.merge_single_archive_file(
+ std::mem::take(&mut tasks_for_current_file),
+ ¤t.file,
+ )
+ .with_context(|| {
+ format!("failed to merge archive file {}", current.file.display())
+ })?;
+ }
+
+ // ... and the the `current` file to the next entry.
+ current = files.next();
+ next = files.peek();
+ }
+
+ if let Some(current) = current {
+ if task.starttime < current.starttime {
+ continue;
+ }
+ }
+ tasks_for_current_file.push(task);
+ }
+
+ // Merge tasks for the last file.
+ if let Some(current) = current {
+ self.merge_single_archive_file(tasks_for_current_file, ¤t.file)
+ .with_context(|| {
+ format!("failed to merge archive file {}", current.file.display())
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Add a new tracked task.
+ ///
+ /// This will insert the task in the list of tracked tasks in the state file,
+ /// as well as create an entry in the `active` file.
+ ///
+ /// This function will request an exclusive lock for the task cache,
+ /// do not call if you are already holding a lock.
+ pub fn add_tracked_task(&self, task: TaskCacheItem) -> Result<(), Error> {
+ let lock = self.lock(true).context("failed to lock archive")?;
+
+ let mut tasks = Vec::new();
+ let mut task_iter = self
+ .get_tasks_with_lock(GetTasks::Active, lock)
+ .context("failed to create active task iterator")?;
+
+ for task in &mut task_iter {
+ match task {
+ Ok(task) => tasks.push(task),
+ Err(err) => {
+ log::error!(
+ "could not read existing task cache entry from 'active' file: {err}"
+ );
+ }
+ }
+ }
+
+ tasks.push(task.clone());
+ tasks.sort_by(compare_tasks_reverse);
+
+ let _lock = task_iter.into_lock();
+
+ let mut state = self.read_state();
+
+ state
+ .oldest_active_task
+ .entry(task.upid.remote().to_owned())
+ .and_modify(|a| *a = (*a).min(task.starttime))
+ .or_insert(task.starttime);
+
+ let tracked_per_remote = state
+ .tracked_tasks
+ .entry(task.upid.remote().to_owned())
+ .or_default();
+ tracked_per_remote.insert(task.upid);
+
+ self.write_active_tasks(tasks.into_iter())
+ .context("failed to write active tasks file when adding tracked task")?;
+ self.write_state(state)
+ .context("failed to write state file when adding tracked tasks")?;
+
+ Ok(())
+ }
+
+ /// Iterate over cached tasks.
+ ///
+ /// This function will request a non-exclusive read-lock, don't call if
+ /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
+ pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
+ let lock = self
+ .lock(false)
+ .context("get_tasks: failed to acquire lock")?;
+ self.get_tasks_with_lock(mode, lock)
+ .context("failed to create task archive iterator")
+ }
+
+ /// Iterate over cached tasks.
+ ///
+ /// This function requires you to pass a lock. If you want to continue to hold the lock
+ /// after iterating, you can consume the iterator by calling
+ /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
+ pub fn get_tasks_with_lock(
+ &self,
+ mode: GetTasks,
+ lock: TaskCacheLock,
+ ) -> Result<TaskArchiveIterator, Error> {
+ match mode {
+ GetTasks::All => {
+ let archive_files = self.archive_files()?.into_iter().map(|pair| pair.file);
+ let all_files = [self.base_path.join(ACTIVE_FILENAME)]
+ .into_iter()
+ .chain(archive_files);
+
+ Ok(TaskArchiveIterator::new(Box::new(all_files), lock))
+ }
+ GetTasks::Active => {
+ let files = [self.base_path.join(ACTIVE_FILENAME)];
+ Ok(TaskArchiveIterator::new(Box::new(files.into_iter()), lock))
+ }
+ #[cfg(test)]
+ GetTasks::Archived => {
+ let files = self.archive_files()?;
+
+ Ok(TaskArchiveIterator::new(
+ Box::new(files.into_iter().map(|pair| pair.file)),
+ lock,
+ ))
+ }
+ }
+ }
+
+ /// Write the provided tasks to the 'active' file.
+ ///
+ /// The tasks are first written to a temporary file, which is then used
+ /// to atomically replace the original.
+ fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
+ let (fd, path) = proxmox_sys::fs::make_tmp_file(
+ self.base_path.join(ACTIVE_FILENAME),
+ self.create_options,
+ )?;
+ let mut fd = BufWriter::new(fd);
+
+ Self::write_tasks(&mut fd, tasks)?;
+
+ if let Err(err) = fd.flush() {
+ log::error!("could not flush 'active' file: {err}");
+ }
+ drop(fd);
+
+ let target = self.base_path.join(ACTIVE_FILENAME);
+
+ let res = std::fs::rename(&path, &target).with_context(|| {
+ format!(
+ "failed to replace {} with {}",
+ target.display(),
+ path.display(),
+ )
+ });
+
+ if let Err(err) = res {
+ if let Err(err) = std::fs::remove_file(&path) {
+ log::error!("failed to cleanup temporary file {}: {err}", path.display());
+ }
+
+ return Err(err);
+ }
+
+ Ok(())
+ }
+
+ /// Read the state file.
+ /// If the state file could not be read or does not exist, the default (empty) state
+ /// is returned.
+ /// A lock is only necessary if the returned state is modified and later passed
+ /// to [`Self::write_state`]. In case of a read-only access no lock is necessary, as
+ /// the state file is always replaced atomically.
+ pub fn read_state(&self) -> State {
+ fn do_read_state(path: &Path) -> Result<State, Error> {
+ match std::fs::read(path) {
+ Ok(content) => serde_json::from_slice(&content).map_err(|err| err.into()),
+ Err(err) if err.kind() == ErrorKind::NotFound => Ok(Default::default()),
+ Err(err) => Err(err.into()),
+ }
+ }
+
+ let path = self.base_path.join(STATE_FILENAME);
+ do_read_state(&path).unwrap_or_else(|err| {
+ log::error!("could not read state file: {err}");
+ Default::default()
+ })
+ }
+
+ /// Write the state file.
+ /// The task archive should be locked for writing when calling this function.
+ pub fn write_state(&self, state: State) -> Result<(), Error> {
+ let path = self.base_path.join(STATE_FILENAME);
+
+ let data = serde_json::to_vec_pretty(&state)?;
+
+ proxmox_sys::fs::replace_file(path, &data, self.create_options, true)?;
+
+ Ok(())
+ }
+
+ /// Returns a list of existing archive files, together with their respective
+ /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
+ /// first).
+ /// The task archive should be locked for reading when calling this function.
+ fn archive_files(&self) -> Result<Vec<ArchiveFile>, Error> {
+ let mut names = Vec::new();
+
+ for entry in std::fs::read_dir(&self.base_path)? {
+ let entry = entry?;
+
+ if let Some(endtime) = entry
+ .path()
+ .file_name()
+ .and_then(|s| s.to_str())
+ .and_then(|s| s.strip_prefix(ARCHIVE_FILENAME_PREFIX))
+ {
+ match endtime.parse() {
+ Ok(starttime) => {
+ names.push(ArchiveFile {
+ starttime,
+ file: entry.path(),
+ });
+ }
+ Err(err) => log::error!(
+ "could not parse archive timestamp for file {}: {err}",
+ entry.path().display()
+ ),
+ }
+ }
+ }
+
+ names.sort_by_key(|e| -e.starttime);
+
+ Ok(names)
+ }
+
+ /// Merge `tasks` with an existing archive file.
+ /// This function assumes that `tasks` and the pre-existing contents of the archive
+ /// file are both sorted descending by startime (most recent tasks come first).
+ /// The task archive must be locked when calling this function.
+ fn merge_single_archive_file(
+ &self,
+ tasks: Vec<TaskCacheItem>,
+ file: &Path,
+ ) -> Result<(), Error> {
+ if tasks.is_empty() {
+ return Ok(());
+ }
+
+ let (fd, new_path) = proxmox_sys::fs::make_tmp_file(file, self.create_options)?;
+ let mut writer = BufWriter::new(fd);
+
+ match File::open(file) {
+ Ok(file) => {
+ let archive_reader = BufReader::new(file);
+ let archive_iter = ArchiveIterator::new(archive_reader)
+ .flat_map(|item| match item {
+ Ok(item) => Some(item),
+ Err(err) => {
+ log::error!("could not read task cache item while merging: {err}");
+ None
+ }
+ })
+ .peekable();
+ let task_iter = tasks.into_iter().peekable();
+
+ Self::write_tasks(&mut writer, MergeTaskIterator::new(archive_iter, task_iter))?;
+ }
+ Err(err) if err.kind() == ErrorKind::NotFound => {
+ Self::write_tasks(&mut writer, tasks.into_iter())?;
+ }
+ Err(err) => return Err(err.into()),
+ }
+
+ if let Err(err) = writer.flush() {
+ log::error!("could not flush BufWriter for {file:?}: {err}");
+ }
+ drop(writer);
+
+ if let Err(err) = std::fs::rename(&new_path, file).with_context(|| {
+ format!(
+ "failed to replace {} with {}",
+ file.display(),
+ new_path.display()
+ )
+ }) {
+ if let Err(err) = std::fs::remove_file(&new_path) {
+ log::error!(
+ "failed to clean up temporary file {}: {err}",
+ new_path.display()
+ );
+ }
+
+ return Err(err);
+ }
+
+ Ok(())
+ }
+
+ /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
+ /// The individual items are encoded as JSON followed by a newline.
+ /// The task archive should be locked when calling this function.
+ fn write_tasks(
+ writer: &mut impl Write,
+ tasks: impl Iterator<Item = TaskCacheItem>,
+ ) -> Result<(), Error> {
+ for task in tasks {
+ serde_json::to_writer(&mut *writer, &task)?;
+ writeln!(writer)?;
+ }
+
+ Ok(())
+ }
+}
+
+/// Comparison function for sorting tasks.
+/// The tasks are compared based on the task's start time, falling
+/// back to the task's UPID as a secondary criterion in case the
+/// start times are equal.
+pub fn compare_tasks(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
+ a.starttime
+ .cmp(&b.starttime)
+ .then_with(|| b.upid.to_string().cmp(&a.upid.to_string()))
+}
+
+/// Comparison function for sorting tasks, reversed
+/// The tasks are compared based on the task's start time, falling
+/// back to the task's UPID as a secondary criterion in case the
+/// start times are equal.
+pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
+ compare_tasks(a, b).reverse()
+}
+
+/// Iterator over the task archive.
+pub struct TaskArchiveIterator {
+ /// Archive files to read.
+ files: Box<dyn Iterator<Item = PathBuf>>,
+ /// Archive iterator we are currently using, if any
+ current: Option<ArchiveIterator<BufReader<File>>>,
+ /// Lock for this archive.
+ lock: TaskCacheLock,
+}
+
+impl TaskArchiveIterator {
+ /// Create a new task archive iterator.
+ pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
+ Self {
+ files,
+ current: None,
+ lock,
+ }
+ }
+
+ /// Return the task archive lock, consuming `self`.
+ pub fn into_lock(self) -> TaskCacheLock {
+ self.lock
+ }
+}
+
+impl Iterator for &mut TaskArchiveIterator {
+ type Item = Result<TaskCacheItem, Error>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ match &mut self.current {
+ Some(current) => {
+ let next = current.next();
+ if next.is_some() {
+ return next;
+ } else {
+ self.current = None;
+ }
+ }
+ None => 'inner: loop {
+ // Returns `None` if no more files are available, stopping iteration.
+ let next_file = self.files.next()?;
+
+ match File::open(&next_file) {
+ Ok(file) => {
+ let archive_reader = BufReader::new(file);
+ let archive_iter = ArchiveIterator::new(archive_reader);
+ self.current = Some(archive_iter);
+ break 'inner;
+ }
+ Err(err) => {
+ log::error!("could not open {next_file:?} while iteration over task archive files, skipping: {err}")
+ }
+ }
+ },
+ }
+ }
+ }
+}
+
+/// Archive file.
+#[derive(Clone, Debug)]
+struct ArchiveFile {
+ /// The path to the archive file.
+ file: PathBuf,
+ /// The archive's lowest permitted starttime (seconds since UNIX epoch).
+ starttime: i64,
+}
+
+/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
+/// from both iterators sorted.
+/// The two iterators are expected to be sorted descendingly based on the task's starttime and
+/// ascendingly based on the task's UPID's string representation. This can be
+/// achieved by using the [`compare_tasks_reverse`] function when sorting an array of tasks.
+pub struct MergeTaskIterator<T: Iterator<Item = TaskCacheItem>, U: Iterator<Item = TaskCacheItem>> {
+ left: Peekable<T>,
+ right: Peekable<U>,
+}
+
+impl<T, U> MergeTaskIterator<T, U>
+where
+ T: Iterator<Item = TaskCacheItem>,
+ U: Iterator<Item = TaskCacheItem>,
+{
+ /// Create a new merging iterator.
+ pub fn new(left: Peekable<T>, right: Peekable<U>) -> Self {
+ Self { left, right }
+ }
+}
+
+impl<T, U> Iterator for MergeTaskIterator<T, U>
+where
+ T: Iterator<Item = TaskCacheItem>,
+ U: Iterator<Item = TaskCacheItem>,
+{
+ type Item = T::Item;
+
+ fn next(&mut self) -> Option<T::Item> {
+ let order = match (self.left.peek(), self.right.peek()) {
+ (Some(l), Some(r)) => Some(compare_tasks(l, r)),
+ (Some(_), None) => Some(Ordering::Greater),
+ (None, Some(_)) => Some(Ordering::Less),
+ (None, None) => None,
+ };
+
+ match order {
+ Some(Ordering::Greater) => self.left.next(),
+ Some(Ordering::Less) => self.right.next(),
+ Some(Ordering::Equal) => {
+ // Dedup by consuming the other iterator as well
+ let _ = self.right.next();
+ self.left.next()
+ }
+ None => None,
+ }
+ }
+}
+
+/// Iterator for a single task archive file.
+///
+/// This iterator implements `Iterator<Item = Result<TaskCacheItem, Error>`. When iterating,
+/// tasks are read line by line, without leading the entire archive file into memory.
+pub struct ArchiveIterator<T> {
+ iter: Lines<T>,
+}
+
+impl<T: BufRead> ArchiveIterator<T> {
+ /// Create a new iterator.
+ pub fn new(file: T) -> Self {
+ let reader = file.lines();
+
+ Self { iter: reader }
+ }
+}
+
+impl<T: BufRead> Iterator for ArchiveIterator<T> {
+ type Item = Result<TaskCacheItem, Error>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.iter.next().map(|result| {
+ result
+ .and_then(|line| Ok(serde_json::from_str(&line)?))
+ .map_err(Into::into)
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io::Cursor;
+
+ use crate::test_support::temp::NamedTempDir;
+
+ use super::*;
+
+ #[test]
+ fn archive_iterator() -> Result<(), Error> {
+ let file = r#"
+ {"upid":"pve-remote!UPID:pve:00039E4D:002638B8:67B4A9D1:stopall::root@pam:","status":"OK","endtime":12345, "starttime": 1234}
+ {"upid":"pbs-remote!UPID:pbs:000002B2:00000158:00000000:674D828C:logrotate::root@pam:","status":"OK","endtime":12345, "starttime": 1234}
+ invalid"#
+ .trim_start();
+
+ let cursor = Cursor::new(file.as_bytes());
+ let mut iter = ArchiveIterator::new(cursor);
+
+ assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pve-remote");
+ assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pbs-remote");
+ assert!(iter.next().unwrap().is_err());
+ assert!(iter.next().is_none());
+
+ Ok(())
+ }
+
+ fn task(starttime: i64, ended: bool) -> TaskCacheItem {
+ let (status, endtime) = if ended {
+ (Some("OK".into()), Some(starttime + 10))
+ } else {
+ (None, None)
+ };
+
+ TaskCacheItem {
+ upid: format!(
+ "pve-remote!UPID:pve:00039E4D:002638B8:{starttime:08X}:stopall::root@pam:"
+ )
+ .parse()
+ .unwrap(),
+ starttime,
+ status,
+ endtime,
+ }
+ }
+
+ fn assert_starttimes(cache: &TaskCache, starttimes: &[i64]) {
+ let tasks: Vec<i64> = cache
+ .get_tasks(GetTasks::All)
+ .unwrap()
+ .map(|task| task.unwrap().starttime)
+ .collect();
+
+ assert_eq!(&tasks, starttimes);
+ }
+
+ fn add_tasks(cache: &TaskCache, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
+ let param = AddTasks {
+ update_most_recent_archive_timestamp: true,
+ tasks,
+ };
+ let mut a = HashMap::new();
+ a.insert("pve-remote".into(), param);
+
+ cache.add_tasks(a)
+ }
+
+ #[test]
+ fn test_add_tasks() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+ let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+ cache.new_file(1000)?;
+ assert_eq!(cache.archive_files()?.len(), 1);
+
+ add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
+
+ cache.rotate(1500, 0, 3)?;
+ assert_eq!(cache.archive_files()?.len(), 2);
+
+ add_tasks(&cache, vec![task(1500, true), task(1501, true)])?;
+ add_tasks(&cache, vec![task(1200, true), task(1300, true)])?;
+
+ cache.rotate(2000, 0, 3)?;
+ assert_eq!(cache.archive_files()?.len(), 3);
+
+ add_tasks(&cache, vec![task(2000, true)])?;
+ add_tasks(&cache, vec![task(1502, true)])?;
+ add_tasks(&cache, vec![task(1002, true)])?;
+
+ // These are before the cut-off of 1000, so they will be discarded.
+ add_tasks(&cache, vec![task(800, true), task(900, true)])?;
+
+ // This one should be deduped
+ add_tasks(&cache, vec![task(1000, true)])?;
+
+ assert_starttimes(
+ &cache,
+ &[2000, 1502, 1501, 1500, 1300, 1200, 1002, 1001, 1000],
+ );
+
+ cache.rotate(2500, 0, 3)?;
+
+ assert_eq!(cache.archive_files()?.len(), 3);
+
+ assert_starttimes(&cache, &[2000, 1502, 1501, 1500]);
+
+ cache.rotate(3000, 0, 3)?;
+ assert_eq!(cache.archive_files()?.len(), 3);
+
+ assert_starttimes(&cache, &[2000]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_active_tasks_are_migrated_to_archive() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+ let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+ cache.new_file(1000)?;
+ add_tasks(&cache, vec![task(1000, false), task(1001, false)])?;
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
+
+ let state = cache.read_state();
+ assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1000);
+
+ add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
+
+ assert_starttimes(&cache, &[1001, 1000]);
+
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_init() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+ let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+ cache.init(1000, 3, 100)?;
+ assert_eq!(cache.archive_files()?.len(), 3);
+
+ add_tasks(
+ &cache,
+ vec![
+ task(1050, true),
+ task(950, true),
+ task(850, true),
+ task(750, true), // This one is discarded
+ ],
+ )?;
+
+ assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 3);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_tracking_tasks() -> Result<(), Error> {
+ let tmp_dir = NamedTempDir::new()?;
+ let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+ cache.init(1000, 3, 100)?;
+
+ cache.add_tracked_task(task(1050, false))?;
+
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
+ cache.add_tracked_task(task(1060, false))?;
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
+
+ let state = cache.read_state();
+ assert_eq!(state.tracked_tasks.get("pve-remote").unwrap().len(), 2);
+ assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1050);
+
+ // Mark first task as finished
+ add_tasks(&cache, vec![task(1050, true)])?;
+
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
+ assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 1);
+
+ // Mark second task as finished
+ add_tasks(&cache, vec![task(1060, true)])?;
+
+ assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
+ assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 2);
+
+ Ok(())
+ }
+}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v3 2/6] remote tasks: add background task for task polling, use new task cache
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 1/6] remote tasks: implement improved cache for remote tasks Lukas Wagner
@ 2025-04-17 13:22 ` Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 3/6] remote tasks: improve locking for task archive iterator Lukas Wagner
` (5 subsequent siblings)
7 siblings, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2025-04-17 13:22 UTC (permalink / raw)
To: pdm-devel
This commits changes the remote task module as follows:
- Add a new background task for regular polling of task data
Instead of triggering fetching of task data from the `get_tasks` function,
which is usually called by an API handler, we move the fetching to a
new background task. The task fetches the latest tasks from all remotes
and stores them in the task cache in regular intervals (10 minutes).
The `get_tasks` function itself only reads from the cache.
The main rationale for this change is that for large setups, fetching
tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
each with a >100ms connection - adds up to minutes quickly).
If we do this from within `get_tasks`, the API handler calling the
function is also blocked for the entire time.
The `get_tasks` API is called every couple of seconds by the UI the get
a list of running remote tasks, so this *must* be quick.
- Tracked tasks are also polled in the same background task, but with
a short polling delay (10 seconds). Instead of polling the status specific
tracked task UPID, we simply fetch *all* tasks since the tracked task started.
While this increased the amount of transmitted data a bit for tracked tasks
that run for a very long time, this new approach make the whole
task tracking functionality much more elegant; it integrates better with
the 'regular' task fetching which happens in long intervals.
- Tasks are now stored in the new improved task cache implementation.
This should make retrieving tasks much quicker and avoids
unneeded disk IO.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Changes since v2:
- Adapt to new locking approach (only drops a `mut`)
Changes since v1:
- use const Duration instead of u64s for durations, using
Duration::as_secs() where needed
- Move the remote_task fetching task functions to
src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
- remote_tasks::get_tasks: wrap function body in a
tokio::task::spawn_blocking. using the TaskCache::get_tasks
iterator does disk IO and could block the executor
- Added some doc strings to make the purpose/workings of
some functions clearer
- Couple of variables have been renamed for more clarity
server/src/api/pve/lxc.rs | 10 +-
server/src/api/pve/mod.rs | 4 +-
server/src/api/pve/qemu.rs | 6 +-
server/src/api/remote_tasks.rs | 11 +-
server/src/bin/proxmox-datacenter-api/main.rs | 1 +
.../bin/proxmox-datacenter-api/tasks/mod.rs | 1 +
.../tasks/remote_tasks.rs | 364 +++++++++++
server/src/remote_tasks/mod.rs | 605 ++++--------------
8 files changed, 499 insertions(+), 503 deletions(-)
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
index f1c31425..83f9f4aa 100644
--- a/server/src/api/pve/lxc.rs
+++ b/server/src/api/pve/lxc.rs
@@ -209,7 +209,7 @@ pub async fn lxc_start(
let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -242,7 +242,7 @@ pub async fn lxc_stop(
let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
.shutdown_lxc_async(&node, vmid, Default::default())
.await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -357,7 +357,7 @@ pub async fn lxc_migrate(
};
let upid = pve.migrate_lxc(&node, vmid, params).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
log::info!("migrating vm {vmid} of node {node:?}");
let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
- new_remote_upid(source, upid)
+ new_remote_upid(source, upid).await
}
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index dd7cf382..d472cf58 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -76,9 +76,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
// converts a remote + PveUpid into a RemoteUpid and starts tracking it
-fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
+async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
- remote_tasks::track_running_task(remote_upid.clone());
+ remote_tasks::track_running_task(remote_upid.clone()).await?;
Ok(remote_upid)
}
diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
index dea0550c..54e310d2 100644
--- a/server/src/api/pve/qemu.rs
+++ b/server/src/api/pve/qemu.rs
@@ -216,7 +216,7 @@ pub async fn qemu_start(
.start_qemu_async(&node, vmid, Default::default())
.await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -376,7 +376,7 @@ pub async fn qemu_migrate(
};
let upid = pve.migrate_qemu(&node, vmid, params).await?;
- new_remote_upid(remote, upid)
+ new_remote_upid(remote, upid).await
}
#[api(
@@ -563,5 +563,5 @@ pub async fn qemu_remote_migrate(
log::info!("migrating vm {vmid} of node {node:?}");
let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
- new_remote_upid(source, upid)
+ new_remote_upid(source, upid).await
}
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index e629000c..05ce3666 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
},
input: {
properties: {
- "max-age": {
- type: Integer,
- optional: true,
- // TODO: sensible default max-age
- default: 300,
- description: "Maximum age of cached task data",
- },
filters: {
type: TaskFilters,
flatten: true,
@@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
},
)]
/// Get the list of tasks for all remotes.
-async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
- let tasks = remote_tasks::get_tasks(max_age, filters).await?;
+async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+ let tasks = remote_tasks::get_tasks(filters).await?;
Ok(tasks)
}
diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
index 49499980..70e489d0 100644
--- a/server/src/bin/proxmox-datacenter-api/main.rs
+++ b/server/src/bin/proxmox-datacenter-api/main.rs
@@ -292,6 +292,7 @@ async fn run(debug: bool) -> Result<(), Error> {
metric_collection::start_task();
tasks::remote_node_mapping::start_task();
resource_cache::start_task();
+ tasks::remote_tasks::start_task()?;
server.await?;
log::info!("server shutting down, waiting for active workers to complete");
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
index e6ead882..a6b1f439 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
@@ -1 +1,2 @@
pub mod remote_node_mapping;
+pub mod remote_tasks;
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
new file mode 100644
index 00000000..dc590871
--- /dev/null
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -0,0 +1,364 @@
+use std::{collections::HashMap, sync::Arc, time::Duration};
+
+use anyhow::{format_err, Error};
+use nix::sys::stat::Mode;
+use tokio::{sync::Semaphore, task::JoinSet};
+
+use pdm_api_types::{
+ remotes::{Remote, RemoteType},
+ RemoteUpid,
+};
+use proxmox_sys::fs::CreateOptions;
+use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
+use server::{
+ api::pve,
+ remote_tasks::{
+ self,
+ task_cache::{AddTasks, State, TaskCache, TaskCacheItem},
+ REMOTE_TASKS_DIR,
+ },
+ task_utils,
+};
+
+/// Tick interval for the remote task fetching task.
+/// This is also the rate at which we check on tracked tasks.
+const TASK_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const REGULAR_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
+/// Number of cycles until a regular refresh.
+const REGULAR_REFRESH_CYCLES: u64 =
+ REGULAR_REFRESH_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
+
+/// Check if we want to rotate once every hour.
+const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
+/// Number of cycles before we want to check if we should rotate the task archives.
+const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
+
+/// Rotate once the most recent archive file is at least 24 hour old.
+const ROTATE_AFTER: Duration = Duration::from_secs(24 * 3600);
+
+/// Keep 7 days worth of tasks.
+const KEEP_OLD_FILES: u64 = 7;
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+/// Maximum number of total concurrent connections. `CONNECTIONS_PER_PVE_REMOTE` is taken into
+/// consideration when accounting for the total number of connections.
+/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_PVE_REMOTE` is 5, we can connect
+/// to 4 PVE remotes in parallel.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
+/// Start the remote task fetching task
+pub fn start_task() -> Result<(), Error> {
+ let api_uid = pdm_config::api_user()?.uid;
+ let api_gid = pdm_config::api_group()?.gid;
+ let file_options = CreateOptions::new()
+ .owner(api_uid)
+ .group(api_gid)
+ .perm(Mode::from_bits_truncate(0o0750));
+ proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
+
+ tokio::spawn(async move {
+ let task_scheduler = std::pin::pin!(remote_task_fetching_task());
+ let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(task_scheduler, abort_future).await;
+ });
+
+ Ok(())
+}
+
+/// Task which handles fetching remote tasks and task archive rotation.
+/// This function never returns.
+async fn remote_task_fetching_task() -> ! {
+ let mut cycle = 0u64;
+ let mut interval = tokio::time::interval(TASK_REFRESH_INTERVAL);
+ interval.reset_at(task_utils::next_aligned_instant(TASK_REFRESH_INTERVAL.as_secs()).into());
+
+ // We don't really care about catching up to missed tick, we just want
+ // a steady tick rate.
+ interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
+ if let Err(err) = init_cache().await {
+ log::error!("error when initialized task cache: {err}");
+ }
+
+ loop {
+ interval.tick().await;
+ if let Err(err) = do_tick(cycle).await {
+ log::error!("error when fetching remote tasks: {err}");
+ }
+
+ // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
+ // better be safe and use .wrapping_add(1) :)
+ cycle = cycle.wrapping_add(1);
+ }
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+async fn do_tick(cycle: u64) -> Result<(), Error> {
+ let cache = remote_tasks::get_cache()?;
+
+ if should_check_for_cache_rotation(cycle) {
+ log::debug!("checking if remote task archive should be rotated");
+ if rotate_cache(cache.clone()).await? {
+ log::info!("rotated remote task archive");
+ }
+ }
+
+ let state = cache.read_state();
+
+ let mut all_tasks = HashMap::new();
+
+ let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+ let mut join_set = JoinSet::new();
+
+ // Get a list of remotes that we should poll in this cycle.
+ let remotes = remotes_to_check(cycle, &state).await?;
+ for remote in remotes {
+ let since = get_cutoff_timestamp(&remote, &state);
+
+ let permit = if remote.ty == RemoteType::Pve {
+ // Acquire multiple permits, for PVE remotes we want
+ // to multiple nodes in parallel.
+ //
+ // `.unwrap()` is safe, we never close the semaphore.
+ Arc::clone(&total_connections_semaphore)
+ .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
+ .await
+ .unwrap()
+ } else {
+ // For PBS remotes we only have a single outgoing connection
+ //
+ // `.unwrap()` is safe, we never close the semaphore.
+ Arc::clone(&total_connections_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap()
+ };
+
+ join_set.spawn(async move {
+ log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
+ let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
+ format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
+ });
+
+ drop(permit);
+ tasks
+ });
+ }
+
+ while let Some(res) = join_set.join_next().await {
+ match res {
+ Ok(Ok((remote, request))) => {
+ all_tasks.insert(remote, request);
+ }
+ Ok(Err(err)) => log::error!("{err}"),
+ Err(err) => log::error!("could not join task fetching future: {err}"),
+ }
+ }
+
+ if !all_tasks.is_empty() {
+ save_tasks(cache, all_tasks).await?;
+ }
+
+ Ok(())
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
+/// spaced `ROTATE_AFTER_S` seconds apart.
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+async fn init_cache() -> Result<(), Error> {
+ tokio::task::spawn_blocking(|| {
+ let cache = remote_tasks::get_cache()?;
+ cache.init(
+ proxmox_time::epoch_i64(),
+ KEEP_OLD_FILES,
+ ROTATE_AFTER.as_secs(),
+ )?;
+ Ok(())
+ })
+ .await?
+}
+
+/// Return list of remotes that are to be polled in this cycle.
+///
+/// If `cycle` is a multiple of `REGULAR_REFRESH_CYCLES`, the function will
+/// return all remotes from the remote config. This ensures that
+/// all remotes are polled at regular intervals.
+/// In any other case we only return remotes which currently have a tracked
+/// task.
+/// On daemon startup (when cycle is 0) we return all remotes to ensure
+/// that we get an up-to-date task list from all remotes.
+async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
+ let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+ let all = cycle % REGULAR_REFRESH_CYCLES == 0;
+
+ if all {
+ Ok(config.sections.into_values().collect())
+ } else {
+ Ok(config
+ .sections
+ .into_iter()
+ .filter_map(|(name, remote)| {
+ if let Some(tracked) = state.tracked_tasks.get(&name) {
+ if !tracked.is_empty() {
+ Some(remote)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ })
+ .collect())
+ }
+}
+
+/// Get the timestamp from which on we should fetch tasks for a given remote.
+/// The returned timestamp is a UNIX timestamp (in seconds).
+fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
+ let oldest_active = state.oldest_active_task.get(&remote.id).copied();
+ let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
+
+ match (oldest_active, youngest_archived) {
+ (None, None) => 0,
+ (None, Some(youngest_archived)) => youngest_archived,
+ (Some(oldest_active), None) => oldest_active,
+ (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
+ }
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+ tokio::task::spawn_blocking(move || {
+ cache.rotate(
+ proxmox_time::epoch_i64(),
+ ROTATE_AFTER.as_secs(),
+ KEEP_OLD_FILES,
+ )
+ })
+ .await?
+}
+
+/// Fetch tasks (active and finished) from a remote
+/// `since` is a UNIX timestamp (seconds).
+async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
+ let mut tasks = Vec::new();
+
+ let mut all_successful = true;
+
+ match remote.ty {
+ RemoteType::Pve => {
+ let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
+ let mut join_set = JoinSet::new();
+
+ let client = pve::connect(remote)?;
+
+ // N+1 requests - we could use /cluster/tasks, but that one
+ // only gives a limited task history
+ for node in client.list_nodes().await? {
+ let params = ListTasks {
+ // Include running tasks
+ source: Some(ListTasksSource::All),
+ since: Some(since),
+ // If `limit` is not provided, we only receive 50 tasks
+ limit: Some(MAX_TASKS_TO_FETCH),
+ ..Default::default()
+ };
+
+ let permit = Arc::clone(&per_remote_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap();
+
+ let r = remote.clone();
+
+ join_set.spawn(async move {
+ let client = pve::connect(&r)?;
+ let task_list =
+ client
+ .get_task_list(&node.node, params)
+ .await
+ .map_err(|err| {
+ format_err!("remote '{}', node '{}': {err}", r.id, node.node)
+ })?;
+
+ drop(permit);
+
+ Ok::<Vec<_>, Error>(task_list)
+ });
+ }
+
+ while let Some(res) = join_set.join_next().await {
+ match res {
+ Ok(Ok(list)) => {
+ let mapped = list.into_iter().filter_map(|task| {
+ match map_pve_task(task, &remote.id) {
+ Ok(task) => Some(task),
+ Err(err) => {
+ log::error!("could not map task data, skipping: {err}");
+ None
+ }
+ }
+ });
+
+ tasks.extend(mapped);
+ }
+ Ok(Err(err)) => {
+ all_successful = false;
+ log::error!("could not fetch tasks: {err:?}");
+ }
+ Err(err) => return Err(err.into()),
+ }
+ }
+ }
+ RemoteType::Pbs => {
+ // TODO: Add code for PBS
+ }
+ }
+
+ let new_tasks = AddTasks {
+ update_most_recent_archive_timestamp: all_successful,
+ tasks,
+ };
+
+ Ok((remote.id.clone(), new_tasks))
+}
+
+/// Check if we are due for checking for cache rotation.
+///
+/// If `cycle` is 0, a cache rotation check is forced. This is
+/// only relevant at daemon startup.
+fn should_check_for_cache_rotation(cycle: u64) -> bool {
+ cycle % CHECK_ROTATE_CYCLES == 0
+}
+
+/// Map a `ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
+ let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
+
+ Ok(TaskCacheItem {
+ upid: remote_upid,
+ starttime: task.starttime,
+ endtime: task.endtime,
+ status: task.status,
+ })
+}
+
+/// Add newly fetched tasks to the cache.
+async fn save_tasks(cache: TaskCache, tasks: HashMap<String, AddTasks>) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || cache.add_tasks(tasks)).await?
+}
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 2062f2b7..126c9ad3 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,515 +1,152 @@
-use std::{
- collections::{HashMap, HashSet},
- fs::File,
- path::{Path, PathBuf},
- sync::{LazyLock, RwLock},
- time::Duration,
-};
+use std::path::Path;
use anyhow::Error;
-use pdm_api_types::{
- remotes::{Remote, RemoteType},
- RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
-};
+use pdm_api_types::{RemoteUpid, TaskFilters, TaskListItem, TaskStateType};
use proxmox_sys::fs::CreateOptions;
-use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
-use tokio::task::JoinHandle;
+use pve_api_types::PveUpid;
+use task_cache::{GetTasks, TaskCache, TaskCacheItem};
-use crate::{api::pve, task_utils};
+pub mod task_cache;
-mod task_cache;
+pub const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
/// Get tasks for all remotes
// FIXME: filter for privileges
-pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
- let (remotes, _) = pdm_config::remotes::config()?;
+pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+ tokio::task::spawn_blocking(move || {
+ let cache = get_cache()?;
- let mut all_tasks = Vec::new();
+ let mut returned_tasks = Vec::new();
- let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let mut cache = TaskCache::new(cache_path)?;
-
- // Force a refresh for all tasks of a remote if a task is finished.
- // Not super nice, but saves us from persisting finished tasks. Also,
- // the /nodes/<node>/tasks/<upid>/status endpoint does not return
- // a task's endtime, which is only returned by
- // /nodes/<node>/tasks...
- // Room for improvements in the future.
- invalidate_cache_for_finished_tasks(&mut cache);
-
- for (remote_name, remote) in &remotes.sections {
- let now = proxmox_time::epoch_i64();
-
- if let Some(tasks) = cache.get_tasks(remote_name.as_str(), now, max_age) {
- // Data in cache is recent enough and has not been invalidated.
- all_tasks.extend(tasks);
+ let which = if filters.running {
+ GetTasks::Active
} else {
- let tasks = match fetch_tasks(remote).await {
- Ok(tasks) => tasks,
+ GetTasks::All
+ };
+
+ for task in &mut cache
+ .get_tasks(which)?
+ .skip(filters.start as usize)
+ .take(filters.limit as usize)
+ {
+ let task = match task {
+ Ok(task) => task,
Err(err) => {
- // ignore errors for not reachable remotes
+ log::error!("could not read task from remote task cache, skipping: {err}");
continue;
}
};
- cache.set_tasks(remote_name.as_str(), tasks.clone(), now);
- all_tasks.extend(tasks);
+ // TODO: Handle PBS tasks
+ let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
+ match pve_upid {
+ Ok(pve_upid) => {
+ returned_tasks.push(TaskListItem {
+ upid: task.upid.to_string(),
+ node: pve_upid.node,
+ pid: pve_upid.pid as i64,
+ pstart: pve_upid.pstart,
+ starttime: pve_upid.starttime,
+ worker_type: pve_upid.worker_type,
+ worker_id: None,
+ user: pve_upid.auth_id,
+ endtime: task.endtime,
+ status: task.status,
+ });
+ }
+ Err(err) => {
+ log::error!("could not parse UPID: {err}");
+ }
+ }
}
- }
- let mut returned_tasks = add_running_tasks(all_tasks)?;
- returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
- let returned_tasks = returned_tasks
- .into_iter()
- .filter(|item| {
- if filters.running && item.endtime.is_some() {
- return false;
- }
-
- if let Some(until) = filters.until {
- if item.starttime > until {
+ let returned_tasks = returned_tasks
+ .into_iter()
+ .filter(|item| {
+ if filters.running && item.endtime.is_some() {
return false;
}
- }
- if let Some(since) = filters.since {
- if item.starttime < since {
- return false;
- }
- }
-
- if let Some(needle) = &filters.userfilter {
- if !item.user.contains(needle) {
- return false;
- }
- }
-
- if let Some(typefilter) = &filters.typefilter {
- if !item.worker_type.contains(typefilter) {
- return false;
- }
- }
-
- let state = item.status.as_ref().map(|status| tasktype(status));
-
- match (state, &filters.statusfilter) {
- (Some(TaskStateType::OK), _) if filters.errors => return false,
- (Some(state), Some(filters)) => {
- if !filters.contains(&state) {
+ if let Some(until) = filters.until {
+ if item.starttime > until {
return false;
}
}
- (None, Some(_)) => return false,
- _ => {}
- }
- true
- })
- .skip(filters.start as usize)
- .take(filters.limit as usize)
- .collect();
-
- // We don't need to wait for this task to finish
- tokio::task::spawn_blocking(move || {
- if let Err(e) = cache.save() {
- log::error!("could not save task cache: {e}");
- }
- });
-
- Ok(returned_tasks)
-}
-
-/// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
- let mut tasks = Vec::new();
-
- match remote.ty {
- RemoteType::Pve => {
- let client = pve::connect(remote)?;
-
- // N+1 requests - we could use /cluster/tasks, but that one
- // only gives a limited task history
- for node in client.list_nodes().await? {
- let params = ListTasks {
- // Include running tasks
- source: Some(ListTasksSource::All),
- // TODO: How much task history do we want? Right now we just hard-code it
- // to 7 days.
- since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
- ..Default::default()
- };
-
- let list = client.get_task_list(&node.node, params).await?;
- let mapped = map_tasks(list, &remote.id)?;
-
- tasks.extend(mapped);
- }
- }
- RemoteType::Pbs => {
- // TODO: Add code for PBS
- }
- }
-
- Ok(tasks)
-}
-
-/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
-fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
- let mut mapped = Vec::new();
-
- for task in tasks {
- let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
-
- mapped.push(TaskListItem {
- upid: remote_upid.to_string(),
- node: task.node,
- pid: task.pid,
- pstart: task.pstart as u64,
- starttime: task.starttime,
- worker_type: task.ty,
- worker_id: Some(task.id),
- user: task.user,
- endtime: task.endtime,
- status: task.status,
- })
- }
-
- Ok(mapped)
-}
-
-/// Drops the cached task list of a remote for all finished tasks.
-///
-/// We use this to force a refresh so that we get the full task
-/// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
- let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
-
- // If a task is finished, we force a refresh for the remote - otherwise
- // we don't get the 'endtime' for the task.
- for task in finished.drain() {
- cache.invalidate_cache_for_remote(task.remote());
- }
-}
-
-/// Supplement the list of tasks that we received from the remote with
-/// the tasks that were started by PDM and are currently running.
-fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
- let mut returned_tasks = Vec::new();
-
- let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
- for task in cached_tasks {
- let remote_upid = task.upid.parse()?;
-
- if running_tasks.contains(&remote_upid) {
- if task.endtime.is_some() {
- // Task is finished but we still think it is running ->
- // Drop it from RUNNING_FOREIGN_TASKS
- running_tasks.remove(&remote_upid);
-
- // No need to put it in FINISHED_TASKS, since we already
- // got its state recently enough (we know the status and endtime)
- }
- } else {
- returned_tasks.push(task);
- }
- }
-
- for task in running_tasks.iter() {
- let upid: PveUpid = task.upid.parse()?;
- returned_tasks.push(TaskListItem {
- upid: task.to_string(),
- node: upid.node,
- pid: upid.pid as i64,
- pstart: upid.pstart,
- starttime: upid.starttime,
- worker_type: upid.worker_type,
- worker_id: upid.worker_id,
- user: upid.auth_id,
- endtime: None,
- status: None,
- });
- }
-
- Ok(returned_tasks)
-}
-
-/// A cache for fetched remote tasks.
-struct TaskCache {
- /// Cache entries
- content: TaskCacheContent,
-
- /// Entries that were added or updated - these will be persistet
- /// when `save` is called.
- new_or_updated: TaskCacheContent,
-
- /// Cache entries were changed/removed.
- dirty: bool,
-
- /// File-location at which the cached tasks are stored.
- cachefile_path: PathBuf,
-}
-
-impl TaskCache {
- /// Create a new tasks cache instance by loading
- /// the cache from disk.
- fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
- Ok(Self {
- content: Self::load_content()?,
- new_or_updated: Default::default(),
- dirty: false,
- cachefile_path,
- })
- }
-
- /// Load the task cache contents from disk.
- fn load_content() -> Result<TaskCacheContent, Error> {
- let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
-
- let content = if let Some(content) = content {
- serde_json::from_str(&content)?
- } else {
- Default::default()
- };
-
- Ok(content)
- }
-
- /// Get path for the cache's lockfile.
- fn lockfile_path(&self) -> PathBuf {
- let mut path = self.cachefile_path.clone();
- path.set_extension("lock");
- path
- }
-
- /// Persist the task cache
- ///
- /// This method requests an exclusive lock for the task cache lockfile.
- fn save(&mut self) -> Result<(), Error> {
- // if we have not updated anything, we don't have to update the cache file
- if !self.dirty {
- return Ok(());
- }
-
- let _guard = self.lock(Duration::from_secs(5))?;
-
- // Read content again, in case somebody has changed it in the meanwhile
- let mut content = Self::load_content()?;
-
- for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
- if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
- // Only update entry if nobody else has updated it in the meanwhile
- if existing_entry.timestamp < entry.timestamp {
- *existing_entry = entry;
- }
- } else {
- content.remote_tasks.insert(remote_name, entry);
- }
- }
-
- let bytes = serde_json::to_vec_pretty(&content)?;
-
- let api_uid = pdm_config::api_user()?.uid;
- let api_gid = pdm_config::api_group()?.gid;
-
- let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
- proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
-
- self.dirty = false;
-
- Ok(())
- }
-
- // Update task data for a given remote.
- fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
- self.dirty = true;
- self.new_or_updated
- .remote_tasks
- .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
- }
-
- // Get task data for a given remote.
- fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
- if let Some(entry) = self.content.remote_tasks.get(remote) {
- if (entry.timestamp + max_age) < now {
- return None;
- }
-
- Some(entry.tasks.clone())
- } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
- if (entry.timestamp + max_age) < now {
- return None;
- }
- Some(entry.tasks.clone())
- } else {
- None
- }
- }
-
- // Invalidate cache for a given remote.
- fn invalidate_cache_for_remote(&mut self, remote: &str) {
- self.dirty = true;
- self.content.remote_tasks.remove(remote);
- }
-
- // Lock the cache for modification.
- //
- // While the cache is locked, other users can still read the cache
- // without a lock, since the cache file is replaced atomically
- // when updating.
- fn lock(&self, duration: Duration) -> Result<File, Error> {
- let api_uid = pdm_config::api_user()?.uid;
- let api_gid = pdm_config::api_group()?.gid;
-
- let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
- proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
- }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
- timestamp: i64,
- tasks: Vec<TaskListItem>,
-}
-
-#[derive(Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
- remote_tasks: HashMap<String, TaskCacheEntry>,
-}
-
-/// Interval at which tracked tasks are polled
-const RUNNING_CHECK_INTERVAL_S: u64 = 10;
-
-/// Tasks which were started by PDM and are still running
-static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-/// Tasks which were started by PDM and w
-static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-
-fn init() -> RwLock<HashSet<RemoteUpid>> {
- RwLock::new(HashSet::new())
-}
-
-/// Insert a remote UPID into the running list
-///
-/// If it is the first entry in the list, a background task is started to track its state
-///
-/// Returns the [`JoinHandle`] if a task was started.
-///
-/// panics on a poisoned mutex
-pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
- let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
-
- // the call inserting the first task in the list needs to start the checking task
- let need_start_task = tasks.is_empty();
- tasks.insert(task);
-
- if !need_start_task {
- return None;
- }
- drop(tasks);
-
- Some(tokio::spawn(async move {
- loop {
- let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
- let finished_tasks = get_finished_tasks().await;
-
- // skip iteration if we still have tasks, just not finished ones
- if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
- continue;
- }
-
- let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
- // we either have finished tasks, or the running task list was empty
- let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
-
- for (upid, _status) in finished_tasks {
- if set.remove(&upid) {
- finished.insert(upid);
- } else {
- // someone else removed & persisted the task in the meantime
- }
- }
-
- // if no task remains, end the current task
- // it will be restarted by the next caller that inserts one
- if set.is_empty() {
- return;
- }
- }
- }))
-}
-
-/// Get a list of running foreign tasks
-///
-/// panics on a poisoned mutex
-pub fn get_running_tasks() -> Vec<RemoteUpid> {
- RUNNING_FOREIGN_TASKS
- .read()
- .unwrap()
- .iter()
- .cloned()
- .collect()
-}
-
-/// Checks all current saved UPIDs if they're still running, and if not,
-/// returns their upids + status
-///
-/// panics on a poisoned mutex
-pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
- let mut finished = Vec::new();
- let config = match pdm_config::remotes::config() {
- Ok((config, _)) => config,
- Err(err) => {
- log::error!("could not open remotes config: {err}");
- return Vec::new();
- }
- };
- for task in get_running_tasks() {
- match config.get(task.remote()) {
- Some(remote) => match remote.ty {
- RemoteType::Pve => {
- let status = match crate::api::pve::tasks::get_task_status(
- remote.id.clone(),
- task.clone(),
- false,
- )
- .await
- {
- Ok(status) => status,
- Err(err) => {
- log::error!("could not get status from remote: {err}");
- finished.push((task, "could not get status".to_string()));
- continue;
- }
- };
- if let Some(status) = status.exitstatus {
- finished.push((task, status.to_string()));
+ if let Some(since) = filters.since {
+ if item.starttime < since {
+ return false;
}
}
- RemoteType::Pbs => {
- let _client = match crate::pbs_client::connect(remote) {
- Ok(client) => client,
- Err(err) => {
- log::error!("could not get status from remote: {err}");
- finished.push((task, "could not get status".to_string()));
- continue;
- }
- };
- // FIXME implement get task status
- finished.push((task, "unknown state".to_string()));
- }
- },
- None => finished.push((task, "unknown remote".to_string())),
- }
- }
- finished
+ if let Some(needle) = &filters.userfilter {
+ if !item.user.contains(needle) {
+ return false;
+ }
+ }
+
+ if let Some(typefilter) = &filters.typefilter {
+ if !item.worker_type.contains(typefilter) {
+ return false;
+ }
+ }
+
+ let state = item.status.as_ref().map(|status| tasktype(status));
+
+ match (state, &filters.statusfilter) {
+ (Some(TaskStateType::OK), _) if filters.errors => return false,
+ (Some(state), Some(filters)) => {
+ if !filters.contains(&state) {
+ return false;
+ }
+ }
+ (None, Some(_)) => return false,
+ _ => {}
+ }
+
+ true
+ })
+ .collect();
+
+ Ok(returned_tasks)
+ })
+ .await?
+}
+
+/// Insert a newly created tasks into the list of tracked tasks.
+///
+/// Any remote with associated tracked tasks will polled with a short interval
+/// until all tracked tasks have finished.
+pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
+ tokio::task::spawn_blocking(move || {
+ let cache = get_cache()?;
+ // TODO:: Handle PBS tasks correctly.
+ let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
+ let task = TaskCacheItem {
+ upid: task.clone(),
+ starttime: pve_upid.starttime,
+ status: None,
+ endtime: None,
+ };
+ cache.add_tracked_task(task)
+ })
+ .await?
+}
+
+/// Get a new [`TaskCache`] instance.
+///
+/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
+/// update loop.
+pub fn get_cache() -> Result<TaskCache, Error> {
+ let api_uid = pdm_config::api_user()?.uid;
+ let api_gid = pdm_config::api_group()?.gid;
+
+ let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+ let cache_path = Path::new(REMOTE_TASKS_DIR);
+ let cache = TaskCache::new(cache_path, file_options)?;
+
+ Ok(cache)
}
/// Parses a task status string into a TaskStateType
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v3 3/6] remote tasks: improve locking for task archive iterator
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 1/6] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 2/6] remote tasks: add background task for task polling, use new task cache Lukas Wagner
@ 2025-04-17 13:22 ` Lukas Wagner
2025-04-18 7:12 ` Wolfgang Bumiller
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 4/6] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
` (4 subsequent siblings)
7 siblings, 1 reply; 13+ messages in thread
From: Lukas Wagner @ 2025-04-17 13:22 UTC (permalink / raw)
To: pdm-devel
Instead of awkwardly using into_lock for keeping the archive locked,
pass the lock as a reference to TaskCache::get_tasks_with_lock. The
iterator itself only holds the lock if get use the auto-locking
TaskCache::get_tasks, otherwise we ensure that the lock lives long
enough via lifetimes and PhantomData in the iterator.
Suggested-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
New in v3
server/src/remote_tasks/mod.rs | 2 +-
server/src/remote_tasks/task_cache.rs | 62 +++++++++++++--------------
2 files changed, 31 insertions(+), 33 deletions(-)
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 126c9ad3..b0fc052f 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -24,7 +24,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
GetTasks::All
};
- for task in &mut cache
+ for task in cache
.get_tasks(which)?
.skip(filters.start as usize)
.take(filters.limit as usize)
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index f847d441..c59c5235 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -6,6 +6,7 @@ use std::{
fs::File,
io::{BufRead, BufReader, BufWriter, ErrorKind, Lines, Write},
iter::Peekable,
+ marker::PhantomData,
path::{Path, PathBuf},
time::Duration,
};
@@ -213,8 +214,8 @@ impl TaskCache {
.map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
.collect();
- let mut task_iter = self
- .get_tasks_with_lock(GetTasks::Active, lock)
+ let task_iter = self
+ .get_tasks_with_lock(GetTasks::Active, &lock)
.context("failed to create archive iterator for active tasks")?;
let mut active_tasks =
@@ -226,10 +227,6 @@ impl TaskCache {
}
}));
- // Consume the iterator to get back the lock. The lock is held
- // until _lock is finally dropped at the end of the function.
- let _lock = task_iter.into_lock();
-
let mut new_finished_tasks = Vec::new();
for task in tasks {
@@ -387,7 +384,7 @@ impl TaskCache {
let mut tasks = Vec::new();
let mut task_iter = self
- .get_tasks_with_lock(GetTasks::Active, lock)
+ .get_tasks_with_lock(GetTasks::Active, &lock)
.context("failed to create active task iterator")?;
for task in &mut task_iter {
@@ -404,8 +401,6 @@ impl TaskCache {
tasks.push(task.clone());
tasks.sort_by(compare_tasks_reverse);
- let _lock = task_iter.into_lock();
-
let mut state = self.read_state();
state
@@ -432,24 +427,28 @@ impl TaskCache {
///
/// This function will request a non-exclusive read-lock, don't call if
/// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
- pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
- let lock = self
- .lock(false)
- .context("get_tasks: failed to acquire lock")?;
- self.get_tasks_with_lock(mode, lock)
+ pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'static>, Error> {
+ let lock = self.lock(false).context("failed to lock archive")?;
+ self.get_tasks_impl(mode, Some(lock))
.context("failed to create task archive iterator")
}
/// Iterate over cached tasks.
///
- /// This function requires you to pass a lock. If you want to continue to hold the lock
- /// after iterating, you can consume the iterator by calling
- /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
- pub fn get_tasks_with_lock(
+ /// This function requires you to pass a lock.
+ pub fn get_tasks_with_lock<'a>(
&self,
mode: GetTasks,
- lock: TaskCacheLock,
- ) -> Result<TaskArchiveIterator, Error> {
+ _lock: &'a TaskCacheLock,
+ ) -> Result<TaskArchiveIterator<'a>, Error> {
+ self.get_tasks_impl(mode, None)
+ }
+
+ pub fn get_tasks_impl<'a>(
+ &self,
+ mode: GetTasks,
+ lock: Option<TaskCacheLock>,
+ ) -> Result<TaskArchiveIterator<'a>, Error> {
match mode {
GetTasks::All => {
let archive_files = self.archive_files()?.into_iter().map(|pair| pair.file);
@@ -682,32 +681,31 @@ pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
}
/// Iterator over the task archive.
-pub struct TaskArchiveIterator {
+pub struct TaskArchiveIterator<'a> {
/// Archive files to read.
files: Box<dyn Iterator<Item = PathBuf>>,
/// Archive iterator we are currently using, if any
current: Option<ArchiveIterator<BufReader<File>>>,
- /// Lock for this archive.
- lock: TaskCacheLock,
+ /// Lock for this archive. This contains the lock in case we
+ /// need to keep the archive locked while iterating over it.
+ _lock: Option<TaskCacheLock>,
+ /// PhantomData to bind the lifetime of the iterator to an externally held lock.
+ _lifetime: PhantomData<&'a ()>,
}
-impl TaskArchiveIterator {
+impl TaskArchiveIterator<'_> {
/// Create a new task archive iterator.
- pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
+ pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: Option<TaskCacheLock>) -> Self {
Self {
files,
current: None,
- lock,
+ _lock: lock,
+ _lifetime: PhantomData,
}
}
-
- /// Return the task archive lock, consuming `self`.
- pub fn into_lock(self) -> TaskCacheLock {
- self.lock
- }
}
-impl Iterator for &mut TaskArchiveIterator {
+impl Iterator for TaskArchiveIterator<'_> {
type Item = Result<TaskCacheItem, Error>;
fn next(&mut self) -> Option<Self::Item> {
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v3 4/6] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
` (2 preceding siblings ...)
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 3/6] remote tasks: improve locking for task archive iterator Lukas Wagner
@ 2025-04-17 13:22 ` Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 5/6] fake remote: make the fake_remote feature compile again Lukas Wagner
` (3 subsequent siblings)
7 siblings, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2025-04-17 13:22 UTC (permalink / raw)
To: pdm-devel
This allows us to get rid of the `tasktype` helper in
server::remote_tasks.
We don't impl `From<&str>` because those should be value-preserving,
lossless and obvious. Also, the function is not called from_str to
avoid any confusion with FromStr.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Changes in v3:
- move the function to TaskStateType::new_from_str instead
of From<&str>
lib/pdm-api-types/src/lib.rs | 15 +++++++++++++++
server/src/remote_tasks/mod.rs | 15 +--------------
2 files changed, 16 insertions(+), 14 deletions(-)
diff --git a/lib/pdm-api-types/src/lib.rs b/lib/pdm-api-types/src/lib.rs
index 38449071..9373725c 100644
--- a/lib/pdm-api-types/src/lib.rs
+++ b/lib/pdm-api-types/src/lib.rs
@@ -232,6 +232,21 @@ pub enum TaskStateType {
Unknown,
}
+impl TaskStateType {
+ /// Construct a new instance from a `&str`.
+ pub fn new_from_str(status: &str) -> Self {
+ if status == "unknown" || status.is_empty() {
+ TaskStateType::Unknown
+ } else if status == "OK" {
+ TaskStateType::OK
+ } else if status.starts_with("WARNINGS: ") {
+ TaskStateType::Warning
+ } else {
+ TaskStateType::Error
+ }
+ }
+}
+
#[api(
properties: {
upid: { schema: UPID::API_SCHEMA },
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index b0fc052f..f4b92809 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -91,7 +91,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
}
}
- let state = item.status.as_ref().map(|status| tasktype(status));
+ let state = item.status.as_deref().map(TaskStateType::new_from_str);
match (state, &filters.statusfilter) {
(Some(TaskStateType::OK), _) if filters.errors => return false,
@@ -148,16 +148,3 @@ pub fn get_cache() -> Result<TaskCache, Error> {
Ok(cache)
}
-
-/// Parses a task status string into a TaskStateType
-pub fn tasktype(status: &str) -> TaskStateType {
- if status == "unknown" || status.is_empty() {
- TaskStateType::Unknown
- } else if status == "OK" {
- TaskStateType::OK
- } else if status.starts_with("WARNINGS: ") {
- TaskStateType::Warning
- } else {
- TaskStateType::Error
- }
-}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v3 5/6] fake remote: make the fake_remote feature compile again
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
` (3 preceding siblings ...)
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 4/6] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
@ 2025-04-17 13:22 ` Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 6/6] fake remote: clippy fixes Lukas Wagner
` (2 subsequent siblings)
7 siblings, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2025-04-17 13:22 UTC (permalink / raw)
To: pdm-devel
The ClientFactory trait was changed in Wolfgang's multi-client patches,
this commit adapts the fake remote feature to the changes.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
new in v2
server/src/test_support/fake_remote.rs | 29 +++++++++++++-------------
1 file changed, 15 insertions(+), 14 deletions(-)
diff --git a/server/src/test_support/fake_remote.rs b/server/src/test_support/fake_remote.rs
index 40f68807..840c2131 100644
--- a/server/src/test_support/fake_remote.rs
+++ b/server/src/test_support/fake_remote.rs
@@ -1,18 +1,22 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use anyhow::{bail, Error};
+use serde::Deserialize;
+
use pdm_api_types::{remotes::Remote, Authid, ConfigDigest};
use pdm_config::remotes::RemoteConfig;
use proxmox_product_config::ApiLockGuard;
use proxmox_section_config::typed::SectionConfigData;
use pve_api_types::{
- client::PveClient, ClusterMetrics, ClusterMetricsData, ClusterNodeIndexResponse,
- ClusterNodeIndexResponseStatus, ClusterResource, ClusterResourceKind, ClusterResourceType,
- ListTasks, ListTasksResponse, PveUpid, StorageContent,
+ ClusterMetrics, ClusterMetricsData, ClusterNodeIndexResponse, ClusterNodeIndexResponseStatus,
+ ClusterResource, ClusterResourceKind, ClusterResourceType, ListTasks, ListTasksResponse,
+ PveUpid, StorageContent,
};
-use serde::Deserialize;
-use crate::{connection::ClientFactory, pbs_client::PbsClient};
+use crate::{
+ connection::{ClientFactory, PveClient},
+ pbs_client::PbsClient,
+};
#[derive(Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
@@ -77,8 +81,8 @@ impl FakeRemoteConfig {
#[async_trait::async_trait]
impl ClientFactory for FakeClientFactory {
- fn make_pve_client(&self, _remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
- Ok(Box::new(FakePveClient {
+ fn make_pve_client(&self, _remote: &Remote) -> Result<Arc<PveClient>, Error> {
+ Ok(Arc::new(FakePveClient {
nr_of_vms: self.config.vms_per_pve_remote,
nr_of_cts: self.config.cts_per_pve_remote,
nr_of_nodes: self.config.nodes_per_pve_remote,
@@ -91,7 +95,7 @@ impl ClientFactory for FakeClientFactory {
&self,
_remote: &Remote,
_target_endpoint: Option<&str>,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+ ) -> Result<Arc<PveClient>, Error> {
bail!("not implemented")
}
@@ -99,10 +103,7 @@ impl ClientFactory for FakeClientFactory {
bail!("not implemented")
}
- async fn make_pve_client_and_login(
- &self,
- _remote: &Remote,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+ async fn make_pve_client_and_login(&self, _remote: &Remote) -> Result<Arc<PveClient>, Error> {
bail!("not implemented")
}
@@ -121,7 +122,7 @@ struct FakePveClient {
}
#[async_trait::async_trait]
-impl PveClient for FakePveClient {
+impl pve_api_types::client::PveClient for FakePveClient {
async fn cluster_resources(
&self,
_ty: Option<ClusterResourceKind>,
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pdm-devel] [PATCH proxmox-datacenter-manager v3 6/6] fake remote: clippy fixes
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
` (4 preceding siblings ...)
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 5/6] fake remote: make the fake_remote feature compile again Lukas Wagner
@ 2025-04-17 13:22 ` Lukas Wagner
2025-04-17 15:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Thomas Lamprecht
2025-04-18 8:33 ` [pdm-devel] superseded: " Lukas Wagner
7 siblings, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2025-04-17 13:22 UTC (permalink / raw)
To: pdm-devel
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Added in v3
server/src/test_support/fake_remote.rs | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git a/server/src/test_support/fake_remote.rs b/server/src/test_support/fake_remote.rs
index 840c2131..37c1992a 100644
--- a/server/src/test_support/fake_remote.rs
+++ b/server/src/test_support/fake_remote.rs
@@ -132,7 +132,7 @@ impl pve_api_types::client::PveClient for FakePveClient {
let mut vmid = 100;
for _ in 0..self.nr_of_vms {
- vmid = vmid + 1;
+ vmid += 1;
result.push(ClusterResource {
cgroup_mode: None,
content: None,
@@ -165,7 +165,7 @@ impl pve_api_types::client::PveClient for FakePveClient {
}
for _ in 0..self.nr_of_cts {
- vmid = vmid + 1;
+ vmid += 1;
result.push(ClusterResource {
cgroup_mode: None,
content: None,
@@ -357,7 +357,6 @@ impl pve_api_types::client::PveClient for FakePveClient {
async fn list_nodes(&self) -> Result<Vec<ClusterNodeIndexResponse>, proxmox_client::Error> {
tokio::time::sleep(Duration::from_millis(self.api_delay_ms as u64)).await;
Ok((0..self.nr_of_nodes)
- .into_iter()
.map(|i| ClusterNodeIndexResponse {
node: format!("pve-{i}"),
cpu: None,
@@ -414,7 +413,6 @@ impl pve_api_types::client::PveClient for FakePveClient {
let number_of_tasks = limit.min(number_of_tasks as u64);
Ok((0..number_of_tasks)
- .into_iter()
.map(|i| make_task(now - i as i64 * NEW_TASK_EVERY * 60))
.collect())
}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
` (5 preceding siblings ...)
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 6/6] fake remote: clippy fixes Lukas Wagner
@ 2025-04-17 15:31 ` Thomas Lamprecht
2025-04-18 7:12 ` Wolfgang Bumiller
2025-04-18 7:24 ` Lukas Wagner
2025-04-18 8:33 ` [pdm-devel] superseded: " Lukas Wagner
7 siblings, 2 replies; 13+ messages in thread
From: Thomas Lamprecht @ 2025-04-17 15:31 UTC (permalink / raw)
To: Proxmox Datacenter Manager development discussion, Lukas Wagner
Am 17.04.25 um 15:22 schrieb Lukas Wagner:
> Benchmarking was done using the 'fake-remote' feature. There were 100
> remotes, 10 PVE nodes per remote. The task cache contained
> about 1.5 million tasks.
> before after
> list of active tasks (*): ~1.3s ~30µs
> list of 500 tasks, offset 0 (**): ~1.3s ~500µs
> list of 500 tasks, offset 1 million (***): ~1.3s ~200ms
> Size on disk: ~500MB ~200MB
>
> (*): Requested by the UI every 3s
> (**): Requested by the UI when visiting Remotes > Tasks
> (***): E.g. when scrolling towars the bottom of 'Remotes > Tasks'
Did you regenerate these for the current revision? Just out of interest
whether there is any measurable effect of Wolfgangs proposed changes.
And FWIW, I'd like to encode these also in the commit message, albeit
with the lore links that b4 adds it's less of a problem, but IMO would
still not really hurt to do. Depending on your answer on the actuality
of above data I could amend that or newly provided measurements into
the commit message on applying though, so definitively no need for a v+1
just for that.
>
> In the old implementation, the archive file was *always* fully deserialized
> and loaded into RAM, this is the reason why the time needed is pretty
> idential for all scenarios.
> The new implementation reads the archive files only line by line,
> and only 500 tasks were loaded into RAM at the same time. The higher the offset,
> the more archive lines/files we have to scan, which increases the
> time needed to access the data. The tasks are sorted descending
> by starttime, as a result the requests get slower the further you
> go back in history.
>
> The 'before' times do NOT include the time needed for actually fetching
> the task data.
>
> This series was preseded by [1], however almost all of the code has changes, which
> is the reason why I send this as a new series.
>
> [1] https://lore.proxmox.com/pdm-devel/20250128122520.167796-1-l.wagner@proxmox.com/
>
> Changes since v2:
> - Change locking approach as suggested by Wolfgang
> - Incorporated feedback from Wolfang
> - see patch notes for details
> - Added some .context/.with_context for better error messages
>
Looks alright to me now, if Wolfgang sees nothing of w.r.t. the changes
since the last version I'd apply this series tomorrow.
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend
2025-04-17 15:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Thomas Lamprecht
@ 2025-04-18 7:12 ` Wolfgang Bumiller
2025-04-18 7:24 ` Lukas Wagner
1 sibling, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2025-04-18 7:12 UTC (permalink / raw)
To: Thomas Lamprecht; +Cc: Proxmox Datacenter Manager development discussion
On Thu, Apr 17, 2025 at 05:31:56PM +0200, Thomas Lamprecht wrote:
> Am 17.04.25 um 15:22 schrieb Lukas Wagner:
> > Benchmarking was done using the 'fake-remote' feature. There were 100
> > remotes, 10 PVE nodes per remote. The task cache contained
> > about 1.5 million tasks.
> > before after
> > list of active tasks (*): ~1.3s ~30µs
> > list of 500 tasks, offset 0 (**): ~1.3s ~500µs
> > list of 500 tasks, offset 1 million (***): ~1.3s ~200ms
> > Size on disk: ~500MB ~200MB
> >
> > (*): Requested by the UI every 3s
> > (**): Requested by the UI when visiting Remotes > Tasks
> > (***): E.g. when scrolling towars the bottom of 'Remotes > Tasks'
>
> Did you regenerate these for the current revision? Just out of interest
> whether there is any measurable effect of Wolfgangs proposed changes.
>
> And FWIW, I'd like to encode these also in the commit message, albeit
> with the lore links that b4 adds it's less of a problem, but IMO would
> still not really hurt to do. Depending on your answer on the actuality
> of above data I could amend that or newly provided measurements into
> the commit message on applying though, so definitively no need for a v+1
> just for that.
>
> >
> > In the old implementation, the archive file was *always* fully deserialized
> > and loaded into RAM, this is the reason why the time needed is pretty
> > idential for all scenarios.
> > The new implementation reads the archive files only line by line,
> > and only 500 tasks were loaded into RAM at the same time. The higher the offset,
> > the more archive lines/files we have to scan, which increases the
> > time needed to access the data. The tasks are sorted descending
> > by starttime, as a result the requests get slower the further you
> > go back in history.
> >
> > The 'before' times do NOT include the time needed for actually fetching
> > the task data.
> >
> > This series was preseded by [1], however almost all of the code has changes, which
> > is the reason why I send this as a new series.
> >
> > [1] https://lore.proxmox.com/pdm-devel/20250128122520.167796-1-l.wagner@proxmox.com/
> >
> > Changes since v2:
> > - Change locking approach as suggested by Wolfgang
> > - Incorporated feedback from Wolfang
> > - see patch notes for details
> > - Added some .context/.with_context for better error messages
> >
>
> Looks alright to me now, if Wolfgang sees nothing of w.r.t. the changes
> since the last version I'd apply this series tomorrow.
Mostly LGTM, just minor issues. (A bunch of `pub`s to drop, one of which
is important in patch 3.)
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pdm-devel] [PATCH proxmox-datacenter-manager v3 3/6] remote tasks: improve locking for task archive iterator
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 3/6] remote tasks: improve locking for task archive iterator Lukas Wagner
@ 2025-04-18 7:12 ` Wolfgang Bumiller
0 siblings, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2025-04-18 7:12 UTC (permalink / raw)
To: Lukas Wagner; +Cc: pdm-devel
On Thu, Apr 17, 2025 at 03:22:53PM +0200, Lukas Wagner wrote:
> Instead of awkwardly using into_lock for keeping the archive locked,
> pass the lock as a reference to TaskCache::get_tasks_with_lock. The
> iterator itself only holds the lock if get use the auto-locking
> TaskCache::get_tasks, otherwise we ensure that the lock lives long
> enough via lifetimes and PhantomData in the iterator.
>
> Suggested-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
> Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
> ---
>
> Notes:
> New in v3
>
> server/src/remote_tasks/mod.rs | 2 +-
> server/src/remote_tasks/task_cache.rs | 62 +++++++++++++--------------
> 2 files changed, 31 insertions(+), 33 deletions(-)
>
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 126c9ad3..b0fc052f 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -24,7 +24,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
> GetTasks::All
> };
>
> - for task in &mut cache
> + for task in cache
> .get_tasks(which)?
> .skip(filters.start as usize)
> .take(filters.limit as usize)
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> index f847d441..c59c5235 100644
> --- a/server/src/remote_tasks/task_cache.rs
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -6,6 +6,7 @@ use std::{
> fs::File,
> io::{BufRead, BufReader, BufWriter, ErrorKind, Lines, Write},
> iter::Peekable,
> + marker::PhantomData,
> path::{Path, PathBuf},
> time::Duration,
> };
> @@ -213,8 +214,8 @@ impl TaskCache {
> .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
> .collect();
>
> - let mut task_iter = self
> - .get_tasks_with_lock(GetTasks::Active, lock)
> + let task_iter = self
> + .get_tasks_with_lock(GetTasks::Active, &lock)
> .context("failed to create archive iterator for active tasks")?;
>
> let mut active_tasks =
> @@ -226,10 +227,6 @@ impl TaskCache {
> }
> }));
>
> - // Consume the iterator to get back the lock. The lock is held
> - // until _lock is finally dropped at the end of the function.
> - let _lock = task_iter.into_lock();
> -
> let mut new_finished_tasks = Vec::new();
>
> for task in tasks {
> @@ -387,7 +384,7 @@ impl TaskCache {
>
> let mut tasks = Vec::new();
> let mut task_iter = self
> - .get_tasks_with_lock(GetTasks::Active, lock)
> + .get_tasks_with_lock(GetTasks::Active, &lock)
> .context("failed to create active task iterator")?;
>
> for task in &mut task_iter {
> @@ -404,8 +401,6 @@ impl TaskCache {
> tasks.push(task.clone());
> tasks.sort_by(compare_tasks_reverse);
>
> - let _lock = task_iter.into_lock();
> -
> let mut state = self.read_state();
>
> state
> @@ -432,24 +427,28 @@ impl TaskCache {
> ///
> /// This function will request a non-exclusive read-lock, don't call if
> /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
> - pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
> - let lock = self
> - .lock(false)
> - .context("get_tasks: failed to acquire lock")?;
> - self.get_tasks_with_lock(mode, lock)
> + pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'static>, Error> {
> + let lock = self.lock(false).context("failed to lock archive")?;
> + self.get_tasks_impl(mode, Some(lock))
> .context("failed to create task archive iterator")
> }
>
> /// Iterate over cached tasks.
> ///
> - /// This function requires you to pass a lock. If you want to continue to hold the lock
> - /// after iterating, you can consume the iterator by calling
> - /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
> - pub fn get_tasks_with_lock(
> + /// This function requires you to pass a lock.
> + pub fn get_tasks_with_lock<'a>(
> &self,
> mode: GetTasks,
> - lock: TaskCacheLock,
> - ) -> Result<TaskArchiveIterator, Error> {
> + _lock: &'a TaskCacheLock,
> + ) -> Result<TaskArchiveIterator<'a>, Error> {
> + self.get_tasks_impl(mode, None)
> + }
> +
> + pub fn get_tasks_impl<'a>(
^ This must not be `pub`, given the lifetime is chosen "at will" by the
caller and the relation to the lock's lifetime is not compile-time
enforced. Only the non-`_impl` functions enforce this.
> + &self,
> + mode: GetTasks,
> + lock: Option<TaskCacheLock>,
> + ) -> Result<TaskArchiveIterator<'a>, Error> {
> match mode {
> GetTasks::All => {
> let archive_files = self.archive_files()?.into_iter().map(|pair| pair.file);
> @@ -682,32 +681,31 @@ pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
> }
>
> /// Iterator over the task archive.
> -pub struct TaskArchiveIterator {
> +pub struct TaskArchiveIterator<'a> {
> /// Archive files to read.
> files: Box<dyn Iterator<Item = PathBuf>>,
> /// Archive iterator we are currently using, if any
> current: Option<ArchiveIterator<BufReader<File>>>,
> - /// Lock for this archive.
> - lock: TaskCacheLock,
> + /// Lock for this archive. This contains the lock in case we
> + /// need to keep the archive locked while iterating over it.
> + _lock: Option<TaskCacheLock>,
> + /// PhantomData to bind the lifetime of the iterator to an externally held lock.
> + _lifetime: PhantomData<&'a ()>,
> }
>
> -impl TaskArchiveIterator {
> +impl TaskArchiveIterator<'_> {
> /// Create a new task archive iterator.
> - pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
> + pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: Option<TaskCacheLock>) -> Self {
> Self {
> files,
> current: None,
> - lock,
> + _lock: lock,
> + _lifetime: PhantomData,
> }
> }
> -
> - /// Return the task archive lock, consuming `self`.
> - pub fn into_lock(self) -> TaskCacheLock {
> - self.lock
> - }
> }
>
> -impl Iterator for &mut TaskArchiveIterator {
> +impl Iterator for TaskArchiveIterator<'_> {
> type Item = Result<TaskCacheItem, Error>;
>
> fn next(&mut self) -> Option<Self::Item> {
> --
> 2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend
2025-04-17 15:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Thomas Lamprecht
2025-04-18 7:12 ` Wolfgang Bumiller
@ 2025-04-18 7:24 ` Lukas Wagner
2025-04-18 8:08 ` Lukas Wagner
1 sibling, 1 reply; 13+ messages in thread
From: Lukas Wagner @ 2025-04-18 7:24 UTC (permalink / raw)
To: Thomas Lamprecht, Proxmox Datacenter Manager development discussion
On 2025-04-17 17:31, Thomas Lamprecht wrote:
> Am 17.04.25 um 15:22 schrieb Lukas Wagner:
>> Benchmarking was done using the 'fake-remote' feature. There were 100
>> remotes, 10 PVE nodes per remote. The task cache contained
>> about 1.5 million tasks.
>> before after
>> list of active tasks (*): ~1.3s ~30µs
>> list of 500 tasks, offset 0 (**): ~1.3s ~500µs
>> list of 500 tasks, offset 1 million (***): ~1.3s ~200ms
>> Size on disk: ~500MB ~200MB
>>
>> (*): Requested by the UI every 3s
>> (**): Requested by the UI when visiting Remotes > Tasks
>> (***): E.g. when scrolling towars the bottom of 'Remotes > Tasks'
>
> Did you regenerate these for the current revision? Just out of interest
> whether there is any measurable effect of Wolfgangs proposed changes.
No, these are still from v1. I'll repeat the benchmark and check if there are any noteworthy
changes.
My prediction is that the impact is probably very small, but let's see if there
are any surprises.
>
> And FWIW, I'd like to encode these also in the commit message, albeit
> with the lore links that b4 adds it's less of a problem, but IMO would
> still not really hurt to do. Depending on your answer on the actuality
> of above data I could amend that or newly provided measurements into
> the commit message on applying though, so definitively no need for a v+1
> just for that.
>
Since there is one more thing to fix as reported by Wolfgang, I'd post a v4 with the fix
and amended commit message which includes updated benchmark results. Shouldn't take too
long I hope!
>>
>> In the old implementation, the archive file was *always* fully deserialized
>> and loaded into RAM, this is the reason why the time needed is pretty
>> idential for all scenarios.
>> The new implementation reads the archive files only line by line,
>> and only 500 tasks were loaded into RAM at the same time. The higher the offset,
>> the more archive lines/files we have to scan, which increases the
>> time needed to access the data. The tasks are sorted descending
>> by starttime, as a result the requests get slower the further you
>> go back in history.
>>
>> The 'before' times do NOT include the time needed for actually fetching
>> the task data.
>>
>> This series was preseded by [1], however almost all of the code has changes, which
>> is the reason why I send this as a new series.
>>
>> [1] https://lore.proxmox.com/pdm-devel/20250128122520.167796-1-l.wagner@proxmox.com/
>>
>> Changes since v2:
>> - Change locking approach as suggested by Wolfgang
>> - Incorporated feedback from Wolfang
>> - see patch notes for details
>> - Added some .context/.with_context for better error messages
>>
>
> Looks alright to me now, if Wolfgang sees nothing of w.r.t. the changes
> since the last version I'd apply this series tomorrow.
--
- Lukas
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend
2025-04-18 7:24 ` Lukas Wagner
@ 2025-04-18 8:08 ` Lukas Wagner
0 siblings, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2025-04-18 8:08 UTC (permalink / raw)
To: Thomas Lamprecht, Proxmox Datacenter Manager development discussion
On 2025-04-18 09:24, Lukas Wagner wrote:
>
>
> On 2025-04-17 17:31, Thomas Lamprecht wrote:
>> Am 17.04.25 um 15:22 schrieb Lukas Wagner:
>>> Benchmarking was done using the 'fake-remote' feature. There were 100
>>> remotes, 10 PVE nodes per remote. The task cache contained
>>> about 1.5 million tasks.
>>> before after
>>> list of active tasks (*): ~1.3s ~30µs
>>> list of 500 tasks, offset 0 (**): ~1.3s ~500µs
>>> list of 500 tasks, offset 1 million (***): ~1.3s ~200ms
>>> Size on disk: ~500MB ~200MB
>>>
>>> (*): Requested by the UI every 3s
>>> (**): Requested by the UI when visiting Remotes > Tasks
>>> (***): E.g. when scrolling towars the bottom of 'Remotes > Tasks'
>>
>> Did you regenerate these for the current revision? Just out of interest
>> whether there is any measurable effect of Wolfgangs proposed changes.
>
> No, these are still from v1. I'll repeat the benchmark and check if there are any noteworthy
> changes.
> My prediction is that the impact is probably very small, but let's see if there
> are any surprises.
>
Results are in:
(before) (v1) (v3)
list of active tasks (*): ~1.3s ~30µs ~300µs
list of 500 tasks, offset 0 (**): ~1.3s ~500µs ~1450µs
list of 500 tasks, offset 1 million (***): ~1.3s ~200ms ~175ms
Size on disk: ~500MB ~200MB ~200MB
I think the slowdown for the first two scenarios can be explained by the
changes in v2, where I wrapped the body of get_tasks in a tokio::task::spawn_blocking,
which I assume introduces some overhead.
With the third one we might actually see the slight improvements from Wolfgangs suggestions.
I'll include these results in the commit message in v4.
--
- Lukas
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pdm-devel] superseded: [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
` (6 preceding siblings ...)
2025-04-17 15:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Thomas Lamprecht
@ 2025-04-18 8:33 ` Lukas Wagner
7 siblings, 0 replies; 13+ messages in thread
From: Lukas Wagner @ 2025-04-18 8:33 UTC (permalink / raw)
To: pdm-devel
Superseded by v4:
https://lore.proxmox.com/pdm-devel/20250418083210.74982-1-l.wagner@proxmox.com/T/#t
--
- Lukas
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2025-04-18 8:33 UTC | newest]
Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-04-17 13:22 [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 1/6] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 2/6] remote tasks: add background task for task polling, use new task cache Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 3/6] remote tasks: improve locking for task archive iterator Lukas Wagner
2025-04-18 7:12 ` Wolfgang Bumiller
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 4/6] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 5/6] fake remote: make the fake_remote feature compile again Lukas Wagner
2025-04-17 13:22 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 6/6] fake remote: clippy fixes Lukas Wagner
2025-04-17 15:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 0/6] remote task cache fetching task / better cache backend Thomas Lamprecht
2025-04-18 7:12 ` Wolfgang Bumiller
2025-04-18 7:24 ` Lukas Wagner
2025-04-18 8:08 ` Lukas Wagner
2025-04-18 8:33 ` [pdm-devel] superseded: " Lukas Wagner
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal