From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from gate001.proxmox.com (gate001.proxmox.com [45.144.208.40]) by lore.proxmox.com (Postfix) with ESMTPS id E8D8C1FF135 for ; Thu, 02 Jul 2026 11:23:55 +0200 (CEST) Received: from gate001.proxmox.com (localhost.localdomain [127.0.0.1]) by gate001.proxmox.com (Proxmox) with ESMTP id ED65B2149E; Thu, 02 Jul 2026 11:23:52 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Date: Thu, 2 Jul 2026 11:22:49 +0200 Message-ID: <20260702092258.174740-7-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com> References: <20260702092258.174740-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1782984189973 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.000 Adjusted score from AWL reputation of From: address DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment (newer systems) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: FQF56RV72FOMGV64DOR32F2GGPPMM5HF X-Message-ID-Hash: FQF56RV72FOMGV64DOR32F2GGPPMM5HF X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Compute all static paths during initialization, store them as members of the TaskCache struct, and add methods to access them. Signed-off-by: Lukas Wagner --- server/src/remote_tasks/task_cache.rs | 118 +++++++++++++++++--------- 1 file changed, 80 insertions(+), 38 deletions(-) diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs index aa92fcc8..d97fa710 100644 --- a/server/src/remote_tasks/task_cache.rs +++ b/server/src/remote_tasks/task_cache.rs @@ -143,6 +143,19 @@ impl State { pub struct TaskCache { /// Path where the cache's files should be placed. base_path: PathBuf, + + /// Path to the journal (WAL). + journal_path: PathBuf, + + /// Path to the state file. + state_path: PathBuf, + + /// Path to the lock file. + lock_path: PathBuf, + + /// Path to the file containing active tasks. + active_path: PathBuf, + /// File permissions for the cache's files. create_options: CreateOptions, @@ -240,19 +253,10 @@ impl WritableTaskCache { /// Start a new archive file with a given timestamp. /// `now` is supposed to be a UNIX timestamp (seconds). fn new_file(&self, now: i64, compress: bool) -> Result { - let suffix = if compress { - ZSTD_EXTENSION_WITH_DOT - } else { - "" - }; + let path = self.cache.archive_path(now, compress); - let new_path = self - .cache - .base_path - .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}{suffix}")); - - let mut file = File::create(&new_path)?; - self.cache.create_options.apply_to(&mut file, &new_path)?; + let mut file = File::create(&path)?; + self.cache.create_options.apply_to(&mut file, &path)?; if compress { let encoder = zstd::stream::write::Encoder::new(file, zstd::DEFAULT_COMPRESSION_LEVEL)?; @@ -260,7 +264,7 @@ impl WritableTaskCache { } Ok(ArchiveFile { - path: new_path, + path, compressed: compress, starttime: now, }) @@ -407,11 +411,10 @@ impl WritableTaskCache { node_success_map: &NodeFetchSuccessMap, state: &mut State, ) -> Result<(), Error> { - let filename = self.cache.base_path.join(WAL_FILENAME); let mut file = OpenOptions::new() .append(true) .create(true) - .open(filename)?; + .open(self.cache.journal_path())?; for task in finished_tasks { // Remove this finished task from our set of active tasks. @@ -479,8 +482,7 @@ impl WritableTaskCache { fn journal_size(&self) -> Result { let metadata = self .cache - .base_path - .join(WAL_FILENAME) + .journal_path() .metadata() .context("failed to read metadata of journal file")?; @@ -504,9 +506,9 @@ impl WritableTaskCache { /// This will merge all tasks in the journal file into the task archive. pub fn apply_journal(&self) -> Result<(), Error> { let start = Instant::now(); - let filename = self.cache.base_path.join(WAL_FILENAME); + let journal_path = self.cache.journal_path(); - let file = match File::open(&filename) { + let file = match File::open(journal_path) { Ok(file) => Box::new(BufReader::new(file)), Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()), Err(err) => return Err(err.into()), @@ -537,7 +539,7 @@ impl WritableTaskCache { OpenOptions::new() .write(true) .truncate(true) - .open(filename) + .open(journal_path) .context("failed to truncate journal file")?; log::info!( @@ -649,7 +651,7 @@ impl WritableTaskCache { /// Write the state file. fn write_state(&self, state: State) -> Result<(), Error> { - let path = self.cache.base_path.join(STATE_FILENAME); + let path = self.cache.state_path(); let data = serde_json::to_vec_pretty(&state)?; @@ -663,10 +665,9 @@ impl WritableTaskCache { /// The tasks are first written to a temporary file, which is then used /// to atomically replace the original. fn write_active_tasks(&self, tasks: impl Iterator) -> Result<(), Error> { - let (fd, path) = proxmox_sys::fs::make_tmp_file( - self.cache.base_path.join(ACTIVE_FILENAME), - self.cache.create_options, - )?; + let target = self.cache.active_path(); + + let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?; let mut fd = BufWriter::new(fd); Self::write_tasks(&mut fd, tasks)?; @@ -676,9 +677,7 @@ impl WritableTaskCache { } drop(fd); - let target = self.cache.base_path.join(ACTIVE_FILENAME); - - let res = std::fs::rename(&path, &target).with_context(|| { + let res = std::fs::rename(&path, target).with_context(|| { format!( "failed to replace {} with {}", target.display(), @@ -790,7 +789,7 @@ impl TaskCache { /// /// Remember to call `init` or `new_file` on a locked, writable TaskCache /// to create the initial archive files. - pub fn new>( + pub fn new>( path: P, create_options: CreateOptions, max_files: u32, @@ -798,8 +797,19 @@ impl TaskCache { rotate_after: u64, journal_max_size: u64, ) -> Result { + let base_path = path.into(); + + let journal_path = base_path.join(WAL_FILENAME); + let state_path = base_path.join(STATE_FILENAME); + let active_path = base_path.join(ACTIVE_FILENAME); + let lock_path = base_path.join(LOCKFILE_FILENAME); + Ok(Self { - base_path: path.as_ref().into(), + base_path, + journal_path, + state_path, + active_path, + lock_path, create_options, journal_max_size, max_files, @@ -823,7 +833,7 @@ impl TaskCache { } fn lock_impl(&self, exclusive: bool) -> Result { - let lockfile = self.base_path.join(LOCKFILE_FILENAME); + let lockfile = self.lock_path(); Ok(TaskCacheLock(proxmox_sys::fs::open_file_locked( lockfile, @@ -845,8 +855,7 @@ impl TaskCache { } } - let path = self.base_path.join(STATE_FILENAME); - do_read_state(&path).unwrap_or_else(|err| { + do_read_state(self.state_path()).unwrap_or_else(|err| { log::error!("could not read state file: {err:#}"); Default::default() }) @@ -857,7 +866,7 @@ impl TaskCache { mode: GetTasks, lock: &'a TaskCacheLock, ) -> Result, Error> { - let journal_file = self.base_path.join(WAL_FILENAME); + let journal_file = self.journal_path(); match mode { GetTasks::All => { @@ -865,16 +874,16 @@ impl TaskCache { archive_files.reverse(); archive_files.push(ArchiveFile { - path: self.base_path.join(ACTIVE_FILENAME), + path: self.active_path().into(), compressed: false, starttime: 0, }); - TaskArchiveIterator::new(Some(journal_file), archive_files, lock) + TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock) } GetTasks::Active => { let archive_files = vec![ArchiveFile { - path: self.base_path.join(ACTIVE_FILENAME), + path: self.active_path().into(), compressed: false, starttime: 0, }]; @@ -886,7 +895,7 @@ impl TaskCache { let mut files = self.archive_files(lock)?; files.reverse(); - TaskArchiveIterator::new(Some(journal_file), files, lock) + TaskArchiveIterator::new(Some(journal_file.into()), files, lock) } } } @@ -935,6 +944,39 @@ impl TaskCache { }) } } + + /// Return path to the task cache journal (WAL). + fn journal_path(&self) -> &Path { + &self.journal_path + } + + /// Return path to the task cache state file. + fn state_path(&self) -> &Path { + &self.state_path + } + + /// Return path to the file of active tasks. + fn active_path(&self) -> &Path { + &self.active_path + } + + /// Return path to the task cache lock file. + fn lock_path(&self) -> &Path { + &self.lock_path + } + + /// Construct an archive file path, given it's lower bound timestamp and its compression + /// status. + fn archive_path(&self, starttime: i64, compressed: bool) -> PathBuf { + let suffix = if compressed { + ZSTD_EXTENSION_WITH_DOT + } else { + "" + }; + + self.base_path + .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}")) + } } /// Comparison function for sorting tasks. -- 2.47.3