public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter
Date: Thu,  2 Jul 2026 11:22:53 +0200	[thread overview]
Message-ID: <20260702092258.174740-11-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>

This is nicely abstracts handling of compressed and uncompressed archive
files behind a common interface, as well as atomically replacing any
existing archive file by using the usual tmpfile + rename approach.

The 'active' file has the same file structure as regular archive files,
so we can use ArchiveFileWriter for these as well.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 251 +++++++++++++++++---------
 1 file changed, 163 insertions(+), 88 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index c7057786..969a8124 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -12,6 +12,7 @@ use std::{
 
 use anyhow::{Context, Error};
 use serde::{Deserialize, Serialize};
+use zstd::Encoder;
 
 use proxmox_sys::fs::CreateOptions;
 
@@ -665,36 +666,13 @@ impl<'a> WritableTaskCache<'a> {
     /// The tasks are first written to a temporary file, which is then used
     /// to atomically replace the original.
     fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
-        let target = self.cache.active_path();
+        let file = self.cache.active_file();
+        let mut writer = file.writer(self.cache.create_options)?;
 
-        let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?;
-        let mut fd = BufWriter::new(fd);
-
-        Self::write_tasks(&mut fd, tasks)?;
-
-        if let Err(err) = fd.flush() {
-            log::error!("could not flush 'active' file: {err:#}");
-        }
-        drop(fd);
-
-        let res = std::fs::rename(&path, target).with_context(|| {
-            format!(
-                "failed to replace {} with {}",
-                target.display(),
-                path.display(),
-            )
-        });
-
-        if let Err(err) = res {
-            if let Err(err) = std::fs::remove_file(&path) {
-                log::error!(
-                    "failed to cleanup temporary file {}: {err:#}",
-                    path.display()
-                );
-            }
-
-            return Err(err);
-        }
+        writer.write_tasks(tasks)?;
+        writer
+            .finalize_and_replace()
+            .context("could not finalize 'active' file")?;
 
         Ok(())
     }
@@ -712,17 +690,7 @@ impl<'a> WritableTaskCache<'a> {
             return Ok(());
         }
 
-        // TODO: Might be nice to also move this to ArchiveFile
-        let (temp_file, temp_file_path) =
-            proxmox_sys::fs::make_tmp_file(&file.path, self.cache.create_options)?;
-        let mut writer = if file.compressed {
-            let encoder =
-                zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?
-                    .auto_finish();
-            Box::new(BufWriter::new(encoder)) as Box<dyn Write>
-        } else {
-            Box::new(BufWriter::new(temp_file)) as Box<dyn Write>
-        };
+        let mut writer = file.writer(self.cache.create_options)?;
 
         let archive_iter = file
             .iter()?
@@ -742,43 +710,8 @@ impl<'a> WritableTaskCache<'a> {
             .peekable();
         let task_iter = tasks.into_iter().peekable();
 
-        Self::write_tasks(&mut writer, MergeTaskIterator::new(archive_iter, task_iter))?;
-
-        if let Err(err) = writer.flush() {
-            log::error!("could not flush BufWriter for {file:?}: {err:#}");
-        }
-        drop(writer);
-
-        if let Err(err) = std::fs::rename(&temp_file_path, &file.path).with_context(|| {
-            format!(
-                "failed to replace {} with {}",
-                file.path.display(),
-                temp_file_path.display()
-            )
-        }) {
-            if let Err(err) = std::fs::remove_file(&temp_file_path) {
-                log::error!(
-                    "failed to clean up temporary file {}: {err:#}",
-                    temp_file_path.display()
-                );
-            }
-
-            return Err(err);
-        }
-
-        Ok(())
-    }
-
-    /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
-    /// The individual items are encoded as JSON followed by a newline.
-    fn write_tasks(
-        writer: &mut impl Write,
-        tasks: impl Iterator<Item = TaskCacheItem>,
-    ) -> Result<(), Error> {
-        for task in tasks {
-            serde_json::to_writer(&mut *writer, &task)?;
-            writeln!(writer)?;
-        }
+        writer.write_tasks(MergeTaskIterator::new(archive_iter, task_iter))?;
+        writer.finalize_and_replace()?;
 
         Ok(())
     }
@@ -872,21 +805,12 @@ impl TaskCache {
             GetTasks::All => {
                 let mut archive_files = self.archive_files(lock)?;
                 archive_files.reverse();
-
-                archive_files.push(ArchiveFile {
-                    path: self.active_path().into(),
-                    compressed: false,
-                    starttime: 0,
-                });
+                archive_files.push(self.active_file());
 
                 TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
             }
             GetTasks::Active => {
-                let archive_files = vec![ArchiveFile {
-                    path: self.active_path().into(),
-                    compressed: false,
-                    starttime: 0,
-                }];
+                let archive_files = vec![self.active_file()];
 
                 TaskArchiveIterator::new(None, archive_files, lock)
             }
@@ -977,6 +901,15 @@ impl TaskCache {
         self.base_path
             .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}"))
     }
+
+    /// Return the [`ArchiveFile`] instance for the `active` file.
+    fn active_file(&self) -> ArchiveFile {
+        ArchiveFile {
+            path: self.active_path().into(),
+            compressed: false,
+            starttime: 0,
+        }
+    }
 }
 
 /// Comparison function for sorting tasks.
@@ -1174,6 +1107,28 @@ impl ArchiveFile {
         Ok(Some(iter))
     }
 
+    /// Create an [`ArchiveFileWriter`] for this archive file.
+    fn writer(&self, create_options: CreateOptions) -> Result<ArchiveFileWriter<'_>, Error> {
+        let (temp_file, temp_file_path) =
+            proxmox_sys::fs::make_tmp_file(&self.path, create_options)?;
+
+        if self.compressed {
+            let encoder =
+                zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
+            Ok(ArchiveFileWriter {
+                archive_file: self,
+                writer: Some(ArchiveFileWriterInner::ZstdEncoder(encoder)),
+                path: temp_file_path,
+            })
+        } else {
+            Ok(ArchiveFileWriter {
+                archive_file: self,
+                writer: Some(ArchiveFileWriterInner::Plain(BufWriter::new(temp_file))),
+                path: temp_file_path,
+            })
+        }
+    }
+
     fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
         let uncompressed_file_path = &self.path;
 
@@ -1207,6 +1162,98 @@ impl ArchiveFile {
     }
 }
 
+/// Writer for an [`ArchiveFile`].
+///
+/// Instantiate this via [`ArchiveFile::writer`]. When calling [`Self::finalize_and_replace`], the
+/// original archive file is replaced with the contents that were written to the writer.
+struct ArchiveFileWriter<'a> {
+    archive_file: &'a ArchiveFile,
+    // writer: Option<Box<dyn Write>>,
+    writer: Option<ArchiveFileWriterInner<'a>>,
+    path: PathBuf,
+}
+
+enum ArchiveFileWriterInner<'a> {
+    ZstdEncoder(Encoder<'a, File>),
+    Plain(BufWriter<File>),
+}
+
+impl<'a> ArchiveFileWriter<'a> {
+    /// Write the contents of an iterator of [`TaskCacheItem`] to this archive file.
+    /// The individual items are encoded as JSON followed by a newline.
+    fn write_tasks(&mut self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
+        if let Some(writer) = self.writer.as_mut() {
+            let writer = match writer {
+                ArchiveFileWriterInner::ZstdEncoder(encoder) => encoder as &mut dyn Write,
+                ArchiveFileWriterInner::Plain(buf_writer) => buf_writer as &mut dyn Write,
+            };
+
+            for task in tasks {
+                serde_json::to_writer(&mut *writer, &task)?;
+                writeln!(writer)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Call [`ArchiveFileWriter::finalize_and_replace`] or just drop `self`, if you don't need to
+    /// handle the error.
+    fn finalize_impl(&mut self) -> Result<(), Error> {
+        if let Some(writer) = self.writer.take() {
+            let file = match writer {
+                ArchiveFileWriterInner::ZstdEncoder(mut encoder) => {
+                    encoder.flush()?;
+                    encoder.finish()?
+                }
+                ArchiveFileWriterInner::Plain(mut buf_writer) => {
+                    buf_writer.flush()?;
+                    buf_writer.into_inner()?
+                }
+            };
+
+            file.sync_all()
+                .with_context(|| format!("could not fsync {}", self.path.display()))?;
+
+            if let Err(err) =
+                std::fs::rename(&self.path, &self.archive_file.path).with_context(|| {
+                    format!(
+                        "failed to replace {} with {}",
+                        self.path.display(),
+                        self.archive_file.path.display()
+                    )
+                })
+            {
+                if let Err(err) = std::fs::remove_file(&self.path) {
+                    log::error!(
+                        "failed to clean up temporary file {path}: {err:#}",
+                        path = self.path.display()
+                    );
+                }
+
+                return Err(err);
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Finalize and drop the internal writer and replace the original
+    /// archive file with the new contents. This method consumes `self`.
+    ///
+    /// [`ArchiveFileWriter::drop`] also finalizes the writer properly, but does not
+    /// allow to catch errors.
+    fn finalize_and_replace(mut self) -> Result<(), Error> {
+        self.finalize_impl()
+    }
+}
+
+impl<'a> Drop for ArchiveFileWriter<'a> {
+    fn drop(&mut self) {
+        let _ = self.finalize_impl();
+    }
+}
+
 /// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
 /// from both iterators sorted.
 /// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1715,4 +1762,32 @@ mod tests {
 
         assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0);
     }
+
+    #[test]
+    fn temporary_file_when_writer_lives_is_not_enumerated() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+        cache.uncompressed_files = 1;
+        cache.max_files = 2;
+
+        let cache = cache.write().unwrap();
+        cache.init(1000).unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+
+        assert_eq!(files.len(), 2);
+
+        let _writer = files
+            .get(0)
+            .unwrap()
+            .writer(cache.cache.create_options)
+            .unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        assert_eq!(
+            files.len(),
+            2,
+            "the tmp file created by the writer should not show up in the list of archive files"
+        );
+    }
 }
-- 
2.47.3





  parent reply	other threads:[~2026-07-02  9:23 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` Lukas Wagner [this message]
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260702092258.174740-11-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal