all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling)
@ 2026-07-02  9:22 Lukas Wagner
  2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
                   ` (14 more replies)
  0 siblings, 15 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

This series bundles a couple of improvements and some refactoring efforts for
the task cache implementation in PDM.

Highlights: 

- Fix an issue that led to PBS task fetch cutoffs not being recorded in the
state file, leading to always the full task history being requested

- Fail more gracefully if any of the zstd-archive files were corrupted (before,
you would end up in an endless loop when reading tasks due to the iterator not
making any progress)

- When creating a new zstd-compressed file, make sure create the file
atomically. This avoids creating a .zst file without a proper zst file header
if the process crashes at the wrong moment.

- Detect archive corruption during read/write access (when applying journal and when
compressing archive files). Attempt basic recovery in the next task fetching
cycle by resetting cutoff timestamps to the lower boundary of the corrupted
file.

- Improve logging, trying to include path to the affected archive file if there
is an error

- Statically compute static paths and only instantiate TaskCache once

The series also contains a patch for proxmox-sys, which is technically not
needed for this series, but the improvement opportunity popped up when working
on this series.


proxmox:

Lukas Wagner (1):
  sys: fs: don't replace file extension make_tmp_file

 proxmox-sys/src/fs/file.rs | 34 +++++++++++++++++++++++++++++++++-
 1 file changed, 33 insertions(+), 1 deletion(-)


proxmox-datacenter-manager:

Lukas Wagner (14):
  task cache: fix missing cutoff state for PBS remotes
  task cache: refresh task: don't apply journal if the archive was
    rotated
  task cache: rotate: align timestamp for new files to UTC midnight
  task cache: add test case for task cache rotation
  task cache: pre-compute static paths during initialization
  task cache: only initialize `TaskCache` struct once
  task cache: archive iterator: don't yield more items if reading from
    file failed
  task cache: include archive file path in error log messages
  task cache: introduce ArchiveFileWriter
  task cache: use ArchiveFileWriter when creating new archive files
  task cache: trigger repair of corruption when applying journal
  task cache: trigger repair of corruption when compressing archive
    files
  task cache: trigger repair of corruption after read-accesses
  task cache: handle potentially duplicated archive files after
    'compress_archive_file'

 server/src/remote_tasks/mod.rs          |   53 +-
 server/src/remote_tasks/refresh_task.rs |   51 +-
 server/src/remote_tasks/task_cache.rs   | 1055 ++++++++++++++++++-----
 3 files changed, 922 insertions(+), 237 deletions(-)


Summary over all repositories:
  4 files changed, 955 insertions(+), 238 deletions(-)

-- 
Generated by murpp 0.12.0




^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:29   ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
                   ` (13 subsequent siblings)
  14 siblings, 1 reply; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

Rather than creating the template path used by mkostemp by replacing the
existing file extension, use `add_extension` to append to any existing
one.

We often provide the path of some original file that is later to be
replaced by the temporary one, and therefore having the full original
filename intact is a benefit for debugability in case there are
leftover temporary files.

For instance, the task archive in PDM uses the following file name
schema: `archive.{timestamp}[.zst]`. Archive files are atomically
replaced by using make_tmp_file and rename, however due to using
`set_extension` before, the `{timestamp}[.zst]` part would be lost.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 proxmox-sys/src/fs/file.rs | 34 +++++++++++++++++++++++++++++++++-
 1 file changed, 33 insertions(+), 1 deletion(-)

diff --git a/proxmox-sys/src/fs/file.rs b/proxmox-sys/src/fs/file.rs
index f0939381..3336156b 100644
--- a/proxmox-sys/src/fs/file.rs
+++ b/proxmox-sys/src/fs/file.rs
@@ -149,7 +149,7 @@ pub fn make_tmp_file<P: AsRef<Path>>(
 
     // use mkstemp here, because it works with different processes, threads, even tokio tasks
     let mut template = path.to_owned();
-    template.set_extension("tmp_XXXXXX");
+    template.add_extension("tmp_XXXXXX");
     let (mut file, tmp_path) = match mkostemp(&template, OFlag::O_CLOEXEC) {
         Ok((fd, path)) => (unsafe { File::from_raw_fd(fd) }, path),
         Err(err) => bail!("mkstemp {:?} failed: {}", template, err),
@@ -466,3 +466,35 @@ pub fn file_get_non_comment_lines<P: AsRef<Path>>(
         Err(err) => Some(Err(err)),
     }))
 }
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn make_tmp_file_does_not_replace_extension() {
+        let (_, path) = make_tmp_file("/tmp/proxmox-sys-test.json", CreateOptions::new()).unwrap();
+
+        assert!(
+            &path
+                .file_name()
+                .unwrap()
+                .to_string_lossy()
+                .contains("proxmox-sys-test.json")
+        );
+
+        let (_, path) = make_tmp_file(
+            "/tmp/proxmox-sys-test.archive.1000.zst",
+            CreateOptions::new(),
+        )
+        .unwrap();
+
+        assert!(
+            &path
+                .file_name()
+                .unwrap()
+                .to_string_lossy()
+                .contains("archive.1000.zst")
+        );
+    }
+}
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
  2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
                   ` (12 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

The node success map, which is used to determine whether the cutoff in
the state file should be updated or not, uses 'localhost' as a nodename
for PBS remotes, since at the point where it is instantiated, we don't
know the real hostname of a PBS remote.

This means that we have to also use 'localhost' where we lookup the
status in the node success map, otherwise the state file will never
contain entries for PBS remotes, leading to the refresh task always
requesting a full task history.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index b0b4a837..b5fc3b21 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
 
 use proxmox_sys::fs::CreateOptions;
 
-use pdm_api_types::RemoteUpid;
+use pdm_api_types::{RemoteUpid, remotes::RemoteType};
 
 /// Filename for the file containing running tasks.
 const ACTIVE_FILENAME: &str = "active";
@@ -424,7 +424,12 @@ impl WritableTaskCache {
                 }
             };
 
-            let node = native_upid.node();
+            let node = match task.upid.remote_type() {
+                RemoteType::Pve => native_upid.node(),
+                // The node success map uses 'localhost' as a node name for PBS remotes,
+                // not the one from the UPID.
+                RemoteType::Pbs => "localhost",
+            };
             let remote = task.upid.remote();
 
             if node_success_map.node_successful(remote, node) {
@@ -449,7 +454,12 @@ impl WritableTaskCache {
                 }
             };
 
-            let node = native_upid.node();
+            let node = match task.upid.remote_type() {
+                RemoteType::Pve => native_upid.node(),
+                // The node success map uses 'localhost' as a node name for PBS remotes,
+                // not the one from the UPID.
+                RemoteType::Pbs => "localhost",
+            };
             let remote = task.upid.remote();
 
             if node_success_map.node_successful(remote, node) {
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
  2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
                   ` (11 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

Task cache rotation already applies the journal, so there is no need to
do this a second time. This also avoids duplicate notices about applied
task cache journal entries, such as

	applying task cache journal
	committed 470 tasks in 0.008.s to task cache archive
	rotated remote task archive
	applying task cache journal
	committed 0 tasks in 0.000.s to task cache archive

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/refresh_task.rs | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs
index 632b7d52..68ef5a25 100644
--- a/server/src/remote_tasks/refresh_task.rs
+++ b/server/src/remote_tasks/refresh_task.rs
@@ -128,6 +128,9 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error>
         log::debug!("checking if remote task archive should be rotated");
         if rotate_cache(cache.clone()).await? {
             log::info!("rotated remote task archive");
+
+            // rotation always applies the journal as well
+            task_state.reset_journal_apply();
         }
 
         task_state.reset_rotate_check();
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (2 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
                   ` (10 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

This also requires to change the rotation check to an inclusive >=,
otherwise we would only rotate after two days instead of every day.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/refresh_task.rs | 23 ++++++++++++++++++++++-
 server/src/remote_tasks/task_cache.rs   |  3 ++-
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs
index 68ef5a25..55d5694e 100644
--- a/server/src/remote_tasks/refresh_task.rs
+++ b/server/src/remote_tasks/refresh_task.rs
@@ -360,7 +360,13 @@ fn get_remotes_with_finished_tasks(
 ///
 /// Returns Ok(true) the cache's files were rotated.
 async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
-    tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await?
+    tokio::task::spawn_blocking(move || {
+        cache.write()?.rotate(align_timestamp(
+            proxmox_time::epoch_i64(),
+            super::ROTATE_AFTER as i64,
+        ))
+    })
+    .await?
 }
 
 /// Apply the task cache journal.
@@ -541,3 +547,18 @@ async fn update_task_cache(
     })
     .await?
 }
+
+/// Align a UNIX timestamp to the specified interval.
+fn align_timestamp(now: i64, interval: i64) -> i64 {
+    now - (now.rem_euclid(interval))
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::remote_tasks::refresh_task::align_timestamp;
+
+    #[test]
+    fn test_align_timestamp() {
+        assert_eq!(align_timestamp(1782898009, 24 * 3600), 1782864000);
+    }
+}
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index b5fc3b21..55aa8b70 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -285,7 +285,8 @@ impl WritableTaskCache {
 
         match archive_files.first() {
             Some(bound) => {
-                if now > bound.starttime && now - bound.starttime > self.cache.rotate_after as i64 {
+                if now > bound.starttime && now - bound.starttime >= self.cache.rotate_after as i64
+                {
                     start_new_file(&mut archive_files)?;
                 }
             }
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (3 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
                   ` (9 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

This is to ensure that older files are indeed compressed and then later
removed.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 65 +++++++++++++++++++++++++++
 1 file changed, 65 insertions(+)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 55aa8b70..aa92fcc8 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -1555,4 +1555,69 @@ mod tests {
         assert_eq!(cache.get_tasks(GetTasks::Active).unwrap().count(), 0);
         assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 3);
     }
+
+    #[test]
+    fn archive_file_rotation() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+        cache.uncompressed_files = 1;
+        cache.max_files = 2;
+
+        let cache = cache.write().unwrap();
+        cache.init(1000).unwrap();
+
+        add_tasks(&cache, vec![task(1010, Some(1011)), task(1020, Some(1021))]).unwrap();
+        cache.apply_journal().unwrap();
+
+        cache.rotate(1100).unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        assert_eq!(files.len(), 2);
+        let first = files.get(0).unwrap();
+        let second = files.get(1).unwrap();
+
+        assert_eq!(first.compressed, false);
+        assert_eq!(first.starttime, 1100);
+        assert_eq!(second.compressed, true);
+        assert_eq!(second.starttime, 1000);
+
+        assert_eq!(first.iter().unwrap().unwrap().count(), 0);
+        assert_eq!(second.iter().unwrap().unwrap().count(), 2);
+
+        add_tasks(&cache, vec![task(1110, Some(1111))]).unwrap();
+        cache.apply_journal().unwrap();
+
+        cache.rotate(1200).unwrap();
+
+        add_tasks(&cache, vec![task(1210, Some(1211))]).unwrap();
+        cache.apply_journal().unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        assert_eq!(files.len(), 2);
+        let first = files.get(0).unwrap();
+        let second = files.get(1).unwrap();
+
+        assert_eq!(first.compressed, false);
+        assert_eq!(first.starttime, 1200);
+        assert_eq!(second.compressed, true);
+        assert_eq!(second.starttime, 1100);
+
+        assert_eq!(first.iter().unwrap().unwrap().count(), 1);
+        assert_eq!(second.iter().unwrap().unwrap().count(), 1);
+
+        cache.rotate(1300).unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        assert_eq!(files.len(), 2);
+        let first = files.get(0).unwrap();
+        let second = files.get(1).unwrap();
+
+        assert_eq!(first.compressed, false);
+        assert_eq!(first.starttime, 1300);
+        assert_eq!(second.compressed, true);
+        assert_eq!(second.starttime, 1200);
+
+        assert_eq!(first.iter().unwrap().unwrap().count(), 0);
+        assert_eq!(second.iter().unwrap().unwrap().count(), 1);
+    }
 }
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (4 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
                   ` (8 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

Compute all static paths during initialization, store them as members of
the TaskCache struct, and add methods to access them.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 118 +++++++++++++++++---------
 1 file changed, 80 insertions(+), 38 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index aa92fcc8..d97fa710 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -143,6 +143,19 @@ impl State {
 pub struct TaskCache {
     /// Path where the cache's files should be placed.
     base_path: PathBuf,
+
+    /// Path to the journal (WAL).
+    journal_path: PathBuf,
+
+    /// Path to the state file.
+    state_path: PathBuf,
+
+    /// Path to the lock file.
+    lock_path: PathBuf,
+
+    /// Path to the file containing active tasks.
+    active_path: PathBuf,
+
     /// File permissions for the cache's files.
     create_options: CreateOptions,
 
@@ -240,19 +253,10 @@ impl WritableTaskCache {
     /// Start a new archive file with a given timestamp.
     /// `now` is supposed to be a UNIX timestamp (seconds).
     fn new_file(&self, now: i64, compress: bool) -> Result<ArchiveFile, Error> {
-        let suffix = if compress {
-            ZSTD_EXTENSION_WITH_DOT
-        } else {
-            ""
-        };
+        let path = self.cache.archive_path(now, compress);
 
-        let new_path = self
-            .cache
-            .base_path
-            .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}{suffix}"));
-
-        let mut file = File::create(&new_path)?;
-        self.cache.create_options.apply_to(&mut file, &new_path)?;
+        let mut file = File::create(&path)?;
+        self.cache.create_options.apply_to(&mut file, &path)?;
 
         if compress {
             let encoder = zstd::stream::write::Encoder::new(file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
@@ -260,7 +264,7 @@ impl WritableTaskCache {
         }
 
         Ok(ArchiveFile {
-            path: new_path,
+            path,
             compressed: compress,
             starttime: now,
         })
@@ -407,11 +411,10 @@ impl WritableTaskCache {
         node_success_map: &NodeFetchSuccessMap,
         state: &mut State,
     ) -> Result<(), Error> {
-        let filename = self.cache.base_path.join(WAL_FILENAME);
         let mut file = OpenOptions::new()
             .append(true)
             .create(true)
-            .open(filename)?;
+            .open(self.cache.journal_path())?;
 
         for task in finished_tasks {
             // Remove this finished task from our set of active tasks.
@@ -479,8 +482,7 @@ impl WritableTaskCache {
     fn journal_size(&self) -> Result<u64, Error> {
         let metadata = self
             .cache
-            .base_path
-            .join(WAL_FILENAME)
+            .journal_path()
             .metadata()
             .context("failed to read metadata of journal file")?;
 
@@ -504,9 +506,9 @@ impl WritableTaskCache {
     /// This will merge all tasks in the journal file into the task archive.
     pub fn apply_journal(&self) -> Result<(), Error> {
         let start = Instant::now();
-        let filename = self.cache.base_path.join(WAL_FILENAME);
+        let journal_path = self.cache.journal_path();
 
-        let file = match File::open(&filename) {
+        let file = match File::open(journal_path) {
             Ok(file) => Box::new(BufReader::new(file)),
             Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
             Err(err) => return Err(err.into()),
@@ -537,7 +539,7 @@ impl WritableTaskCache {
         OpenOptions::new()
             .write(true)
             .truncate(true)
-            .open(filename)
+            .open(journal_path)
             .context("failed to truncate journal file")?;
 
         log::info!(
@@ -649,7 +651,7 @@ impl WritableTaskCache {
 
     /// Write the state file.
     fn write_state(&self, state: State) -> Result<(), Error> {
-        let path = self.cache.base_path.join(STATE_FILENAME);
+        let path = self.cache.state_path();
 
         let data = serde_json::to_vec_pretty(&state)?;
 
@@ -663,10 +665,9 @@ impl WritableTaskCache {
     /// The tasks are first written to a temporary file, which is then used
     /// to atomically replace the original.
     fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
-        let (fd, path) = proxmox_sys::fs::make_tmp_file(
-            self.cache.base_path.join(ACTIVE_FILENAME),
-            self.cache.create_options,
-        )?;
+        let target = self.cache.active_path();
+
+        let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?;
         let mut fd = BufWriter::new(fd);
 
         Self::write_tasks(&mut fd, tasks)?;
@@ -676,9 +677,7 @@ impl WritableTaskCache {
         }
         drop(fd);
 
-        let target = self.cache.base_path.join(ACTIVE_FILENAME);
-
-        let res = std::fs::rename(&path, &target).with_context(|| {
+        let res = std::fs::rename(&path, target).with_context(|| {
             format!(
                 "failed to replace {} with {}",
                 target.display(),
@@ -790,7 +789,7 @@ impl TaskCache {
     ///
     /// Remember to call `init` or `new_file` on a locked, writable TaskCache
     /// to create the initial archive files.
-    pub fn new<P: AsRef<Path>>(
+    pub fn new<P: Into<PathBuf>>(
         path: P,
         create_options: CreateOptions,
         max_files: u32,
@@ -798,8 +797,19 @@ impl TaskCache {
         rotate_after: u64,
         journal_max_size: u64,
     ) -> Result<Self, Error> {
+        let base_path = path.into();
+
+        let journal_path = base_path.join(WAL_FILENAME);
+        let state_path = base_path.join(STATE_FILENAME);
+        let active_path = base_path.join(ACTIVE_FILENAME);
+        let lock_path = base_path.join(LOCKFILE_FILENAME);
+
         Ok(Self {
-            base_path: path.as_ref().into(),
+            base_path,
+            journal_path,
+            state_path,
+            active_path,
+            lock_path,
             create_options,
             journal_max_size,
             max_files,
@@ -823,7 +833,7 @@ impl TaskCache {
     }
 
     fn lock_impl(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
-        let lockfile = self.base_path.join(LOCKFILE_FILENAME);
+        let lockfile = self.lock_path();
 
         Ok(TaskCacheLock(proxmox_sys::fs::open_file_locked(
             lockfile,
@@ -845,8 +855,7 @@ impl TaskCache {
             }
         }
 
-        let path = self.base_path.join(STATE_FILENAME);
-        do_read_state(&path).unwrap_or_else(|err| {
+        do_read_state(self.state_path()).unwrap_or_else(|err| {
             log::error!("could not read state file: {err:#}");
             Default::default()
         })
@@ -857,7 +866,7 @@ impl TaskCache {
         mode: GetTasks,
         lock: &'a TaskCacheLock,
     ) -> Result<TaskArchiveIterator<'a>, Error> {
-        let journal_file = self.base_path.join(WAL_FILENAME);
+        let journal_file = self.journal_path();
 
         match mode {
             GetTasks::All => {
@@ -865,16 +874,16 @@ impl TaskCache {
                 archive_files.reverse();
 
                 archive_files.push(ArchiveFile {
-                    path: self.base_path.join(ACTIVE_FILENAME),
+                    path: self.active_path().into(),
                     compressed: false,
                     starttime: 0,
                 });
 
-                TaskArchiveIterator::new(Some(journal_file), archive_files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
             }
             GetTasks::Active => {
                 let archive_files = vec![ArchiveFile {
-                    path: self.base_path.join(ACTIVE_FILENAME),
+                    path: self.active_path().into(),
                     compressed: false,
                     starttime: 0,
                 }];
@@ -886,7 +895,7 @@ impl TaskCache {
                 let mut files = self.archive_files(lock)?;
                 files.reverse();
 
-                TaskArchiveIterator::new(Some(journal_file), files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), files, lock)
             }
         }
     }
@@ -935,6 +944,39 @@ impl TaskCache {
             })
         }
     }
+
+    /// Return path to the task cache journal (WAL).
+    fn journal_path(&self) -> &Path {
+        &self.journal_path
+    }
+
+    /// Return path to the task cache state file.
+    fn state_path(&self) -> &Path {
+        &self.state_path
+    }
+
+    /// Return path to the file of active tasks.
+    fn active_path(&self) -> &Path {
+        &self.active_path
+    }
+
+    /// Return path to the task cache lock file.
+    fn lock_path(&self) -> &Path {
+        &self.lock_path
+    }
+
+    /// Construct an archive file path, given it's lower bound timestamp and its compression
+    /// status.
+    fn archive_path(&self, starttime: i64, compressed: bool) -> PathBuf {
+        let suffix = if compressed {
+            ZSTD_EXTENSION_WITH_DOT
+        } else {
+            ""
+        };
+
+        self.base_path
+            .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}"))
+    }
 }
 
 /// Comparison function for sorting tasks.
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (5 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
                   ` (7 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

It only really contains static parameters that remain constant after
daemon startup, so there is really no need to initialize it over and
over again. The WritableTaskCache and ReadableTaskCache guards now take
a reference instead of owning the TaskCache struct.

TaskCache::new was always infallible, so there is no need to return a
Result from it.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/mod.rs          | 38 ++++++++++++-------------
 server/src/remote_tasks/refresh_task.rs | 25 ++++++++--------
 server/src/remote_tasks/task_cache.rs   | 25 ++++++++--------
 3 files changed, 42 insertions(+), 46 deletions(-)

diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 50ac6708..6a340dba 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,4 +1,4 @@
-use std::path::Path;
+use std::sync::LazyLock;
 
 use anyhow::Error;
 
@@ -38,7 +38,7 @@ pub async fn get_tasks(
     let view = views::get_optional_view(view.as_deref())?;
 
     tokio::task::spawn_blocking(move || {
-        let cache = get_cache()?.read()?;
+        let cache = get_cache().read()?;
 
         let which = if filters.running {
             GetTasks::Active
@@ -171,7 +171,7 @@ pub async fn get_tasks(
 pub async fn track_running_pve_task(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
     tokio::task::spawn_blocking(move || {
         let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
-        let cache = get_cache()?.write()?;
+        let cache = get_cache().write()?;
 
         let task = TaskCacheItem {
             upid: remote_upid.clone(),
@@ -198,7 +198,7 @@ pub async fn track_running_pbs_task(
 ) -> Result<RemoteUpid, Error> {
     tokio::task::spawn_blocking(move || {
         let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
-        let cache = get_cache()?.write()?;
+        let cache = get_cache().write()?;
 
         let task = TaskCacheItem {
             upid: remote_upid.clone(),
@@ -213,22 +213,20 @@ pub async fn track_running_pbs_task(
     .await?
 }
 
-/// Get a new [`TaskCache`] instance.
-///
-/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
-/// update loop.
-pub fn get_cache() -> Result<TaskCache, Error> {
-    let file_options = proxmox_product_config::default_create_options();
+/// Get a reference to the [`TaskCache`] instance.
+pub fn get_cache() -> &'static TaskCache {
+    static CACHE: LazyLock<TaskCache> = LazyLock::new(|| {
+        let file_options = proxmox_product_config::default_create_options();
 
-    let cache_path = Path::new(REMOTE_TASKS_DIR);
-    let cache = TaskCache::new(
-        cache_path,
-        file_options,
-        KEEP_OLD_FILES,
-        NUMBER_OF_UNCOMPRESSED_FILES,
-        ROTATE_AFTER,
-        JOURNAL_MAX_SIZE,
-    )?;
+        TaskCache::new(
+            REMOTE_TASKS_DIR,
+            file_options,
+            KEEP_OLD_FILES,
+            NUMBER_OF_UNCOMPRESSED_FILES,
+            ROTATE_AFTER,
+            JOURNAL_MAX_SIZE,
+        )
+    });
 
-    Ok(cache)
+    &CACHE
 }
diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs
index 55d5694e..eb74cb93 100644
--- a/server/src/remote_tasks/refresh_task.rs
+++ b/server/src/remote_tasks/refresh_task.rs
@@ -122,11 +122,11 @@ impl Default for TaskState {
 /// Handle a single timer tick.
 /// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
 pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> {
-    let cache = super::get_cache()?;
+    let cache = super::get_cache();
 
     if task_state.is_due_for_rotate_check() {
         log::debug!("checking if remote task archive should be rotated");
-        if rotate_cache(cache.clone()).await? {
+        if rotate_cache(cache).await? {
             log::info!("rotated remote task archive");
 
             // rotation always applies the journal as well
@@ -137,7 +137,7 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error>
     }
 
     if task_state.is_due_for_journal_apply() {
-        apply_journal(cache.clone()).await?;
+        apply_journal(cache).await?;
         task_state.reset_journal_apply();
     }
 
@@ -151,7 +151,7 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error>
         let mut tasks_to_poll: HashSet<RemoteUpid> =
             HashSet::from_iter(cache_state.tracked_tasks().cloned());
 
-        let active_tasks = get_active_tasks(cache.clone()).await?;
+        let active_tasks = get_active_tasks(cache).await?;
         tasks_to_poll.extend(active_tasks.into_iter());
 
         let poll_results = poll_tracked_tasks(
@@ -188,7 +188,7 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error>
             .iter()
             .any(|(_, result)| matches!(result, PollResult::RemoteGone | PollResult::RequestError))
     {
-        update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
+        update_task_cache(all_tasks, update_state_for_remote, poll_results).await?;
     }
 
     Ok(())
@@ -196,13 +196,13 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error>
 
 /// Manually trigger task collection from a list of remotes.
 pub async fn refresh_taskcache(remotes: Vec<Remote>) -> Result<(), Error> {
-    let cache = super::get_cache()?;
+    let cache = super::get_cache();
     let cache_state = cache.read_state();
 
     let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
 
     if !all_tasks.is_empty() {
-        update_task_cache(cache, all_tasks, update_state_for_remote, HashMap::new()).await?;
+        update_task_cache(all_tasks, update_state_for_remote, HashMap::new()).await?;
     }
 
     Ok(())
@@ -215,7 +215,7 @@ pub async fn refresh_taskcache(remotes: Vec<Remote>) -> Result<(), Error> {
 /// without any prior task archive rotation.
 pub async fn init_cache() -> Result<(), Error> {
     tokio::task::spawn_blocking(|| {
-        let cache = super::get_cache()?;
+        let cache = super::get_cache();
         cache.write()?.init(proxmox_time::epoch_i64())?;
         Ok(())
     })
@@ -359,7 +359,7 @@ fn get_remotes_with_finished_tasks(
 /// Rotate the task cache if necessary.
 ///
 /// Returns Ok(true) the cache's files were rotated.
-async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+async fn rotate_cache(cache: &'static TaskCache) -> Result<bool, Error> {
     tokio::task::spawn_blocking(move || {
         cache.write()?.rotate(align_timestamp(
             proxmox_time::epoch_i64(),
@@ -370,12 +370,12 @@ async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
 }
 
 /// Apply the task cache journal.
-async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
+async fn apply_journal(cache: &'static TaskCache) -> Result<(), Error> {
     tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
 }
 
 /// Get a list of active tasks.
-async fn get_active_tasks(cache: TaskCache) -> Result<Vec<RemoteUpid>, Error> {
+async fn get_active_tasks(cache: &'static TaskCache) -> Result<Vec<RemoteUpid>, Error> {
     tokio::task::spawn_blocking(move || {
         let tasks: Vec<RemoteUpid> = cache
             .read()?
@@ -523,7 +523,6 @@ fn map_pbs_task(task: pbs_api_types::TaskListItem, remote: String) -> TaskCacheI
 
 /// Update task cache with results from tracked task polling & regular task fetching.
 async fn update_task_cache(
-    cache: TaskCache,
     new_tasks: Vec<TaskCacheItem>,
     update_state_for_remote: NodeFetchSuccessMap,
     poll_results: HashMap<RemoteUpid, PollResult>,
@@ -539,7 +538,7 @@ async fn update_task_cache(
             })
             .collect();
 
-        cache
+        super::get_cache()
             .write()?
             .update(new_tasks, &update_state_for_remote, drop_tracked)?;
 
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index d97fa710..f0e3533a 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -175,14 +175,14 @@ pub struct TaskCache {
 }
 
 /// A [`TaskCache`] locked for writing.
-pub struct WritableTaskCache {
-    cache: TaskCache,
+pub struct WritableTaskCache<'a> {
+    cache: &'a TaskCache,
     lock: TaskCacheLock,
 }
 
 /// A [`TaskCache`] locked for reading.
-pub struct ReadableTaskCache {
-    cache: TaskCache,
+pub struct ReadableTaskCache<'a> {
+    cache: &'a TaskCache,
     lock: TaskCacheLock,
 }
 
@@ -223,7 +223,7 @@ impl NodeFetchSuccessMap {
     }
 }
 
-impl ReadableTaskCache {
+impl<'a> ReadableTaskCache<'a> {
     /// Iterate over cached tasks.
     pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
         self.cache
@@ -232,7 +232,7 @@ impl ReadableTaskCache {
     }
 }
 
-impl WritableTaskCache {
+impl<'a> WritableTaskCache<'a> {
     /// Create initial task archive files that can be backfilled with the
     /// recent task history from a remote.
     ///
@@ -796,7 +796,7 @@ impl TaskCache {
         uncompressed: u32,
         rotate_after: u64,
         journal_max_size: u64,
-    ) -> Result<Self, Error> {
+    ) -> Self {
         let base_path = path.into();
 
         let journal_path = base_path.join(WAL_FILENAME);
@@ -804,7 +804,7 @@ impl TaskCache {
         let active_path = base_path.join(ACTIVE_FILENAME);
         let lock_path = base_path.join(LOCKFILE_FILENAME);
 
-        Ok(Self {
+        Self {
             base_path,
             journal_path,
             state_path,
@@ -815,18 +815,18 @@ impl TaskCache {
             max_files,
             rotate_after,
             uncompressed_files: uncompressed,
-        })
+        }
     }
 
     /// Lock the cache for reading.
-    pub fn read(self) -> Result<ReadableTaskCache, Error> {
+    pub fn read(&self) -> Result<ReadableTaskCache<'_>, Error> {
         let lock = self.lock_impl(false)?;
 
         Ok(ReadableTaskCache { cache: self, lock })
     }
 
     /// Lock the cache for writing.
-    pub fn write(self) -> Result<WritableTaskCache, Error> {
+    pub fn write(&self) -> Result<WritableTaskCache<'_>, Error> {
         let lock = self.lock_impl(true)?;
 
         Ok(WritableTaskCache { cache: self, lock })
@@ -1400,8 +1400,7 @@ mod tests {
             1,
             0,
             DEFAULT_MAX_SIZE,
-        )
-        .unwrap();
+        );
 
         Ok((tmp_dir, cache))
     }
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (6 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
                   ` (6 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

Otherwise `get_tasks` runs into an endless loop if any of the archive
files is corrupted.

Also add a test case that verifies the absence of the issue.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 48 +++++++++++++++++++++++----
 1 file changed, 42 insertions(+), 6 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index f0e3533a..02be9ca1 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -1244,6 +1244,7 @@ where
 /// tasks are read line by line, without leading the entire archive file into memory.
 struct ArchiveIterator {
     iter: Lines<Box<dyn BufRead>>,
+    failed: bool,
 }
 
 impl ArchiveIterator {
@@ -1251,7 +1252,10 @@ impl ArchiveIterator {
     pub fn new(reader: Box<dyn BufRead>) -> Self {
         let lines = reader.lines();
 
-        Self { iter: lines }
+        Self {
+            iter: lines,
+            failed: false,
+        }
     }
 }
 
@@ -1259,11 +1263,18 @@ impl Iterator for ArchiveIterator {
     type Item = Result<TaskCacheItem, Error>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        self.iter.next().map(|result| {
-            result
-                .and_then(|line| Ok(serde_json::from_str(&line)?))
-                .map_err(Into::into)
-        })
+        if self.failed {
+            // Don't return any more items if we have failed reading a line once
+            return None;
+        }
+
+        self.iter.next().map(|line| match line {
+            Ok(line) => Some(serde_json::from_str(&line).context("failed to decode JSON")),
+            Err(err) => {
+                self.failed = true;
+                Some(Err(err).context("failed to read line"))
+            }
+        })?
     }
 }
 
@@ -1661,4 +1672,29 @@ mod tests {
         assert_eq!(first.iter().unwrap().unwrap().count(), 0);
         assert_eq!(second.iter().unwrap().unwrap().count(), 1);
     }
+
+    #[test]
+    fn corrupted_archive_file_does_not_lead_to_endless_loop() {
+        let (_tmp_dir, cache) = make_cache().unwrap();
+        let cache = cache.write().unwrap();
+
+        // Create compressed file
+        cache.new_file(1000, true).unwrap();
+        add_tasks(&cache, vec![task(1100, Some(1110))]).unwrap();
+        cache.apply_journal().unwrap();
+
+        assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 1);
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        let file = files.get(0).expect("there is one archive file");
+
+        // truncate existing compressed file, corrupting the zstd file header
+        let _file = OpenOptions::new()
+            .write(true)
+            .truncate(true)
+            .open(&file.path)
+            .expect("file truncated");
+
+        assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0);
+    }
 }
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (7 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
                   ` (5 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

This should make any kind of troubleshooting much simpler.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 26 ++++++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 02be9ca1..c7057786 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -1065,8 +1065,10 @@ impl Iterator for TaskArchiveIterator<'_> {
 struct InnerTaskArchiveIterator {
     /// Archive files to read.
     files: Vec<ArchiveFile>,
-    /// Archive iterator we are currently using, if any
+    /// Archive iterator we are currently using, if any.
     current: Option<ArchiveIterator>,
+    /// Archive file that is currently being iterated over, if any.
+    current_file: Option<ArchiveFile>,
 }
 
 impl InnerTaskArchiveIterator {
@@ -1075,6 +1077,7 @@ impl InnerTaskArchiveIterator {
         Self {
             files,
             current: None,
+            current_file: None,
         }
     }
 }
@@ -1088,9 +1091,22 @@ impl Iterator for InnerTaskArchiveIterator {
                 Some(current) => {
                     let next = current.next();
                     if next.is_some() {
-                        return next;
+                        return next.map(|res| {
+                            res.with_context(|| {
+                                format!(
+                                    "failed to read from {file}",
+                                    file = self
+                                        .current_file
+                                        .as_ref()
+                                        .expect("self.current and self.current_file are both Some")
+                                        .path
+                                        .display()
+                                )
+                            })
+                        });
                     } else {
                         self.current = None;
+                        self.current_file = None;
                     }
                 }
                 None => 'inner: loop {
@@ -1100,6 +1116,7 @@ impl Iterator for InnerTaskArchiveIterator {
                     match next_file.iter() {
                         Ok(Some(iter)) => {
                             self.current = Some(iter);
+                            self.current_file = Some(next_file);
                             break 'inner;
                         }
                         Ok(None) => {
@@ -1107,7 +1124,8 @@ impl Iterator for InnerTaskArchiveIterator {
                         }
                         Err(err) => {
                             log::error!(
-                                "could not create archive iterator while iteration over task archive files, skipping: {err:#}"
+                                "could not create task archive iterator for {file}, skipping: {err:#}",
+                                file = next_file.path.display()
                             )
                         }
                     }
@@ -1292,7 +1310,7 @@ impl JournalIterator {
             .flat_map(|task| match task {
                 Ok(task) => Some(task),
                 Err(err) => {
-                    log::error!("could not read task while iterating over archive file: {err:#}");
+                    log::error!("could not read task while iterating over journal file: {err:#}");
                     None
                 }
             })
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (8 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
                   ` (4 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

This is nicely abstracts handling of compressed and uncompressed archive
files behind a common interface, as well as atomically replacing any
existing archive file by using the usual tmpfile + rename approach.

The 'active' file has the same file structure as regular archive files,
so we can use ArchiveFileWriter for these as well.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 251 +++++++++++++++++---------
 1 file changed, 163 insertions(+), 88 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index c7057786..969a8124 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -12,6 +12,7 @@ use std::{
 
 use anyhow::{Context, Error};
 use serde::{Deserialize, Serialize};
+use zstd::Encoder;
 
 use proxmox_sys::fs::CreateOptions;
 
@@ -665,36 +666,13 @@ impl<'a> WritableTaskCache<'a> {
     /// The tasks are first written to a temporary file, which is then used
     /// to atomically replace the original.
     fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
-        let target = self.cache.active_path();
+        let file = self.cache.active_file();
+        let mut writer = file.writer(self.cache.create_options)?;
 
-        let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?;
-        let mut fd = BufWriter::new(fd);
-
-        Self::write_tasks(&mut fd, tasks)?;
-
-        if let Err(err) = fd.flush() {
-            log::error!("could not flush 'active' file: {err:#}");
-        }
-        drop(fd);
-
-        let res = std::fs::rename(&path, target).with_context(|| {
-            format!(
-                "failed to replace {} with {}",
-                target.display(),
-                path.display(),
-            )
-        });
-
-        if let Err(err) = res {
-            if let Err(err) = std::fs::remove_file(&path) {
-                log::error!(
-                    "failed to cleanup temporary file {}: {err:#}",
-                    path.display()
-                );
-            }
-
-            return Err(err);
-        }
+        writer.write_tasks(tasks)?;
+        writer
+            .finalize_and_replace()
+            .context("could not finalize 'active' file")?;
 
         Ok(())
     }
@@ -712,17 +690,7 @@ impl<'a> WritableTaskCache<'a> {
             return Ok(());
         }
 
-        // TODO: Might be nice to also move this to ArchiveFile
-        let (temp_file, temp_file_path) =
-            proxmox_sys::fs::make_tmp_file(&file.path, self.cache.create_options)?;
-        let mut writer = if file.compressed {
-            let encoder =
-                zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?
-                    .auto_finish();
-            Box::new(BufWriter::new(encoder)) as Box<dyn Write>
-        } else {
-            Box::new(BufWriter::new(temp_file)) as Box<dyn Write>
-        };
+        let mut writer = file.writer(self.cache.create_options)?;
 
         let archive_iter = file
             .iter()?
@@ -742,43 +710,8 @@ impl<'a> WritableTaskCache<'a> {
             .peekable();
         let task_iter = tasks.into_iter().peekable();
 
-        Self::write_tasks(&mut writer, MergeTaskIterator::new(archive_iter, task_iter))?;
-
-        if let Err(err) = writer.flush() {
-            log::error!("could not flush BufWriter for {file:?}: {err:#}");
-        }
-        drop(writer);
-
-        if let Err(err) = std::fs::rename(&temp_file_path, &file.path).with_context(|| {
-            format!(
-                "failed to replace {} with {}",
-                file.path.display(),
-                temp_file_path.display()
-            )
-        }) {
-            if let Err(err) = std::fs::remove_file(&temp_file_path) {
-                log::error!(
-                    "failed to clean up temporary file {}: {err:#}",
-                    temp_file_path.display()
-                );
-            }
-
-            return Err(err);
-        }
-
-        Ok(())
-    }
-
-    /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
-    /// The individual items are encoded as JSON followed by a newline.
-    fn write_tasks(
-        writer: &mut impl Write,
-        tasks: impl Iterator<Item = TaskCacheItem>,
-    ) -> Result<(), Error> {
-        for task in tasks {
-            serde_json::to_writer(&mut *writer, &task)?;
-            writeln!(writer)?;
-        }
+        writer.write_tasks(MergeTaskIterator::new(archive_iter, task_iter))?;
+        writer.finalize_and_replace()?;
 
         Ok(())
     }
@@ -872,21 +805,12 @@ impl TaskCache {
             GetTasks::All => {
                 let mut archive_files = self.archive_files(lock)?;
                 archive_files.reverse();
-
-                archive_files.push(ArchiveFile {
-                    path: self.active_path().into(),
-                    compressed: false,
-                    starttime: 0,
-                });
+                archive_files.push(self.active_file());
 
                 TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
             }
             GetTasks::Active => {
-                let archive_files = vec![ArchiveFile {
-                    path: self.active_path().into(),
-                    compressed: false,
-                    starttime: 0,
-                }];
+                let archive_files = vec![self.active_file()];
 
                 TaskArchiveIterator::new(None, archive_files, lock)
             }
@@ -977,6 +901,15 @@ impl TaskCache {
         self.base_path
             .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}"))
     }
+
+    /// Return the [`ArchiveFile`] instance for the `active` file.
+    fn active_file(&self) -> ArchiveFile {
+        ArchiveFile {
+            path: self.active_path().into(),
+            compressed: false,
+            starttime: 0,
+        }
+    }
 }
 
 /// Comparison function for sorting tasks.
@@ -1174,6 +1107,28 @@ impl ArchiveFile {
         Ok(Some(iter))
     }
 
+    /// Create an [`ArchiveFileWriter`] for this archive file.
+    fn writer(&self, create_options: CreateOptions) -> Result<ArchiveFileWriter<'_>, Error> {
+        let (temp_file, temp_file_path) =
+            proxmox_sys::fs::make_tmp_file(&self.path, create_options)?;
+
+        if self.compressed {
+            let encoder =
+                zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
+            Ok(ArchiveFileWriter {
+                archive_file: self,
+                writer: Some(ArchiveFileWriterInner::ZstdEncoder(encoder)),
+                path: temp_file_path,
+            })
+        } else {
+            Ok(ArchiveFileWriter {
+                archive_file: self,
+                writer: Some(ArchiveFileWriterInner::Plain(BufWriter::new(temp_file))),
+                path: temp_file_path,
+            })
+        }
+    }
+
     fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
         let uncompressed_file_path = &self.path;
 
@@ -1207,6 +1162,98 @@ impl ArchiveFile {
     }
 }
 
+/// Writer for an [`ArchiveFile`].
+///
+/// Instantiate this via [`ArchiveFile::writer`]. When calling [`Self::finalize_and_replace`], the
+/// original archive file is replaced with the contents that were written to the writer.
+struct ArchiveFileWriter<'a> {
+    archive_file: &'a ArchiveFile,
+    // writer: Option<Box<dyn Write>>,
+    writer: Option<ArchiveFileWriterInner<'a>>,
+    path: PathBuf,
+}
+
+enum ArchiveFileWriterInner<'a> {
+    ZstdEncoder(Encoder<'a, File>),
+    Plain(BufWriter<File>),
+}
+
+impl<'a> ArchiveFileWriter<'a> {
+    /// Write the contents of an iterator of [`TaskCacheItem`] to this archive file.
+    /// The individual items are encoded as JSON followed by a newline.
+    fn write_tasks(&mut self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
+        if let Some(writer) = self.writer.as_mut() {
+            let writer = match writer {
+                ArchiveFileWriterInner::ZstdEncoder(encoder) => encoder as &mut dyn Write,
+                ArchiveFileWriterInner::Plain(buf_writer) => buf_writer as &mut dyn Write,
+            };
+
+            for task in tasks {
+                serde_json::to_writer(&mut *writer, &task)?;
+                writeln!(writer)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Call [`ArchiveFileWriter::finalize_and_replace`] or just drop `self`, if you don't need to
+    /// handle the error.
+    fn finalize_impl(&mut self) -> Result<(), Error> {
+        if let Some(writer) = self.writer.take() {
+            let file = match writer {
+                ArchiveFileWriterInner::ZstdEncoder(mut encoder) => {
+                    encoder.flush()?;
+                    encoder.finish()?
+                }
+                ArchiveFileWriterInner::Plain(mut buf_writer) => {
+                    buf_writer.flush()?;
+                    buf_writer.into_inner()?
+                }
+            };
+
+            file.sync_all()
+                .with_context(|| format!("could not fsync {}", self.path.display()))?;
+
+            if let Err(err) =
+                std::fs::rename(&self.path, &self.archive_file.path).with_context(|| {
+                    format!(
+                        "failed to replace {} with {}",
+                        self.path.display(),
+                        self.archive_file.path.display()
+                    )
+                })
+            {
+                if let Err(err) = std::fs::remove_file(&self.path) {
+                    log::error!(
+                        "failed to clean up temporary file {path}: {err:#}",
+                        path = self.path.display()
+                    );
+                }
+
+                return Err(err);
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Finalize and drop the internal writer and replace the original
+    /// archive file with the new contents. This method consumes `self`.
+    ///
+    /// [`ArchiveFileWriter::drop`] also finalizes the writer properly, but does not
+    /// allow to catch errors.
+    fn finalize_and_replace(mut self) -> Result<(), Error> {
+        self.finalize_impl()
+    }
+}
+
+impl<'a> Drop for ArchiveFileWriter<'a> {
+    fn drop(&mut self) {
+        let _ = self.finalize_impl();
+    }
+}
+
 /// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
 /// from both iterators sorted.
 /// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1715,4 +1762,32 @@ mod tests {
 
         assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0);
     }
+
+    #[test]
+    fn temporary_file_when_writer_lives_is_not_enumerated() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+        cache.uncompressed_files = 1;
+        cache.max_files = 2;
+
+        let cache = cache.write().unwrap();
+        cache.init(1000).unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+
+        assert_eq!(files.len(), 2);
+
+        let _writer = files
+            .get(0)
+            .unwrap()
+            .writer(cache.cache.create_options)
+            .unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        assert_eq!(
+            files.len(),
+            2,
+            "the tmp file created by the writer should not show up in the list of archive files"
+        );
+    }
 }
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (9 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
                   ` (3 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

As a nice side-effect this ensures that any zstd-compressed files are
initialized atomically, making it impossible to have a missing zstd
header if the process is terminated during the `new_file` call.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 969a8124..3677de2c 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -256,19 +256,16 @@ impl<'a> WritableTaskCache<'a> {
     fn new_file(&self, now: i64, compress: bool) -> Result<ArchiveFile, Error> {
         let path = self.cache.archive_path(now, compress);
 
-        let mut file = File::create(&path)?;
-        self.cache.create_options.apply_to(&mut file, &path)?;
-
-        if compress {
-            let encoder = zstd::stream::write::Encoder::new(file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
-            encoder.finish()?;
-        }
-
-        Ok(ArchiveFile {
+        let file = ArchiveFile {
             path,
             compressed: compress,
             starttime: now,
-        })
+        };
+
+        let writer = file.writer(self.cache.create_options)?;
+        writer.finalize_and_replace()?;
+
+        Ok(file)
     }
 
     /// Rotate task archive if the the newest archive file is older than `rotate_after`.
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (10 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
                   ` (2 subsequent siblings)
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

Merging entries from the journal into the individual task archive files
involves reading all existing entries from the archive file. If we
encounter any error during that process, we now reset the cutoff
timestamps for all remotes to the lower cutoff for the archive file,
triggering a rebuild of this file the next time tasks are fetched.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 172 +++++++++++++++++++++++---
 1 file changed, 153 insertions(+), 19 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 3677de2c..c6a6c777 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -531,7 +531,7 @@ impl<'a> WritableTaskCache<'a> {
 
         let count = tasks.len();
 
-        self.merge_tasks_into_archive(tasks)?;
+        let corrupted_archive_files = self.merge_tasks_into_archive(tasks)?;
 
         // truncate the journal file
         OpenOptions::new()
@@ -540,6 +540,10 @@ impl<'a> WritableTaskCache<'a> {
             .open(journal_path)
             .context("failed to truncate journal file")?;
 
+        if !corrupted_archive_files.is_empty() {
+            self.request_repair(&corrupted_archive_files)?;
+        }
+
         log::info!(
             "committed {count} tasks in {:.3}.s to task cache archive",
             start.elapsed().as_secs_f32()
@@ -551,7 +555,10 @@ impl<'a> WritableTaskCache<'a> {
     /// Merge a list of *finished* tasks into the remote task archive files.
     /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
     /// timestamp, ascending by UPID).
-    fn merge_tasks_into_archive(&self, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
+    fn merge_tasks_into_archive(
+        &self,
+        tasks: Vec<TaskCacheItem>,
+    ) -> Result<Vec<CorruptedArchiveFile>, Error> {
         debug_assert!(
             tasks
                 .iter()
@@ -568,6 +575,8 @@ impl<'a> WritableTaskCache<'a> {
         let mut current = files.next();
         let mut next = files.peek();
 
+        let mut corrupted = Vec::new();
+
         let mut tasks_for_current_file = Vec::new();
 
         // Tasks are sorted youngest to oldest (biggest start time first)
@@ -581,13 +590,18 @@ impl<'a> WritableTaskCache<'a> {
                     // The next entry's cut-off is larger then the task's start time, that means
                     // we want to finalized the current file by merging all tasks that
                     // should be stored in it...
-                    self.merge_single_archive_file(
-                        std::mem::take(&mut tasks_for_current_file),
-                        current,
-                    )
-                    .with_context(|| {
-                        format!("failed to merge archive file {}", current.path.display())
-                    })?;
+                    if self
+                        .merge_single_archive_file(
+                            std::mem::take(&mut tasks_for_current_file),
+                            current,
+                        )
+                        .with_context(|| {
+                            format!("failed to merge archive file {}", current.path.display())
+                        })?
+                        == ArchiveFileState::Corrupted
+                    {
+                        corrupted.push(CorruptedArchiveFile(current.clone()))
+                    };
                 }
 
                 // ... and the `current` file to the next entry.
@@ -605,10 +619,52 @@ impl<'a> WritableTaskCache<'a> {
 
         // Merge tasks for the last file.
         if let Some(current) = current {
-            self.merge_single_archive_file(tasks_for_current_file, current)
+            if self
+                .merge_single_archive_file(tasks_for_current_file, current)
                 .with_context(|| {
                     format!("failed to merge archive file {}", current.path.display())
-                })?;
+                })?
+                == ArchiveFileState::Corrupted
+            {
+                corrupted.push(CorruptedArchiveFile(current.clone()));
+            }
+        }
+
+        Ok(corrupted)
+    }
+
+    /// Repair archive files that were detected as corrupted when accessed.
+    ///
+    /// This resets the task fetching cutoff timestamps to the lower bound of the
+    /// oldest corrupted file, so its contents are re-fetched from the remotes on
+    /// the next fetch cycle.
+    pub fn request_repair(
+        &self,
+        corrupted_archive_files: &[CorruptedArchiveFile],
+    ) -> Result<(), Error> {
+        let reset_cutoff = corrupted_archive_files.iter().map(|c| {
+            log::warn!(
+                "{} seems to be corrupted, attempting recovery by resetting task cutoff timestamps",
+                c.0.path.display()
+            );
+            c.0.starttime
+        }).min();
+
+        if let Some(reset_cutoff) = reset_cutoff {
+            log::warn!(
+                "resetting task cutoff timestamp in state file to {reset_cutoff} to recover task archive"
+            );
+            let mut state = self.read_state();
+
+            for remote in state.remote_state.values_mut() {
+                for node in remote.node_state.values_mut() {
+                    node.cutoff = node.cutoff.min(reset_cutoff);
+                }
+            }
+
+            self.write_state(state).context(
+                "failed to write state when resetting cutoff after archive file corruption event",
+            )?;
         }
 
         Ok(())
@@ -682,9 +738,11 @@ impl<'a> WritableTaskCache<'a> {
         &self,
         tasks: Vec<TaskCacheItem>,
         file: &ArchiveFile,
-    ) -> Result<(), Error> {
+    ) -> Result<ArchiveFileState, Error> {
+        let mut file_state = ArchiveFileState::Valid;
+
         if tasks.is_empty() {
-            return Ok(());
+            return Ok(file_state);
         }
 
         let mut writer = file.writer(self.cache.create_options)?;
@@ -700,6 +758,7 @@ impl<'a> WritableTaskCache<'a> {
             .flat_map(|item| match item {
                 Ok(item) => Some(item),
                 Err(err) => {
+                    file_state = ArchiveFileState::Corrupted;
                     log::error!("could not read task cache item while merging: {err:#}");
                     None
                 }
@@ -710,7 +769,7 @@ impl<'a> WritableTaskCache<'a> {
         writer.write_tasks(MergeTaskIterator::new(archive_iter, task_iter))?;
         writer.finalize_and_replace()?;
 
-        Ok(())
+        Ok(file_state)
     }
 }
 
@@ -1251,6 +1310,20 @@ impl<'a> Drop for ArchiveFileWriter<'a> {
     }
 }
 
+/// Marker to signal the state of an [`ArchiveFile`].
+#[derive(Debug, Clone, PartialEq, Eq)]
+enum ArchiveFileState {
+    /// The archive file is valid and contains no errors.
+    Valid,
+    /// The archive file is corrupted.
+    Corrupted,
+}
+
+/// Newtype wrapper to make the return type of [`WritableTaskCache::merge_tasks_into_archive`]
+/// more expressive.
+///
+pub struct CorruptedArchiveFile(ArchiveFile);
+
 /// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
 /// from both iterators sorted.
 /// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1478,6 +1551,14 @@ mod tests {
         Ok((tmp_dir, cache))
     }
 
+    fn truncate_archive_file(file: &ArchiveFile) {
+        OpenOptions::new()
+            .write(true)
+            .truncate(true)
+            .open(&file.path)
+            .expect("file truncated");
+    }
+
     #[test]
     fn test_add_tasks() -> Result<(), Error> {
         let (_tmp_dir, cache) = make_cache().unwrap();
@@ -1751,11 +1832,7 @@ mod tests {
         let file = files.get(0).expect("there is one archive file");
 
         // truncate existing compressed file, corrupting the zstd file header
-        let _file = OpenOptions::new()
-            .write(true)
-            .truncate(true)
-            .open(&file.path)
-            .expect("file truncated");
+        truncate_archive_file(file);
 
         assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0);
     }
@@ -1787,4 +1864,61 @@ mod tests {
             "the tmp file created by the writer should not show up in the list of archive files"
         );
     }
+
+    #[test]
+    fn reset_cutoff_timestamp_after_corruption() {
+        let testcases = [
+            (
+                "corruption of task file with lower bound of 900 determines the new cutoff",
+                vec![
+                    task(810, Some(811)),
+                    task(910, Some(911)),
+                    task(1010, Some(1011)),
+                ],
+                900,
+            ),
+            (
+                "older active task determines cutoff",
+                vec![
+                    task(810, None),
+                    task(910, Some(920)),
+                    task(1010, Some(1011)),
+                ],
+                810,
+            ),
+        ];
+
+        for (description, initial_tasks, expected_cutoff) in testcases {
+            let (_tmp_dir, mut cache) = make_cache().unwrap();
+            cache.rotate_after = 100;
+            let cache = cache.write().unwrap();
+
+            cache.init(1000).unwrap();
+
+            add_tasks(&cache, initial_tasks).unwrap();
+
+            cache.apply_journal().unwrap();
+
+            let files = cache.cache.archive_files(&cache.lock).unwrap();
+            let file = files.get(1).expect("there is one archive file");
+
+            assert_eq!(file.starttime, 900);
+            assert!(
+                file.compressed,
+                "the file we are about to corrupt should be compressed"
+            );
+
+            // truncate existing compressed file, corrupting the zstd file header
+            truncate_archive_file(file);
+
+            add_tasks(&cache, vec![task(920, Some(930))]).unwrap();
+
+            // When applying the journal and writing to the corrupted file, the cache
+            // should notice that something is broken and reset the cutoff timestamp to the
+            // start of the affected archive file, resulting in a repair in the
+            // next fetch cycle
+            cache.apply_journal().unwrap();
+            assert_eq!(get_cutoff(&cache), expected_cutoff, "{}", description);
+        }
+    }
 }
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (11 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

Same as when we apply the journal, we use the opportunity to check for
any corruption and then reset the cut-off timestamps if any errors
occurred when reading from the archive file.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 132 ++++++++++++++++++++------
 1 file changed, 101 insertions(+), 31 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index c6a6c777..dbe71dc9 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -302,16 +302,26 @@ impl<'a> WritableTaskCache<'a> {
                 .with_context(|| format!("failed to remove {}", to_remove.path.display()))?;
         }
 
+        let mut corrupted = Vec::new();
+
         for file in archive_files
             .iter_mut()
             .skip(self.cache.uncompressed_files as usize)
         {
-            if !file.compressed {
-                file.compress(self.cache.create_options)
-                    .with_context(|| format!("failed to compress {}", file.path.display()))?;
+            if !file.compressed
+                && self
+                    .compress_archive_file(file)
+                    .with_context(|| format!("failed to compress {}", file.path.display()))?
+                    == ArchiveFileState::Corrupted
+            {
+                corrupted.push(CorruptedArchiveFile(file.clone()));
             }
         }
 
+        if !corrupted.is_empty() {
+            self.request_repair(&corrupted)?;
+        }
+
         Ok(did_rotate)
     }
 
@@ -771,6 +781,53 @@ impl<'a> WritableTaskCache<'a> {
 
         Ok(file_state)
     }
+
+    /// Compress an existing [`ArchiveFile`].
+    ///
+    /// This will return `Ok(ArchiveFileState::Corrupted)` if any JSON record could not be
+    /// deserialized. This can be used by the caller to handle corruption properly, e.g.
+    /// by triggering a rebuild of the affected archive file at the next opportunity.
+    fn compress_archive_file(&self, file: &ArchiveFile) -> Result<ArchiveFileState, Error> {
+        let mut new_file = file.clone();
+        new_file.set_compressed(true);
+
+        let mut file_state = ArchiveFileState::Valid;
+
+        let archive_iter = file
+            .iter()?
+            .with_context(|| {
+                format!(
+                    "task archive file '{}' disappeared while merging tasks",
+                    file.path.display()
+                )
+            })?
+            .flat_map(|item| match item {
+                Ok(item) => Some(item),
+                Err(err) => {
+                    file_state = ArchiveFileState::Corrupted;
+                    log::error!(
+                        "could not read task cache item while compressing '{path}': {err:#}",
+                        path = file.path.display()
+                    );
+                    None
+                }
+            });
+
+        let mut writer = new_file.writer(self.cache.create_options)?;
+
+        writer.write_tasks(archive_iter)?;
+
+        if let Err(err) = writer.finalize_and_replace() {
+            log::error!("could not replace {file:?}: {err:#}");
+
+            // don't remove old, uncompressed file if we were not able to put the new file in place
+            return Err(err);
+        }
+
+        std::fs::remove_file(&file.path).context("failed to remove uncompressed archive file")?;
+
+        Ok(file_state)
+    }
 }
 
 impl TaskCache {
@@ -1185,36 +1242,20 @@ impl ArchiveFile {
         }
     }
 
-    fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
-        let uncompressed_file_path = &self.path;
+    /// Set the `compressed` flag and update the path.
+    ///
+    /// This will *not* compress the existing file contents.
+    fn set_compressed(&mut self, compressed: bool) {
+        let suffix = if compressed {
+            ZSTD_EXTENSION_WITH_DOT
+        } else {
+            ""
+        };
 
-        let (temp_file, temp_file_path) =
-            proxmox_sys::fs::make_tmp_file(uncompressed_file_path, options)
-                .context("failed to create temporary file")?;
+        self.compressed = compressed;
 
-        let uncompressed_file =
-            File::open(uncompressed_file_path).context("failed to open uncompressed file")?;
-
-        zstd::stream::copy_encode(
-            uncompressed_file,
-            temp_file,
-            zstd::DEFAULT_COMPRESSION_LEVEL,
-        )
-        .context("zstd::stream::copy_encode failed")?;
-
-        let mut new_path_for_compressed = uncompressed_file_path.clone();
-        new_path_for_compressed
-            .set_extension(format!("{}{ZSTD_EXTENSION_WITH_DOT}", self.starttime));
-
-        std::fs::rename(&temp_file_path, &new_path_for_compressed)
-            .context("failed to move compressed task archive file")?;
-        std::fs::remove_file(uncompressed_file_path)
-            .context("failed to remove uncompressed archive file")?;
-
-        self.path = new_path_for_compressed;
-        self.compressed = true;
-
-        Ok(())
+        self.path
+            .set_extension(format!("{}{suffix}", self.starttime));
     }
 }
 
@@ -1921,4 +1962,33 @@ mod tests {
             assert_eq!(get_cutoff(&cache), expected_cutoff, "{}", description);
         }
     }
+
+    #[test]
+    fn reset_cutoff_after_compressing_corrupted() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+        cache.uncompressed_files = 1;
+        cache.max_files = 2;
+
+        let cache = cache.write().unwrap();
+        cache.init(1000).unwrap();
+
+        add_tasks(&cache, vec![task(1010, Some(1011)), task(1020, Some(1021))]).unwrap();
+        cache.apply_journal().unwrap();
+
+        assert_eq!(get_cutoff(&cache), 1020);
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+        let first = files.get(0).unwrap();
+
+        std::fs::write(&first.path, "some invalid\nthis is no valid json").unwrap();
+
+        cache.rotate(1100).unwrap();
+
+        assert_eq!(
+            get_cutoff(&cache),
+            1000,
+            "cutoff timestamp should be reset to the lower bound of the corrupted archive file"
+        );
+    }
 }
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (12 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

While iterating over tasks, we now memorize any files that produced
errors, and if there are any, we reset the cut-off timestamps to the
lower bounds of the affected archive files. This should repair the
archive during the next regular update cycle.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/mod.rs        |  15 +++
 server/src/remote_tasks/task_cache.rs | 167 +++++++++++++++++++++-----
 2 files changed, 155 insertions(+), 27 deletions(-)

diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 6a340dba..5fba867d 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -157,6 +157,21 @@ pub async fn get_tasks(
             .take(limit)
             .collect();
 
+        let corrupted = cache.take_corrupted_files();
+        drop(cache);
+
+        if !corrupted.is_empty() {
+            // If we noticed corrupted archive files while iterating, drop the read lock,
+            // acquire the write lock and reset the fetch cutoff so the affected files are
+            // repaired on the next fetch cycle.
+            if let Err(err) = get_cache()
+                .write()
+                .and_then(|cache| cache.request_repair(&corrupted))
+            {
+                log::error!("failed to request repair of corrupted task archive file: {err:#}");
+            }
+        }
+
         Ok(returned_tasks)
     })
     .await?
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index dbe71dc9..23238cca 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -1,5 +1,6 @@
 //! Task cache implementation, based on rotating files.
 use std::{
+    cell::RefCell,
     cmp::Ordering,
     collections::{HashMap, HashSet},
     fs::{File, OpenOptions},
@@ -7,6 +8,7 @@ use std::{
     iter::Peekable,
     os::unix::fs::MetadataExt,
     path::{Path, PathBuf},
+    rc::Rc,
     time::{Duration, Instant},
 };
 
@@ -185,6 +187,10 @@ pub struct WritableTaskCache<'a> {
 pub struct ReadableTaskCache<'a> {
     cache: &'a TaskCache,
     lock: TaskCacheLock,
+    /// Archive files found to be corrupted while iterating over tasks. Populated
+    /// as tasks are read (see [`InnerTaskArchiveIterator`]); query it via
+    /// [`Self::take_corrupted_files`] once iteration is done to trigger a repair.
+    corrupted: CorruptedArchiveFiles,
 }
 
 /// Lock for the cache.
@@ -226,11 +232,26 @@ impl NodeFetchSuccessMap {
 
 impl<'a> ReadableTaskCache<'a> {
     /// Iterate over cached tasks.
+    ///
+    /// Archive file corruption encountered while iterating is recorded and can be
+    /// queried via [`Self::take_corrupted_files`] once iteration is done, so the
+    /// caller can trigger a repair.
     pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
         self.cache
-            .get_tasks_impl(mode, &self.lock)
+            .get_tasks_impl(mode, &self.lock, self.corrupted.clone())
             .context("failed to create task archive iterator")
     }
+
+    /// Take the list of archive files that were found to be corrupted while
+    /// iterating over tasks returned by [`Self::get_tasks`].
+    ///
+    /// Corruption is detected lazily while iterating, so this only reflects the
+    /// files that were actually read; call it after the iterator has been consumed.
+    /// Pass the result to [`WritableTaskCache::repair_corrupted_files`] to reset the
+    /// fetch cutoff and repair the archive on the next fetch cycle.
+    pub fn take_corrupted_files(&self) -> Vec<CorruptedArchiveFile> {
+        self.corrupted.take()
+    }
 }
 
 impl<'a> WritableTaskCache<'a> {
@@ -326,9 +347,12 @@ impl<'a> WritableTaskCache<'a> {
     }
 
     /// Iterate over cached tasks.
+    ///
+    /// Corruption is not tracked here: the writable cache already repairs the
+    /// archive via its write paths (`apply_journal`, `rotate`).
     pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
         self.cache
-            .get_tasks_impl(mode, &self.lock)
+            .get_tasks_impl(mode, &self.lock, Default::default())
             .context("failed to create task archive iterator")
     }
 
@@ -868,7 +892,11 @@ impl TaskCache {
     pub fn read(&self) -> Result<ReadableTaskCache<'_>, Error> {
         let lock = self.lock_impl(false)?;
 
-        Ok(ReadableTaskCache { cache: self, lock })
+        Ok(ReadableTaskCache {
+            cache: self,
+            lock,
+            corrupted: Default::default(),
+        })
     }
 
     /// Lock the cache for writing.
@@ -911,6 +939,7 @@ impl TaskCache {
         &self,
         mode: GetTasks,
         lock: &'a TaskCacheLock,
+        corrupted: CorruptedArchiveFiles,
     ) -> Result<TaskArchiveIterator<'a>, Error> {
         let journal_file = self.journal_path();
 
@@ -920,19 +949,19 @@ impl TaskCache {
                 archive_files.reverse();
                 archive_files.push(self.active_file());
 
-                TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock, corrupted)
             }
             GetTasks::Active => {
                 let archive_files = vec![self.active_file()];
 
-                TaskArchiveIterator::new(None, archive_files, lock)
+                TaskArchiveIterator::new(None, archive_files, lock, corrupted)
             }
             #[cfg(test)]
             GetTasks::Archived => {
                 let mut files = self.archive_files(lock)?;
                 files.reverse();
 
-                TaskArchiveIterator::new(Some(journal_file.into()), files, lock)
+                TaskArchiveIterator::new(Some(journal_file.into()), files, lock, corrupted)
             }
         }
     }
@@ -1060,8 +1089,9 @@ impl<'a> TaskArchiveIterator<'a> {
         journal: Option<PathBuf>,
         files: Vec<ArchiveFile>,
         lock: &'a TaskCacheLock,
+        corrupted: CorruptedArchiveFiles,
     ) -> Result<Self, Error> {
-        let inner = InnerTaskArchiveIterator::new(files)
+        let inner = InnerTaskArchiveIterator::new(files, corrupted)
             .filter_map(|res| match res {
                 Ok(task) => Some(task),
                 Err(err) => {
@@ -1115,15 +1145,18 @@ struct InnerTaskArchiveIterator {
     current: Option<ArchiveIterator>,
     /// Archive file that is currently being iterated over, if any.
     current_file: Option<ArchiveFile>,
+    /// Archive files found to be corrupted while iterating.
+    corrupted: CorruptedArchiveFiles,
 }
 
 impl InnerTaskArchiveIterator {
     /// Create a new task archive iterator.
-    pub fn new(files: Vec<ArchiveFile>) -> Self {
+    pub fn new(files: Vec<ArchiveFile>, corrupted: CorruptedArchiveFiles) -> Self {
         Self {
             files,
             current: None,
             current_file: None,
+            corrupted,
         }
     }
 }
@@ -1134,27 +1167,41 @@ impl Iterator for InnerTaskArchiveIterator {
     fn next(&mut self) -> Option<Self::Item> {
         loop {
             match &mut self.current {
-                Some(current) => {
-                    let next = current.next();
-                    if next.is_some() {
-                        return next.map(|res| {
-                            res.with_context(|| {
-                                format!(
-                                    "failed to read from {file}",
-                                    file = self
-                                        .current_file
-                                        .as_ref()
-                                        .expect("self.current and self.current_file are both Some")
-                                        .path
-                                        .display()
-                                )
-                            })
-                        });
-                    } else {
+                Some(current) => match current.next() {
+                    Some(res) => {
+                        if res.is_err() {
+                            // The archive file is corrupted. Record it so a repair can be
+                            // triggered by resetting the fetch cutoff to the file's lower
+                            // bound. The active file (starttime 0) is rewritten on every
+                            // update and needs no cutoff-based repair, so skip it.
+                            if let Some(file) = self
+                                .current_file
+                                .as_ref()
+                                .filter(|file| file.starttime != 0)
+                            {
+                                self.corrupted
+                                    .borrow_mut()
+                                    .push(CorruptedArchiveFile(file.clone()));
+                            }
+                        }
+
+                        return Some(res.with_context(|| {
+                            format!(
+                                "failed to read from {file}",
+                                file = self
+                                    .current_file
+                                    .as_ref()
+                                    .expect("self.current and self.current_file are both Some")
+                                    .path
+                                    .display()
+                            )
+                        }));
+                    }
+                    None => {
                         self.current = None;
                         self.current_file = None;
                     }
-                }
+                },
                 None => 'inner: loop {
                     // Returns `None` if no more files are available, stopping iteration.
                     let next_file = self.files.pop()?;
@@ -1362,9 +1409,14 @@ enum ArchiveFileState {
 
 /// Newtype wrapper to make the return type of [`WritableTaskCache::merge_tasks_into_archive`]
 /// more expressive.
-///
 pub struct CorruptedArchiveFile(ArchiveFile);
 
+/// Shared collection of archive files found to be corrupted while iterating.
+///
+/// This is populated during read access (see [`InnerTaskArchiveIterator`]) and shared with the
+/// [`ReadableTaskCache`], which triggers a repair for the collected files when it is dropped.
+type CorruptedArchiveFiles = Rc<RefCell<Vec<CorruptedArchiveFile>>>;
+
 /// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
 /// from both iterators sorted.
 /// The two iterators are expected to be sorted descendingly based on the task's starttime and
@@ -1963,6 +2015,67 @@ mod tests {
         }
     }
 
+    #[test]
+    fn reset_cutoff_after_corruption_during_read() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+
+        // Populate the archive and corrupt one of the compressed files. Scope the
+        // writable cache so its exclusive lock is released before we read.
+        {
+            let writable = cache.write().unwrap();
+            writable.init(1000).unwrap();
+
+            add_tasks(
+                &writable,
+                vec![
+                    task(810, Some(811)),
+                    task(910, Some(911)),
+                    task(1010, Some(1011)),
+                ],
+            )
+            .unwrap();
+            writable.apply_journal().unwrap();
+
+            assert_eq!(get_cutoff(&writable), 1010);
+
+            let files = writable.cache.archive_files(&writable.lock).unwrap();
+            let file = files.get(1).expect("there is a second archive file");
+            assert_eq!(file.starttime, 900);
+
+            // truncate existing compressed file, corrupting the zstd file header
+            truncate_archive_file(file);
+        }
+
+        // A read access should notice the corruption while iterating. The caller can
+        // then query the corrupted files and, after releasing the read lock, reset the
+        // cutoff timestamp to the lower bound of the corrupted file so the next fetch
+        // cycle repairs it.
+        let corrupted = {
+            let readable = cache.read().unwrap();
+            let count = readable.get_tasks(GetTasks::All).unwrap().count();
+
+            // The task in the corrupted file (starttime 910) is skipped.
+            assert_eq!(count, 2);
+
+            readable.take_corrupted_files()
+        };
+
+        assert_eq!(corrupted.len(), 1);
+
+        {
+            let writable = cache.write().unwrap();
+
+            writable.request_repair(&corrupted).unwrap();
+
+            assert_eq!(
+                get_cutoff(&writable),
+                900,
+                "cutoff should be reset to the corrupted file's lower bound after a read"
+            );
+        }
+    }
+
     #[test]
     fn reset_cutoff_after_compressing_corrupted() {
         let (_tmp_dir, mut cache) = make_cache().unwrap();
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file'
  2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
                   ` (13 preceding siblings ...)
  2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
@ 2026-07-02  9:22 ` Lukas Wagner
  14 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:22 UTC (permalink / raw)
  To: pdm-devel

`compress_archive_file` cannot be made fully atomic, therefore we must
consider the case where we have both, the old, uncompressed file and the
new, compressed file in the archive directory.

This is handled by ignoring duplicates files when reading/writing
from/into the task archive, and by actively cleaning up the duplicates
during `rotate`.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/remote_tasks/task_cache.rs | 97 +++++++++++++++++++++++++--
 1 file changed, 93 insertions(+), 4 deletions(-)

diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 23238cca..2110037b 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -289,13 +289,34 @@ impl<'a> WritableTaskCache<'a> {
         Ok(file)
     }
 
-    /// Rotate task archive if the the newest archive file is older than `rotate_after`.
+    /// Rotate task archive if the newest archive file is older than `rotate_after`.
     ///
     /// The oldest archive files are removed if the total number of archive files exceeds
     /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
+    ///
+    /// If there are any duplicate archive files (can happen if `compress` is interrupted at the
+    /// wrong time), the uncompressed version is deleted.
     pub fn rotate(&self, now: i64) -> Result<bool, Error> {
         let mut did_rotate = false;
-        let mut archive_files = self.cache.archive_files(&self.lock)?;
+        let archive_files_with_dupes = self.cache.archive_files_with_dupes(&self.lock)?;
+
+        let mut deduped: Vec<ArchiveFile> = Vec::new();
+
+        for file in archive_files_with_dupes {
+            if deduped.iter().any(|e| e.starttime == file.starttime) {
+                // Found dupe, remove it.
+                if let Err(err) = std::fs::remove_file(&file.path) {
+                    log::error!(
+                        "could not clean up duplicate archive file '{path}': {err}",
+                        path = file.path.display()
+                    )
+                }
+            } else {
+                deduped.push(file);
+            }
+        }
+
+        let mut archive_files = deduped;
 
         let mut start_new_file = |files: &mut Vec<ArchiveFile>| -> Result<(), Error> {
             let new_file = self.new_file(now, self.cache.uncompressed_files == 0)?;
@@ -848,6 +869,11 @@ impl<'a> WritableTaskCache<'a> {
             return Err(err);
         }
 
+        // If we crash here or if `remove_file` fails, we might end up with the original,
+        // uncompressed file as well as the new, compressed file. `archive_files` filters
+        // duplicates out, so we should never read from the duplicated file. Any left-over
+        // duplicates are cleaned during `rotate`.
+
         std::fs::remove_file(&file.path).context("failed to remove uncompressed archive file")?;
 
         Ok(file_state)
@@ -970,7 +996,7 @@ impl TaskCache {
     /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
     /// first).
     /// The task archive should be locked for reading when calling this function.
-    fn archive_files(&self, _lock: &TaskCacheLock) -> Result<Vec<ArchiveFile>, Error> {
+    fn archive_files_with_dupes(&self, _lock: &TaskCacheLock) -> Result<Vec<ArchiveFile>, Error> {
         let mut names = Vec::new();
 
         for entry in std::fs::read_dir(&self.base_path)? {
@@ -983,7 +1009,23 @@ impl TaskCache {
             }
         }
 
-        names.sort_by_key(|e| -e.starttime);
+        names.sort_by(|a, b| {
+            b.starttime
+                .cmp(&a.starttime)
+                .then(b.compressed.cmp(&a.compressed))
+        });
+
+        Ok(names)
+    }
+
+    /// Returns a list of existing archive files, together with their respective
+    /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
+    /// first). Any duplicates (equal starttime but different compression status) are removed.
+    ///
+    /// The task archive should be locked for reading when calling this function.
+    fn archive_files(&self, lock: &TaskCacheLock) -> Result<Vec<ArchiveFile>, Error> {
+        let mut names = self.archive_files_with_dupes(lock)?;
+        names.dedup_by_key(|e| e.starttime);
 
         Ok(names)
     }
@@ -2104,4 +2146,51 @@ mod tests {
             "cutoff timestamp should be reset to the lower bound of the corrupted archive file"
         );
     }
+
+    /// Ensure that if for any reason there exist multiple files with the same start time (only
+    /// possible if one of them is compressed and the other one uncompressed), we only return one
+    /// of them. Also verify that compressed files are preferred.
+    #[test]
+    fn dedup_duplicate_archive_files() {
+        let (_tmp_dir, mut cache) = make_cache().unwrap();
+        cache.rotate_after = 100;
+        cache.uncompressed_files = 1;
+        cache.max_files = 3;
+
+        let cache = cache.write().unwrap();
+
+        cache.new_file(1000, false).unwrap();
+        cache.new_file(1000, true).unwrap();
+        cache.new_file(2000, true).unwrap();
+        cache.new_file(2000, false).unwrap();
+
+        let files = cache.cache.archive_files(&cache.lock).unwrap();
+
+        assert_eq!(files.len(), 2);
+        let first = files.get(0).unwrap();
+        let second = files.get(1).unwrap();
+
+        assert!(first.compressed);
+        assert_eq!(first.starttime, 2000);
+
+        assert!(second.compressed);
+        assert_eq!(second.starttime, 1000);
+
+        let files = cache.cache.archive_files_with_dupes(&cache.lock).unwrap();
+        assert_eq!(files.len(), 4);
+
+        cache.rotate(2050).unwrap();
+
+        let files = cache.cache.archive_files_with_dupes(&cache.lock).unwrap();
+        assert_eq!(files.len(), 2);
+
+        let first = files.get(0).unwrap();
+        let second = files.get(1).unwrap();
+
+        assert!(first.compressed);
+        assert_eq!(first.starttime, 2000);
+
+        assert!(second.compressed);
+        assert_eq!(second.starttime, 1000);
+    }
 }
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 17+ messages in thread

* Re: [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file
  2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
@ 2026-07-02  9:29   ` Lukas Wagner
  0 siblings, 0 replies; 17+ messages in thread
From: Lukas Wagner @ 2026-07-02  9:29 UTC (permalink / raw)
  To: Lukas Wagner, pdm-devel

Missing 'in' in the commit subject, sorry




^ permalink raw reply	[flat|nested] 17+ messages in thread

end of thread, other threads:[~2026-07-02  9:30 UTC | newest]

Thread overview: 17+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2026-07-02  9:22 [PATCH datacenter-manager/proxmox 00/15] task cache improvements (archive corruption handling, error handling) Lukas Wagner
2026-07-02  9:22 ` [PATCH proxmox 01/15] sys: fs: don't replace file extension make_tmp_file Lukas Wagner
2026-07-02  9:29   ` Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 02/15] task cache: fix missing cutoff state for PBS remotes Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 03/15] task cache: refresh task: don't apply journal if the archive was rotated Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 04/15] task cache: rotate: align timestamp for new files to UTC midnight Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 05/15] task cache: add test case for task cache rotation Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 06/15] task cache: pre-compute static paths during initialization Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 08/15] task cache: archive iterator: don't yield more items if reading from file failed Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 09/15] task cache: include archive file path in error log messages Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 11/15] task cache: use ArchiveFileWriter when creating new archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 12/15] task cache: trigger repair of corruption when applying journal Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 13/15] task cache: trigger repair of corruption when compressing archive files Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 14/15] task cache: trigger repair of corruption after read-accesses Lukas Wagner
2026-07-02  9:22 ` [PATCH datacenter-manager 15/15] task cache: handle potentially duplicated archive files after 'compress_archive_file' Lukas Wagner

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal