all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal
Date: Thu,  2 Jul 2026 11:22:55 +0200	[thread overview]
Message-ID: <20260702092258.174740-13-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>

Merging entries from the journal into the individual task archive files
involves reading all existing entries from the archive file. If we
encounter any error during that process, we now reset the cutoff
timestamps for all remotes to the lower cutoff for the archive file,
triggering a rebuild of this file the next time tasks are fetched.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 172 +++++++++++++++++++++++---
 1 file changed, 153 insertions(+), 19 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 3677de2c..c6a6c777 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -531,7 +531,7 @@ impl<'a> WritableTaskCache<'a> {
 
         let count = tasks.len();
 
-        self.merge_tasks_into_archive(tasks)?;
+        let corrupted_archive_files = self.merge_tasks_into_archive(tasks)?;
 
         // truncate the journal file
         OpenOptions::new()
@@ -540,6 +540,10 @@ impl<'a> WritableTaskCache<'a> {
             .open(journal_path)
             .context("failed to truncate journal file")?;
 
+        if !corrupted_archive_files.is_empty() {
+            self.request_repair(&corrupted_archive_files)?;
+        }
+
         log::info!(
             "committed {count} tasks in {:.3}.s to task cache archive",
             start.elapsed().as_secs_f32()
@@ -551,7 +555,10 @@ impl<'a> WritableTaskCache<'a> {
     /// Merge a list of *finished* tasks into the remote task archive files.
     /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
     /// timestamp, ascending by UPID).
-    fn merge_tasks_into_archive(&self, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
+    fn merge_tasks_into_archive(
+        &self,
+        tasks: Vec<TaskCacheItem>,
+    ) -> Result<Vec<CorruptedArchiveFile>, Error> {
         debug_assert!(
             tasks
                 .iter()
@@ -568,6 +575,8 @@ impl<'a> WritableTaskCache<'a> {
         let mut current = files.next();
         let mut next = files.peek();
 
+        let mut corrupted = Vec::new();
+
         let mut tasks_for_current_file = Vec::new();
 
         // Tasks are sorted youngest to oldest (biggest start time first)
@@ -581,13 +590,18 @@ impl<'a> WritableTaskCache<'a> {
                     // The next entry's cut-off is larger then the task's start time, that means
                     // we want to finalized the current file by merging all tasks that
                     // should be stored in it...
-                    self.merge_single_archive_file(
-                        std::mem::take(&mut tasks_for_current_file),
-                        current,
-                    )
-                    .with_context(|| {
-                        format!("failed to merge archive file {}", current.path.display())
-                    })?;
+                    if self
+                        .merge_single_archive_file(
+                            std::mem::take(&mut tasks_for_current_file),
+                            current,
+                        )
+                        .with_context(|| {
+                            format!("failed to merge archive file {}", current.path.display())
+                        })?
+                        == ArchiveFileState::Corrupted
+                    {
+                        corrupted.push(CorruptedArchiveFile(current.clone()))
+                    };
                 }
 
                 // ... and the `current` file to the next entry.
@@ -605,10 +619,52 @@ impl<'a> WritableTaskCache<'a> {
 
         // Merge tasks for the last file.
         if let Some(current) = current {
-            self.merge_single_archive_file(tasks_for_current_file, current)
+            if self
+                .merge_single_archive_file(tasks_for_current_file, current)
                 .with_context(|| {
                     format!("failed to merge archive file {}", current.path.display())
-                })?;
+                })?
+                == ArchiveFileState::Corrupted
+            {
+                corrupted.push(CorruptedArchiveFile(current.clone()));
+            }
+        }
+
+        Ok(corrupted)
+    }
+
+    /// Repair archive files that were detected as corrupted when accessed.
+    ///
+    /// This resets the task fetching cutoff timestamps to the lower bound of the
+    /// oldest corrupted file, so its contents are re-fetched from the remotes on
+    /// the next fetch cycle.
+    pub fn request_repair(
+        &self,
+        corrupted_archive_files: &[CorruptedArchiveFile],
+    ) -> Result<(), Error> {
+        let reset_cutoff = corrupted_archive_files.iter().map(|c| {
+            log::warn!(
+                "{} seems to be corrupted, attempting recovery by resetting task cutoff timestamps",
+                c.0.path.display()
+            );
+            c.0.starttime
+        }).min();
+
+        if let Some(reset_cutoff) = reset_cutoff {
+            log::warn!(
+                "resetting task cutoff timestamp in state file to {reset_cutoff} to recover task archive"
+            );
+            let mut state = self.read_state();
+
+            for remote in state.remote_state.values_mut() {
+                for node in remote.node_state.values_mut() {
+                    node.cutoff = node.cutoff.min(reset_cutoff);
+                }
+            }
+
+            self.write_state(state).context(
+                "failed to write state when resetting cutoff after archive file corruption event",
+            )?;
         }
 
         Ok(())
@@ -682,9 +738,11 @@ impl<'a> WritableTaskCache<'a> {
         &self,
         tasks: Vec<TaskCacheItem>,
         file: &ArchiveFile,
-    ) -> Result<(), Error> {
+    ) -> Result<ArchiveFileState, Error> {
+        let mut file_state = ArchiveFileState::Valid;
+
         if tasks.is_empty() {
-            return Ok(());
+            return Ok(file_state);
         }
 
         let mut writer = file.writer(self.cache.create_options)?;
@@ -700,6 +758,7 @@ impl<'a> WritableTaskCache<'a> {
             .flat_map(|item| match item {
                 Ok(item) => Some(item),
                 Err(err) => {
+                    file_state = ArchiveFileState::Corrupted;
                     log::error!("could not read task cache item while merging: {err:#}");
                     None
                 }
@@ -710,7 +769,7 @@ impl<'a> WritableTaskCache<'a> {
         writer.write_tasks(MergeTaskIterator::new(archive_iter, task_iter))?;
         writer.finalize_and_replace()?;
 
-        Ok(())
+        Ok(file_state)
     }
 }
 
@@ -1251,6 +1310,20 @@ impl<'a> Drop for ArchiveFileWriter<'a> {
     }
 }
 
+/// Marker to signal the state of an [`ArchiveFile`].
+#[derive(Debug, Clone, PartialEq, Eq)]
+enum ArchiveFileState {
+    /// The archive file is valid and contains no errors.
+    Valid,
+    /// The archive file is corrupted.
+    Corrupted,
+}
+
+/// Newtype wrapper to make the return type of [`WritableTaskCache::merge_tasks_into_archive`]
+/// more expressive.
+///
+pub struct CorruptedArchiveFile(ArchiveFile);
+
 /// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
 /// from both iterators sorted.
 /// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1478,6 +1551,14 @@ mod tests {
         Ok((tmp_dir, cache))
     }
 
+    fn truncate_archive_file(file: &ArchiveFile) {
+        OpenOptions::new()
+            .write(true)
+            .truncate(true)
+            .open(&file.path)
+            .expect("file truncated");
+    }
+
     #[test]
     fn test_add_tasks() -> Result<(), Error> {
         let (_tmp_dir, cache) = make_cache().unwrap();
@@ -1751,11 +1832,7 @@ mod tests {
         let file = files.get(0).expect("there is one archive file");
 
         // truncate existing compressed file, corrupting the zstd file header
-        let _file = OpenOptions::new()
-            .write(true)
-            .truncate(true)
-            .open(&file.path)
-            .expect("file truncated");
+        truncate_archive_file(file);
 
         assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0);
     }
@@ -1787,4 +1864,61 @@ mod tests {
             "the tmp file created by the writer should not show up in the list of archive files"
         );
     }
+
+    #[test]
+    fn reset_cutoff_timestamp_after_corruption() {
+        let testcases = [
+            (
+                "corruption of task file with lower bound of 900 determines the new cutoff",
+                vec![
+                    task(810, Some(811)),
+                    task(910, Some(911)),
+                    task(1010, Some(1011)),
+                ],
+                900,
+            ),
+            (
+                "older active task determines cutoff",
+                vec![
+                    task(810, None),
+                    task(910, Some(920)),
+                    task(1010, Some(1011)),
+                ],
+                810,
+            ),
+        ];
+
+        for (description, initial_tasks, expected_cutoff) in testcases {
+            let (_tmp_dir, mut cache) = make_cache().unwrap();
+            cache.rotate_after = 100;
+            let cache = cache.write().unwrap();
+
+            cache.init(1000).unwrap();
+
+            add_tasks(&cache, initial_tasks).unwrap();
+
+            cache.apply_journal().unwrap();
+
+            let files = cache.cache.archive_files(&cache.lock).unwrap();
+            let file = files.get(1).expect("there is one archive file");
+
+            assert_eq!(file.starttime, 900);
+            assert!(
+                file.compressed,
+                "the file we are about to corrupt should be compressed"
+            );
+
+            // truncate existing compressed file, corrupting the zstd file header
+            truncate_archive_file(file);
+
+            add_tasks(&cache, vec![task(920, Some(930))]).unwrap();
+
+            // When applying the journal and writing to the corrupted file, the cache
+            // should notice that something is broken and reset the cutoff timestamp to the
+            // start of the affected archive file, resulting in a repair in the
+            // next fetch cycle
+            cache.apply_journal().unwrap();
+            assert_eq!(get_cutoff(&cache), expected_cutoff, "{}", description);
+        }
+    }
 }
-- 
2.47.3





  parent reply	other threads:[~2026-07-02  9:23 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` Lukas Wagner [this message]
2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing " Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260702092258.174740-13-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal