From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from gate001.proxmox.com (gate001.proxmox.com [IPv6:2a0f:8001:1:32::40]) by lore.proxmox.com (Postfix) with ESMTPS id C478E1FF135 for ; Thu, 02 Jul 2026 11:23:30 +0200 (CEST) Received: from gate001.proxmox.com (localhost.localdomain [127.0.0.1]) by gate001.proxmox.com (Proxmox) with ESMTP id 6C4842144B; Thu, 02 Jul 2026 11:23:28 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Date: Thu, 2 Jul 2026 11:22:56 +0200 Message-ID: <20260702092258.174740-14-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com> References: <20260702092258.174740-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1782984190871 X-SPAM-LEVEL: Spam detection results: 0 DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment (newer systems) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: GD5QM25N6WEXN7JVJDY44R3JFS3NTW5H X-Message-ID-Hash: GD5QM25N6WEXN7JVJDY44R3JFS3NTW5H X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Same as when we apply the journal, we use the opportunity to check for any corruption and then reset the cut-off timestamps if any errors occurred when reading from the archive file. Signed-off-by: Lukas Wagner --- server/src/remote_tasks/task_cache.rs | 132 ++++++++++++++++++++------ 1 file changed, 101 insertions(+), 31 deletions(-) diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs index c6a6c777..dbe71dc9 100644 --- a/server/src/remote_tasks/task_cache.rs +++ b/server/src/remote_tasks/task_cache.rs @@ -302,16 +302,26 @@ impl<'a> WritableTaskCache<'a> { .with_context(|| format!("failed to remove {}", to_remove.path.display()))?; } + let mut corrupted = Vec::new(); + for file in archive_files .iter_mut() .skip(self.cache.uncompressed_files as usize) { - if !file.compressed { - file.compress(self.cache.create_options) - .with_context(|| format!("failed to compress {}", file.path.display()))?; + if !file.compressed + && self + .compress_archive_file(file) + .with_context(|| format!("failed to compress {}", file.path.display()))? + == ArchiveFileState::Corrupted + { + corrupted.push(CorruptedArchiveFile(file.clone())); } } + if !corrupted.is_empty() { + self.request_repair(&corrupted)?; + } + Ok(did_rotate) } @@ -771,6 +781,53 @@ impl<'a> WritableTaskCache<'a> { Ok(file_state) } + + /// Compress an existing [`ArchiveFile`]. + /// + /// This will return `Ok(ArchiveFileState::Corrupted)` if any JSON record could not be + /// deserialized. This can be used by the caller to handle corruption properly, e.g. + /// by triggering a rebuild of the affected archive file at the next opportunity. + fn compress_archive_file(&self, file: &ArchiveFile) -> Result { + let mut new_file = file.clone(); + new_file.set_compressed(true); + + let mut file_state = ArchiveFileState::Valid; + + let archive_iter = file + .iter()? + .with_context(|| { + format!( + "task archive file '{}' disappeared while merging tasks", + file.path.display() + ) + })? + .flat_map(|item| match item { + Ok(item) => Some(item), + Err(err) => { + file_state = ArchiveFileState::Corrupted; + log::error!( + "could not read task cache item while compressing '{path}': {err:#}", + path = file.path.display() + ); + None + } + }); + + let mut writer = new_file.writer(self.cache.create_options)?; + + writer.write_tasks(archive_iter)?; + + if let Err(err) = writer.finalize_and_replace() { + log::error!("could not replace {file:?}: {err:#}"); + + // don't remove old, uncompressed file if we were not able to put the new file in place + return Err(err); + } + + std::fs::remove_file(&file.path).context("failed to remove uncompressed archive file")?; + + Ok(file_state) + } } impl TaskCache { @@ -1185,36 +1242,20 @@ impl ArchiveFile { } } - fn compress(&mut self, options: CreateOptions) -> Result<(), Error> { - let uncompressed_file_path = &self.path; + /// Set the `compressed` flag and update the path. + /// + /// This will *not* compress the existing file contents. + fn set_compressed(&mut self, compressed: bool) { + let suffix = if compressed { + ZSTD_EXTENSION_WITH_DOT + } else { + "" + }; - let (temp_file, temp_file_path) = - proxmox_sys::fs::make_tmp_file(uncompressed_file_path, options) - .context("failed to create temporary file")?; + self.compressed = compressed; - let uncompressed_file = - File::open(uncompressed_file_path).context("failed to open uncompressed file")?; - - zstd::stream::copy_encode( - uncompressed_file, - temp_file, - zstd::DEFAULT_COMPRESSION_LEVEL, - ) - .context("zstd::stream::copy_encode failed")?; - - let mut new_path_for_compressed = uncompressed_file_path.clone(); - new_path_for_compressed - .set_extension(format!("{}{ZSTD_EXTENSION_WITH_DOT}", self.starttime)); - - std::fs::rename(&temp_file_path, &new_path_for_compressed) - .context("failed to move compressed task archive file")?; - std::fs::remove_file(uncompressed_file_path) - .context("failed to remove uncompressed archive file")?; - - self.path = new_path_for_compressed; - self.compressed = true; - - Ok(()) + self.path + .set_extension(format!("{}{suffix}", self.starttime)); } } @@ -1921,4 +1962,33 @@ mod tests { assert_eq!(get_cutoff(&cache), expected_cutoff, "{}", description); } } + + #[test] + fn reset_cutoff_after_compressing_corrupted() { + let (_tmp_dir, mut cache) = make_cache().unwrap(); + cache.rotate_after = 100; + cache.uncompressed_files = 1; + cache.max_files = 2; + + let cache = cache.write().unwrap(); + cache.init(1000).unwrap(); + + add_tasks(&cache, vec![task(1010, Some(1011)), task(1020, Some(1021))]).unwrap(); + cache.apply_journal().unwrap(); + + assert_eq!(get_cutoff(&cache), 1020); + + let files = cache.cache.archive_files(&cache.lock).unwrap(); + let first = files.get(0).unwrap(); + + std::fs::write(&first.path, "some invalid\nthis is no valid json").unwrap(); + + cache.rotate(1100).unwrap(); + + assert_eq!( + get_cutoff(&cache), + 1000, + "cutoff timestamp should be reset to the lower bound of the corrupted archive file" + ); + } } -- 2.47.3