From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from gate001.proxmox.com (gate001.proxmox.com [IPv6:2a0f:8001:1:32::40]) by lore.proxmox.com (Postfix) with ESMTPS id B3F611FF135 for ; Thu, 02 Jul 2026 11:23:31 +0200 (CEST) Received: from gate001.proxmox.com (localhost.localdomain [127.0.0.1]) by gate001.proxmox.com (Proxmox) with ESMTP id A0D792146B; Thu, 02 Jul 2026 11:23:28 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Date: Thu, 2 Jul 2026 11:22:55 +0200 Message-ID: <20260702092258.174740-13-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com> References: <20260702092258.174740-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1782984190739 X-SPAM-LEVEL: Spam detection results: 0 DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment (newer systems) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 2RWTEKABKJLFB4LXAYGAG4ICVYLIR6IL X-Message-ID-Hash: 2RWTEKABKJLFB4LXAYGAG4ICVYLIR6IL X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Merging entries from the journal into the individual task archive files involves reading all existing entries from the archive file. If we encounter any error during that process, we now reset the cutoff timestamps for all remotes to the lower cutoff for the archive file, triggering a rebuild of this file the next time tasks are fetched. Signed-off-by: Lukas Wagner --- server/src/remote_tasks/task_cache.rs | 172 +++++++++++++++++++++++--- 1 file changed, 153 insertions(+), 19 deletions(-) diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs index 3677de2c..c6a6c777 100644 --- a/server/src/remote_tasks/task_cache.rs +++ b/server/src/remote_tasks/task_cache.rs @@ -531,7 +531,7 @@ impl<'a> WritableTaskCache<'a> { let count = tasks.len(); - self.merge_tasks_into_archive(tasks)?; + let corrupted_archive_files = self.merge_tasks_into_archive(tasks)?; // truncate the journal file OpenOptions::new() @@ -540,6 +540,10 @@ impl<'a> WritableTaskCache<'a> { .open(journal_path) .context("failed to truncate journal file")?; + if !corrupted_archive_files.is_empty() { + self.request_repair(&corrupted_archive_files)?; + } + log::info!( "committed {count} tasks in {:.3}.s to task cache archive", start.elapsed().as_secs_f32() @@ -551,7 +555,10 @@ impl<'a> WritableTaskCache<'a> { /// Merge a list of *finished* tasks into the remote task archive files. /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by /// timestamp, ascending by UPID). - fn merge_tasks_into_archive(&self, tasks: Vec) -> Result<(), Error> { + fn merge_tasks_into_archive( + &self, + tasks: Vec, + ) -> Result, Error> { debug_assert!( tasks .iter() @@ -568,6 +575,8 @@ impl<'a> WritableTaskCache<'a> { let mut current = files.next(); let mut next = files.peek(); + let mut corrupted = Vec::new(); + let mut tasks_for_current_file = Vec::new(); // Tasks are sorted youngest to oldest (biggest start time first) @@ -581,13 +590,18 @@ impl<'a> WritableTaskCache<'a> { // The next entry's cut-off is larger then the task's start time, that means // we want to finalized the current file by merging all tasks that // should be stored in it... - self.merge_single_archive_file( - std::mem::take(&mut tasks_for_current_file), - current, - ) - .with_context(|| { - format!("failed to merge archive file {}", current.path.display()) - })?; + if self + .merge_single_archive_file( + std::mem::take(&mut tasks_for_current_file), + current, + ) + .with_context(|| { + format!("failed to merge archive file {}", current.path.display()) + })? + == ArchiveFileState::Corrupted + { + corrupted.push(CorruptedArchiveFile(current.clone())) + }; } // ... and the `current` file to the next entry. @@ -605,10 +619,52 @@ impl<'a> WritableTaskCache<'a> { // Merge tasks for the last file. if let Some(current) = current { - self.merge_single_archive_file(tasks_for_current_file, current) + if self + .merge_single_archive_file(tasks_for_current_file, current) .with_context(|| { format!("failed to merge archive file {}", current.path.display()) - })?; + })? + == ArchiveFileState::Corrupted + { + corrupted.push(CorruptedArchiveFile(current.clone())); + } + } + + Ok(corrupted) + } + + /// Repair archive files that were detected as corrupted when accessed. + /// + /// This resets the task fetching cutoff timestamps to the lower bound of the + /// oldest corrupted file, so its contents are re-fetched from the remotes on + /// the next fetch cycle. + pub fn request_repair( + &self, + corrupted_archive_files: &[CorruptedArchiveFile], + ) -> Result<(), Error> { + let reset_cutoff = corrupted_archive_files.iter().map(|c| { + log::warn!( + "{} seems to be corrupted, attempting recovery by resetting task cutoff timestamps", + c.0.path.display() + ); + c.0.starttime + }).min(); + + if let Some(reset_cutoff) = reset_cutoff { + log::warn!( + "resetting task cutoff timestamp in state file to {reset_cutoff} to recover task archive" + ); + let mut state = self.read_state(); + + for remote in state.remote_state.values_mut() { + for node in remote.node_state.values_mut() { + node.cutoff = node.cutoff.min(reset_cutoff); + } + } + + self.write_state(state).context( + "failed to write state when resetting cutoff after archive file corruption event", + )?; } Ok(()) @@ -682,9 +738,11 @@ impl<'a> WritableTaskCache<'a> { &self, tasks: Vec, file: &ArchiveFile, - ) -> Result<(), Error> { + ) -> Result { + let mut file_state = ArchiveFileState::Valid; + if tasks.is_empty() { - return Ok(()); + return Ok(file_state); } let mut writer = file.writer(self.cache.create_options)?; @@ -700,6 +758,7 @@ impl<'a> WritableTaskCache<'a> { .flat_map(|item| match item { Ok(item) => Some(item), Err(err) => { + file_state = ArchiveFileState::Corrupted; log::error!("could not read task cache item while merging: {err:#}"); None } @@ -710,7 +769,7 @@ impl<'a> WritableTaskCache<'a> { writer.write_tasks(MergeTaskIterator::new(archive_iter, task_iter))?; writer.finalize_and_replace()?; - Ok(()) + Ok(file_state) } } @@ -1251,6 +1310,20 @@ impl<'a> Drop for ArchiveFileWriter<'a> { } } +/// Marker to signal the state of an [`ArchiveFile`]. +#[derive(Debug, Clone, PartialEq, Eq)] +enum ArchiveFileState { + /// The archive file is valid and contains no errors. + Valid, + /// The archive file is corrupted. + Corrupted, +} + +/// Newtype wrapper to make the return type of [`WritableTaskCache::merge_tasks_into_archive`] +/// more expressive. +/// +pub struct CorruptedArchiveFile(ArchiveFile); + /// Iterator that merges two _sorted_ `Iterator`, returning the items /// from both iterators sorted. /// The two iterators are expected to be sorted descendingly based on the task's starttime and @@ -1478,6 +1551,14 @@ mod tests { Ok((tmp_dir, cache)) } + fn truncate_archive_file(file: &ArchiveFile) { + OpenOptions::new() + .write(true) + .truncate(true) + .open(&file.path) + .expect("file truncated"); + } + #[test] fn test_add_tasks() -> Result<(), Error> { let (_tmp_dir, cache) = make_cache().unwrap(); @@ -1751,11 +1832,7 @@ mod tests { let file = files.get(0).expect("there is one archive file"); // truncate existing compressed file, corrupting the zstd file header - let _file = OpenOptions::new() - .write(true) - .truncate(true) - .open(&file.path) - .expect("file truncated"); + truncate_archive_file(file); assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0); } @@ -1787,4 +1864,61 @@ mod tests { "the tmp file created by the writer should not show up in the list of archive files" ); } + + #[test] + fn reset_cutoff_timestamp_after_corruption() { + let testcases = [ + ( + "corruption of task file with lower bound of 900 determines the new cutoff", + vec![ + task(810, Some(811)), + task(910, Some(911)), + task(1010, Some(1011)), + ], + 900, + ), + ( + "older active task determines cutoff", + vec![ + task(810, None), + task(910, Some(920)), + task(1010, Some(1011)), + ], + 810, + ), + ]; + + for (description, initial_tasks, expected_cutoff) in testcases { + let (_tmp_dir, mut cache) = make_cache().unwrap(); + cache.rotate_after = 100; + let cache = cache.write().unwrap(); + + cache.init(1000).unwrap(); + + add_tasks(&cache, initial_tasks).unwrap(); + + cache.apply_journal().unwrap(); + + let files = cache.cache.archive_files(&cache.lock).unwrap(); + let file = files.get(1).expect("there is one archive file"); + + assert_eq!(file.starttime, 900); + assert!( + file.compressed, + "the file we are about to corrupt should be compressed" + ); + + // truncate existing compressed file, corrupting the zstd file header + truncate_archive_file(file); + + add_tasks(&cache, vec![task(920, Some(930))]).unwrap(); + + // When applying the journal and writing to the corrupted file, the cache + // should notice that something is broken and reset the cutoff timestamp to the + // start of the affected archive file, resulting in a repair in the + // next fetch cycle + cache.apply_journal().unwrap(); + assert_eq!(get_cutoff(&cache), expected_cutoff, "{}", description); + } + } } -- 2.47.3