all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization
Date: Thu,  2 Jul 2026 11:22:49 +0200	[thread overview]
Message-ID: <20260702092258.174740-7-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>

Compute all static paths during initialization, store them as members of
the TaskCache struct, and add methods to access them.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 118 +++++++++++++++++---------
 1 file changed, 80 insertions(+), 38 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index aa92fcc8..d97fa710 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -143,6 +143,19 @@ impl State {
 pub struct TaskCache {
     /// Path where the cache's files should be placed.
     base_path: PathBuf,
+
+    /// Path to the journal (WAL).
+    journal_path: PathBuf,
+
+    /// Path to the state file.
+    state_path: PathBuf,
+
+    /// Path to the lock file.
+    lock_path: PathBuf,
+
+    /// Path to the file containing active tasks.
+    active_path: PathBuf,
+
     /// File permissions for the cache's files.
     create_options: CreateOptions,
 
@@ -240,19 +253,10 @@ impl WritableTaskCache {
     /// Start a new archive file with a given timestamp.
     /// `now` is supposed to be a UNIX timestamp (seconds).
     fn new_file(&self, now: i64, compress: bool) -> Result<ArchiveFile, Error> {
-        let suffix = if compress {
-            ZSTD_EXTENSION_WITH_DOT
-        } else {
-            ""
-        };
+        let path = self.cache.archive_path(now, compress);
 
-        let new_path = self
-            .cache
-            .base_path
-            .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}{suffix}"));
-
-        let mut file = File::create(&new_path)?;
-        self.cache.create_options.apply_to(&mut file, &new_path)?;
+        let mut file = File::create(&path)?;
+        self.cache.create_options.apply_to(&mut file, &path)?;
 
         if compress {
             let encoder = zstd::stream::write::Encoder::new(file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
@@ -260,7 +264,7 @@ impl WritableTaskCache {
         }
 
         Ok(ArchiveFile {
-            path: new_path,
+            path,
             compressed: compress,
             starttime: now,
         })
@@ -407,11 +411,10 @@ impl WritableTaskCache {
         node_success_map: &NodeFetchSuccessMap,
         state: &mut State,
     ) -> Result<(), Error> {
-        let filename = self.cache.base_path.join(WAL_FILENAME);
         let mut file = OpenOptions::new()
             .append(true)
             .create(true)
-            .open(filename)?;
+            .open(self.cache.journal_path())?;
 
         for task in finished_tasks {
             // Remove this finished task from our set of active tasks.
@@ -479,8 +482,7 @@ impl WritableTaskCache {
     fn journal_size(&self) -> Result<u64, Error> {
         let metadata = self
             .cache
-            .base_path
-            .join(WAL_FILENAME)
+            .journal_path()
             .metadata()
             .context("failed to read metadata of journal file")?;
 
@@ -504,9 +506,9 @@ impl WritableTaskCache {
     /// This will merge all tasks in the journal file into the task archive.
     pub fn apply_journal(&self) -> Result<(), Error> {
         let start = Instant::now();
-        let filename = self.cache.base_path.join(WAL_FILENAME);
+        let journal_path = self.cache.journal_path();
 
-        let file = match File::open(&filename) {
+        let file = match File::open(journal_path) {
             Ok(file) => Box::new(BufReader::new(file)),
             Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
             Err(err) => return Err(err.into()),
@@ -537,7 +539,7 @@ impl WritableTaskCache {
         OpenOptions::new()
             .write(true)
             .truncate(true)
-            .open(filename)
+            .open(journal_path)
             .context("failed to truncate journal file")?;
 
         log::info!(
@@ -649,7 +651,7 @@ impl WritableTaskCache {
 
     /// Write the state file.
     fn write_state(&self, state: State) -> Result<(), Error> {
-        let path = self.cache.base_path.join(STATE_FILENAME);
+        let path = self.cache.state_path();
 
         let data = serde_json::to_vec_pretty(&state)?;
 
@@ -663,10 +665,9 @@ impl WritableTaskCache {
     /// The tasks are first written to a temporary file, which is then used
     /// to atomically replace the original.
     fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
-        let (fd, path) = proxmox_sys::fs::make_tmp_file(
-            self.cache.base_path.join(ACTIVE_FILENAME),
-            self.cache.create_options,
-        )?;
+        let target = self.cache.active_path();
+
+        let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?;
         let mut fd = BufWriter::new(fd);
 
         Self::write_tasks(&mut fd, tasks)?;
@@ -676,9 +677,7 @@ impl WritableTaskCache {
         }
         drop(fd);
 
-        let target = self.cache.base_path.join(ACTIVE_FILENAME);
-
-        let res = std::fs::rename(&path, &target).with_context(|| {
+        let res = std::fs::rename(&path, target).with_context(|| {
             format!(
                 "failed to replace {} with {}",
                 target.display(),
@@ -790,7 +789,7 @@ impl TaskCache {
     ///
     /// Remember to call `init` or `new_file` on a locked, writable TaskCache
     /// to create the initial archive files.
-    pub fn new<P: AsRef<Path>>(
+    pub fn new<P: Into<PathBuf>>(
         path: P,
         create_options: CreateOptions,
         max_files: u32,
@@ -798,8 +797,19 @@ impl TaskCache {
         rotate_after: u64,
         journal_max_size: u64,
     ) -> Result<Self, Error> {
+        let base_path = path.into();
+
+        let journal_path = base_path.join(WAL_FILENAME);
+        let state_path = base_path.join(STATE_FILENAME);
+        let active_path = base_path.join(ACTIVE_FILENAME);
+        let lock_path = base_path.join(LOCKFILE_FILENAME);
+
         Ok(Self {
-            base_path: path.as_ref().into(),
+            base_path,
+            journal_path,
+            state_path,
+            active_path,
+            lock_path,
             create_options,
             journal_max_size,
             max_files,
@@ -823,7 +833,7 @@ impl TaskCache {
     }
 
     fn lock_impl(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
-        let lockfile = self.base_path.join(LOCKFILE_FILENAME);
+        let lockfile = self.lock_path();
 
         Ok(TaskCacheLock(proxmox_sys::fs::open_file_locked(
             lockfile,
@@ -845,8 +855,7 @@ impl TaskCache {
             }
         }
 
-        let path = self.base_path.join(STATE_FILENAME);
-        do_read_state(&path).unwrap_or_else(|err| {
+        do_read_state(self.state_path()).unwrap_or_else(|err| {
             log::error!("could not read state file: {err:#}");
             Default::default()
         })
@@ -857,7 +866,7 @@ impl TaskCache {
         mode: GetTasks,
         lock: &'a TaskCacheLock,
     ) -> Result<TaskArchiveIterator<'a>, Error> {
-        let journal_file = self.base_path.join(WAL_FILENAME);
+        let journal_file = self.journal_path();
 
         match mode {
             GetTasks::All => {
@@ -865,16 +874,16 @@ impl TaskCache {
                 archive_files.reverse();
 
                 archive_files.push(ArchiveFile {
-                    path: self.base_path.join(ACTIVE_FILENAME),
+                    path: self.active_path().into(),
                     compressed: false,
                     starttime: 0,
                 });
 
-                TaskArchiveIterator::new(Some(journal_file), archive_files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
             }
             GetTasks::Active => {
                 let archive_files = vec![ArchiveFile {
-                    path: self.base_path.join(ACTIVE_FILENAME),
+                    path: self.active_path().into(),
                     compressed: false,
                     starttime: 0,
                 }];
@@ -886,7 +895,7 @@ impl TaskCache {
                 let mut files = self.archive_files(lock)?;
                 files.reverse();
 
-                TaskArchiveIterator::new(Some(journal_file), files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), files, lock)
             }
         }
     }
@@ -935,6 +944,39 @@ impl TaskCache {
             })
         }
     }
+
+    /// Return path to the task cache journal (WAL).
+    fn journal_path(&self) -> &Path {
+        &self.journal_path
+    }
+
+    /// Return path to the task cache state file.
+    fn state_path(&self) -> &Path {
+        &self.state_path
+    }
+
+    /// Return path to the file of active tasks.
+    fn active_path(&self) -> &Path {
+        &self.active_path
+    }
+
+    /// Return path to the task cache lock file.
+    fn lock_path(&self) -> &Path {
+        &self.lock_path
+    }
+
+    /// Construct an archive file path, given it's lower bound timestamp and its compression
+    /// status.
+    fn archive_path(&self, starttime: i64, compressed: bool) -> PathBuf {
+        let suffix = if compressed {
+            ZSTD_EXTENSION_WITH_DOT
+        } else {
+            ""
+        };
+
+        self.base_path
+            .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}"))
+    }
 }
 
 /// Comparison function for sorting tasks.
-- 
2.47.3





  parent reply	other threads:[~2026-07-02  9:23 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` Lukas Wagner [this message]
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260702092258.174740-7-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal