public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files
Date: Thu,  2 Jul 2026 11:22:56 +0200	[thread overview]
Message-ID: <20260702092258.174740-14-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>

Same as when we apply the journal, we use the opportunity to check for
any corruption and then reset the cut-off timestamps if any errors
occurred when reading from the archive file.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 132 ++++++++++++++++++++------
 1 file changed, 101 insertions(+), 31 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index c6a6c777..dbe71dc9 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -302,16 +302,26 @@ impl<'a> WritableTaskCache<'a> {
                 .with_context(|| format!("failed to remove {}", to_remove.path.display()))?;
         }
 
+        let mut corrupted = Vec::new();
+
         for file in archive_files
             .iter_mut()
             .skip(self.cache.uncompressed_files as usize)
         {
-            if !file.compressed {
-                file.compress(self.cache.create_options)
-                    .with_context(|| format!("failed to compress {}", file.path.display()))?;
+            if !file.compressed
+                && self
+                    .compress_archive_file(file)
+                    .with_context(|| format!("failed to compress {}", file.path.display()))?
+                    == ArchiveFileState::Corrupted
+            {
+                corrupted.push(CorruptedArchiveFile(file.clone()));
             }
         }
 
+        if !corrupted.is_empty() {
+            self.request_repair(&corrupted)?;
+        }
+
         Ok(did_rotate)
     }
 
@@ -771,6 +781,53 @@ impl<'a> WritableTaskCache<'a> {
 
         Ok(file_state)
     }
+
+    /// Compress an existing [`ArchiveFile`].
+    ///
+    /// This will return `Ok(ArchiveFileState::Corrupted)` if any JSON record could not be
+    /// deserialized. This can be used by the caller to handle corruption properly, e.g.
+    /// by triggering a rebuild of the affected archive file at the next opportunity.
+    fn compress_archive_file(&self, file: &ArchiveFile) -> Result<ArchiveFileState, Error> {
+        let mut new_file = file.clone();
+        new_file.set_compressed(true);
+
+        let mut file_state = ArchiveFileState::Valid;
+
+        let archive_iter = file
+            .iter()?
+            .with_context(|| {
+                format!(
+                    "task archive file '{}' disappeared while merging tasks",
+                    file.path.display()
+                )
+            })?
+            .flat_map(|item| match item {
+                Ok(item) => Some(item),
+                Err(err) => {
+                    file_state = ArchiveFileState::Corrupted;
+                    log::error!(
+                        "could not read task cache item while compressing '{path}': {err:#}",
+                        path = file.path.display()
+                    );
+                    None
+                }
+            });
+
+        let mut writer = new_file.writer(self.cache.create_options)?;
+
+        writer.write_tasks(archive_iter)?;
+
+        if let Err(err) = writer.finalize_and_replace() {
+            log::error!("could not replace {file:?}: {err:#}");
+
+            // don't remove old, uncompressed file if we were not able to put the new file in place
+            return Err(err);
+        }
+
+        std::fs::remove_file(&file.path).context("failed to remove uncompressed archive file")?;
+
+        Ok(file_state)
+    }
 }
 
 impl TaskCache {
@@ -1185,36 +1242,20 @@ impl ArchiveFile {
         }
     }
 
-    fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
-        let uncompressed_file_path = &self.path;
+    /// Set the `compressed` flag and update the path.
+    ///
+    /// This will *not* compress the existing file contents.
+    fn set_compressed(&mut self, compressed: bool) {
+        let suffix = if compressed {
+            ZSTD_EXTENSION_WITH_DOT
+        } else {
+            ""
+        };
 
-        let (temp_file, temp_file_path) =
-            proxmox_sys::fs::make_tmp_file(uncompressed_file_path, options)
-                .context("failed to create temporary file")?;
+        self.compressed = compressed;
 
-        let uncompressed_file =
-            File::open(uncompressed_file_path).context("failed to open uncompressed file")?;
-
-        zstd::stream::copy_encode(
-            uncompressed_file,
-            temp_file,
-            zstd::DEFAULT_COMPRESSION_LEVEL,
-        )
-        .context("zstd::stream::copy_encode failed")?;
-
-        let mut new_path_for_compressed = uncompressed_file_path.clone();
-        new_path_for_compressed
-            .set_extension(format!("{}{ZSTD_EXTENSION_WITH_DOT}", self.starttime));
-
-        std::fs::rename(&temp_file_path, &new_path_for_compressed)
-            .context("failed to move compressed task archive file")?;
-        std::fs::remove_file(uncompressed_file_path)
-            .context("failed to remove uncompressed archive file")?;
-
-        self.path = new_path_for_compressed;
-        self.compressed = true;
-
-        Ok(())
+        self.path
+            .set_extension(format!("{}{suffix}", self.starttime));
     }
 }
 
@@ -1921,4 +1962,33 @@ mod tests {
             assert_eq!(get_cutoff(&cache), expected_cutoff, "{}", description);
         }
     }
+
+    #[test]
+    fn reset_cutoff_after_compressing_corrupted() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+        cache.uncompressed_files = 1;
+        cache.max_files = 2;
+
+        let cache = cache.write().unwrap();
+        cache.init(1000).unwrap();
+
+        add_tasks(&cache, vec![task(1010, Some(1011)), task(1020, Some(1021))]).unwrap();
+        cache.apply_journal().unwrap();
+
+        assert_eq!(get_cutoff(&cache), 1020);
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        let first = files.get(0).unwrap();
+
+        std::fs::write(&first.path, "some invalid\nthis is no valid json").unwrap();
+
+        cache.rotate(1100).unwrap();
+
+        assert_eq!(
+            get_cutoff(&cache),
+            1000,
+            "cutoff timestamp should be reset to the lower bound of the corrupted archive file"
+        );
+    }
 }
-- 
2.47.3





  parent reply	other threads:[~2026-07-02  9:23 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02  9:22 ` Lukas Wagner [this message]
2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260702092258.174740-14-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal