public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization
Date: Thu,  2 Jul 2026 11:22:49 +0200	[thread overview]
Message-ID: <20260702092258.174740-7-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>

Compute all static paths during initialization, store them as members of
the TaskCache struct, and add methods to access them.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 118 +++++++++++++++++---------
 1 file changed, 80 insertions(+), 38 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index aa92fcc8..d97fa710 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -143,6 +143,19 @@ impl State {
 pub struct TaskCache {
     /// Path where the cache's files should be placed.
     base_path: PathBuf,
+
+    /// Path to the journal (WAL).
+    journal_path: PathBuf,
+
+    /// Path to the state file.
+    state_path: PathBuf,
+
+    /// Path to the lock file.
+    lock_path: PathBuf,
+
+    /// Path to the file containing active tasks.
+    active_path: PathBuf,
+
     /// File permissions for the cache's files.
     create_options: CreateOptions,
 
@@ -240,19 +253,10 @@ impl WritableTaskCache {
     /// Start a new archive file with a given timestamp.
     /// `now` is supposed to be a UNIX timestamp (seconds).
     fn new_file(&self, now: i64, compress: bool) -> Result<ArchiveFile, Error> {
-        let suffix = if compress {
-            ZSTD_EXTENSION_WITH_DOT
-        } else {
-            ""
-        };
+        let path = self.cache.archive_path(now, compress);
 
-        let new_path = self
-            .cache
-            .base_path
-            .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}{suffix}"));
-
-        let mut file = File::create(&new_path)?;
-        self.cache.create_options.apply_to(&mut file, &new_path)?;
+        let mut file = File::create(&path)?;
+        self.cache.create_options.apply_to(&mut file, &path)?;
 
         if compress {
             let encoder = zstd::stream::write::Encoder::new(file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
@@ -260,7 +264,7 @@ impl WritableTaskCache {
         }
 
         Ok(ArchiveFile {
-            path: new_path,
+            path,
             compressed: compress,
             starttime: now,
         })
@@ -407,11 +411,10 @@ impl WritableTaskCache {
         node_success_map: &NodeFetchSuccessMap,
         state: &mut State,
     ) -> Result<(), Error> {
-        let filename = self.cache.base_path.join(WAL_FILENAME);
         let mut file = OpenOptions::new()
             .append(true)
             .create(true)
-            .open(filename)?;
+            .open(self.cache.journal_path())?;
 
         for task in finished_tasks {
             // Remove this finished task from our set of active tasks.
@@ -479,8 +482,7 @@ impl WritableTaskCache {
     fn journal_size(&self) -> Result<u64, Error> {
         let metadata = self
             .cache
-            .base_path
-            .join(WAL_FILENAME)
+            .journal_path()
             .metadata()
             .context("failed to read metadata of journal file")?;
 
@@ -504,9 +506,9 @@ impl WritableTaskCache {
     /// This will merge all tasks in the journal file into the task archive.
     pub fn apply_journal(&self) -> Result<(), Error> {
         let start = Instant::now();
-        let filename = self.cache.base_path.join(WAL_FILENAME);
+        let journal_path = self.cache.journal_path();
 
-        let file = match File::open(&filename) {
+        let file = match File::open(journal_path) {
             Ok(file) => Box::new(BufReader::new(file)),
             Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
             Err(err) => return Err(err.into()),
@@ -537,7 +539,7 @@ impl WritableTaskCache {
         OpenOptions::new()
             .write(true)
             .truncate(true)
-            .open(filename)
+            .open(journal_path)
             .context("failed to truncate journal file")?;
 
         log::info!(
@@ -649,7 +651,7 @@ impl WritableTaskCache {
 
     /// Write the state file.
     fn write_state(&self, state: State) -> Result<(), Error> {
-        let path = self.cache.base_path.join(STATE_FILENAME);
+        let path = self.cache.state_path();
 
         let data = serde_json::to_vec_pretty(&state)?;
 
@@ -663,10 +665,9 @@ impl WritableTaskCache {
     /// The tasks are first written to a temporary file, which is then used
     /// to atomically replace the original.
     fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
-        let (fd, path) = proxmox_sys::fs::make_tmp_file(
-            self.cache.base_path.join(ACTIVE_FILENAME),
-            self.cache.create_options,
-        )?;
+        let target = self.cache.active_path();
+
+        let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?;
         let mut fd = BufWriter::new(fd);
 
         Self::write_tasks(&mut fd, tasks)?;
@@ -676,9 +677,7 @@ impl WritableTaskCache {
         }
         drop(fd);
 
-        let target = self.cache.base_path.join(ACTIVE_FILENAME);
-
-        let res = std::fs::rename(&path, &target).with_context(|| {
+        let res = std::fs::rename(&path, target).with_context(|| {
             format!(
                 "failed to replace {} with {}",
                 target.display(),
@@ -790,7 +789,7 @@ impl TaskCache {
     ///
     /// Remember to call `init` or `new_file` on a locked, writable TaskCache
     /// to create the initial archive files.
-    pub fn new<P: AsRef<Path>>(
+    pub fn new<P: Into<PathBuf>>(
         path: P,
         create_options: CreateOptions,
         max_files: u32,
@@ -798,8 +797,19 @@ impl TaskCache {
         rotate_after: u64,
         journal_max_size: u64,
     ) -> Result<Self, Error> {
+        let base_path = path.into();
+
+        let journal_path = base_path.join(WAL_FILENAME);
+        let state_path = base_path.join(STATE_FILENAME);
+        let active_path = base_path.join(ACTIVE_FILENAME);
+        let lock_path = base_path.join(LOCKFILE_FILENAME);
+
         Ok(Self {
-            base_path: path.as_ref().into(),
+            base_path,
+            journal_path,
+            state_path,
+            active_path,
+            lock_path,
             create_options,
             journal_max_size,
             max_files,
@@ -823,7 +833,7 @@ impl TaskCache {
     }
 
     fn lock_impl(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
-        let lockfile = self.base_path.join(LOCKFILE_FILENAME);
+        let lockfile = self.lock_path();
 
         Ok(TaskCacheLock(proxmox_sys::fs::open_file_locked(
             lockfile,
@@ -845,8 +855,7 @@ impl TaskCache {
             }
         }
 
-        let path = self.base_path.join(STATE_FILENAME);
-        do_read_state(&path).unwrap_or_else(|err| {
+        do_read_state(self.state_path()).unwrap_or_else(|err| {
             log::error!("could not read state file: {err:#}");
             Default::default()
         })
@@ -857,7 +866,7 @@ impl TaskCache {
         mode: GetTasks,
         lock: &'a TaskCacheLock,
     ) -> Result<TaskArchiveIterator<'a>, Error> {
-        let journal_file = self.base_path.join(WAL_FILENAME);
+        let journal_file = self.journal_path();
 
         match mode {
             GetTasks::All => {
@@ -865,16 +874,16 @@ impl TaskCache {
                 archive_files.reverse();
 
                 archive_files.push(ArchiveFile {
-                    path: self.base_path.join(ACTIVE_FILENAME),
+                    path: self.active_path().into(),
                     compressed: false,
                     starttime: 0,
                 });
 
-                TaskArchiveIterator::new(Some(journal_file), archive_files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
             }
             GetTasks::Active => {
                 let archive_files = vec![ArchiveFile {
-                    path: self.base_path.join(ACTIVE_FILENAME),
+                    path: self.active_path().into(),
                     compressed: false,
                     starttime: 0,
                 }];
@@ -886,7 +895,7 @@ impl TaskCache {
                 let mut files = self.archive_files(lock)?;
                 files.reverse();
 
-                TaskArchiveIterator::new(Some(journal_file), files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), files, lock)
             }
         }
     }
@@ -935,6 +944,39 @@ impl TaskCache {
             })
         }
     }
+
+    /// Return path to the task cache journal (WAL).
+    fn journal_path(&self) -> &Path {
+        &self.journal_path
+    }
+
+    /// Return path to the task cache state file.
+    fn state_path(&self) -> &Path {
+        &self.state_path
+    }
+
+    /// Return path to the file of active tasks.
+    fn active_path(&self) -> &Path {
+        &self.active_path
+    }
+
+    /// Return path to the task cache lock file.
+    fn lock_path(&self) -> &Path {
+        &self.lock_path
+    }
+
+    /// Construct an archive file path, given it's lower bound timestamp and its compression
+    /// status.
+    fn archive_path(&self, starttime: i64, compressed: bool) -> PathBuf {
+        let suffix = if compressed {
+            ZSTD_EXTENSION_WITH_DOT
+        } else {
+            ""
+        };
+
+        self.base_path
+            .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}"))
+    }
 }
 
 /// Comparison function for sorting tasks.
-- 
2.47.3





  parent reply	other threads:[~2026-07-02  9:23 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` Lukas Wagner [this message]
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260702092258.174740-7-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal