From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 92BFB1FF15E for ; Tue, 28 Jan 2025 13:26:53 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 93DA3CED8; Tue, 28 Jan 2025 13:26:52 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Date: Tue, 28 Jan 2025 13:25:10 +0100 Message-Id: <20250128122520.167796-6-l.wagner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250128122520.167796-1-l.wagner@proxmox.com> References: <20250128122520.167796-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.015 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 05/15] task cache: add FIFO cache replacement policy X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" The task cache will now hold up to a certain number of tasks *per remote*. If the capacity is exceeded, the oldest tasks, based on the task's starttime, will be dropped. Signed-off-by: Lukas Wagner --- server/src/task_cache.rs | 193 +++++++++++++++++++++++++++++++-------- 1 file changed, 153 insertions(+), 40 deletions(-) diff --git a/server/src/task_cache.rs b/server/src/task_cache.rs index f24af3f..032f2a4 100644 --- a/server/src/task_cache.rs +++ b/server/src/task_cache.rs @@ -1,6 +1,8 @@ use std::{ + cmp::Ordering, collections::{HashMap, HashSet}, fs::File, + iter::Peekable, path::{Path, PathBuf}, sync::{LazyLock, RwLock}, time::Duration, @@ -18,6 +20,9 @@ use tokio::task::JoinHandle; use crate::{api::pve, task_utils}; +// TODO: Does this number make sense? +const CACHED_TASKS_PER_REMOTE: usize = 2000; + /// Get tasks for all remotes // FIXME: filter for privileges pub async fn get_tasks(filters: TaskFilters) -> Result, Error> { @@ -31,7 +36,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result, Error> let file_options = CreateOptions::new().owner(api_uid).group(api_gid); let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json"); - let mut cache = TaskCache::new(cache_path, file_options)?; + let mut cache = TaskCache::new(cache_path, file_options, CACHED_TASKS_PER_REMOTE)?; // Force a refresh for all tasks of a remote if a task is finished. // Not super nice, but saves us from persisting finished tasks. Also, @@ -53,7 +58,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result, Error> continue; } }; - cache.set_tasks(remote_name.as_str(), tasks.clone()); + cache.add_tasks(remote_name.as_str(), tasks.clone()); all_tasks.extend(tasks); } @@ -240,10 +245,6 @@ struct TaskCache { /// Cache entries content: TaskCacheContent, - /// Entries that were added or updated - these will be persistet - /// when `save` is called. - new_or_updated: TaskCacheContent, - /// Cache entries were changed/removed. dirty: bool, @@ -252,18 +253,25 @@ struct TaskCache { /// File mode/owner/group for the cache file. cachefile_options: CreateOptions, + + /// Max. tasks per remote + max_tasks_per_remote: usize, } impl TaskCache { /// Create a new tasks cache instance by loading /// the cache from disk. - fn new(cachefile_path: PathBuf, cachefile_options: CreateOptions) -> Result { + fn new( + cachefile_path: PathBuf, + cachefile_options: CreateOptions, + max_tasks_per_remote: usize, + ) -> Result { Ok(Self { content: Self::load_content(&cachefile_path)?, - new_or_updated: Default::default(), dirty: false, cachefile_path, cachefile_options, + max_tasks_per_remote, }) } @@ -301,15 +309,14 @@ impl TaskCache { // Read content again, in case somebody has changed it in the meanwhile let mut content = Self::load_content(&self.cachefile_path)?; - for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() { - if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) { - *existing_entry = entry; - } else { - content.remote_tasks.insert(remote_name, entry); + for (remote, entry) in self.content.remote_tasks.iter_mut() { + if let Some(other) = content.remote_tasks.remove(remote) { + entry.tasks = + Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote); } } - let bytes = serde_json::to_vec_pretty(&content)?; + let bytes = serde_json::to_vec_pretty(&self.content)?; proxmox_sys::fs::replace_file( &self.cachefile_path, @@ -323,20 +330,22 @@ impl TaskCache { Ok(()) } - // Update task data for a given remote. - fn set_tasks(&mut self, remote: &str, tasks: Vec) { + /// Add tasks to the cache. + /// + /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the + /// oldest ones are truncated. + fn add_tasks(&mut self, remote: &str, tasks: Vec) { + let entry = self.content.remote_tasks.entry(remote.into()).or_default(); + + entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote); + self.dirty = true; - self.new_or_updated - .remote_tasks - .insert(remote.to_string(), TaskCacheEntry { tasks }); } // Get task data for a given remote. fn get_tasks(&self, remote: &str) -> Option> { if let Some(entry) = self.content.remote_tasks.get(remote) { Some(entry.tasks.clone()) - } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) { - Some(entry.tasks.clone()) } else { None } @@ -361,9 +370,72 @@ impl TaskCache { self.cachefile_options.clone(), ) } + + fn merge_tasks( + mut a: Vec, + mut b: Vec, + limit: usize, + ) -> Vec { + a.sort_by_key(|task| -task.starttime); + b.sort_by_key(|task| -task.starttime); + + MergeTaskIterator { + left: a.into_iter().peekable(), + right: b.into_iter().peekable(), + } + .take(limit) + .collect() + } +} + +struct MergeTaskIterator> { + left: Peekable, + right: Peekable, +} + +impl Iterator for MergeTaskIterator +where + T: Iterator, +{ + type Item = T::Item; + + fn next(&mut self) -> Option { + let order = match (self.left.peek(), self.right.peek()) { + (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)), + (Some(_), None) => Some(Ordering::Greater), + (None, Some(_)) => Some(Ordering::Less), + (None, None) => None, + }; + + match order { + Some(Ordering::Greater) => self.left.next(), + Some(Ordering::Less) => self.right.next(), + Some(Ordering::Equal) => { + // Both unwraps are safe, the former peek/match + // guarantess that there is an element. + let l = self.left.peek().unwrap(); + let r = self.right.peek().unwrap(); + + // Dedup if both lists contain the same task + if l.upid == r.upid { + // Take the one where the task is already finished + if l.endtime.is_some() { + let _ = self.right.next(); + self.left.next() + } else { + let _ = self.left.next(); + self.right.next() + } + } else { + self.left.next() + } + } + None => None, + } + } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Default, Debug, Serialize, Deserialize)] /// Per-remote entry in the task cache. struct TaskCacheEntry { tasks: Vec, @@ -524,6 +596,29 @@ mod tests { use super::*; use crate::test_support::temp::NamedTempFile; + fn make_upid( + starttime: i64, + endtime: Option, + status: Option, + ) -> Result { + let upid: PveUpid = + format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root@pam:",) + .parse()?; + + Ok(TaskListItem { + upid: upid.to_string(), + node: upid.node, + pid: upid.pid as i64, + pstart: upid.pstart, + starttime: upid.starttime, + worker_type: upid.worker_type, + worker_id: upid.worker_id, + user: upid.auth_id, + endtime, + status, + }) + } + #[test] fn basic_task_cache() -> Result<(), Error> { let options = CreateOptions::new() @@ -533,37 +628,55 @@ mod tests { let temp_file = NamedTempFile::new(options.clone())?; - let mut cache = TaskCache::new(temp_file.path().into(), options.clone())?; + let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?; let mut tasks = Vec::new(); let now = proxmox_time::epoch_i64(); for i in (0..20).rev() { - let upid: PveUpid = - "UPID:pve-node:0000C530:001C9BEC:677E934A:stopall::root@pam:".parse()?; - - tasks.push(TaskListItem { - upid: upid.to_string(), - node: upid.node, - pid: upid.pid as i64, - pstart: upid.pstart, - starttime: now - 10 * i, - worker_type: upid.worker_type, - worker_id: upid.worker_id, - user: upid.auth_id, - endtime: None, - status: None, - }); + tasks.push(make_upid(now - 10 * i, None, None)?); } - cache.set_tasks("some-remote", tasks.clone()); + cache.add_tasks("some-remote", tasks.clone()); cache.save()?; - let cache = TaskCache::new(temp_file.path().into(), options)?; + let cache = TaskCache::new(temp_file.path().into(), options, 50)?; let res = cache.get_tasks("some-remote").unwrap(); + tasks.reverse(); assert_eq!(tasks, res); Ok(()) } + + #[test] + fn merge_tasks() -> Result<(), Error> { + // Arrange + let mut a = Vec::new(); + for i in [30, 10, 20] { + a.push(make_upid(i, None, None)?); + } + + let mut b = Vec::new(); + for i in [25, 15, 35, 5] { + b.push(make_upid(i, None, None)?); + } + + a.push(make_upid(40, None, None)?); + b.push(make_upid(40, Some(50), Some("some status".into()))?); + + // Act + let tasks = TaskCache::merge_tasks(a, b, 5); + + // Assert + assert_eq!(tasks.len(), 5); + assert_eq!(tasks[0].starttime, 40); + assert_eq!(tasks[0].endtime, Some(50)); + assert_eq!(tasks[1].starttime, 35); + assert_eq!(tasks[2].starttime, 30); + assert_eq!(tasks[3].starttime, 25); + assert_eq!(tasks[4].starttime, 20); + + Ok(()) + } } -- 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel