From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses
Date: Thu, 2 Jul 2026 11:22:57 +0200 [thread overview]
Message-ID: <20260702092258.174740-15-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>
While iterating over tasks, we now memorize any files that produced
errors, and if there are any, we reset the cut-off timestamps to the
lower bounds of the affected archive files. This should repair the
archive during the next regular update cycle.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/remote_tasks/mod.rs | 15 +++
server/src/remote_tasks/task_cache.rs | 167 +++++++++++++++++++++-----
2 files changed, 155 insertions(+), 27 deletions(-)
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 6a340dba..5fba867d 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -157,6 +157,21 @@ pub async fn get_tasks(
.take(limit)
.collect();
+ let corrupted = cache.take_corrupted_files();
+ drop(cache);
+
+ if !corrupted.is_empty() {
+ // If we noticed corrupted archive files while iterating, drop the read lock,
+ // acquire the write lock and reset the fetch cutoff so the affected files are
+ // repaired on the next fetch cycle.
+ if let Err(err) = get_cache()
+ .write()
+ .and_then(|cache| cache.request_repair(&corrupted))
+ {
+ log::error!("failed to request repair of corrupted task archive file: {err:#}");
+ }
+ }
+
Ok(returned_tasks)
})
.await?
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index dbe71dc9..23238cca 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -1,5 +1,6 @@
//! Task cache implementation, based on rotating files.
use std::{
+ cell::RefCell,
cmp::Ordering,
collections::{HashMap, HashSet},
fs::{File, OpenOptions},
@@ -7,6 +8,7 @@ use std::{
iter::Peekable,
os::unix::fs::MetadataExt,
path::{Path, PathBuf},
+ rc::Rc,
time::{Duration, Instant},
};
@@ -185,6 +187,10 @@ pub struct WritableTaskCache<'a> {
pub struct ReadableTaskCache<'a> {
cache: &'a TaskCache,
lock: TaskCacheLock,
+ /// Archive files found to be corrupted while iterating over tasks. Populated
+ /// as tasks are read (see [`InnerTaskArchiveIterator`]); query it via
+ /// [`Self::take_corrupted_files`] once iteration is done to trigger a repair.
+ corrupted: CorruptedArchiveFiles,
}
/// Lock for the cache.
@@ -226,11 +232,26 @@ impl NodeFetchSuccessMap {
impl<'a> ReadableTaskCache<'a> {
/// Iterate over cached tasks.
+ ///
+ /// Archive file corruption encountered while iterating is recorded and can be
+ /// queried via [`Self::take_corrupted_files`] once iteration is done, so the
+ /// caller can trigger a repair.
pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
self.cache
- .get_tasks_impl(mode, &self.lock)
+ .get_tasks_impl(mode, &self.lock, self.corrupted.clone())
.context("failed to create task archive iterator")
}
+
+ /// Take the list of archive files that were found to be corrupted while
+ /// iterating over tasks returned by [`Self::get_tasks`].
+ ///
+ /// Corruption is detected lazily while iterating, so this only reflects the
+ /// files that were actually read; call it after the iterator has been consumed.
+ /// Pass the result to [`WritableTaskCache::repair_corrupted_files`] to reset the
+ /// fetch cutoff and repair the archive on the next fetch cycle.
+ pub fn take_corrupted_files(&self) -> Vec<CorruptedArchiveFile> {
+ self.corrupted.take()
+ }
}
impl<'a> WritableTaskCache<'a> {
@@ -326,9 +347,12 @@ impl<'a> WritableTaskCache<'a> {
}
/// Iterate over cached tasks.
+ ///
+ /// Corruption is not tracked here: the writable cache already repairs the
+ /// archive via its write paths (`apply_journal`, `rotate`).
pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
self.cache
- .get_tasks_impl(mode, &self.lock)
+ .get_tasks_impl(mode, &self.lock, Default::default())
.context("failed to create task archive iterator")
}
@@ -868,7 +892,11 @@ impl TaskCache {
pub fn read(&self) -> Result<ReadableTaskCache<'_>, Error> {
let lock = self.lock_impl(false)?;
- Ok(ReadableTaskCache { cache: self, lock })
+ Ok(ReadableTaskCache {
+ cache: self,
+ lock,
+ corrupted: Default::default(),
+ })
}
/// Lock the cache for writing.
@@ -911,6 +939,7 @@ impl TaskCache {
&self,
mode: GetTasks,
lock: &'a TaskCacheLock,
+ corrupted: CorruptedArchiveFiles,
) -> Result<TaskArchiveIterator<'a>, Error> {
let journal_file = self.journal_path();
@@ -920,19 +949,19 @@ impl TaskCache {
archive_files.reverse();
archive_files.push(self.active_file());
- TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
+ TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock, corrupted)
}
GetTasks::Active => {
let archive_files = vec![self.active_file()];
- TaskArchiveIterator::new(None, archive_files, lock)
+ TaskArchiveIterator::new(None, archive_files, lock, corrupted)
}
#[cfg(test)]
GetTasks::Archived => {
let mut files = self.archive_files(lock)?;
files.reverse();
- TaskArchiveIterator::new(Some(journal_file.into()), files, lock)
+ TaskArchiveIterator::new(Some(journal_file.into()), files, lock, corrupted)
}
}
}
@@ -1060,8 +1089,9 @@ impl<'a> TaskArchiveIterator<'a> {
journal: Option<PathBuf>,
files: Vec<ArchiveFile>,
lock: &'a TaskCacheLock,
+ corrupted: CorruptedArchiveFiles,
) -> Result<Self, Error> {
- let inner = InnerTaskArchiveIterator::new(files)
+ let inner = InnerTaskArchiveIterator::new(files, corrupted)
.filter_map(|res| match res {
Ok(task) => Some(task),
Err(err) => {
@@ -1115,15 +1145,18 @@ struct InnerTaskArchiveIterator {
current: Option<ArchiveIterator>,
/// Archive file that is currently being iterated over, if any.
current_file: Option<ArchiveFile>,
+ /// Archive files found to be corrupted while iterating.
+ corrupted: CorruptedArchiveFiles,
}
impl InnerTaskArchiveIterator {
/// Create a new task archive iterator.
- pub fn new(files: Vec<ArchiveFile>) -> Self {
+ pub fn new(files: Vec<ArchiveFile>, corrupted: CorruptedArchiveFiles) -> Self {
Self {
files,
current: None,
current_file: None,
+ corrupted,
}
}
}
@@ -1134,27 +1167,41 @@ impl Iterator for InnerTaskArchiveIterator {
fn next(&mut self) -> Option<Self::Item> {
loop {
match &mut self.current {
- Some(current) => {
- let next = current.next();
- if next.is_some() {
- return next.map(|res| {
- res.with_context(|| {
- format!(
- "failed to read from {file}",
- file = self
- .current_file
- .as_ref()
- .expect("self.current and self.current_file are both Some")
- .path
- .display()
- )
- })
- });
- } else {
+ Some(current) => match current.next() {
+ Some(res) => {
+ if res.is_err() {
+ // The archive file is corrupted. Record it so a repair can be
+ // triggered by resetting the fetch cutoff to the file's lower
+ // bound. The active file (starttime 0) is rewritten on every
+ // update and needs no cutoff-based repair, so skip it.
+ if let Some(file) = self
+ .current_file
+ .as_ref()
+ .filter(|file| file.starttime != 0)
+ {
+ self.corrupted
+ .borrow_mut()
+ .push(CorruptedArchiveFile(file.clone()));
+ }
+ }
+
+ return Some(res.with_context(|| {
+ format!(
+ "failed to read from {file}",
+ file = self
+ .current_file
+ .as_ref()
+ .expect("self.current and self.current_file are both Some")
+ .path
+ .display()
+ )
+ }));
+ }
+ None => {
self.current = None;
self.current_file = None;
}
- }
+ },
None => 'inner: loop {
// Returns `None` if no more files are available, stopping iteration.
let next_file = self.files.pop()?;
@@ -1362,9 +1409,14 @@ enum ArchiveFileState {
/// Newtype wrapper to make the return type of [`WritableTaskCache::merge_tasks_into_archive`]
/// more expressive.
-///
pub struct CorruptedArchiveFile(ArchiveFile);
+/// Shared collection of archive files found to be corrupted while iterating.
+///
+/// This is populated during read access (see [`InnerTaskArchiveIterator`]) and shared with the
+/// [`ReadableTaskCache`], which triggers a repair for the collected files when it is dropped.
+type CorruptedArchiveFiles = Rc<RefCell<Vec<CorruptedArchiveFile>>>;
+
/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
/// from both iterators sorted.
/// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1963,6 +2015,67 @@ mod tests {
}
}
+ #[test]
+ fn reset_cutoff_after_corruption_during_read() {
+ let (_tmp_dir, mut cache) = make_cache().unwrap();
+ cache.rotate_after = 100;
+
+ // Populate the archive and corrupt one of the compressed files. Scope the
+ // writable cache so its exclusive lock is released before we read.
+ {
+ let writable = cache.write().unwrap();
+ writable.init(1000).unwrap();
+
+ add_tasks(
+ &writable,
+ vec![
+ task(810, Some(811)),
+ task(910, Some(911)),
+ task(1010, Some(1011)),
+ ],
+ )
+ .unwrap();
+ writable.apply_journal().unwrap();
+
+ assert_eq!(get_cutoff(&writable), 1010);
+
+ let files = writable.cache.archive_files(&writable.lock).unwrap();
+ let file = files.get(1).expect("there is a second archive file");
+ assert_eq!(file.starttime, 900);
+
+ // truncate existing compressed file, corrupting the zstd file header
+ truncate_archive_file(file);
+ }
+
+ // A read access should notice the corruption while iterating. The caller can
+ // then query the corrupted files and, after releasing the read lock, reset the
+ // cutoff timestamp to the lower bound of the corrupted file so the next fetch
+ // cycle repairs it.
+ let corrupted = {
+ let readable = cache.read().unwrap();
+ let count = readable.get_tasks(GetTasks::All).unwrap().count();
+
+ // The task in the corrupted file (starttime 910) is skipped.
+ assert_eq!(count, 2);
+
+ readable.take_corrupted_files()
+ };
+
+ assert_eq!(corrupted.len(), 1);
+
+ {
+ let writable = cache.write().unwrap();
+
+ writable.request_repair(&corrupted).unwrap();
+
+ assert_eq!(
+ get_cutoff(&writable),
+ 900,
+ "cutoff should be reset to the corrupted file's lower bound after a read"
+ );
+ }
+ }
+
#[test]
fn reset_cutoff_after_compressing_corrupted() {
let (_tmp_dir, mut cache) = make_cache().unwrap();
--
2.47.3
next prev parent reply other threads:[~2026-07-02 9:23 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-07-02 9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02 9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02 9:29 ` Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02 9:22 ` Lukas Wagner [this message]
2026-07-02 9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260702092258.174740-15-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox