all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file'
Date: Thu,  2 Jul 2026 11:22:58 +0200	[thread overview]
Message-ID: <20260702092258.174740-16-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com>

`compress_archive_file` cannot be made fully atomic, therefore we must
consider the case where we have both, the old, uncompressed file and the
new, compressed file in the archive directory.

This is handled by ignoring duplicates files when reading/writing
from/into the task archive, and by actively cleaning up the duplicates
during `rotate`.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 97 +++++++++++++++++++++++++--
 1 file changed, 93 insertions(+), 4 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 23238cca..2110037b 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -289,13 +289,34 @@ impl<'a> WritableTaskCache<'a> {
         Ok(file)
     }
 
-    /// Rotate task archive if the the newest archive file is older than `rotate_after`.
+    /// Rotate task archive if the newest archive file is older than `rotate_after`.
     ///
     /// The oldest archive files are removed if the total number of archive files exceeds
     /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
+    ///
+    /// If there are any duplicate archive files (can happen if `compress` is interrupted at the
+    /// wrong time), the uncompressed version is deleted.
     pub fn rotate(&self, now: i64) -> Result<bool, Error> {
         let mut did_rotate = false;
-        let mut archive_files = self.cache.archive_files(&self.lock)?;
+        let archive_files_with_dupes = self.cache.archive_files_with_dupes(&self.lock)?;
+
+        let mut deduped: Vec<ArchiveFile> = Vec::new();
+
+        for file in archive_files_with_dupes {
+            if deduped.iter().any(|e| e.starttime == file.starttime) {
+                // Found dupe, remove it.
+                if let Err(err) = std::fs::remove_file(&file.path) {
+                    log::error!(
+                        "could not clean up duplicate archive file '{path}': {err}",
+                        path = file.path.display()
+                    )
+                }
+            } else {
+                deduped.push(file);
+            }
+        }
+
+        let mut archive_files = deduped;
 
         let mut start_new_file = |files: &mut Vec<ArchiveFile>| -> Result<(), Error> {
             let new_file = self.new_file(now, self.cache.uncompressed_files == 0)?;
@@ -848,6 +869,11 @@ impl<'a> WritableTaskCache<'a> {
             return Err(err);
         }
 
+        // If we crash here or if `remove_file` fails, we might end up with the original,
+        // uncompressed file as well as the new, compressed file. `archive_files` filters
+        // duplicates out, so we should never read from the duplicated file. Any left-over
+        // duplicates are cleaned during `rotate`.
+
         std::fs::remove_file(&file.path).context("failed to remove uncompressed archive file")?;
 
         Ok(file_state)
@@ -970,7 +996,7 @@ impl TaskCache {
     /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
     /// first).
     /// The task archive should be locked for reading when calling this function.
-    fn archive_files(&self, _lock: &TaskCacheLock) -> Result<Vec<ArchiveFile>, Error> {
+    fn archive_files_with_dupes(&self, _lock: &TaskCacheLock) -> Result<Vec<ArchiveFile>, Error> {
         let mut names = Vec::new();
 
         for entry in std::fs::read_dir(&self.base_path)? {
@@ -983,7 +1009,23 @@ impl TaskCache {
             }
         }
 
-        names.sort_by_key(|e| -e.starttime);
+        names.sort_by(|a, b| {
+            b.starttime
+                .cmp(&a.starttime)
+                .then(b.compressed.cmp(&a.compressed))
+        });
+
+        Ok(names)
+    }
+
+    /// Returns a list of existing archive files, together with their respective
+    /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
+    /// first). Any duplicates (equal starttime but different compression status) are removed.
+    ///
+    /// The task archive should be locked for reading when calling this function.
+    fn archive_files(&self, lock: &TaskCacheLock) -> Result<Vec<ArchiveFile>, Error> {
+        let mut names = self.archive_files_with_dupes(lock)?;
+        names.dedup_by_key(|e| e.starttime);
 
         Ok(names)
     }
@@ -2104,4 +2146,51 @@ mod tests {
             "cutoff timestamp should be reset to the lower bound of the corrupted archive file"
         );
     }
+
+    /// Ensure that if for any reason there exist multiple files with the same start time (only
+    /// possible if one of them is compressed and the other one uncompressed), we only return one
+    /// of them. Also verify that compressed files are preferred.
+    #[test]
+    fn dedup_duplicate_archive_files() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+        cache.uncompressed_files = 1;
+        cache.max_files = 3;
+
+        let cache = cache.write().unwrap();
+
+        cache.new_file(1000, false).unwrap();
+        cache.new_file(1000, true).unwrap();
+        cache.new_file(2000, true).unwrap();
+        cache.new_file(2000, false).unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+
+        assert_eq!(files.len(), 2);
+        let first = files.get(0).unwrap();
+        let second = files.get(1).unwrap();
+
+        assert!(first.compressed);
+        assert_eq!(first.starttime, 2000);
+
+        assert!(second.compressed);
+        assert_eq!(second.starttime, 1000);
+
+        let files = cache.cache.archive_files_with_dupes(&cache.lock).unwrap();
+        assert_eq!(files.len(), 4);
+
+        cache.rotate(2050).unwrap();
+
+        let files = cache.cache.archive_files_with_dupes(&cache.lock).unwrap();
+        assert_eq!(files.len(), 2);
+
+        let first = files.get(0).unwrap();
+        let second = files.get(1).unwrap();
+
+        assert!(first.compressed);
+        assert_eq!(first.starttime, 2000);
+
+        assert!(second.compressed);
+        assert_eq!(second.starttime, 1000);
+    }
 }
-- 
2.47.3





      parent reply	other threads:[~2026-07-02  9:23 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02  9:22 ` Lukas Wagner [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260702092258.174740-16-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal