From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 40DAA1FF172 for <inbox@lore.proxmox.com>; Wed, 16 Apr 2025 15:16:37 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0F9873793D; Wed, 16 Apr 2025 15:16:35 +0200 (CEST) Date: Wed, 16 Apr 2025 15:15:55 +0200 From: Wolfgang Bumiller <w.bumiller@proxmox.com> To: Lukas Wagner <l.wagner@proxmox.com> Message-ID: <xzwsvjovdoitx4j62mxxvy5aeucrcmrqfaixng7wugb3vcectl@2nop5ijlnwgb> References: <20250411110117.199543-1-l.wagner@proxmox.com> <20250411110117.199543-2-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Disposition: inline In-Reply-To: <20250411110117.199543-2-l.wagner@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.080 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mod.rs] Subject: Re: [pdm-devel] [PATCH proxmox-datacenter-manager v2 1/4] remote tasks: implement improved cache for remote tasks X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Cc: pdm-devel@lists.proxmox.com Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> looks mostly fine, some minor things noted inline... On Fri, Apr 11, 2025 at 01:01:14PM +0200, Lukas Wagner wrote: > This commit adds a new implementation for a cache for remote tasks, one > that should improve performance characteristics in for pretty much all > use cases. > > In general storage works pretty similar to the task archive we already > have for (local) PDM tasks. > > root@pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l > total 40 > -rw-r--r-- 1 www-data www-data 0 Mar 13 13:18 active > -rw-r--r-- 1 www-data www-data 1676 Mar 11 14:51 archive.1741355462 > -rw-r--r-- 1 www-data www-data 0 Mar 11 14:51 archive.1741441862 > -rw-r--r-- 1 www-data www-data 2538 Mar 11 14:51 archive.1741528262 > -rw-r--r-- 1 www-data www-data 8428 Mar 11 15:07 archive.1741614662 > -rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062 > -rw-r--r-- 1 www-data www-data 3364 Mar 13 13:18 archive.1741788270 > -rw-r--r-- 1 www-data www-data 287 Mar 13 13:18 state > > Tasks are stored in the 'active' and multiple 'archive' files. > Running tasks are placed into the 'active' file, tasks that are finished > are persisted into one of the archive files. > The archive files are suffixed with a UNIX timestamp which serves > as a lower-bound for start times for tasks stored in this file. > Encoding this lower-bound in the file name instead of using a more > traditional increasing file index (.1, .2, .3) gives us the benefit > that we can decide in which file a newly arrived tasks belongs from > a single readdir call, without even reading the file itself. > > The size of the entire archive can be controlled by doing file > 'rotation', where the archive file with the oldest timestamp is deleted > and a new file with the current timestamp is added. > If 'rotation' happens at fixed intervals (e.g. once daily), this gives > us a configurable number of days of task history. > There is no direct cap for the total number > of tasks, but this could be easily added by checking the size of the > most recent archive file and by rotating early if a threshold is > exceeded. > > The format inside the files is also similar to the local task archive, > with one task corresponding to one line in the file. > One key difference is that here each line is a JSON object, that is to > make it easier to add additional data later, if needed. > The JSON object contains the tasks UPID, status, endtime and starttime > (the starttime is technically also encoded in the UPID, but having it as > a separate value simplified a couple of things) > Each file is sorted by the task's start time, the youngest task coming first. > > One key difference between this task cache and the local task archive is > that we need to handle tasks coming in out-of-order, e.g. if remotes > were not reachable for a certain time. To maintain the ordering > in the file, we have to merge the newly arrived tasks into the existing > task file. This was implemented in a way that avoids reading the entire > file into memory at once, exploiting the fact that the contents of the > existing file are already sorted. This allows to do a > zipper/merge-sort like approach (see MergeTaskIterator for > details). The existing file is only read line-by-line and finally > replaced atomically. > > The cache also has a separate state file, containing additional > information, e.g. cut-off timestamps for the polling task. > Some of the data in the statefile is technically also contained in the > archive files, but reading the state file is much faster. > > This commit also adds an elaborate suite of unit tests for this new > cache. While adding some additional work up front, they paid off themselves > during development quite quickly, since the overall approach for the > cache changed a couple of times. The test suite gave me confidence that > the design changes didn't screw anything up, catching a couple of bugs > in the process. > > Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> > Reviewed-by: Max Carrara <m.carrara@proxmox.com> > --- > server/src/remote_tasks/mod.rs | 2 + > server/src/remote_tasks/task_cache.rs | 964 ++++++++++++++++++++++++++ > 2 files changed, 966 insertions(+) > create mode 100644 server/src/remote_tasks/task_cache.rs > > diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs > index 7ded5408..2062f2b7 100644 > --- a/server/src/remote_tasks/mod.rs > +++ b/server/src/remote_tasks/mod.rs > @@ -18,6 +18,8 @@ use tokio::task::JoinHandle; > > use crate::{api::pve, task_utils}; > > +mod task_cache; > + > /// Get tasks for all remotes > // FIXME: filter for privileges > pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> { > diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs > new file mode 100644 > index 00000000..628a9a40 > --- /dev/null > +++ b/server/src/remote_tasks/task_cache.rs > @@ -0,0 +1,964 @@ > +//! Task cache implementation, based on rotating files. > + > +use std::{ > + cmp::Ordering, > + collections::{HashMap, HashSet}, > + fs::File, > + io::{BufRead, BufReader, BufWriter, Lines, Write}, > + iter::Peekable, > + path::{Path, PathBuf}, > + time::Duration, > +}; > + > +use anyhow::Error; > +use proxmox_sys::fs::CreateOptions; > +use serde::{Deserialize, Serialize}; > + > +use pdm_api_types::RemoteUpid; > + > +/// Item which can be put into the task cache. > +#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)] > +pub struct TaskCacheItem { > + /// The task's UPID > + pub upid: RemoteUpid, > + /// The time at which the task was started (seconds since the UNIX epoch). > + /// Technically this is also contained within the UPID, duplicating it here > + /// allows us to directly access it when sorting in new tasks, without having > + /// to parse the UPID. > + pub starttime: i64, > + #[serde(skip_serializing_if = "Option::is_none")] > + /// The task's status. > + pub status: Option<String>, > + #[serde(skip_serializing_if = "Option::is_none")] > + /// The task's endtime (seconds since the UNIX epoch). > + pub endtime: Option<i64>, > +} > + > +/// State needed for task polling. > +#[derive(Serialize, Deserialize, Default)] > +#[serde(rename_all = "kebab-case")] > +pub struct State { > + /// Map of remote -> most recent task starttime (UNIX epoch) in the archive. > + /// This can be used as a cut-off when requesting new task data. > + pub most_recent_archive_starttime: HashMap<String, i64>, > + /// Oldest running task. Useful as another cut-off for fetching tasks. > + /// The timestamp is based on seconds since the UNIX epoch. > + pub oldest_active_task: HashMap<String, i64>, > + /// Tracked tasks per remote. > + pub tracked_tasks: HashMap<String, HashSet<RemoteUpid>>, > +} > + > +/// Cache for remote tasks. > +#[derive(Clone)] > +pub struct TaskCache { > + /// Path where the cache's files should be placed. > + base_path: PathBuf, > + /// File permissions for the cache's files. > + create_options: CreateOptions, > +} > + > +/// Tasks that should be added to the cache via [`TaskCache::add_tasks`]. > +pub struct AddTasks { > + /// Update most recent archived task in state file. > + pub update_most_recent_archive_timestamp: bool, > + /// The tasks to add. > + pub tasks: Vec<TaskCacheItem>, > +} > + > +/// Lock for the cache. > +#[allow(dead_code)] > +pub struct TaskCacheLock(File); > + > +/// Which tasks to fetch from the archive. > +pub enum GetTasks { > + /// Get all tasks, finished and running. > + All, > + /// Only get running (active) tasks. > + Active, > + /// Only get finished (archived) tasks. > + #[allow(dead_code)] > + Archived, > +} > + > +impl TaskCache { > + /// Create a new task cache instance. > + /// > + /// Remember to call `init` or `new_file` to create initial archive files before > + /// adding any tasks. > + pub fn new<P: AsRef<Path>>(path: P, create_options: CreateOptions) -> Result<Self, Error> { > + Ok(Self { > + base_path: path.as_ref().into(), > + create_options, > + }) > + } > + > + /// Create initial task archives that can be backfilled with the > + /// recent task history from a remote. > + /// > + /// This function only has an effect if there are no archive files yet. > + pub fn init(&self, now: i64, number_of_files: u64, period_per_file: u64) -> Result<(), Error> { > + let _lock = self.lock(true)?; > + > + if self.archive_files()?.is_empty() { > + for i in 0..number_of_files { > + self.new_file(now - (i * period_per_file) as i64)?; > + } > + } > + > + Ok(()) > + } > + > + /// Start a new archive file with a given timestamp. > + /// `now` is supposed to be a UNIX timestamp (seconds). > + fn new_file(&self, now: i64) -> Result<(), Error> { > + let new_path = self.base_path.join(format!("archive.{now}")); > + let mut file = File::create(&new_path)?; > + self.create_options.apply_to(&mut file, &new_path)?; > + > + Ok(()) > + } > + > + /// Lock the cache. > + fn lock(&self, exclusive: bool) -> Result<TaskCacheLock, Error> { > + let lockfile = self.base_path.join(".lock"); > + > + let fd = proxmox_sys::fs::open_file_locked( > + lockfile, > + Duration::from_secs(15), > + exclusive, > + self.create_options.clone(), > + )?; > + > + Ok(TaskCacheLock(fd)) > + } > + > + /// Rotate task archive if the the newest archive file is older than `rotate_after`. > + /// > + /// The oldest archive files are removed if the total number of archive files exceeds > + /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds). > + /// > + /// This function requests an exclusive lock, don't call if you already hold a lock. > + pub fn rotate(&self, now: i64, rotate_after: u64, max_files: u64) -> Result<bool, Error> { > + let _lock = self.lock(true)?; > + let mut did_rotate = false; > + > + let mut bounds = self.archive_files()?; > + > + match bounds.first() { > + Some(bound) => { > + if now > bound.starttime && now - bound.starttime > rotate_after as i64 { > + self.new_file(now)?; > + did_rotate = true; > + } > + } > + None => { > + self.new_file(now)?; > + did_rotate = true; > + } > + } > + > + while bounds.len() >= max_files as usize { > + // Unwrap is safe because of the length check above > + let to_remove = bounds.pop().unwrap(); > + std::fs::remove_file(&to_remove.file)?; > + } > + > + Ok(did_rotate) > + } > + > + /// Add new tasks to the task archive. > + /// > + /// Running tasks (tasks without an endtime) are placed into the 'active' file in the > + /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive > + /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given > + /// archive file. If a task which was added as running previously is added again, this time in > + /// a finished state, it will be removed from the `active` file and also sorted into > + /// one of the archive files. > + /// Same goes for the list of tracked tasks; the entry in the state file will be removed. > + /// > + /// Crash consistency: > + /// > + /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the > + /// end after all tasks have been added into the archive. Adding tasks is an idempotent > + /// operation; adding the *same* task multiple times does not lead to duplicated entries in the > + /// task archive. Individual archive files are updated atomically, but since > + /// adding tasks can involve updating multiple archive files, the archive could end up > + /// in a partially-updated, inconsistent state in case of a crash. > + /// However, since the state file with the cut-off timestamps is updated last, > + /// the consistency of the archive should be restored at the next update cycle of the archive. > + pub fn add_tasks(&self, mut added: HashMap<String, AddTasks>) -> Result<(), Error> { > + let lock = self.lock(true)?; > + > + // Get a flat `Vec` of all new tasks > + let tasks: Vec<_> = added > + .iter_mut() > + .flat_map(|(_, tasks)| std::mem::take(&mut tasks.tasks)) > + .collect(); > + > + let update_state_for_remote: HashMap<String, bool> = added > + .into_iter() > + .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp)) > + .collect(); > + > + let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?; > + > + let mut active_tasks = > + HashSet::from_iter(task_iter.flat_map(|active_task| match active_task { > + Ok(task) => Some((task.upid, task.starttime)), > + Err(err) => { > + log::error!("failed to read task cache entry from active file: {err}"); > + None > + } > + })); > + > + // Consume the iterator to get back the lock. The lock is held > + // until _lock is finally dropped at the end of the function. > + let _lock = task_iter.into_lock(); > + > + active_tasks.extend( > + tasks > + .iter() > + .filter(|task| task.endtime.is_none()) > + .map(|a| (a.upid.clone(), a.starttime)), > + ); > + > + let mut state = self.read_state(); > + let mut new_finished_tasks = tasks > + .into_iter() > + .filter(|task| task.endtime.is_some()) > + .collect::<Vec<_>>(); > + > + new_finished_tasks.sort_by(compare_tasks_reverse); > + self.merge_tasks_into_archive( > + new_finished_tasks, > + &mut active_tasks, > + update_state_for_remote, > + &mut state, > + )?; > + self.update_oldest_active(&active_tasks, &mut state); > + > + let mut active: Vec<TaskCacheItem> = active_tasks > + .into_iter() > + .map(|(upid, starttime)| TaskCacheItem { > + upid, > + starttime, > + status: None, > + endtime: None, > + }) > + .collect(); > + > + active.sort_by(compare_tasks_reverse); > + self.write_active_tasks(active.into_iter())?; > + self.write_state(state)?; > + > + Ok(()) > + } > + > + /// Update the timestamp of the oldest running task in `state`. > + fn update_oldest_active(&self, active_tasks: &HashSet<(RemoteUpid, i64)>, state: &mut State) { > + // Update the state with timestamp of the oldest running task, > + // we also use this as cut-off when fetching tasks, so we > + // for sure know when a task finishes. > + let mut oldest_active_task_per_remote = HashMap::new(); > + for (upid, startime) in active_tasks.iter() { > + oldest_active_task_per_remote > + .entry(upid.remote().to_owned()) > + .and_modify(|time| *time = (*startime).min(*time)) > + .or_insert(*startime); > + } > + state.oldest_active_task = oldest_active_task_per_remote; > + } > + > + /// Merge a list of *finished* tasks into the remote task archive files. > + /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by > + /// timestamp, ascending by UPID). > + /// > + /// The task archive should be locked when calling this. > + fn merge_tasks_into_archive( > + &self, > + tasks: Vec<TaskCacheItem>, > + active_tasks: &mut HashSet<(RemoteUpid, i64)>, > + update_state_for_remote: HashMap<String, bool>, > + state: &mut State, > + ) -> Result<(), Error> { > + debug_assert!(tasks > + .iter() > + .is_sorted_by(|a, b| compare_tasks(a, b).is_ge())); > + > + let files = self.archive_files()?; > + > + let mut files = files.iter().peekable(); > + > + let mut current = files.next(); > + let mut next = files.peek(); > + > + let mut tasks_for_current_file = Vec::new(); > + > + // Tasks are sorted youngest to oldest (biggest starttime first) > + for task in tasks { > + active_tasks.remove(&(task.upid.clone(), task.starttime)); ^ As mention off list, rust nowadays actually understands temporary partial moves, so we could avoid the clone with: let upid_starttime = (task.upid, task.starttime); active_tasks.remove(&upid_starttime); task.upid = upid_starttime.0; > + > + if let Some(tracked_tasks) = state.tracked_tasks.get_mut(task.upid.remote()) { > + tracked_tasks.remove(&task.upid); > + } > + > + if let Some(true) = update_state_for_remote.get(task.upid.remote()) { > + // Update the most recent startime per remote, the task polling logic uses it as a > + // cut-off. > + state > + .most_recent_archive_starttime > + .entry(task.upid.remote().to_owned()) > + .and_modify(|time| *time = (task.starttime).max(*time)) > + .or_insert(task.starttime); > + } > + > + // Skip ahead until we have found the correct file. > + while next.is_some() { > + if let Some(current) = current { > + if task.starttime >= current.starttime { > + break; > + } > + // The next entry's cut-off is larger then the task's start time, that means > + // we want to finalized the current file by merging all tasks that > + // should be stored in it... > + self.merge_single_archive_file( > + std::mem::take(&mut tasks_for_current_file), > + ¤t.file, > + )?; > + } > + > + // ... and the the `current` file to the next entry. > + current = files.next(); > + next = files.peek(); > + } > + > + if let Some(current) = current { > + if task.starttime < current.starttime { > + continue; > + } > + } > + tasks_for_current_file.push(task); > + } > + > + // Merge tasks for the last file. > + if let Some(current) = current { > + self.merge_single_archive_file(tasks_for_current_file, ¤t.file)?; > + } > + > + Ok(()) > + } > + > + /// Add a new tracked task. > + /// > + /// This will insert the task in the list of tracked tasks in the state file, > + /// as well as create an entry in the `active` file. > + /// > + /// This function will request an exclusive lock for the task cache, > + /// do not call if you are already holding a lock. > + pub fn add_tracked_task(&self, task: TaskCacheItem) -> Result<(), Error> { > + let lock = self.lock(true)?; > + > + let mut tasks = Vec::new(); > + let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?; > + > + for task in &mut task_iter { > + match task { > + Ok(task) => tasks.push(task), > + Err(err) => { > + log::error!( > + "could not read existing task cache entry from 'active' file: {err}" > + ); > + } > + } > + } > + > + tasks.push(task.clone()); > + tasks.sort_by(compare_tasks_reverse); > + > + let _lock = task_iter.into_lock(); > + > + let mut state = self.read_state(); > + > + state > + .oldest_active_task > + .entry(task.upid.remote().to_owned()) > + .and_modify(|a| *a = (*a).min(task.starttime)) > + .or_insert(task.starttime); > + > + let tracked_per_remote = state > + .tracked_tasks > + .entry(task.upid.remote().to_owned()) > + .or_default(); > + tracked_per_remote.insert(task.upid); > + > + self.write_active_tasks(tasks.into_iter())?; > + self.write_state(state)?; > + > + Ok(()) > + } > + > + /// Iterate over cached tasks. > + /// > + /// This function will request a non-exclusive read-lock, don't call if > + /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`]. > + pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> { > + let lock = self.lock(false)?; > + self.get_tasks_with_lock(mode, lock) > + } > + > + /// Iterate over cached tasks. > + /// > + /// This function requires you to pass a lock. If you want to continue to hold the lock > + /// after iterating, you can consume the iterator by calling > + /// [`TaskArchiveIterator::into_lock`], yielding the original lock. > + pub fn get_tasks_with_lock( > + &self, > + mode: GetTasks, > + lock: TaskCacheLock, > + ) -> Result<TaskArchiveIterator, Error> { > + match mode { > + GetTasks::All => { > + let mut files = vec![ArchiveFile { > + starttime: 0, ^ this is immediately thrown out > + file: self.base_path.join("active"), > + }]; > + > + let archive_files = self.archive_files()?; > + files.extend(archive_files); ^ you could do the `.map` here in the extend call instead, that would shorten the entire match arm to 3 lines. > + > + Ok(TaskArchiveIterator::new( > + Box::new(files.into_iter().map(|pair| pair.file)), > + lock, > + )) > + } > + GetTasks::Active => { > + let files = vec![ArchiveFile { > + starttime: 0, ^ Same here, here we don't even have a reason :) > + file: self.base_path.join("active"), > + }]; > + Ok(TaskArchiveIterator::new( > + Box::new(files.into_iter().map(|pair| pair.file)), > + lock, > + )) > + } > + GetTasks::Archived => { > + let files = self.archive_files()?; > + > + Ok(TaskArchiveIterator::new( > + Box::new(files.into_iter().map(|pair| pair.file)), > + lock, > + )) > + } > + } > + } > + > + /// Write the provided tasks to the 'active' file. > + /// > + /// The tasks are first written to a temporary file, which is then used > + /// to atomically replace the original. > + fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> { > + let (fd, path) = proxmox_sys::fs::make_tmp_file( > + self.base_path.join("active"), > + self.create_options.clone(), > + )?; > + let mut fd = BufWriter::new(fd); > + > + Self::write_tasks(&mut fd, tasks)?; > + > + if let Err(err) = fd.flush() { > + log::error!("could not flush 'active' file: {err}"); > + } > + drop(fd); > + > + std::fs::rename(path, self.base_path.join("active"))?; ^ if the rename fails we should `unlink(&path)` (and if *that* fails just log its error) At some point we need a *stateful* tempfile helper which also gets the destination path and does the whole rename-or-unlink-log-error dance in a `.commit()`... > + > + Ok(()) > + } > + > + /// Read the state file. > + /// If the state file could not be read or does not exist, the default (empty) state > + /// is returned. > + /// A lock is only necessary if the returned state is modified and later passed > + /// to [`Self::write_state`]. In case of a read-only access no lock is necessary, as > + /// the state file is always replaced atomically. > + pub fn read_state(&self) -> State { > + fn do_read_state(path: &Path) -> Result<State, Error> { > + let data = proxmox_sys::fs::file_read_optional_string(path)?; We could also skip the utf-8 check and use `serde_json::from_slice()` instead... (since we were concerned about performance?) > + match data { > + Some(data) => Ok(serde_json::from_str(&data)?), > + None => Ok(Default::default()), > + } > + } > + > + let path = self.base_path.join("state"); > + > + do_read_state(&path).unwrap_or_else(|err| { > + log::error!("could not read state file: {err}"); > + Default::default() > + }) > + } > + > + /// Write the state file. > + /// The task archive should be locked for writing when calling this function. > + pub fn write_state(&self, state: State) -> Result<(), Error> { > + let path = self.base_path.join("state"); > + > + let data = serde_json::to_vec_pretty(&state)?; > + > + proxmox_sys::fs::replace_file(path, &data, self.create_options.clone(), true)?; > + > + Ok(()) > + } > + > + /// Returns a list of existing archive files, together with their respective > + /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one > + /// first). > + /// The task archive should be locked for reading when calling this function. > + fn archive_files(&self) -> Result<Vec<ArchiveFile>, Error> { > + let mut names = Vec::new(); > + > + for entry in std::fs::read_dir(&self.base_path)? { > + let entry = entry?; > + > + if let Some(endtime) = entry > + .path() > + .file_name() > + .and_then(|s| s.to_str()) > + .and_then(|s| s.strip_prefix("archive.")) > + { > + match endtime.parse() { > + Ok(starttime) => { > + names.push(ArchiveFile { > + starttime, > + file: entry.path(), > + }); > + } > + Err(err) => log::error!("could not parse archive timestamp: {err}"), > + } > + } > + } > + > + names.sort_by_key(|e| -e.starttime); > + > + Ok(names) > + } > + > + /// Merge `tasks` with an existing archive file. > + /// This function assumes that `tasks` and the pre-existing contents of the archive > + /// file are both sorted descending by startime (most recent tasks come first). > + /// The task archive must be locked when calling this function. > + fn merge_single_archive_file( > + &self, > + tasks: Vec<TaskCacheItem>, > + file: &Path, > + ) -> Result<(), Error> { > + if tasks.is_empty() { > + return Ok(()); > + } > + > + let (fd, new_path) = proxmox_sys::fs::make_tmp_file(file, self.create_options.clone())?; > + let mut fd = BufWriter::new(fd); > + > + if file.exists() { ^ Bad pattern. Use `File::open` directly and deal with `err.kind() == NotFound` instead. > + let archive_reader = BufReader::new(File::open(file)?); > + let archive_iter = ArchiveIterator::new(archive_reader) > + .flat_map(|item| match item { > + Ok(item) => Some(item), > + Err(err) => { > + log::error!("could not read task cache item while merging: {err}"); > + None > + } > + }) > + .peekable(); > + let task_iter = tasks.into_iter().peekable(); > + > + Self::write_tasks(&mut fd, MergeTaskIterator::new(archive_iter, task_iter))?; > + } else { > + Self::write_tasks(&mut fd, tasks.into_iter())?; > + } > + > + if let Err(err) = fd.flush() { > + log::error!("could not flush BufWriter for {file:?}: {err}"); > + } > + drop(fd); > + > + if let Err(err) = std::fs::rename(&new_path, file) { > + log::error!("could not replace archive file {new_path:?}: {err}"); ^ as above, should also attempt to unlink(new_path). > + } > + > + Ok(()) > + } > + > + /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`]. > + /// The individual items are encoded as JSON followed by a newline. > + /// The task archive should be locked when calling this function. > + fn write_tasks( > + writer: &mut impl Write, > + tasks: impl Iterator<Item = TaskCacheItem>, > + ) -> Result<(), Error> { > + for task in tasks { > + serde_json::to_writer(&mut *writer, &task)?; > + writeln!(writer)?; > + } > + > + Ok(()) > + } > +} > + > +/// Comparison function for sorting tasks. > +/// The tasks are compared based on the task's start time, falling > +/// back to the task's UPID as a secondary criterion in case the > +/// start times are equal. > +pub fn compare_tasks(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering { > + a.starttime > + .cmp(&b.starttime) > + .then_with(|| b.upid.to_string().cmp(&a.upid.to_string())) > +} > + > +/// Comparison function for sorting tasks, reversed > +/// The tasks are compared based on the task's start time, falling > +/// back to the task's UPID as a secondary criterion in case the > +/// start times are equal. > +pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering { > + compare_tasks(a, b).reverse() > +} > + > +/// Iterator over the task archive. > +pub struct TaskArchiveIterator { > + /// Archive files to read. > + files: Box<dyn Iterator<Item = PathBuf>>, > + /// Archive iterator we are currently using, if any > + current: Option<ArchiveIterator<BufReader<File>>>, > + /// Lock for this archive. > + lock: TaskCacheLock, > +} > + > +impl TaskArchiveIterator { > + /// Create a new task archive iterator. > + pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self { > + Self { > + files, > + current: None, > + lock, > + } > + } > + > + /// Return the task archive lock, consuming `self`. > + pub fn into_lock(self) -> TaskCacheLock { > + self.lock > + } > +} > + > +impl Iterator for &mut TaskArchiveIterator { > + type Item = Result<TaskCacheItem, Error>; > + > + fn next(&mut self) -> Option<Self::Item> { > + loop { > + match &mut self.current { > + Some(current) => { > + let next = current.next(); > + if next.is_some() { > + return next; > + } else { > + self.current = None; > + } > + } > + None => 'inner: loop { > + // Returns `None` if no more files are available, stopping iteration. > + let next_file = self.files.next()?; > + > + match File::open(&next_file) { > + Ok(file) => { > + let archive_reader = BufReader::new(file); > + let archive_iter = ArchiveIterator::new(archive_reader); > + self.current = Some(archive_iter); > + break 'inner; > + } > + Err(err) => { > + log::error!("could not open {next_file:?} while iteration over task archive files, skipping: {err}") > + } > + } > + }, > + } > + } > + } > +} > + > +/// Archive file. > +#[derive(Clone, Debug)] > +struct ArchiveFile { > + /// The path to the archive file. > + file: PathBuf, > + /// The archive's lowest permitted starttime (seconds since UNIX epoch). > + starttime: i64, > +} > + > +/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items > +/// from both iterators sorted. > +/// The two iterators are expected to be sorted descendingly based on the task's starttime and > +/// ascendingly based on the task's UPID's string representation. This can be > +/// achieved by using the [`compare_tasks_reverse`] function when sorting an array of tasks. > +pub struct MergeTaskIterator<T: Iterator<Item = TaskCacheItem>, U: Iterator<Item = TaskCacheItem>> { > + left: Peekable<T>, > + right: Peekable<U>, > +} > + > +impl<T, U> MergeTaskIterator<T, U> > +where > + T: Iterator<Item = TaskCacheItem>, > + U: Iterator<Item = TaskCacheItem>, > +{ > + /// Create a new merging iterator. > + pub fn new(left: Peekable<T>, right: Peekable<U>) -> Self { > + Self { left, right } > + } > +} > + > +impl<T, U> Iterator for MergeTaskIterator<T, U> > +where > + T: Iterator<Item = TaskCacheItem>, > + U: Iterator<Item = TaskCacheItem>, > +{ > + type Item = T::Item; > + > + fn next(&mut self) -> Option<T::Item> { > + let order = match (self.left.peek(), self.right.peek()) { > + (Some(l), Some(r)) => Some(compare_tasks(l, r)), > + (Some(_), None) => Some(Ordering::Greater), > + (None, Some(_)) => Some(Ordering::Less), > + (None, None) => None, > + }; > + > + match order { > + Some(Ordering::Greater) => self.left.next(), > + Some(Ordering::Less) => self.right.next(), > + Some(Ordering::Equal) => { > + // Dedup by consuming the other iterator as well > + let _ = self.right.next(); > + self.left.next() > + } > + None => None, > + } > + } > +} > + > +/// Iterator for a single task archive file. > +/// > +/// This iterator implements `Iterator<Item = Result<TaskCacheItem, Error>`. When iterating, > +/// tasks are read line by line, without leading the entire archive file into memory. > +pub struct ArchiveIterator<T> { > + iter: Lines<T>, > +} > + > +impl<T: BufRead> ArchiveIterator<T> { > + /// Create a new iterator. > + pub fn new(file: T) -> Self { > + let reader = file.lines(); > + > + Self { iter: reader } > + } > +} > + > +impl<T: BufRead> Iterator for ArchiveIterator<T> { > + type Item = Result<TaskCacheItem, Error>; > + > + fn next(&mut self) -> Option<Self::Item> { > + self.iter.next().map(|result| { > + result > + .and_then(|line| Ok(serde_json::from_str(&line)?)) > + .map_err(Into::into) > + }) > + } > +} > + > +#[cfg(test)] > +mod tests { > + use std::io::Cursor; > + > + use crate::test_support::temp::NamedTempDir; > + > + use super::*; > + > + #[test] > + fn archive_iterator() -> Result<(), Error> { > + let file = r#" > + {"upid":"pve-remote!UPID:pve:00039E4D:002638B8:67B4A9D1:stopall::root@pam:","status":"OK","endtime":12345, "starttime": 1234} > + {"upid":"pbs-remote!UPID:pbs:000002B2:00000158:00000000:674D828C:logrotate::root@pam:","status":"OK","endtime":12345, "starttime": 1234} > + invalid"# > + .trim_start(); > + > + let cursor = Cursor::new(file.as_bytes()); > + let mut iter = ArchiveIterator::new(cursor); > + > + assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pve-remote"); > + assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pbs-remote"); > + assert!(iter.next().unwrap().is_err()); > + assert!(iter.next().is_none()); > + > + Ok(()) > + } > + > + fn task(starttime: i64, ended: bool) -> TaskCacheItem { > + let (status, endtime) = if ended { > + (Some("OK".into()), Some(starttime + 10)) > + } else { > + (None, None) > + }; > + > + TaskCacheItem { > + upid: format!( > + "pve-remote!UPID:pve:00039E4D:002638B8:{starttime:08X}:stopall::root@pam:" > + ) > + .parse() > + .unwrap(), > + starttime, > + status, > + endtime, > + } > + } > + > + fn assert_starttimes(cache: &TaskCache, starttimes: &[i64]) { > + let tasks: Vec<i64> = cache > + .get_tasks(GetTasks::All) > + .unwrap() > + .map(|task| task.unwrap().starttime) > + .collect(); > + > + assert_eq!(&tasks, starttimes); > + } > + > + fn add_tasks(cache: &TaskCache, tasks: Vec<TaskCacheItem>) -> Result<(), Error> { > + let param = AddTasks { > + update_most_recent_archive_timestamp: true, > + tasks, > + }; > + let mut a = HashMap::new(); > + a.insert("pve-remote".into(), param); > + > + cache.add_tasks(a) > + } > + > + #[test] > + fn test_add_tasks() -> Result<(), Error> { > + let tmp_dir = NamedTempDir::new()?; > + let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap(); > + > + cache.new_file(1000)?; > + assert_eq!(cache.archive_files()?.len(), 1); > + > + add_tasks(&cache, vec![task(1000, true), task(1001, true)])?; > + > + cache.rotate(1500, 0, 3)?; > + assert_eq!(cache.archive_files()?.len(), 2); > + > + add_tasks(&cache, vec![task(1500, true), task(1501, true)])?; > + add_tasks(&cache, vec![task(1200, true), task(1300, true)])?; > + > + cache.rotate(2000, 0, 3)?; > + assert_eq!(cache.archive_files()?.len(), 3); > + > + add_tasks(&cache, vec![task(2000, true)])?; > + add_tasks(&cache, vec![task(1502, true)])?; > + add_tasks(&cache, vec![task(1002, true)])?; > + > + // These are before the cut-off of 1000, so they will be discarded. > + add_tasks(&cache, vec![task(800, true), task(900, true)])?; > + > + // This one should be deduped > + add_tasks(&cache, vec![task(1000, true)])?; > + > + assert_starttimes( > + &cache, > + &[2000, 1502, 1501, 1500, 1300, 1200, 1002, 1001, 1000], > + ); > + > + cache.rotate(2500, 0, 3)?; > + > + assert_eq!(cache.archive_files()?.len(), 3); > + > + assert_starttimes(&cache, &[2000, 1502, 1501, 1500]); > + > + cache.rotate(3000, 0, 3)?; > + assert_eq!(cache.archive_files()?.len(), 3); > + > + assert_starttimes(&cache, &[2000]); > + > + Ok(()) > + } > + > + #[test] > + fn test_active_tasks_are_migrated_to_archive() -> Result<(), Error> { > + let tmp_dir = NamedTempDir::new()?; > + let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap(); > + > + cache.new_file(1000)?; > + add_tasks(&cache, vec![task(1000, false), task(1001, false)])?; > + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2); > + > + let state = cache.read_state(); > + assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1000); > + > + add_tasks(&cache, vec![task(1000, true), task(1001, true)])?; > + > + assert_starttimes(&cache, &[1001, 1000]); > + > + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0); > + > + Ok(()) > + } > + > + #[test] > + fn test_init() -> Result<(), Error> { > + let tmp_dir = NamedTempDir::new()?; > + let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap(); > + > + cache.init(1000, 3, 100)?; > + assert_eq!(cache.archive_files()?.len(), 3); > + > + add_tasks( > + &cache, > + vec![ > + task(1050, true), > + task(950, true), > + task(850, true), > + task(750, true), // This one is discarded > + ], > + )?; > + > + assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 3); > + > + Ok(()) > + } > + > + #[test] > + fn test_tracking_tasks() -> Result<(), Error> { > + let tmp_dir = NamedTempDir::new()?; > + let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap(); > + > + cache.init(1000, 3, 100)?; > + > + cache.add_tracked_task(task(1050, false))?; > + > + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1); > + cache.add_tracked_task(task(1060, false))?; > + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2); > + > + let state = cache.read_state(); > + assert_eq!(state.tracked_tasks.get("pve-remote").unwrap().len(), 2); > + assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1050); > + > + // Mark first task as finished > + add_tasks(&cache, vec![task(1050, true)])?; > + > + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1); > + assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 1); > + > + // Mark second task as finished > + add_tasks(&cache, vec![task(1060, true)])?; > + > + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0); > + assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 2); > + > + Ok(()) > + } > +} > -- > 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel