From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from gate001.proxmox.com (gate001.proxmox.com [IPv6:2a0f:8001:1:32::40]) by lore.proxmox.com (Postfix) with ESMTPS id DFDC91FF135 for ; Thu, 02 Jul 2026 11:23:34 +0200 (CEST) Received: from gate001.proxmox.com (localhost.localdomain [127.0.0.1]) by gate001.proxmox.com (Proxmox) with ESMTP id 89E9321475; Thu, 02 Jul 2026 11:23:29 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 10/15] task cache: introduce ArchiveFileWriter Date: Thu, 2 Jul 2026 11:22:53 +0200 Message-ID: <20260702092258.174740-11-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com> References: <20260702092258.174740-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1782984190484 X-SPAM-LEVEL: Spam detection results: 0 DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment (newer systems) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 3A5T35TXHTOPGE5TIXTKMOL26MRDNOHO X-Message-ID-Hash: 3A5T35TXHTOPGE5TIXTKMOL26MRDNOHO X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: This is nicely abstracts handling of compressed and uncompressed archive files behind a common interface, as well as atomically replacing any existing archive file by using the usual tmpfile + rename approach. The 'active' file has the same file structure as regular archive files, so we can use ArchiveFileWriter for these as well. Signed-off-by: Lukas Wagner --- server/src/remote_tasks/task_cache.rs | 251 +++++++++++++++++--------- 1 file changed, 163 insertions(+), 88 deletions(-) diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs index c7057786..969a8124 100644 --- a/server/src/remote_tasks/task_cache.rs +++ b/server/src/remote_tasks/task_cache.rs @@ -12,6 +12,7 @@ use std::{ use anyhow::{Context, Error}; use serde::{Deserialize, Serialize}; +use zstd::Encoder; use proxmox_sys::fs::CreateOptions; @@ -665,36 +666,13 @@ impl<'a> WritableTaskCache<'a> { /// The tasks are first written to a temporary file, which is then used /// to atomically replace the original. fn write_active_tasks(&self, tasks: impl Iterator) -> Result<(), Error> { - let target = self.cache.active_path(); + let file = self.cache.active_file(); + let mut writer = file.writer(self.cache.create_options)?; - let (fd, path) = proxmox_sys::fs::make_tmp_file(target, self.cache.create_options)?; - let mut fd = BufWriter::new(fd); - - Self::write_tasks(&mut fd, tasks)?; - - if let Err(err) = fd.flush() { - log::error!("could not flush 'active' file: {err:#}"); - } - drop(fd); - - let res = std::fs::rename(&path, target).with_context(|| { - format!( - "failed to replace {} with {}", - target.display(), - path.display(), - ) - }); - - if let Err(err) = res { - if let Err(err) = std::fs::remove_file(&path) { - log::error!( - "failed to cleanup temporary file {}: {err:#}", - path.display() - ); - } - - return Err(err); - } + writer.write_tasks(tasks)?; + writer + .finalize_and_replace() + .context("could not finalize 'active' file")?; Ok(()) } @@ -712,17 +690,7 @@ impl<'a> WritableTaskCache<'a> { return Ok(()); } - // TODO: Might be nice to also move this to ArchiveFile - let (temp_file, temp_file_path) = - proxmox_sys::fs::make_tmp_file(&file.path, self.cache.create_options)?; - let mut writer = if file.compressed { - let encoder = - zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)? - .auto_finish(); - Box::new(BufWriter::new(encoder)) as Box - } else { - Box::new(BufWriter::new(temp_file)) as Box - }; + let mut writer = file.writer(self.cache.create_options)?; let archive_iter = file .iter()? @@ -742,43 +710,8 @@ impl<'a> WritableTaskCache<'a> { .peekable(); let task_iter = tasks.into_iter().peekable(); - Self::write_tasks(&mut writer, MergeTaskIterator::new(archive_iter, task_iter))?; - - if let Err(err) = writer.flush() { - log::error!("could not flush BufWriter for {file:?}: {err:#}"); - } - drop(writer); - - if let Err(err) = std::fs::rename(&temp_file_path, &file.path).with_context(|| { - format!( - "failed to replace {} with {}", - file.path.display(), - temp_file_path.display() - ) - }) { - if let Err(err) = std::fs::remove_file(&temp_file_path) { - log::error!( - "failed to clean up temporary file {}: {err:#}", - temp_file_path.display() - ); - } - - return Err(err); - } - - Ok(()) - } - - /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`]. - /// The individual items are encoded as JSON followed by a newline. - fn write_tasks( - writer: &mut impl Write, - tasks: impl Iterator, - ) -> Result<(), Error> { - for task in tasks { - serde_json::to_writer(&mut *writer, &task)?; - writeln!(writer)?; - } + writer.write_tasks(MergeTaskIterator::new(archive_iter, task_iter))?; + writer.finalize_and_replace()?; Ok(()) } @@ -872,21 +805,12 @@ impl TaskCache { GetTasks::All => { let mut archive_files = self.archive_files(lock)?; archive_files.reverse(); - - archive_files.push(ArchiveFile { - path: self.active_path().into(), - compressed: false, - starttime: 0, - }); + archive_files.push(self.active_file()); TaskArchiveIterator::new(Some(journal_file.into()), archive_files, lock) } GetTasks::Active => { - let archive_files = vec![ArchiveFile { - path: self.active_path().into(), - compressed: false, - starttime: 0, - }]; + let archive_files = vec![self.active_file()]; TaskArchiveIterator::new(None, archive_files, lock) } @@ -977,6 +901,15 @@ impl TaskCache { self.base_path .join(format!("{ARCHIVE_FILENAME_PREFIX}{starttime}{suffix}")) } + + /// Return the [`ArchiveFile`] instance for the `active` file. + fn active_file(&self) -> ArchiveFile { + ArchiveFile { + path: self.active_path().into(), + compressed: false, + starttime: 0, + } + } } /// Comparison function for sorting tasks. @@ -1174,6 +1107,28 @@ impl ArchiveFile { Ok(Some(iter)) } + /// Create an [`ArchiveFileWriter`] for this archive file. + fn writer(&self, create_options: CreateOptions) -> Result, Error> { + let (temp_file, temp_file_path) = + proxmox_sys::fs::make_tmp_file(&self.path, create_options)?; + + if self.compressed { + let encoder = + zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?; + Ok(ArchiveFileWriter { + archive_file: self, + writer: Some(ArchiveFileWriterInner::ZstdEncoder(encoder)), + path: temp_file_path, + }) + } else { + Ok(ArchiveFileWriter { + archive_file: self, + writer: Some(ArchiveFileWriterInner::Plain(BufWriter::new(temp_file))), + path: temp_file_path, + }) + } + } + fn compress(&mut self, options: CreateOptions) -> Result<(), Error> { let uncompressed_file_path = &self.path; @@ -1207,6 +1162,98 @@ impl ArchiveFile { } } +/// Writer for an [`ArchiveFile`]. +/// +/// Instantiate this via [`ArchiveFile::writer`]. When calling [`Self::finalize_and_replace`], the +/// original archive file is replaced with the contents that were written to the writer. +struct ArchiveFileWriter<'a> { + archive_file: &'a ArchiveFile, + // writer: Option>, + writer: Option>, + path: PathBuf, +} + +enum ArchiveFileWriterInner<'a> { + ZstdEncoder(Encoder<'a, File>), + Plain(BufWriter), +} + +impl<'a> ArchiveFileWriter<'a> { + /// Write the contents of an iterator of [`TaskCacheItem`] to this archive file. + /// The individual items are encoded as JSON followed by a newline. + fn write_tasks(&mut self, tasks: impl Iterator) -> Result<(), Error> { + if let Some(writer) = self.writer.as_mut() { + let writer = match writer { + ArchiveFileWriterInner::ZstdEncoder(encoder) => encoder as &mut dyn Write, + ArchiveFileWriterInner::Plain(buf_writer) => buf_writer as &mut dyn Write, + }; + + for task in tasks { + serde_json::to_writer(&mut *writer, &task)?; + writeln!(writer)?; + } + } + + Ok(()) + } + + /// Call [`ArchiveFileWriter::finalize_and_replace`] or just drop `self`, if you don't need to + /// handle the error. + fn finalize_impl(&mut self) -> Result<(), Error> { + if let Some(writer) = self.writer.take() { + let file = match writer { + ArchiveFileWriterInner::ZstdEncoder(mut encoder) => { + encoder.flush()?; + encoder.finish()? + } + ArchiveFileWriterInner::Plain(mut buf_writer) => { + buf_writer.flush()?; + buf_writer.into_inner()? + } + }; + + file.sync_all() + .with_context(|| format!("could not fsync {}", self.path.display()))?; + + if let Err(err) = + std::fs::rename(&self.path, &self.archive_file.path).with_context(|| { + format!( + "failed to replace {} with {}", + self.path.display(), + self.archive_file.path.display() + ) + }) + { + if let Err(err) = std::fs::remove_file(&self.path) { + log::error!( + "failed to clean up temporary file {path}: {err:#}", + path = self.path.display() + ); + } + + return Err(err); + } + } + + Ok(()) + } + + /// Finalize and drop the internal writer and replace the original + /// archive file with the new contents. This method consumes `self`. + /// + /// [`ArchiveFileWriter::drop`] also finalizes the writer properly, but does not + /// allow to catch errors. + fn finalize_and_replace(mut self) -> Result<(), Error> { + self.finalize_impl() + } +} + +impl<'a> Drop for ArchiveFileWriter<'a> { + fn drop(&mut self) { + let _ = self.finalize_impl(); + } +} + /// Iterator that merges two _sorted_ `Iterator`, returning the items /// from both iterators sorted. /// The two iterators are expected to be sorted descendingly based on the task's starttime and @@ -1715,4 +1762,32 @@ mod tests { assert_eq!(cache.get_tasks(GetTasks::Archived).unwrap().count(), 0); } + + #[test] + fn temporary_file_when_writer_lives_is_not_enumerated() { + let (_tmp_dir, mut cache) = make_cache().unwrap(); + cache.rotate_after = 100; + cache.uncompressed_files = 1; + cache.max_files = 2; + + let cache = cache.write().unwrap(); + cache.init(1000).unwrap(); + + let files = cache.cache.archive_files(&cache.lock).unwrap(); + + assert_eq!(files.len(), 2); + + let _writer = files + .get(0) + .unwrap() + .writer(cache.cache.create_options) + .unwrap(); + + let files = cache.cache.archive_files(&cache.lock).unwrap(); + assert_eq!( + files.len(), + 2, + "the tmp file created by the writer should not show up in the list of archive files" + ); + } } -- 2.47.3