From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 46BE71FF15E for ; Tue, 28 Jan 2025 13:26:54 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C77D3CE6E; Tue, 28 Jan 2025 13:26:52 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Date: Tue, 28 Jan 2025 13:25:12 +0100 Message-Id: <20250128122520.167796-8-l.wagner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250128122520.167796-1-l.wagner@proxmox.com> References: <20250128122520.167796-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.015 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 07/15] task cache: move to its own submodule X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" No functional changes, only adapting method visibility where needed. Signed-off-by: Lukas Wagner --- server/src/remote_tasks/mod.rs | 309 +------------------------ server/src/remote_tasks/task_cache.rs | 312 ++++++++++++++++++++++++++ 2 files changed, 317 insertions(+), 304 deletions(-) create mode 100644 server/src/remote_tasks/task_cache.rs diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs index 032f2a4..4a0552c 100644 --- a/server/src/remote_tasks/mod.rs +++ b/server/src/remote_tasks/mod.rs @@ -1,11 +1,7 @@ use std::{ - cmp::Ordering, - collections::{HashMap, HashSet}, - fs::File, - iter::Peekable, - path::{Path, PathBuf}, + collections::HashSet, + path::Path, sync::{LazyLock, RwLock}, - time::Duration, }; use anyhow::Error; @@ -15,11 +11,13 @@ use pdm_api_types::{ }; use proxmox_sys::fs::CreateOptions; use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid}; -use serde::{Deserialize, Serialize}; use tokio::task::JoinHandle; use crate::{api::pve, task_utils}; +mod task_cache; +use task_cache::TaskCache; + // TODO: Does this number make sense? const CACHED_TASKS_PER_REMOTE: usize = 2000; @@ -240,213 +238,6 @@ fn add_running_tasks(cached_tasks: Vec) -> Result Result { - Ok(Self { - content: Self::load_content(&cachefile_path)?, - dirty: false, - cachefile_path, - cachefile_options, - max_tasks_per_remote, - }) - } - - /// Load the task cache contents from disk. - fn load_content(path: &Path) -> Result { - let content = proxmox_sys::fs::file_read_optional_string(path)?; - - let content = if let Some(content) = content { - serde_json::from_str(&content).unwrap_or_default() - } else { - Default::default() - }; - - Ok(content) - } - - /// Get path for the cache's lockfile. - fn lockfile_path(&self) -> PathBuf { - let mut path = self.cachefile_path.clone(); - path.set_extension("lock"); - path - } - - /// Persist the task cache - /// - /// This method requests an exclusive lock for the task cache lockfile. - fn save(&mut self) -> Result<(), Error> { - // if we have not updated anything, we don't have to update the cache file - if !self.dirty { - return Ok(()); - } - - let _guard = self.lock(Duration::from_secs(5))?; - - // Read content again, in case somebody has changed it in the meanwhile - let mut content = Self::load_content(&self.cachefile_path)?; - - for (remote, entry) in self.content.remote_tasks.iter_mut() { - if let Some(other) = content.remote_tasks.remove(remote) { - entry.tasks = - Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote); - } - } - - let bytes = serde_json::to_vec_pretty(&self.content)?; - - proxmox_sys::fs::replace_file( - &self.cachefile_path, - &bytes, - self.cachefile_options.clone(), - true, - )?; - - self.dirty = false; - - Ok(()) - } - - /// Add tasks to the cache. - /// - /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the - /// oldest ones are truncated. - fn add_tasks(&mut self, remote: &str, tasks: Vec) { - let entry = self.content.remote_tasks.entry(remote.into()).or_default(); - - entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote); - - self.dirty = true; - } - - // Get task data for a given remote. - fn get_tasks(&self, remote: &str) -> Option> { - if let Some(entry) = self.content.remote_tasks.get(remote) { - Some(entry.tasks.clone()) - } else { - None - } - } - - // Invalidate cache for a given remote. - fn invalidate_cache_for_remote(&mut self, remote: &str) { - self.dirty = true; - self.content.remote_tasks.remove(remote); - } - - // Lock the cache for modification. - // - // While the cache is locked, other users can still read the cache - // without a lock, since the cache file is replaced atomically - // when updating. - fn lock(&self, duration: Duration) -> Result { - proxmox_sys::fs::open_file_locked( - self.lockfile_path(), - duration, - true, - self.cachefile_options.clone(), - ) - } - - fn merge_tasks( - mut a: Vec, - mut b: Vec, - limit: usize, - ) -> Vec { - a.sort_by_key(|task| -task.starttime); - b.sort_by_key(|task| -task.starttime); - - MergeTaskIterator { - left: a.into_iter().peekable(), - right: b.into_iter().peekable(), - } - .take(limit) - .collect() - } -} - -struct MergeTaskIterator> { - left: Peekable, - right: Peekable, -} - -impl Iterator for MergeTaskIterator -where - T: Iterator, -{ - type Item = T::Item; - - fn next(&mut self) -> Option { - let order = match (self.left.peek(), self.right.peek()) { - (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)), - (Some(_), None) => Some(Ordering::Greater), - (None, Some(_)) => Some(Ordering::Less), - (None, None) => None, - }; - - match order { - Some(Ordering::Greater) => self.left.next(), - Some(Ordering::Less) => self.right.next(), - Some(Ordering::Equal) => { - // Both unwraps are safe, the former peek/match - // guarantess that there is an element. - let l = self.left.peek().unwrap(); - let r = self.right.peek().unwrap(); - - // Dedup if both lists contain the same task - if l.upid == r.upid { - // Take the one where the task is already finished - if l.endtime.is_some() { - let _ = self.right.next(); - self.left.next() - } else { - let _ = self.left.next(); - self.right.next() - } - } else { - self.left.next() - } - } - None => None, - } - } -} - -#[derive(Default, Debug, Serialize, Deserialize)] -/// Per-remote entry in the task cache. -struct TaskCacheEntry { - tasks: Vec, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -/// Content of the task cache file. -struct TaskCacheContent { - remote_tasks: HashMap, -} - /// Interval at which tracked tasks are polled const RUNNING_CHECK_INTERVAL_S: u64 = 10; @@ -590,93 +381,3 @@ pub fn tasktype(status: &str) -> TaskStateType { TaskStateType::Error } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_support::temp::NamedTempFile; - - fn make_upid( - starttime: i64, - endtime: Option, - status: Option, - ) -> Result { - let upid: PveUpid = - format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root@pam:",) - .parse()?; - - Ok(TaskListItem { - upid: upid.to_string(), - node: upid.node, - pid: upid.pid as i64, - pstart: upid.pstart, - starttime: upid.starttime, - worker_type: upid.worker_type, - worker_id: upid.worker_id, - user: upid.auth_id, - endtime, - status, - }) - } - - #[test] - fn basic_task_cache() -> Result<(), Error> { - let options = CreateOptions::new() - .owner(nix::unistd::Uid::effective()) - .group(nix::unistd::Gid::effective()) - .perm(nix::sys::stat::Mode::from_bits_truncate(0o600)); - - let temp_file = NamedTempFile::new(options.clone())?; - - let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?; - - let mut tasks = Vec::new(); - - let now = proxmox_time::epoch_i64(); - for i in (0..20).rev() { - tasks.push(make_upid(now - 10 * i, None, None)?); - } - - cache.add_tasks("some-remote", tasks.clone()); - cache.save()?; - - let cache = TaskCache::new(temp_file.path().into(), options, 50)?; - - let res = cache.get_tasks("some-remote").unwrap(); - tasks.reverse(); - assert_eq!(tasks, res); - - Ok(()) - } - - #[test] - fn merge_tasks() -> Result<(), Error> { - // Arrange - let mut a = Vec::new(); - for i in [30, 10, 20] { - a.push(make_upid(i, None, None)?); - } - - let mut b = Vec::new(); - for i in [25, 15, 35, 5] { - b.push(make_upid(i, None, None)?); - } - - a.push(make_upid(40, None, None)?); - b.push(make_upid(40, Some(50), Some("some status".into()))?); - - // Act - let tasks = TaskCache::merge_tasks(a, b, 5); - - // Assert - assert_eq!(tasks.len(), 5); - assert_eq!(tasks[0].starttime, 40); - assert_eq!(tasks[0].endtime, Some(50)); - assert_eq!(tasks[1].starttime, 35); - assert_eq!(tasks[2].starttime, 30); - assert_eq!(tasks[3].starttime, 25); - assert_eq!(tasks[4].starttime, 20); - - Ok(()) - } -} diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs new file mode 100644 index 0000000..8a98876 --- /dev/null +++ b/server/src/remote_tasks/task_cache.rs @@ -0,0 +1,312 @@ +use std::{ + cmp::Ordering, + collections::HashMap, + fs::File, + iter::Peekable, + path::{Path, PathBuf}, + time::Duration, +}; + +use anyhow::Error; +use serde::{Deserialize, Serialize}; + +use pdm_api_types::TaskListItem; +use proxmox_sys::fs::CreateOptions; + +/// A cache for fetched remote tasks. +pub(super) struct TaskCache { + /// Cache entries + content: TaskCacheContent, + + /// Cache entries were changed/removed. + dirty: bool, + + /// File-location at which the cached tasks are stored. + cachefile_path: PathBuf, + + /// File mode/owner/group for the cache file. + cachefile_options: CreateOptions, + + /// Max. tasks per remote + max_tasks_per_remote: usize, +} + +impl TaskCache { + /// Create a new tasks cache instance by loading + /// the cache from disk. + pub(super) fn new( + cachefile_path: PathBuf, + cachefile_options: CreateOptions, + max_tasks_per_remote: usize, + ) -> Result { + Ok(Self { + content: Self::load_content(&cachefile_path)?, + dirty: false, + cachefile_path, + cachefile_options, + max_tasks_per_remote, + }) + } + + /// Load the task cache contents from disk. + fn load_content(path: &Path) -> Result { + let content = proxmox_sys::fs::file_read_optional_string(path)?; + + let content = if let Some(content) = content { + serde_json::from_str(&content).unwrap_or_default() + } else { + Default::default() + }; + + Ok(content) + } + + /// Get path for the cache's lockfile. + fn lockfile_path(&self) -> PathBuf { + let mut path = self.cachefile_path.clone(); + path.set_extension("lock"); + path + } + + /// Persist the task cache + /// + /// This method requests an exclusive lock for the task cache lockfile. + pub(super) fn save(&mut self) -> Result<(), Error> { + // if we have not updated anything, we don't have to update the cache file + if !self.dirty { + return Ok(()); + } + + let _guard = self.lock(Duration::from_secs(5))?; + + // Read content again, in case somebody has changed it in the meanwhile + let mut content = Self::load_content(&self.cachefile_path)?; + + for (remote, entry) in self.content.remote_tasks.iter_mut() { + if let Some(other) = content.remote_tasks.remove(remote) { + entry.tasks = + Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote); + } + } + + let bytes = serde_json::to_vec_pretty(&self.content)?; + + proxmox_sys::fs::replace_file( + &self.cachefile_path, + &bytes, + self.cachefile_options.clone(), + true, + )?; + + self.dirty = false; + + Ok(()) + } + + /// Add tasks to the cache. + /// + /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the + /// oldest ones are truncated. + pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec) { + let entry = self.content.remote_tasks.entry(remote.into()).or_default(); + + entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote); + + self.dirty = true; + } + + // Get task data for a given remote. + pub(super) fn get_tasks(&self, remote: &str) -> Option> { + self.content + .remote_tasks + .get(remote) + .map(|entry| entry.tasks.clone()) + } + + // Invalidate cache for a given remote. + pub(super) fn invalidate_cache_for_remote(&mut self, remote: &str) { + self.dirty = true; + self.content.remote_tasks.remove(remote); + } + + // Lock the cache for modification. + // + // While the cache is locked, other users can still read the cache + // without a lock, since the cache file is replaced atomically + // when updating. + fn lock(&self, duration: Duration) -> Result { + proxmox_sys::fs::open_file_locked( + self.lockfile_path(), + duration, + true, + self.cachefile_options.clone(), + ) + } + + fn merge_tasks( + mut a: Vec, + mut b: Vec, + limit: usize, + ) -> Vec { + a.sort_by_key(|task| -task.starttime); + b.sort_by_key(|task| -task.starttime); + + MergeTaskIterator { + left: a.into_iter().peekable(), + right: b.into_iter().peekable(), + } + .take(limit) + .collect() + } +} + +struct MergeTaskIterator> { + left: Peekable, + right: Peekable, +} + +impl Iterator for MergeTaskIterator +where + T: Iterator, +{ + type Item = T::Item; + + fn next(&mut self) -> Option { + let order = match (self.left.peek(), self.right.peek()) { + (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)), + (Some(_), None) => Some(Ordering::Greater), + (None, Some(_)) => Some(Ordering::Less), + (None, None) => None, + }; + + match order { + Some(Ordering::Greater) => self.left.next(), + Some(Ordering::Less) => self.right.next(), + Some(Ordering::Equal) => { + // Both unwraps are safe, the former peek/match + // guarantess that there is an element. + let l = self.left.peek().unwrap(); + let r = self.right.peek().unwrap(); + + // Dedup if both lists contain the same task + if l.upid == r.upid { + // Take the one where the task is already finished + if l.endtime.is_some() { + let _ = self.right.next(); + self.left.next() + } else { + let _ = self.left.next(); + self.right.next() + } + } else { + self.left.next() + } + } + None => None, + } + } +} + +#[derive(Default, Debug, Serialize, Deserialize)] +/// Per-remote entry in the task cache. +struct TaskCacheEntry { + tasks: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +/// Content of the task cache file. +struct TaskCacheContent { + remote_tasks: HashMap, +} + +#[cfg(test)] +mod tests { + use pve_api_types::PveUpid; + + use super::*; + use crate::test_support::temp::NamedTempFile; + + fn make_upid( + starttime: i64, + endtime: Option, + status: Option, + ) -> Result { + let upid: PveUpid = + format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root@pam:",) + .parse()?; + + Ok(TaskListItem { + upid: upid.to_string(), + node: upid.node, + pid: upid.pid as i64, + pstart: upid.pstart, + starttime: upid.starttime, + worker_type: upid.worker_type, + worker_id: upid.worker_id, + user: upid.auth_id, + endtime, + status, + }) + } + + #[test] + fn basic_task_cache() -> Result<(), Error> { + let options = CreateOptions::new() + .owner(nix::unistd::Uid::effective()) + .group(nix::unistd::Gid::effective()) + .perm(nix::sys::stat::Mode::from_bits_truncate(0o600)); + + let temp_file = NamedTempFile::new(options.clone())?; + + let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?; + + let mut tasks = Vec::new(); + + let now = proxmox_time::epoch_i64(); + for i in (0..20).rev() { + tasks.push(make_upid(now - 10 * i, None, None)?); + } + + cache.add_tasks("some-remote", tasks.clone()); + cache.save()?; + + let cache = TaskCache::new(temp_file.path().into(), options, 50)?; + + let res = cache.get_tasks("some-remote").unwrap(); + tasks.reverse(); + assert_eq!(tasks, res); + + Ok(()) + } + + #[test] + fn merge_tasks() -> Result<(), Error> { + // Arrange + let mut a = Vec::new(); + for i in [30, 10, 20] { + a.push(make_upid(i, None, None)?); + } + + let mut b = Vec::new(); + for i in [25, 15, 35, 5] { + b.push(make_upid(i, None, None)?); + } + + a.push(make_upid(40, None, None)?); + b.push(make_upid(40, Some(50), Some("some status".into()))?); + + // Act + let tasks = TaskCache::merge_tasks(a, b, 5); + + // Assert + assert_eq!(tasks.len(), 5); + assert_eq!(tasks[0].starttime, 40); + assert_eq!(tasks[0].endtime, Some(50)); + assert_eq!(tasks[1].starttime, 35); + assert_eq!(tasks[2].starttime, 30); + assert_eq!(tasks[3].starttime, 25); + assert_eq!(tasks[4].starttime, 20); + + Ok(()) + } +} -- 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel