From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization
Date: Thu, 2 Jul 2026 11:22:49 +0200 [thread overview]
Message-ID: <20260702092258.174740-7-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>
Compute all static paths during initialization, store them as members of
the TaskCache struct, and add methods to access them.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/remote_tasks/task_cache.rs | 118 +++++++++++++++++---------
1 file changed, 80 insertions(+), 38 deletions(-)
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index aa92fcc8..d97fa710 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -143,6 +143,19 @@ impl State {
pub struct TaskCache {
/// Path where the cache's files should be placed.
base_path: PathBuf,
+
+ /// Path to the journal (WAL).
+ journal_path: PathBuf,
+
+ /// Path to the state file.
+ state_path: PathBuf,
+
+ /// Path to the lock file.
+ lock_path: PathBuf,
+
+ /// Path to the file containing active tasks.
+ active_path: PathBuf,
+
/// File permissions for the cache's files.
create_options: CreateOptions,
@@ -240,19 +253,10 @@ impl WritableTaskCache {
/// Start a new archive file with a given timestamp.
/// `now` is supposed to be a UNIX timestamp (seconds).
fn new_file(&self, now: i64, compress: bool) -> Result<ArchiveFile, Error> {
- let suffix = if compress {
- ZSTD_EXTENSION_WITH_DOT
- } else {
- ""
- };
+ let path = self.cache.archive_path(now, compress);
- let new_path = self
- .cache
- .base_path
- .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}{suffix}"));
-
- let mut file = File::create(&new_path)?;
- self.cache.create_options.apply_to(&mut file, &new_path)?;
+ let mut file = File::create(&path)?;
+ self.cache.create_options.apply_to(&mut file, &path)?;
if compress {
let encoder = zstd::stream::write::Encoder::new(file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
@@ -260,7 +264,7 @@ impl WritableTaskCache {
}
Ok(ArchiveFile {
- path: new_path,
+ path,
compressed: compress,
starttime: now,
})
@@ -407,11 +411,10 @@ impl WritableTaskCache {
node_success_map: &NodeFetchSuccessMap,
state: &mut State,
) -> Result<(), Error> {
- let filename = self.cache.base_path.join(WAL_FILENAME);
let mut file = OpenOptions::new()
.append(true)
.create(true)
- .open(filename)?;
+ .open(self.cache.journal_path())?;
for task in finished_tasks {
// Remove this finished task from our set of active tasks.
@@ -479,8 +482,7 @@ impl WritableTaskCache {
fn journal_size(&self) -> Result<u64, Error> {
let metadata = self
.cache
- .base_path
- .join(WAL_FILENAME)
+ .journal_path()
.metadata()
.context("failed to read metadata of journal file")?;
@@ -504,9 +506,9 @@ impl WritableTaskCache {
/// This will merge all tasks in the journal file into the task archive.
pub fn apply_journal(&self) -> Result<(), Error> {
let start = Instant::now();
- let filename = self.cache.base_path.join(WAL_FILENAME);
+ let journal_path = self.cache.journal_path();
- let file = match File::open(&filename) {
+ let file = match File::open(journal_path) {
Ok(file) => Box::new(BufReader::new(file)),
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
Err(err) => return Err(err.into()),
@@ -537,7 +539,7 @@ impl WritableTaskCache {
OpenOptions::new()
.write(true)
.truncate(true)
- .open(filename)
+ .open(journal_path)
.context("failed to truncate journal file")?;
log::info!(
@@ -649,7 +651,7 @@ impl WritableTaskCache {
/// Write the state file.
fn write_state(&self, state: State) -> Result<(), Error> {
- let path = self.cache.base_path.join(STATE_FILENAME);
+ let path = self.cache.state_path();
let data = serde_json::to_vec_pretty(&state)?;
@@ -663,10 +665,9 @@ impl WritableTaskCache {
/// The tasks are first written to a temporary file, which is then used
/// to atomically replace the original.
fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
- let (fd, path) = proxmox_sys::fs::make_tmp_file(
- self.cache.base_path.join(ACTIVE_FILENAME),
- self.cache.create_options,
- )?;
+ let target = self.cache.active_path();
+
+ let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?;
let mut fd = BufWriter::new(fd);
Self::write_tasks(&mut fd, tasks)?;
@@ -676,9 +677,7 @@ impl WritableTaskCache {
}
drop(fd);
- let target = self.cache.base_path.join(ACTIVE_FILENAME);
-
- let res = std::fs::rename(&path, &target).with_context(|| {
+ let res = std::fs::rename(&path, target).with_context(|| {
format!(
"failed to replace {} with {}",
target.display(),
@@ -790,7 +789,7 @@ impl TaskCache {
///
/// Remember to call `init` or `new_file` on a locked, writable TaskCache
/// to create the initial archive files.
- pub fn new<P: AsRef<Path>>(
+ pub fn new<P: Into<PathBuf>>(
path: P,
create_options: CreateOptions,
max_files: u32,
@@ -798,8 +797,19 @@ impl TaskCache {
rotate_after: u64,
journal_max_size: u64,
) -> Result<Self, Error> {
+ let base_path = path.into();
+
+ let journal_path = base_path.join(WAL_FILENAME);
+ let state_path = base_path.join(STATE_FILENAME);
+ let active_path = base_path.join(ACTIVE_FILENAME);
+ let lock_path = base_path.join(LOCKFILE_FILENAME);
+
Ok(Self {
- base_path: path.as_ref().into(),
+ base_path,
+ journal_path,
+ state_path,
+ active_path,
+ lock_path,
create_options,
journal_max_size,
max_files,
@@ -823,7 +833,7 @@ impl TaskCache {
}
fn lock_impl(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
- let lockfile = self.base_path.join(LOCKFILE_FILENAME);
+ let lockfile = self.lock_path();
Ok(TaskCacheLock(proxmox_sys::fs::open_file_locked(
lockfile,
@@ -845,8 +855,7 @@ impl TaskCache {
}
}
- let path = self.base_path.join(STATE_FILENAME);
- do_read_state(&path).unwrap_or_else(|err| {
+ do_read_state(self.state_path()).unwrap_or_else(|err| {
log::error!("could not read state file: {err:#}");
Default::default()
})
@@ -857,7 +866,7 @@ impl TaskCache {
mode: GetTasks,
lock: &'a TaskCacheLock,
) -> Result<TaskArchiveIterator<'a>, Error> {
- let journal_file = self.base_path.join(WAL_FILENAME);
+ let journal_file = self.journal_path();
match mode {
GetTasks::All => {
@@ -865,16 +874,16 @@ impl TaskCache {
archive_files.reverse();
archive_files.push(ArchiveFile {
- path: self.base_path.join(ACTIVE_FILENAME),
+ path: self.active_path().into(),
compressed: false,
starttime: 0,
});
- TaskArchiveIterator::new(Some(journal_file), archive_files, lock)
+ TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
}
GetTasks::Active => {
let archive_files = vec![ArchiveFile {
- path: self.base_path.join(ACTIVE_FILENAME),
+ path: self.active_path().into(),
compressed: false,
starttime: 0,
}];
@@ -886,7 +895,7 @@ impl TaskCache {
let mut files = self.archive_files(lock)?;
files.reverse();
- TaskArchiveIterator::new(Some(journal_file), files, lock)
+ TaskArchiveIterator::new(Some(journal_file.into()), files, lock)
}
}
}
@@ -935,6 +944,39 @@ impl TaskCache {
})
}
}
+
+ /// Return path to the task cache journal (WAL).
+ fn journal_path(&self) -> &Path {
+ &self.journal_path
+ }
+
+ /// Return path to the task cache state file.
+ fn state_path(&self) -> &Path {
+ &self.state_path
+ }
+
+ /// Return path to the file of active tasks.
+ fn active_path(&self) -> &Path {
+ &self.active_path
+ }
+
+ /// Return path to the task cache lock file.
+ fn lock_path(&self) -> &Path {
+ &self.lock_path
+ }
+
+ /// Construct an archive file path, given it's lower bound timestamp and its compression
+ /// status.
+ fn archive_path(&self, starttime: i64, compressed: bool) -> PathBuf {
+ let suffix = if compressed {
+ ZSTD_EXTENSION_WITH_DOT
+ } else {
+ ""
+ };
+
+ self.base_path
+ .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}"))
+ }
}
/// Comparison function for sorting tasks.
--
2.47.3
next prev parent reply other threads:[~2026-07-02 9:23 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-07-02 9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02 9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02 9:29 ` Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02 9:22 ` Lukas Wagner [this message]
2026-07-02 9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260702092258.174740-7-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox