From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from gate001.proxmox.com (gate001.proxmox.com [45.144.208.40]) by lore.proxmox.com (Postfix) with ESMTPS id D46671FF135 for ; Thu, 02 Jul 2026 11:23:58 +0200 (CEST) Received: from gate001.proxmox.com (localhost.localdomain [127.0.0.1]) by gate001.proxmox.com (Proxmox) with ESMTP id 11A0D214B5; Thu, 02 Jul 2026 11:23:55 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Date: Thu, 2 Jul 2026 11:22:58 +0200 Message-ID: <20260702092258.174740-16-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com> References: <20260702092258.174740-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1782984191128 X-SPAM-LEVEL: Spam detection results: 0 DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment (newer systems) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: QZXRYTY537VRY6A5RAETDN3FK3R3ASE7 X-Message-ID-Hash: QZXRYTY537VRY6A5RAETDN3FK3R3ASE7 X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: `compress_archive_file` cannot be made fully atomic, therefore we must consider the case where we have both, the old, uncompressed file and the new, compressed file in the archive directory. This is handled by ignoring duplicates files when reading/writing from/into the task archive, and by actively cleaning up the duplicates during `rotate`. Signed-off-by: Lukas Wagner --- server/src/remote_tasks/task_cache.rs | 97 +++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 4 deletions(-) diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs index 23238cca..2110037b 100644 --- a/server/src/remote_tasks/task_cache.rs +++ b/server/src/remote_tasks/task_cache.rs @@ -289,13 +289,34 @@ impl<'a> WritableTaskCache<'a> { Ok(file) } - /// Rotate task archive if the the newest archive file is older than `rotate_after`. + /// Rotate task archive if the newest archive file is older than `rotate_after`. /// /// The oldest archive files are removed if the total number of archive files exceeds /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds). + /// + /// If there are any duplicate archive files (can happen if `compress` is interrupted at the + /// wrong time), the uncompressed version is deleted. pub fn rotate(&self, now: i64) -> Result { let mut did_rotate = false; - let mut archive_files = self.cache.archive_files(&self.lock)?; + let archive_files_with_dupes = self.cache.archive_files_with_dupes(&self.lock)?; + + let mut deduped: Vec = Vec::new(); + + for file in archive_files_with_dupes { + if deduped.iter().any(|e| e.starttime == file.starttime) { + // Found dupe, remove it. + if let Err(err) = std::fs::remove_file(&file.path) { + log::error!( + "could not clean up duplicate archive file '{path}': {err}", + path = file.path.display() + ) + } + } else { + deduped.push(file); + } + } + + let mut archive_files = deduped; let mut start_new_file = |files: &mut Vec| -> Result<(), Error> { let new_file = self.new_file(now, self.cache.uncompressed_files == 0)?; @@ -848,6 +869,11 @@ impl<'a> WritableTaskCache<'a> { return Err(err); } + // If we crash here or if `remove_file` fails, we might end up with the original, + // uncompressed file as well as the new, compressed file. `archive_files` filters + // duplicates out, so we should never read from the duplicated file. Any left-over + // duplicates are cleaned during `rotate`. + std::fs::remove_file(&file.path).context("failed to remove uncompressed archive file")?; Ok(file_state) @@ -970,7 +996,7 @@ impl TaskCache { /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one /// first). /// The task archive should be locked for reading when calling this function. - fn archive_files(&self, _lock: &TaskCacheLock) -> Result, Error> { + fn archive_files_with_dupes(&self, _lock: &TaskCacheLock) -> Result, Error> { let mut names = Vec::new(); for entry in std::fs::read_dir(&self.base_path)? { @@ -983,7 +1009,23 @@ impl TaskCache { } } - names.sort_by_key(|e| -e.starttime); + names.sort_by(|a, b| { + b.starttime + .cmp(&a.starttime) + .then(b.compressed.cmp(&a.compressed)) + }); + + Ok(names) + } + + /// Returns a list of existing archive files, together with their respective + /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one + /// first). Any duplicates (equal starttime but different compression status) are removed. + /// + /// The task archive should be locked for reading when calling this function. + fn archive_files(&self, lock: &TaskCacheLock) -> Result, Error> { + let mut names = self.archive_files_with_dupes(lock)?; + names.dedup_by_key(|e| e.starttime); Ok(names) } @@ -2104,4 +2146,51 @@ mod tests { "cutoff timestamp should be reset to the lower bound of the corrupted archive file" ); } + + /// Ensure that if for any reason there exist multiple files with the same start time (only + /// possible if one of them is compressed and the other one uncompressed), we only return one + /// of them. Also verify that compressed files are preferred. + #[test] + fn dedup_duplicate_archive_files() { + let (_tmp_dir, mut cache) = make_cache().unwrap(); + cache.rotate_after = 100; + cache.uncompressed_files = 1; + cache.max_files = 3; + + let cache = cache.write().unwrap(); + + cache.new_file(1000, false).unwrap(); + cache.new_file(1000, true).unwrap(); + cache.new_file(2000, true).unwrap(); + cache.new_file(2000, false).unwrap(); + + let files = cache.cache.archive_files(&cache.lock).unwrap(); + + assert_eq!(files.len(), 2); + let first = files.get(0).unwrap(); + let second = files.get(1).unwrap(); + + assert!(first.compressed); + assert_eq!(first.starttime, 2000); + + assert!(second.compressed); + assert_eq!(second.starttime, 1000); + + let files = cache.cache.archive_files_with_dupes(&cache.lock).unwrap(); + assert_eq!(files.len(), 4); + + cache.rotate(2050).unwrap(); + + let files = cache.cache.archive_files_with_dupes(&cache.lock).unwrap(); + assert_eq!(files.len(), 2); + + let first = files.get(0).unwrap(); + let second = files.get(1).unwrap(); + + assert!(first.compressed); + assert_eq!(first.starttime, 2000); + + assert!(second.compressed); + assert_eq!(second.starttime, 1000); + } } -- 2.47.3