From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files
Date: Thu, 2 Jul 2026 11:22:56 +0200 [thread overview]
Message-ID: <20260702092258.174740-14-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>
Same as when we apply the journal, we use the opportunity to check for
any corruption and then reset the cut-off timestamps if any errors
occurred when reading from the archive file.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/remote_tasks/task_cache.rs | 132 ++++++++++++++++++++------
1 file changed, 101 insertions(+), 31 deletions(-)
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index c6a6c777..dbe71dc9 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -302,16 +302,26 @@ impl<'a> WritableTaskCache<'a> {
.with_context(|| format!("failed to remove {}", to_remove.path.display()))?;
}
+ let mut corrupted = Vec::new();
+
for file in archive_files
.iter_mut()
.skip(self.cache.uncompressed_files as usize)
{
- if !file.compressed {
- file.compress(self.cache.create_options)
- .with_context(|| format!("failed to compress {}", file.path.display()))?;
+ if !file.compressed
+ && self
+ .compress_archive_file(file)
+ .with_context(|| format!("failed to compress {}", file.path.display()))?
+ == ArchiveFileState::Corrupted
+ {
+ corrupted.push(CorruptedArchiveFile(file.clone()));
}
}
+ if !corrupted.is_empty() {
+ self.request_repair(&corrupted)?;
+ }
+
Ok(did_rotate)
}
@@ -771,6 +781,53 @@ impl<'a> WritableTaskCache<'a> {
Ok(file_state)
}
+
+ /// Compress an existing [`ArchiveFile`].
+ ///
+ /// This will return `Ok(ArchiveFileState::Corrupted)` if any JSON record could not be
+ /// deserialized. This can be used by the caller to handle corruption properly, e.g.
+ /// by triggering a rebuild of the affected archive file at the next opportunity.
+ fn compress_archive_file(&self, file: &ArchiveFile) -> Result<ArchiveFileState, Error> {
+ let mut new_file = file.clone();
+ new_file.set_compressed(true);
+
+ let mut file_state = ArchiveFileState::Valid;
+
+ let archive_iter = file
+ .iter()?
+ .with_context(|| {
+ format!(
+ "task archive file '{}' disappeared while merging tasks",
+ file.path.display()
+ )
+ })?
+ .flat_map(|item| match item {
+ Ok(item) => Some(item),
+ Err(err) => {
+ file_state = ArchiveFileState::Corrupted;
+ log::error!(
+ "could not read task cache item while compressing '{path}': {err:#}",
+ path = file.path.display()
+ );
+ None
+ }
+ });
+
+ let mut writer = new_file.writer(self.cache.create_options)?;
+
+ writer.write_tasks(archive_iter)?;
+
+ if let Err(err) = writer.finalize_and_replace() {
+ log::error!("could not replace {file:?}: {err:#}");
+
+ // don't remove old, uncompressed file if we were not able to put the new file in place
+ return Err(err);
+ }
+
+ std::fs::remove_file(&file.path).context("failed to remove uncompressed archive file")?;
+
+ Ok(file_state)
+ }
}
impl TaskCache {
@@ -1185,36 +1242,20 @@ impl ArchiveFile {
}
}
- fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
- let uncompressed_file_path = &self.path;
+ /// Set the `compressed` flag and update the path.
+ ///
+ /// This will *not* compress the existing file contents.
+ fn set_compressed(&mut self, compressed: bool) {
+ let suffix = if compressed {
+ ZSTD_EXTENSION_WITH_DOT
+ } else {
+ ""
+ };
- let (temp_file, temp_file_path) =
- proxmox_sys::fs::make_tmp_file(uncompressed_file_path, options)
- .context("failed to create temporary file")?;
+ self.compressed = compressed;
- let uncompressed_file =
- File::open(uncompressed_file_path).context("failed to open uncompressed file")?;
-
- zstd::stream::copy_encode(
- uncompressed_file,
- temp_file,
- zstd::DEFAULT_COMPRESSION_LEVEL,
- )
- .context("zstd::stream::copy_encode failed")?;
-
- let mut new_path_for_compressed = uncompressed_file_path.clone();
- new_path_for_compressed
- .set_extension(format!("{}{ZSTD_EXTENSION_WITH_DOT}", self.starttime));
-
- std::fs::rename(&temp_file_path, &new_path_for_compressed)
- .context("failed to move compressed task archive file")?;
- std::fs::remove_file(uncompressed_file_path)
- .context("failed to remove uncompressed archive file")?;
-
- self.path = new_path_for_compressed;
- self.compressed = true;
-
- Ok(())
+ self.path
+ .set_extension(format!("{}{suffix}", self.starttime));
}
}
@@ -1921,4 +1962,33 @@ mod tests {
assert_eq!(get_cutoff(&cache), expected_cutoff, "{}", description);
}
}
+
+ #[test]
+ fn reset_cutoff_after_compressing_corrupted() {
+ let (_tmp_dir, mut cache) = make_cache().unwrap();
+ cache.rotate_after = 100;
+ cache.uncompressed_files = 1;
+ cache.max_files = 2;
+
+ let cache = cache.write().unwrap();
+ cache.init(1000).unwrap();
+
+ add_tasks(&cache, vec![task(1010, Some(1011)), task(1020, Some(1021))]).unwrap();
+ cache.apply_journal().unwrap();
+
+ assert_eq!(get_cutoff(&cache), 1020);
+
+ let files = cache.cache.archive_files(&cache.lock).unwrap();
+ let first = files.get(0).unwrap();
+
+ std::fs::write(&first.path, "some invalid\nthis is no valid json").unwrap();
+
+ cache.rotate(1100).unwrap();
+
+ assert_eq!(
+ get_cutoff(&cache),
+ 1000,
+ "cutoff timestamp should be reset to the lower bound of the corrupted archive file"
+ );
+ }
}
--
2.47.3
next prev parent reply other threads:[~2026-07-02 9:23 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-07-02 9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02 9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02 9:29 ` Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02 9:22 ` Lukas Wagner [this message]
2026-07-02 9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02 9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260702092258.174740-14-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.