From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from gate001.proxmox.com (gate001.proxmox.com [IPv6:2a0f:8001:1:32::40]) by lore.proxmox.com (Postfix) with ESMTPS id 3FBAB1FF135 for ; Thu, 02 Jul 2026 11:23:36 +0200 (CEST) Received: from gate001.proxmox.com (localhost.localdomain [127.0.0.1]) by gate001.proxmox.com (Proxmox) with ESMTP id 9A16321480; Thu, 02 Jul 2026 11:23:30 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Date: Thu, 2 Jul 2026 11:22:57 +0200 Message-ID: <20260702092258.174740-15-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com> References: <20260702092258.174740-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1782984190998 X-SPAM-LEVEL: Spam detection results: 0 DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment (newer systems) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: ZHGWKZ43J6ZHTPVM3G6DG3VHUHGPQV4W X-Message-ID-Hash: ZHGWKZ43J6ZHTPVM3G6DG3VHUHGPQV4W X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: While iterating over tasks, we now memorize any files that produced errors, and if there are any, we reset the cut-off timestamps to the lower bounds of the affected archive files. This should repair the archive during the next regular update cycle. Signed-off-by: Lukas Wagner --- server/src/remote_tasks/mod.rs | 15 +++ server/src/remote_tasks/task_cache.rs | 167 +++++++++++++++++++++----- 2 files changed, 155 insertions(+), 27 deletions(-) diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs index 6a340dba..5fba867d 100644 --- a/server/src/remote_tasks/mod.rs +++ b/server/src/remote_tasks/mod.rs @@ -157,6 +157,21 @@ pub async fn get_tasks( .take(limit) .collect(); + let corrupted = cache.take_corrupted_files(); + drop(cache); + + if !corrupted.is_empty() { + // If we noticed corrupted archive files while iterating, drop the read lock, + // acquire the write lock and reset the fetch cutoff so the affected files are + // repaired on the next fetch cycle. + if let Err(err) = get_cache() + .write() + .and_then(|cache| cache.request_repair(&corrupted)) + { + log::error!("failed to request repair of corrupted task archive file: {err:#}"); + } + } + Ok(returned_tasks) }) .await? diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs index dbe71dc9..23238cca 100644 --- a/server/src/remote_tasks/task_cache.rs +++ b/server/src/remote_tasks/task_cache.rs @@ -1,5 +1,6 @@ //! Task cache implementation, based on rotating files. use std::{ + cell::RefCell, cmp::Ordering, collections::{HashMap, HashSet}, fs::{File, OpenOptions}, @@ -7,6 +8,7 @@ use std::{ iter::Peekable, os::unix::fs::MetadataExt, path::{Path, PathBuf}, + rc::Rc, time::{Duration, Instant}, }; @@ -185,6 +187,10 @@ pub struct WritableTaskCache<'a> { pub struct ReadableTaskCache<'a> { cache: &'a TaskCache, lock: TaskCacheLock, + /// Archive files found to be corrupted while iterating over tasks. Populated + /// as tasks are read (see [`InnerTaskArchiveIterator`]); query it via + /// [`Self::take_corrupted_files`] once iteration is done to trigger a repair. + corrupted: CorruptedArchiveFiles, } /// Lock for the cache. @@ -226,11 +232,26 @@ impl NodeFetchSuccessMap { impl<'a> ReadableTaskCache<'a> { /// Iterate over cached tasks. + /// + /// Archive file corruption encountered while iterating is recorded and can be + /// queried via [`Self::take_corrupted_files`] once iteration is done, so the + /// caller can trigger a repair. pub fn get_tasks(&self, mode: GetTasks) -> Result, Error> { self.cache - .get_tasks_impl(mode, &self.lock) + .get_tasks_impl(mode, &self.lock, self.corrupted.clone()) .context("failed to create task archive iterator") } + + /// Take the list of archive files that were found to be corrupted while + /// iterating over tasks returned by [`Self::get_tasks`]. + /// + /// Corruption is detected lazily while iterating, so this only reflects the + /// files that were actually read; call it after the iterator has been consumed. + /// Pass the result to [`WritableTaskCache::repair_corrupted_files`] to reset the + /// fetch cutoff and repair the archive on the next fetch cycle. + pub fn take_corrupted_files(&self) -> Vec { + self.corrupted.take() + } } impl<'a> WritableTaskCache<'a> { @@ -326,9 +347,12 @@ impl<'a> WritableTaskCache<'a> { } /// Iterate over cached tasks. + /// + /// Corruption is not tracked here: the writable cache already repairs the + /// archive via its write paths (`apply_journal`, `rotate`). pub fn get_tasks(&self, mode: GetTasks) -> Result, Error> { self.cache - .get_tasks_impl(mode, &self.lock) + .get_tasks_impl(mode, &self.lock, Default::default()) .context("failed to create task archive iterator") } @@ -868,7 +892,11 @@ impl TaskCache { pub fn read(&self) -> Result, Error> { let lock = self.lock_impl(false)?; - Ok(ReadableTaskCache { cache: self, lock }) + Ok(ReadableTaskCache { + cache: self, + lock, + corrupted: Default::default(), + }) } /// Lock the cache for writing. @@ -911,6 +939,7 @@ impl TaskCache { &self, mode: GetTasks, lock: &'a TaskCacheLock, + corrupted: CorruptedArchiveFiles, ) -> Result, Error> { let journal_file = self.journal_path(); @@ -920,19 +949,19 @@ impl TaskCache { archive_files.reverse(); archive_files.push(self.active_file()); - TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock) + TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock, corrupted) } GetTasks::Active => { let archive_files = vec![self.active_file()]; - TaskArchiveIterator::new(None, archive_files, lock) + TaskArchiveIterator::new(None, archive_files, lock, corrupted) } #[cfg(test)] GetTasks::Archived => { let mut files = self.archive_files(lock)?; files.reverse(); - TaskArchiveIterator::new(Some(journal_file.into()), files, lock) + TaskArchiveIterator::new(Some(journal_file.into()), files, lock, corrupted) } } } @@ -1060,8 +1089,9 @@ impl<'a> TaskArchiveIterator<'a> { journal: Option, files: Vec, lock: &'a TaskCacheLock, + corrupted: CorruptedArchiveFiles, ) -> Result { - let inner = InnerTaskArchiveIterator::new(files) + let inner = InnerTaskArchiveIterator::new(files, corrupted) .filter_map(|res| match res { Ok(task) => Some(task), Err(err) => { @@ -1115,15 +1145,18 @@ struct InnerTaskArchiveIterator { current: Option, /// Archive file that is currently being iterated over, if any. current_file: Option, + /// Archive files found to be corrupted while iterating. + corrupted: CorruptedArchiveFiles, } impl InnerTaskArchiveIterator { /// Create a new task archive iterator. - pub fn new(files: Vec) -> Self { + pub fn new(files: Vec, corrupted: CorruptedArchiveFiles) -> Self { Self { files, current: None, current_file: None, + corrupted, } } } @@ -1134,27 +1167,41 @@ impl Iterator for InnerTaskArchiveIterator { fn next(&mut self) -> Option { loop { match &mut self.current { - Some(current) => { - let next = current.next(); - if next.is_some() { - return next.map(|res| { - res.with_context(|| { - format!( - "failed to read from {file}", - file = self - .current_file - .as_ref() - .expect("self.current and self.current_file are both Some") - .path - .display() - ) - }) - }); - } else { + Some(current) => match current.next() { + Some(res) => { + if res.is_err() { + // The archive file is corrupted. Record it so a repair can be + // triggered by resetting the fetch cutoff to the file's lower + // bound. The active file (starttime 0) is rewritten on every + // update and needs no cutoff-based repair, so skip it. + if let Some(file) = self + .current_file + .as_ref() + .filter(|file| file.starttime != 0) + { + self.corrupted + .borrow_mut() + .push(CorruptedArchiveFile(file.clone())); + } + } + + return Some(res.with_context(|| { + format!( + "failed to read from {file}", + file = self + .current_file + .as_ref() + .expect("self.current and self.current_file are both Some") + .path + .display() + ) + })); + } + None => { self.current = None; self.current_file = None; } - } + }, None => 'inner: loop { // Returns `None` if no more files are available, stopping iteration. let next_file = self.files.pop()?; @@ -1362,9 +1409,14 @@ enum ArchiveFileState { /// Newtype wrapper to make the return type of [`WritableTaskCache::merge_tasks_into_archive`] /// more expressive. -/// pub struct CorruptedArchiveFile(ArchiveFile); +/// Shared collection of archive files found to be corrupted while iterating. +/// +/// This is populated during read access (see [`InnerTaskArchiveIterator`]) and shared with the +/// [`ReadableTaskCache`], which triggers a repair for the collected files when it is dropped. +type CorruptedArchiveFiles = Rc>>; + /// Iterator that merges two _sorted_ `Iterator`, returning the items /// from both iterators sorted. /// The two iterators are expected to be sorted descendingly based on the task's starttime and @@ -1963,6 +2015,67 @@ mod tests { } } + #[test] + fn reset_cutoff_after_corruption_during_read() { + let (_tmp_dir, mut cache) = make_cache().unwrap(); + cache.rotate_after = 100; + + // Populate the archive and corrupt one of the compressed files. Scope the + // writable cache so its exclusive lock is released before we read. + { + let writable = cache.write().unwrap(); + writable.init(1000).unwrap(); + + add_tasks( + &writable, + vec![ + task(810, Some(811)), + task(910, Some(911)), + task(1010, Some(1011)), + ], + ) + .unwrap(); + writable.apply_journal().unwrap(); + + assert_eq!(get_cutoff(&writable), 1010); + + let files = writable.cache.archive_files(&writable.lock).unwrap(); + let file = files.get(1).expect("there is a second archive file"); + assert_eq!(file.starttime, 900); + + // truncate existing compressed file, corrupting the zstd file header + truncate_archive_file(file); + } + + // A read access should notice the corruption while iterating. The caller can + // then query the corrupted files and, after releasing the read lock, reset the + // cutoff timestamp to the lower bound of the corrupted file so the next fetch + // cycle repairs it. + let corrupted = { + let readable = cache.read().unwrap(); + let count = readable.get_tasks(GetTasks::All).unwrap().count(); + + // The task in the corrupted file (starttime 910) is skipped. + assert_eq!(count, 2); + + readable.take_corrupted_files() + }; + + assert_eq!(corrupted.len(), 1); + + { + let writable = cache.write().unwrap(); + + writable.request_repair(&corrupted).unwrap(); + + assert_eq!( + get_cutoff(&writable), + 900, + "cutoff should be reset to the corrupted file's lower bound after a read" + ); + } + } + #[test] fn reset_cutoff_after_compressing_corrupted() { let (_tmp_dir, mut cache) = make_cache().unwrap(); -- 2.47.3