From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files
Date: Thu, 2 Jul 2026 11:22:56 +0200 [thread overview]
Message-ID: <20260702092258.174740-14-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>
Same as when we apply the journal, we use the opportunity to check for
any corruption and then reset the cut-off timestamps if any errors
occurred when reading from the archive file.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/remote_tasks/task_cache.rs | 132 ++++++++++++++++++++------
1 file changed, 101 insertions(+), 31 deletions(-)
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index c6a6c777..dbe71dc9 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -302,16 +302,26 @@ impl<'a> WritableTaskCache<'a> {
.with_context(|| format!("failed to remove {}", to_remove.path.display()))?;
}
+ let mut corrupted = Vec::new();
+
for file in archive_files
.iter_mut()
.skip(self.cache.uncompressed_files as usize)
{
- if !file.compressed {
- file.compress(self.cache.create_options)
- .with_context(|| format!("failed to compress {}", file.path.display()))?;
+ if !file.compressed
+ && self
+ .compress_archive_file(file)
+ .with_context(|| format!("failed to compress {}", file.path.display()))?
+ == ArchiveFileState::Corrupted
+ {
+ corrupted.push(CorruptedArchiveFile(file.clone()));
}
}
+ if !corrupted.is_empty() {
+ self.request_repair(&corrupted)?;
+ }
+
Ok(did_rotate)
}
@@ -771,6 +781,53 @@ impl<'a> WritableTaskCache<'a> {
Ok(file_state)
}
+
+ /// Compress an existing [`ArchiveFile`].
+ ///
+ /// This will return `Ok(ArchiveFileState::Corrupted)` if any JSON record could not be
+ /// deserialized. This can be used by the caller to handle corruption properly, e.g.
+ /// by triggering a rebuild of the affected archive file at the next opportunity.
+ fn compress_archive_file(&self, file: &ArchiveFile) -> Result<ArchiveFileState, Error> {
+ let mut new_file = file.clone();
+ new_file.set_compressed(true);
+
+ let mut file_state = ArchiveFileState::Valid;
+
+ let archive_iter = file
+ .iter()?
+ .with_context(|| {
+ format!(
+ "task archive file '{}' disappeared while merging tasks",
+ file.path.display()
+ )
+ })?
+ .flat_map(|item| match item {
+ Ok(item) => Some(item),
+ Err(err) => {
+ file_state = ArchiveFileState::Corrupted;
+ log::error!(
+ "could not read task cache item while compressing '{path}': {err:#}",
+ path = file.path.display()
+ );
+ None
+ }
+ });
+
+ let mut writer = new_file.writer(self.cache.create_options)?;
+
+ writer.write_tasks(archive_iter)?;
+
+ if let Err(err) = writer.finalize_and_replace() {
+ log::error!("could not replace {file:?}: {err:#}");
+
+ // don't remove old, uncompressed file if we were not able to put the new file in place
+ return Err(err);
+ }
+
+ std::fs::remove_file(&file.path).context("failed to remove uncompressed archive file")?;
+
+ Ok(file_state)
+ }
}
impl TaskCache {
@@ -1185,36 +1242,20 @@ impl ArchiveFile {
}
}
- fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
- let uncompressed_file_path = &self.path;
+ /// Set the `compressed` flag and update the path.
+ ///
+ /// This will *not* compress the existing file contents.
+ fn set_compressed(&mut self, compressed: bool) {
+ let suffix = if compressed {
+ ZSTD_EXTENSION_WITH_DOT
+ } else {
+ ""
+ };
- let (temp_file, temp_file_path) =
- proxmox_sys::fs::make_tmp_file(uncompressed_file_path, options)
- .context("failed to create temporary file")?;
+ self.compressed = compressed;
- let uncompressed_file =
- File::open(uncompressed_file_path).context("failed to open uncompressed file")?;
-
- zstd::stream::copy_encode(
- uncompressed_file,
- temp_file,
- zstd::DEFAULT_COMPRESSION_LEVEL,
- )
- .context("zstd::stream::copy_encode failed")?;
-
- let mut new_path_for_compressed = uncompressed_file_path.clone();
- new_path_for_compressed
- .set_extension(format!("{}{ZSTD_EXTENSION_WITH_DOT}", self.starttime));
-
- std::fs::rename(&temp_file_path, &new_path_for_compressed)
- .context("failed to move compressed task archive file")?;
- std::fs::remove_file(uncompressed_file_path)
- .context("failed to remove uncompressed archive file")?;
-
- self.path = new_path_for_compressed;
- self.compressed = true;
-
- Ok(())
+ self.path
+ .set_extension(format!("{}{suffix}", self.starttime));
}
}
@@ -1921,4 +1962,33 @@ mod tests {
assert_eq!(get_cutoff(&cache), expected_cutoff, "{}", description);
}
}
+
+ #[test]
+ fn reset_cutoff_after_compressing_corrupted() {
+ let (_tmp_dir, mut cache) = make_cache().unwrap();
+ cache.rotate_after = 100;
+ cache.uncompressed_files = 1;
+ cache.max_files = 2;
+
+ let cache = cache.write().unwrap();
+ cache.init(1000).unwrap();
+
+ add_tasks(&cache, vec![task(1010, Some(1011)), task(1020, Some(1021))]).unwrap();
+ cache.apply_journal().unwrap();
+
+ assert_eq!(get_cutoff(&cache), 1020);
+
+ let files = cache.cache.archive_files(&cache.lock).unwrap();
+ let first = files.get(0).unwrap();
+
+ std::fs::write(&first.path, "some invalid\nthis is no valid json").unwrap();
+
+ cache.rotate(1100).unwrap();
+
+ assert_eq!(
+ get_cutoff(&cache),
+ 1000,
+ "cutoff timestamp should be reset to the lower bound of the corrupted archive file"
+ );
+ }
}
--
2.47.3
next prev parent reply other threads:[~2026-07-02 9:23 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-07-02 9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02 9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02 9:29 ` Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02 9:22 ` Lukas Wagner [this message]
2026-07-02 9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260702092258.174740-14-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox