public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses
Date: Thu,  2 Jul 2026 11:22:57 +0200	[thread overview]
Message-ID: <20260702092258.174740-15-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>

While iterating over tasks, we now memorize any files that produced
errors, and if there are any, we reset the cut-off timestamps to the
lower bounds of the affected archive files. This should repair the
archive during the next regular update cycle.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/mod.rs        |  15 +++
 server/src/remote_tasks/task_cache.rs | 167 +++++++++++++++++++++-----
 2 files changed, 155 insertions(+), 27 deletions(-)

diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 6a340dba..5fba867d 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -157,6 +157,21 @@ pub async fn get_tasks(
             .take(limit)
             .collect();
 
+        let corrupted = cache.take_corrupted_files();
+        drop(cache);
+
+        if !corrupted.is_empty() {
+            // If we noticed corrupted archive files while iterating, drop the read lock,
+            // acquire the write lock and reset the fetch cutoff so the affected files are
+            // repaired on the next fetch cycle.
+            if let Err(err) = get_cache()
+                .write()
+                .and_then(|cache| cache.request_repair(&corrupted))
+            {
+                log::error!("failed to request repair of corrupted task archive file: {err:#}");
+            }
+        }
+
         Ok(returned_tasks)
     })
     .await?
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index dbe71dc9..23238cca 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -1,5 +1,6 @@
 //! Task cache implementation, based on rotating files.
 use std::{
+    cell::RefCell,
     cmp::Ordering,
     collections::{HashMap, HashSet},
     fs::{File, OpenOptions},
@@ -7,6 +8,7 @@ use std::{
     iter::Peekable,
     os::unix::fs::MetadataExt,
     path::{Path, PathBuf},
+    rc::Rc,
     time::{Duration, Instant},
 };
 
@@ -185,6 +187,10 @@ pub struct WritableTaskCache<'a> {
 pub struct ReadableTaskCache<'a> {
     cache: &'a TaskCache,
     lock: TaskCacheLock,
+    /// Archive files found to be corrupted while iterating over tasks. Populated
+    /// as tasks are read (see [`InnerTaskArchiveIterator`]); query it via
+    /// [`Self::take_corrupted_files`] once iteration is done to trigger a repair.
+    corrupted: CorruptedArchiveFiles,
 }
 
 /// Lock for the cache.
@@ -226,11 +232,26 @@ impl NodeFetchSuccessMap {
 
 impl<'a> ReadableTaskCache<'a> {
     /// Iterate over cached tasks.
+    ///
+    /// Archive file corruption encountered while iterating is recorded and can be
+    /// queried via [`Self::take_corrupted_files`] once iteration is done, so the
+    /// caller can trigger a repair.
     pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
         self.cache
-            .get_tasks_impl(mode, &self.lock)
+            .get_tasks_impl(mode, &self.lock, self.corrupted.clone())
             .context("failed to create task archive iterator")
     }
+
+    /// Take the list of archive files that were found to be corrupted while
+    /// iterating over tasks returned by [`Self::get_tasks`].
+    ///
+    /// Corruption is detected lazily while iterating, so this only reflects the
+    /// files that were actually read; call it after the iterator has been consumed.
+    /// Pass the result to [`WritableTaskCache::repair_corrupted_files`] to reset the
+    /// fetch cutoff and repair the archive on the next fetch cycle.
+    pub fn take_corrupted_files(&self) -> Vec<CorruptedArchiveFile> {
+        self.corrupted.take()
+    }
 }
 
 impl<'a> WritableTaskCache<'a> {
@@ -326,9 +347,12 @@ impl<'a> WritableTaskCache<'a> {
     }
 
     /// Iterate over cached tasks.
+    ///
+    /// Corruption is not tracked here: the writable cache already repairs the
+    /// archive via its write paths (`apply_journal`, `rotate`).
     pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
         self.cache
-            .get_tasks_impl(mode, &self.lock)
+            .get_tasks_impl(mode, &self.lock, Default::default())
             .context("failed to create task archive iterator")
     }
 
@@ -868,7 +892,11 @@ impl TaskCache {
     pub fn read(&self) -> Result<ReadableTaskCache<'_>, Error> {
         let lock = self.lock_impl(false)?;
 
-        Ok(ReadableTaskCache { cache: self, lock })
+        Ok(ReadableTaskCache {
+            cache: self,
+            lock,
+            corrupted: Default::default(),
+        })
     }
 
     /// Lock the cache for writing.
@@ -911,6 +939,7 @@ impl TaskCache {
         &self,
         mode: GetTasks,
         lock: &'a TaskCacheLock,
+        corrupted: CorruptedArchiveFiles,
     ) -> Result<TaskArchiveIterator<'a>, Error> {
         let journal_file = self.journal_path();
 
@@ -920,19 +949,19 @@ impl TaskCache {
                 archive_files.reverse();
                 archive_files.push(self.active_file());
 
-                TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock, corrupted)
             }
             GetTasks::Active => {
                 let archive_files = vec![self.active_file()];
 
-                TaskArchiveIterator::new(None, archive_files, lock)
+                TaskArchiveIterator::new(None, archive_files, lock, corrupted)
             }
             #[cfg(test)]
             GetTasks::Archived => {
                 let mut files = self.archive_files(lock)?;
                 files.reverse();
 
-                TaskArchiveIterator::new(Some(journal_file.into()), files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), files, lock, corrupted)
             }
         }
     }
@@ -1060,8 +1089,9 @@ impl<'a> TaskArchiveIterator<'a> {
         journal: Option<PathBuf>,
         files: Vec<ArchiveFile>,
         lock: &'a TaskCacheLock,
+        corrupted: CorruptedArchiveFiles,
     ) -> Result<Self, Error> {
-        let inner = InnerTaskArchiveIterator::new(files)
+        let inner = InnerTaskArchiveIterator::new(files, corrupted)
             .filter_map(|res| match res {
                 Ok(task) => Some(task),
                 Err(err) => {
@@ -1115,15 +1145,18 @@ struct InnerTaskArchiveIterator {
     current: Option<ArchiveIterator>,
     /// Archive file that is currently being iterated over, if any.
     current_file: Option<ArchiveFile>,
+    /// Archive files found to be corrupted while iterating.
+    corrupted: CorruptedArchiveFiles,
 }
 
 impl InnerTaskArchiveIterator {
     /// Create a new task archive iterator.
-    pub fn new(files: Vec<ArchiveFile>) -> Self {
+    pub fn new(files: Vec<ArchiveFile>, corrupted: CorruptedArchiveFiles) -> Self {
         Self {
             files,
             current: None,
             current_file: None,
+            corrupted,
         }
     }
 }
@@ -1134,27 +1167,41 @@ impl Iterator for InnerTaskArchiveIterator {
     fn next(&mut self) -> Option<Self::Item> {
         loop {
             match &mut self.current {
-                Some(current) => {
-                    let next = current.next();
-                    if next.is_some() {
-                        return next.map(|res| {
-                            res.with_context(|| {
-                                format!(
-                                    "failed to read from {file}",
-                                    file = self
-                                        .current_file
-                                        .as_ref()
-                                        .expect("self.current and self.current_file are both Some")
-                                        .path
-                                        .display()
-                                )
-                            })
-                        });
-                    } else {
+                Some(current) => match current.next() {
+                    Some(res) => {
+                        if res.is_err() {
+                            // The archive file is corrupted. Record it so a repair can be
+                            // triggered by resetting the fetch cutoff to the file's lower
+                            // bound. The active file (starttime 0) is rewritten on every
+                            // update and needs no cutoff-based repair, so skip it.
+                            if let Some(file) = self
+                                .current_file
+                                .as_ref()
+                                .filter(|file| file.starttime != 0)
+                            {
+                                self.corrupted
+                                    .borrow_mut()
+                                    .push(CorruptedArchiveFile(file.clone()));
+                            }
+                        }
+
+                        return Some(res.with_context(|| {
+                            format!(
+                                "failed to read from {file}",
+                                file = self
+                                    .current_file
+                                    .as_ref()
+                                    .expect("self.current and self.current_file are both Some")
+                                    .path
+                                    .display()
+                            )
+                        }));
+                    }
+                    None => {
                         self.current = None;
                         self.current_file = None;
                     }
-                }
+                },
                 None => 'inner: loop {
                     // Returns `None` if no more files are available, stopping iteration.
                     let next_file = self.files.pop()?;
@@ -1362,9 +1409,14 @@ enum ArchiveFileState {
 
 /// Newtype wrapper to make the return type of [`WritableTaskCache::merge_tasks_into_archive`]
 /// more expressive.
-///
 pub struct CorruptedArchiveFile(ArchiveFile);
 
+/// Shared collection of archive files found to be corrupted while iterating.
+///
+/// This is populated during read access (see [`InnerTaskArchiveIterator`]) and shared with the
+/// [`ReadableTaskCache`], which triggers a repair for the collected files when it is dropped.
+type CorruptedArchiveFiles = Rc<RefCell<Vec<CorruptedArchiveFile>>>;
+
 /// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
 /// from both iterators sorted.
 /// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1963,6 +2015,67 @@ mod tests {
         }
     }
 
+    #[test]
+    fn reset_cutoff_after_corruption_during_read() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+
+        // Populate the archive and corrupt one of the compressed files. Scope the
+        // writable cache so its exclusive lock is released before we read.
+        {
+            let writable = cache.write().unwrap();
+            writable.init(1000).unwrap();
+
+            add_tasks(
+                &writable,
+                vec![
+                    task(810, Some(811)),
+                    task(910, Some(911)),
+                    task(1010, Some(1011)),
+                ],
+            )
+            .unwrap();
+            writable.apply_journal().unwrap();
+
+            assert_eq!(get_cutoff(&writable), 1010);
+
+            let files = writable.cache.archive_files(&writable.lock).unwrap();
+            let file = files.get(1).expect("there is a second archive file");
+            assert_eq!(file.starttime, 900);
+
+            // truncate existing compressed file, corrupting the zstd file header
+            truncate_archive_file(file);
+        }
+
+        // A read access should notice the corruption while iterating. The caller can
+        // then query the corrupted files and, after releasing the read lock, reset the
+        // cutoff timestamp to the lower bound of the corrupted file so the next fetch
+        // cycle repairs it.
+        let corrupted = {
+            let readable = cache.read().unwrap();
+            let count = readable.get_tasks(GetTasks::All).unwrap().count();
+
+            // The task in the corrupted file (starttime 910) is skipped.
+            assert_eq!(count, 2);
+
+            readable.take_corrupted_files()
+        };
+
+        assert_eq!(corrupted.len(), 1);
+
+        {
+            let writable = cache.write().unwrap();
+
+            writable.request_repair(&corrupted).unwrap();
+
+            assert_eq!(
+                get_cutoff(&writable),
+                900,
+                "cutoff should be reset to the corrupted file's lower bound after a read"
+            );
+        }
+    }
+
     #[test]
     fn reset_cutoff_after_compressing_corrupted() {
         let (_tmp_dir, mut cache) = make_cache().unwrap();
-- 
2.47.3





  parent reply	other threads:[~2026-07-02  9:23 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02  9:22 ` Lukas Wagner [this message]
2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260702092258.174740-15-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal