From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses
Date: Thu, 2 Jul 2026 11:22:57 +0200 [thread overview]
Message-ID: <20260702092258.174740-15-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>
While iterating over tasks, we now memorize any files that produced
errors, and if there are any, we reset the cut-off timestamps to the
lower bounds of the affected archive files. This should repair the
archive during the next regular update cycle.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/remote_tasks/mod.rs | 15 +++
server/src/remote_tasks/task_cache.rs | 167 +++++++++++++++++++++-----
2 files changed, 155 insertions(+), 27 deletions(-)
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 6a340dba..5fba867d 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -157,6 +157,21 @@ pub async fn get_tasks(
.take(limit)
.collect();
+ let corrupted = cache.take_corrupted_files();
+ drop(cache);
+
+ if !corrupted.is_empty() {
+ // If we noticed corrupted archive files while iterating, drop the read lock,
+ // acquire the write lock and reset the fetch cutoff so the affected files are
+ // repaired on the next fetch cycle.
+ if let Err(err) = get_cache()
+ .write()
+ .and_then(|cache| cache.request_repair(&corrupted))
+ {
+ log::error!("failed to request repair of corrupted task archive file: {err:#}");
+ }
+ }
+
Ok(returned_tasks)
})
.await?
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index dbe71dc9..23238cca 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -1,5 +1,6 @@
//! Task cache implementation, based on rotating files.
use std::{
+ cell::RefCell,
cmp::Ordering,
collections::{HashMap, HashSet},
fs::{File, OpenOptions},
@@ -7,6 +8,7 @@ use std::{
iter::Peekable,
os::unix::fs::MetadataExt,
path::{Path, PathBuf},
+ rc::Rc,
time::{Duration, Instant},
};
@@ -185,6 +187,10 @@ pub struct WritableTaskCache<'a> {
pub struct ReadableTaskCache<'a> {
cache: &'a TaskCache,
lock: TaskCacheLock,
+ /// Archive files found to be corrupted while iterating over tasks. Populated
+ /// as tasks are read (see [`InnerTaskArchiveIterator`]); query it via
+ /// [`Self::take_corrupted_files`] once iteration is done to trigger a repair.
+ corrupted: CorruptedArchiveFiles,
}
/// Lock for the cache.
@@ -226,11 +232,26 @@ impl NodeFetchSuccessMap {
impl<'a> ReadableTaskCache<'a> {
/// Iterate over cached tasks.
+ ///
+ /// Archive file corruption encountered while iterating is recorded and can be
+ /// queried via [`Self::take_corrupted_files`] once iteration is done, so the
+ /// caller can trigger a repair.
pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
self.cache
- .get_tasks_impl(mode, &self.lock)
+ .get_tasks_impl(mode, &self.lock, self.corrupted.clone())
.context("failed to create task archive iterator")
}
+
+ /// Take the list of archive files that were found to be corrupted while
+ /// iterating over tasks returned by [`Self::get_tasks`].
+ ///
+ /// Corruption is detected lazily while iterating, so this only reflects the
+ /// files that were actually read; call it after the iterator has been consumed.
+ /// Pass the result to [`WritableTaskCache::repair_corrupted_files`] to reset the
+ /// fetch cutoff and repair the archive on the next fetch cycle.
+ pub fn take_corrupted_files(&self) -> Vec<CorruptedArchiveFile> {
+ self.corrupted.take()
+ }
}
impl<'a> WritableTaskCache<'a> {
@@ -326,9 +347,12 @@ impl<'a> WritableTaskCache<'a> {
}
/// Iterate over cached tasks.
+ ///
+ /// Corruption is not tracked here: the writable cache already repairs the
+ /// archive via its write paths (`apply_journal`, `rotate`).
pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
self.cache
- .get_tasks_impl(mode, &self.lock)
+ .get_tasks_impl(mode, &self.lock, Default::default())
.context("failed to create task archive iterator")
}
@@ -868,7 +892,11 @@ impl TaskCache {
pub fn read(&self) -> Result<ReadableTaskCache<'_>, Error> {
let lock = self.lock_impl(false)?;
- Ok(ReadableTaskCache { cache: self, lock })
+ Ok(ReadableTaskCache {
+ cache: self,
+ lock,
+ corrupted: Default::default(),
+ })
}
/// Lock the cache for writing.
@@ -911,6 +939,7 @@ impl TaskCache {
&self,
mode: GetTasks,
lock: &'a TaskCacheLock,
+ corrupted: CorruptedArchiveFiles,
) -> Result<TaskArchiveIterator<'a>, Error> {
let journal_file = self.journal_path();
@@ -920,19 +949,19 @@ impl TaskCache {
archive_files.reverse();
archive_files.push(self.active_file());
- TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
+ TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock, corrupted)
}
GetTasks::Active => {
let archive_files = vec![self.active_file()];
- TaskArchiveIterator::new(None, archive_files, lock)
+ TaskArchiveIterator::new(None, archive_files, lock, corrupted)
}
#[cfg(test)]
GetTasks::Archived => {
let mut files = self.archive_files(lock)?;
files.reverse();
- TaskArchiveIterator::new(Some(journal_file.into()), files, lock)
+ TaskArchiveIterator::new(Some(journal_file.into()), files, lock, corrupted)
}
}
}
@@ -1060,8 +1089,9 @@ impl<'a> TaskArchiveIterator<'a> {
journal: Option<PathBuf>,
files: Vec<ArchiveFile>,
lock: &'a TaskCacheLock,
+ corrupted: CorruptedArchiveFiles,
) -> Result<Self, Error> {
- let inner = InnerTaskArchiveIterator::new(files)
+ let inner = InnerTaskArchiveIterator::new(files, corrupted)
.filter_map(|res| match res {
Ok(task) => Some(task),
Err(err) => {
@@ -1115,15 +1145,18 @@ struct InnerTaskArchiveIterator {
current: Option<ArchiveIterator>,
/// Archive file that is currently being iterated over, if any.
current_file: Option<ArchiveFile>,
+ /// Archive files found to be corrupted while iterating.
+ corrupted: CorruptedArchiveFiles,
}
impl InnerTaskArchiveIterator {
/// Create a new task archive iterator.
- pub fn new(files: Vec<ArchiveFile>) -> Self {
+ pub fn new(files: Vec<ArchiveFile>, corrupted: CorruptedArchiveFiles) -> Self {
Self {
files,
current: None,
current_file: None,
+ corrupted,
}
}
}
@@ -1134,27 +1167,41 @@ impl Iterator for InnerTaskArchiveIterator {
fn next(&mut self) -> Option<Self::Item> {
loop {
match &mut self.current {
- Some(current) => {
- let next = current.next();
- if next.is_some() {
- return next.map(|res| {
- res.with_context(|| {
- format!(
- "failed to read from {file}",
- file = self
- .current_file
- .as_ref()
- .expect("self.current and self.current_file are both Some")
- .path
- .display()
- )
- })
- });
- } else {
+ Some(current) => match current.next() {
+ Some(res) => {
+ if res.is_err() {
+ // The archive file is corrupted. Record it so a repair can be
+ // triggered by resetting the fetch cutoff to the file's lower
+ // bound. The active file (starttime 0) is rewritten on every
+ // update and needs no cutoff-based repair, so skip it.
+ if let Some(file) = self
+ .current_file
+ .as_ref()
+ .filter(|file| file.starttime != 0)
+ {
+ self.corrupted
+ .borrow_mut()
+ .push(CorruptedArchiveFile(file.clone()));
+ }
+ }
+
+ return Some(res.with_context(|| {
+ format!(
+ "failed to read from {file}",
+ file = self
+ .current_file
+ .as_ref()
+ .expect("self.current and self.current_file are both Some")
+ .path
+ .display()
+ )
+ }));
+ }
+ None => {
self.current = None;
self.current_file = None;
}
- }
+ },
None => 'inner: loop {
// Returns `None` if no more files are available, stopping iteration.
let next_file = self.files.pop()?;
@@ -1362,9 +1409,14 @@ enum ArchiveFileState {
/// Newtype wrapper to make the return type of [`WritableTaskCache::merge_tasks_into_archive`]
/// more expressive.
-///
pub struct CorruptedArchiveFile(ArchiveFile);
+/// Shared collection of archive files found to be corrupted while iterating.
+///
+/// This is populated during read access (see [`InnerTaskArchiveIterator`]) and shared with the
+/// [`ReadableTaskCache`], which triggers a repair for the collected files when it is dropped.
+type CorruptedArchiveFiles = Rc<RefCell<Vec<CorruptedArchiveFile>>>;
+
/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
/// from both iterators sorted.
/// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1963,6 +2015,67 @@ mod tests {
}
}
+ #[test]
+ fn reset_cutoff_after_corruption_during_read() {
+ let (_tmp_dir, mut cache) = make_cache().unwrap();
+ cache.rotate_after = 100;
+
+ // Populate the archive and corrupt one of the compressed files. Scope the
+ // writable cache so its exclusive lock is released before we read.
+ {
+ let writable = cache.write().unwrap();
+ writable.init(1000).unwrap();
+
+ add_tasks(
+ &writable,
+ vec![
+ task(810, Some(811)),
+ task(910, Some(911)),
+ task(1010, Some(1011)),
+ ],
+ )
+ .unwrap();
+ writable.apply_journal().unwrap();
+
+ assert_eq!(get_cutoff(&writable), 1010);
+
+ let files = writable.cache.archive_files(&writable.lock).unwrap();
+ let file = files.get(1).expect("there is a second archive file");
+ assert_eq!(file.starttime, 900);
+
+ // truncate existing compressed file, corrupting the zstd file header
+ truncate_archive_file(file);
+ }
+
+ // A read access should notice the corruption while iterating. The caller can
+ // then query the corrupted files and, after releasing the read lock, reset the
+ // cutoff timestamp to the lower bound of the corrupted file so the next fetch
+ // cycle repairs it.
+ let corrupted = {
+ let readable = cache.read().unwrap();
+ let count = readable.get_tasks(GetTasks::All).unwrap().count();
+
+ // The task in the corrupted file (starttime 910) is skipped.
+ assert_eq!(count, 2);
+
+ readable.take_corrupted_files()
+ };
+
+ assert_eq!(corrupted.len(), 1);
+
+ {
+ let writable = cache.write().unwrap();
+
+ writable.request_repair(&corrupted).unwrap();
+
+ assert_eq!(
+ get_cutoff(&writable),
+ 900,
+ "cutoff should be reset to the corrupted file's lower bound after a read"
+ );
+ }
+ }
+
#[test]
fn reset_cutoff_after_compressing_corrupted() {
let (_tmp_dir, mut cache) = make_cache().unwrap();
--
2.47.3
next prev parent reply other threads:[~2026-07-02 9:23 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-07-02 9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02 9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02 9:29 ` Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02 9:22 ` Lukas Wagner [this message]
2026-07-02 9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260702092258.174740-15-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.