all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter
Date: Thu,  2 Jul 2026 11:22:53 +0200	[thread overview]
Message-ID: <20260702092258.174740-11-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>

This is nicely abstracts handling of compressed and uncompressed archive
files behind a common interface, as well as atomically replacing any
existing archive file by using the usual tmpfile + rename approach.

The 'active' file has the same file structure as regular archive files,
so we can use ArchiveFileWriter for these as well.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 251 +++++++++++++++++---------
 1 file changed, 163 insertions(+), 88 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index c7057786..969a8124 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -12,6 +12,7 @@ use std::{
 
 use anyhow::{Context, Error};
 use serde::{Deserialize, Serialize};
+use zstd::Encoder;
 
 use proxmox_sys::fs::CreateOptions;
 
@@ -665,36 +666,13 @@ impl<'a> WritableTaskCache<'a> {
     /// The tasks are first written to a temporary file, which is then used
     /// to atomically replace the original.
     fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
-        let target = self.cache.active_path();
+        let file = self.cache.active_file();
+        let mut writer = file.writer(self.cache.create_options)?;
 
-        let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?;
-        let mut fd = BufWriter::new(fd);
-
-        Self::write_tasks(&mut fd, tasks)?;
-
-        if let Err(err) = fd.flush() {
-            log::error!("could not flush 'active' file: {err:#}");
-        }
-        drop(fd);
-
-        let res = std::fs::rename(&path, target).with_context(|| {
-            format!(
-                "failed to replace {} with {}",
-                target.display(),
-                path.display(),
-            )
-        });
-
-        if let Err(err) = res {
-            if let Err(err) = std::fs::remove_file(&path) {
-                log::error!(
-                    "failed to cleanup temporary file {}: {err:#}",
-                    path.display()
-                );
-            }
-
-            return Err(err);
-        }
+        writer.write_tasks(tasks)?;
+        writer
+            .finalize_and_replace()
+            .context("could not finalize 'active' file")?;
 
         Ok(())
     }
@@ -712,17 +690,7 @@ impl<'a> WritableTaskCache<'a> {
             return Ok(());
         }
 
-        // TODO: Might be nice to also move this to ArchiveFile
-        let (temp_file, temp_file_path) =
-            proxmox_sys::fs::make_tmp_file(&file.path, self.cache.create_options)?;
-        let mut writer = if file.compressed {
-            let encoder =
-                zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?
-                    .auto_finish();
-            Box::new(BufWriter::new(encoder)) as Box<dyn Write>
-        } else {
-            Box::new(BufWriter::new(temp_file)) as Box<dyn Write>
-        };
+        let mut writer = file.writer(self.cache.create_options)?;
 
         let archive_iter = file
             .iter()?
@@ -742,43 +710,8 @@ impl<'a> WritableTaskCache<'a> {
             .peekable();
         let task_iter = tasks.into_iter().peekable();
 
-        Self::write_tasks(&mut writer, MergeTaskIterator::new(archive_iter, task_iter))?;
-
-        if let Err(err) = writer.flush() {
-            log::error!("could not flush BufWriter for {file:?}: {err:#}");
-        }
-        drop(writer);
-
-        if let Err(err) = std::fs::rename(&temp_file_path, &file.path).with_context(|| {
-            format!(
-                "failed to replace {} with {}",
-                file.path.display(),
-                temp_file_path.display()
-            )
-        }) {
-            if let Err(err) = std::fs::remove_file(&temp_file_path) {
-                log::error!(
-                    "failed to clean up temporary file {}: {err:#}",
-                    temp_file_path.display()
-                );
-            }
-
-            return Err(err);
-        }
-
-        Ok(())
-    }
-
-    /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
-    /// The individual items are encoded as JSON followed by a newline.
-    fn write_tasks(
-        writer: &mut impl Write,
-        tasks: impl Iterator<Item = TaskCacheItem>,
-    ) -> Result<(), Error> {
-        for task in tasks {
-            serde_json::to_writer(&mut *writer, &task)?;
-            writeln!(writer)?;
-        }
+        writer.write_tasks(MergeTaskIterator::new(archive_iter, task_iter))?;
+        writer.finalize_and_replace()?;
 
         Ok(())
     }
@@ -872,21 +805,12 @@ impl TaskCache {
             GetTasks::All => {
                 let mut archive_files = self.archive_files(lock)?;
                 archive_files.reverse();
-
-                archive_files.push(ArchiveFile {
-                    path: self.active_path().into(),
-                    compressed: false,
-                    starttime: 0,
-                });
+                archive_files.push(self.active_file());
 
                 TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
             }
             GetTasks::Active => {
-                let archive_files = vec![ArchiveFile {
-                    path: self.active_path().into(),
-                    compressed: false,
-                    starttime: 0,
-                }];
+                let archive_files = vec![self.active_file()];
 
                 TaskArchiveIterator::new(None, archive_files, lock)
             }
@@ -977,6 +901,15 @@ impl TaskCache {
         self.base_path
             .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}"))
     }
+
+    /// Return the [`ArchiveFile`] instance for the `active` file.
+    fn active_file(&self) -> ArchiveFile {
+        ArchiveFile {
+            path: self.active_path().into(),
+            compressed: false,
+            starttime: 0,
+        }
+    }
 }
 
 /// Comparison function for sorting tasks.
@@ -1174,6 +1107,28 @@ impl ArchiveFile {
         Ok(Some(iter))
     }
 
+    /// Create an [`ArchiveFileWriter`] for this archive file.
+    fn writer(&self, create_options: CreateOptions) -> Result<ArchiveFileWriter<'_>, Error> {
+        let (temp_file, temp_file_path) =
+            proxmox_sys::fs::make_tmp_file(&self.path, create_options)?;
+
+        if self.compressed {
+            let encoder =
+                zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
+            Ok(ArchiveFileWriter {
+                archive_file: self,
+                writer: Some(ArchiveFileWriterInner::ZstdEncoder(encoder)),
+                path: temp_file_path,
+            })
+        } else {
+            Ok(ArchiveFileWriter {
+                archive_file: self,
+                writer: Some(ArchiveFileWriterInner::Plain(BufWriter::new(temp_file))),
+                path: temp_file_path,
+            })
+        }
+    }
+
     fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
         let uncompressed_file_path = &self.path;
 
@@ -1207,6 +1162,98 @@ impl ArchiveFile {
     }
 }
 
+/// Writer for an [`ArchiveFile`].
+///
+/// Instantiate this via [`ArchiveFile::writer`]. When calling [`Self::finalize_and_replace`], the
+/// original archive file is replaced with the contents that were written to the writer.
+struct ArchiveFileWriter<'a> {
+    archive_file: &'a ArchiveFile,
+    // writer: Option<Box<dyn Write>>,
+    writer: Option<ArchiveFileWriterInner<'a>>,
+    path: PathBuf,
+}
+
+enum ArchiveFileWriterInner<'a> {
+    ZstdEncoder(Encoder<'a, File>),
+    Plain(BufWriter<File>),
+}
+
+impl<'a> ArchiveFileWriter<'a> {
+    /// Write the contents of an iterator of [`TaskCacheItem`] to this archive file.
+    /// The individual items are encoded as JSON followed by a newline.
+    fn write_tasks(&mut self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
+        if let Some(writer) = self.writer.as_mut() {
+            let writer = match writer {
+                ArchiveFileWriterInner::ZstdEncoder(encoder) => encoder as &mut dyn Write,
+                ArchiveFileWriterInner::Plain(buf_writer) => buf_writer as &mut dyn Write,
+            };
+
+            for task in tasks {
+                serde_json::to_writer(&mut *writer, &task)?;
+                writeln!(writer)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Call [`ArchiveFileWriter::finalize_and_replace`] or just drop `self`, if you don't need to
+    /// handle the error.
+    fn finalize_impl(&mut self) -> Result<(), Error> {
+        if let Some(writer) = self.writer.take() {
+            let file = match writer {
+                ArchiveFileWriterInner::ZstdEncoder(mut encoder) => {
+                    encoder.flush()?;
+                    encoder.finish()?
+                }
+                ArchiveFileWriterInner::Plain(mut buf_writer) => {
+                    buf_writer.flush()?;
+                    buf_writer.into_inner()?
+                }
+            };
+
+            file.sync_all()
+                .with_context(|| format!("could not fsync {}", self.path.display()))?;
+
+            if let Err(err) =
+                std::fs::rename(&self.path, &self.archive_file.path).with_context(|| {
+                    format!(
+                        "failed to replace {} with {}",
+                        self.path.display(),
+                        self.archive_file.path.display()
+                    )
+                })
+            {
+                if let Err(err) = std::fs::remove_file(&self.path) {
+                    log::error!(
+                        "failed to clean up temporary file {path}: {err:#}",
+                        path = self.path.display()
+                    );
+                }
+
+                return Err(err);
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Finalize and drop the internal writer and replace the original
+    /// archive file with the new contents. This method consumes `self`.
+    ///
+    /// [`ArchiveFileWriter::drop`] also finalizes the writer properly, but does not
+    /// allow to catch errors.
+    fn finalize_and_replace(mut self) -> Result<(), Error> {
+        self.finalize_impl()
+    }
+}
+
+impl<'a> Drop for ArchiveFileWriter<'a> {
+    fn drop(&mut self) {
+        let _ = self.finalize_impl();
+    }
+}
+
 /// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
 /// from both iterators sorted.
 /// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1715,4 +1762,32 @@ mod tests {
 
         assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0);
     }
+
+    #[test]
+    fn temporary_file_when_writer_lives_is_not_enumerated() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+        cache.uncompressed_files = 1;
+        cache.max_files = 2;
+
+        let cache = cache.write().unwrap();
+        cache.init(1000).unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+
+        assert_eq!(files.len(), 2);
+
+        let _writer = files
+            .get(0)
+            .unwrap()
+            .writer(cache.cache.create_options)
+            .unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        assert_eq!(
+            files.len(),
+            2,
+            "the tmp file created by the writer should not show up in the list of archive files"
+        );
+    }
 }
-- 
2.47.3





  parent reply	other threads:[~2026-07-02  9:23 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` Lukas Wagner [this message]
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260702092258.174740-11-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal