From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter
Date: Thu, 2 Jul 2026 11:22:53 +0200 [thread overview]
Message-ID: <20260702092258.174740-11-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>
This is nicely abstracts handling of compressed and uncompressed archive
files behind a common interface, as well as atomically replacing any
existing archive file by using the usual tmpfile + rename approach.
The 'active' file has the same file structure as regular archive files,
so we can use ArchiveFileWriter for these as well.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/remote_tasks/task_cache.rs | 251 +++++++++++++++++---------
1 file changed, 163 insertions(+), 88 deletions(-)
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index c7057786..969a8124 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -12,6 +12,7 @@ use std::{
use anyhow::{Context, Error};
use serde::{Deserialize, Serialize};
+use zstd::Encoder;
use proxmox_sys::fs::CreateOptions;
@@ -665,36 +666,13 @@ impl<'a> WritableTaskCache<'a> {
/// The tasks are first written to a temporary file, which is then used
/// to atomically replace the original.
fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
- let target = self.cache.active_path();
+ let file = self.cache.active_file();
+ let mut writer = file.writer(self.cache.create_options)?;
- let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?;
- let mut fd = BufWriter::new(fd);
-
- Self::write_tasks(&mut fd, tasks)?;
-
- if let Err(err) = fd.flush() {
- log::error!("could not flush 'active' file: {err:#}");
- }
- drop(fd);
-
- let res = std::fs::rename(&path, target).with_context(|| {
- format!(
- "failed to replace {} with {}",
- target.display(),
- path.display(),
- )
- });
-
- if let Err(err) = res {
- if let Err(err) = std::fs::remove_file(&path) {
- log::error!(
- "failed to cleanup temporary file {}: {err:#}",
- path.display()
- );
- }
-
- return Err(err);
- }
+ writer.write_tasks(tasks)?;
+ writer
+ .finalize_and_replace()
+ .context("could not finalize 'active' file")?;
Ok(())
}
@@ -712,17 +690,7 @@ impl<'a> WritableTaskCache<'a> {
return Ok(());
}
- // TODO: Might be nice to also move this to ArchiveFile
- let (temp_file, temp_file_path) =
- proxmox_sys::fs::make_tmp_file(&file.path, self.cache.create_options)?;
- let mut writer = if file.compressed {
- let encoder =
- zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?
- .auto_finish();
- Box::new(BufWriter::new(encoder)) as Box<dyn Write>
- } else {
- Box::new(BufWriter::new(temp_file)) as Box<dyn Write>
- };
+ let mut writer = file.writer(self.cache.create_options)?;
let archive_iter = file
.iter()?
@@ -742,43 +710,8 @@ impl<'a> WritableTaskCache<'a> {
.peekable();
let task_iter = tasks.into_iter().peekable();
- Self::write_tasks(&mut writer, MergeTaskIterator::new(archive_iter, task_iter))?;
-
- if let Err(err) = writer.flush() {
- log::error!("could not flush BufWriter for {file:?}: {err:#}");
- }
- drop(writer);
-
- if let Err(err) = std::fs::rename(&temp_file_path, &file.path).with_context(|| {
- format!(
- "failed to replace {} with {}",
- file.path.display(),
- temp_file_path.display()
- )
- }) {
- if let Err(err) = std::fs::remove_file(&temp_file_path) {
- log::error!(
- "failed to clean up temporary file {}: {err:#}",
- temp_file_path.display()
- );
- }
-
- return Err(err);
- }
-
- Ok(())
- }
-
- /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
- /// The individual items are encoded as JSON followed by a newline.
- fn write_tasks(
- writer: &mut impl Write,
- tasks: impl Iterator<Item = TaskCacheItem>,
- ) -> Result<(), Error> {
- for task in tasks {
- serde_json::to_writer(&mut *writer, &task)?;
- writeln!(writer)?;
- }
+ writer.write_tasks(MergeTaskIterator::new(archive_iter, task_iter))?;
+ writer.finalize_and_replace()?;
Ok(())
}
@@ -872,21 +805,12 @@ impl TaskCache {
GetTasks::All => {
let mut archive_files = self.archive_files(lock)?;
archive_files.reverse();
-
- archive_files.push(ArchiveFile {
- path: self.active_path().into(),
- compressed: false,
- starttime: 0,
- });
+ archive_files.push(self.active_file());
TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
}
GetTasks::Active => {
- let archive_files = vec![ArchiveFile {
- path: self.active_path().into(),
- compressed: false,
- starttime: 0,
- }];
+ let archive_files = vec![self.active_file()];
TaskArchiveIterator::new(None, archive_files, lock)
}
@@ -977,6 +901,15 @@ impl TaskCache {
self.base_path
.join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}"))
}
+
+ /// Return the [`ArchiveFile`] instance for the `active` file.
+ fn active_file(&self) -> ArchiveFile {
+ ArchiveFile {
+ path: self.active_path().into(),
+ compressed: false,
+ starttime: 0,
+ }
+ }
}
/// Comparison function for sorting tasks.
@@ -1174,6 +1107,28 @@ impl ArchiveFile {
Ok(Some(iter))
}
+ /// Create an [`ArchiveFileWriter`] for this archive file.
+ fn writer(&self, create_options: CreateOptions) -> Result<ArchiveFileWriter<'_>, Error> {
+ let (temp_file, temp_file_path) =
+ proxmox_sys::fs::make_tmp_file(&self.path, create_options)?;
+
+ if self.compressed {
+ let encoder =
+ zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
+ Ok(ArchiveFileWriter {
+ archive_file: self,
+ writer: Some(ArchiveFileWriterInner::ZstdEncoder(encoder)),
+ path: temp_file_path,
+ })
+ } else {
+ Ok(ArchiveFileWriter {
+ archive_file: self,
+ writer: Some(ArchiveFileWriterInner::Plain(BufWriter::new(temp_file))),
+ path: temp_file_path,
+ })
+ }
+ }
+
fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
let uncompressed_file_path = &self.path;
@@ -1207,6 +1162,98 @@ impl ArchiveFile {
}
}
+/// Writer for an [`ArchiveFile`].
+///
+/// Instantiate this via [`ArchiveFile::writer`]. When calling [`Self::finalize_and_replace`], the
+/// original archive file is replaced with the contents that were written to the writer.
+struct ArchiveFileWriter<'a> {
+ archive_file: &'a ArchiveFile,
+ // writer: Option<Box<dyn Write>>,
+ writer: Option<ArchiveFileWriterInner<'a>>,
+ path: PathBuf,
+}
+
+enum ArchiveFileWriterInner<'a> {
+ ZstdEncoder(Encoder<'a, File>),
+ Plain(BufWriter<File>),
+}
+
+impl<'a> ArchiveFileWriter<'a> {
+ /// Write the contents of an iterator of [`TaskCacheItem`] to this archive file.
+ /// The individual items are encoded as JSON followed by a newline.
+ fn write_tasks(&mut self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
+ if let Some(writer) = self.writer.as_mut() {
+ let writer = match writer {
+ ArchiveFileWriterInner::ZstdEncoder(encoder) => encoder as &mut dyn Write,
+ ArchiveFileWriterInner::Plain(buf_writer) => buf_writer as &mut dyn Write,
+ };
+
+ for task in tasks {
+ serde_json::to_writer(&mut *writer, &task)?;
+ writeln!(writer)?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Call [`ArchiveFileWriter::finalize_and_replace`] or just drop `self`, if you don't need to
+ /// handle the error.
+ fn finalize_impl(&mut self) -> Result<(), Error> {
+ if let Some(writer) = self.writer.take() {
+ let file = match writer {
+ ArchiveFileWriterInner::ZstdEncoder(mut encoder) => {
+ encoder.flush()?;
+ encoder.finish()?
+ }
+ ArchiveFileWriterInner::Plain(mut buf_writer) => {
+ buf_writer.flush()?;
+ buf_writer.into_inner()?
+ }
+ };
+
+ file.sync_all()
+ .with_context(|| format!("could not fsync {}", self.path.display()))?;
+
+ if let Err(err) =
+ std::fs::rename(&self.path, &self.archive_file.path).with_context(|| {
+ format!(
+ "failed to replace {} with {}",
+ self.path.display(),
+ self.archive_file.path.display()
+ )
+ })
+ {
+ if let Err(err) = std::fs::remove_file(&self.path) {
+ log::error!(
+ "failed to clean up temporary file {path}: {err:#}",
+ path = self.path.display()
+ );
+ }
+
+ return Err(err);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Finalize and drop the internal writer and replace the original
+ /// archive file with the new contents. This method consumes `self`.
+ ///
+ /// [`ArchiveFileWriter::drop`] also finalizes the writer properly, but does not
+ /// allow to catch errors.
+ fn finalize_and_replace(mut self) -> Result<(), Error> {
+ self.finalize_impl()
+ }
+}
+
+impl<'a> Drop for ArchiveFileWriter<'a> {
+ fn drop(&mut self) {
+ let _ = self.finalize_impl();
+ }
+}
+
/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
/// from both iterators sorted.
/// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1715,4 +1762,32 @@ mod tests {
assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0);
}
+
+ #[test]
+ fn temporary_file_when_writer_lives_is_not_enumerated() {
+ let (_tmp_dir, mut cache) = make_cache().unwrap();
+ cache.rotate_after = 100;
+ cache.uncompressed_files = 1;
+ cache.max_files = 2;
+
+ let cache = cache.write().unwrap();
+ cache.init(1000).unwrap();
+
+ let files = cache.cache.archive_files(&cache.lock).unwrap();
+
+ assert_eq!(files.len(), 2);
+
+ let _writer = files
+ .get(0)
+ .unwrap()
+ .writer(cache.cache.create_options)
+ .unwrap();
+
+ let files = cache.cache.archive_files(&cache.lock).unwrap();
+ assert_eq!(
+ files.len(),
+ 2,
+ "the tmp file created by the writer should not show up in the list of archive files"
+ );
+ }
}
--
2.47.3
next prev parent reply other threads:[~2026-07-02 9:23 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-07-02 9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02 9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02 9:29 ` Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02 9:22 ` Lukas Wagner [this message]
2026-07-02 9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260702092258.174740-11-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox