From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from gate001.proxmox.com (gate001.proxmox.com [45.144.208.40]) by lore.proxmox.com (Postfix) with ESMTPS id 527771FF135 for ; Thu, 02 Jul 2026 11:23:26 +0200 (CEST) Received: from gate001.proxmox.com (localhost.localdomain [127.0.0.1]) by gate001.proxmox.com (Proxmox) with ESMTP id 511A32144A; Thu, 02 Jul 2026 11:23:24 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 07/15] task cache: only initialize `TaskCache` struct once Date: Thu, 2 Jul 2026 11:22:50 +0200 Message-ID: <20260702092258.174740-8-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260702092258.174740-1-l.wagner@proxmox.com> References: <20260702092258.174740-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1782984190097 X-SPAM-LEVEL: Spam detection results: 0 DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment (newer systems) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: PJF7YR6UD5RU2SKWW46VQNBWX4SJE6NS X-Message-ID-Hash: PJF7YR6UD5RU2SKWW46VQNBWX4SJE6NS X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: It only really contains static parameters that remain constant after daemon startup, so there is really no need to initialize it over and over again. The WritableTaskCache and ReadableTaskCache guards now take a reference instead of owning the TaskCache struct. TaskCache::new was always infallible, so there is no need to return a Result from it. Signed-off-by: Lukas Wagner --- server/src/remote_tasks/mod.rs | 38 ++++++++++++------------- server/src/remote_tasks/refresh_task.rs | 25 ++++++++-------- server/src/remote_tasks/task_cache.rs | 25 ++++++++-------- 3 files changed, 42 insertions(+), 46 deletions(-) diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs index 50ac6708..6a340dba 100644 --- a/server/src/remote_tasks/mod.rs +++ b/server/src/remote_tasks/mod.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::sync::LazyLock; use anyhow::Error; @@ -38,7 +38,7 @@ pub async fn get_tasks( let view = views::get_optional_view(view.as_deref())?; tokio::task::spawn_blocking(move || { - let cache = get_cache()?.read()?; + let cache = get_cache().read()?; let which = if filters.running { GetTasks::Active @@ -171,7 +171,7 @@ pub async fn get_tasks( pub async fn track_running_pve_task(remote: String, upid: PveUpid) -> Result { tokio::task::spawn_blocking(move || { let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?; - let cache = get_cache()?.write()?; + let cache = get_cache().write()?; let task = TaskCacheItem { upid: remote_upid.clone(), @@ -198,7 +198,7 @@ pub async fn track_running_pbs_task( ) -> Result { tokio::task::spawn_blocking(move || { let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?; - let cache = get_cache()?.write()?; + let cache = get_cache().write()?; let task = TaskCacheItem { upid: remote_upid.clone(), @@ -213,22 +213,20 @@ pub async fn track_running_pbs_task( .await? } -/// Get a new [`TaskCache`] instance. -/// -/// No heavy-weight operations are done here, it's fine to call this regularly as part of the -/// update loop. -pub fn get_cache() -> Result { - let file_options = proxmox_product_config::default_create_options(); +/// Get a reference to the [`TaskCache`] instance. +pub fn get_cache() -> &'static TaskCache { + static CACHE: LazyLock = LazyLock::new(|| { + let file_options = proxmox_product_config::default_create_options(); - let cache_path = Path::new(REMOTE_TASKS_DIR); - let cache = TaskCache::new( - cache_path, - file_options, - KEEP_OLD_FILES, - NUMBER_OF_UNCOMPRESSED_FILES, - ROTATE_AFTER, - JOURNAL_MAX_SIZE, - )?; + TaskCache::new( + REMOTE_TASKS_DIR, + file_options, + KEEP_OLD_FILES, + NUMBER_OF_UNCOMPRESSED_FILES, + ROTATE_AFTER, + JOURNAL_MAX_SIZE, + ) + }); - Ok(cache) + &CACHE } diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs index 55d5694e..eb74cb93 100644 --- a/server/src/remote_tasks/refresh_task.rs +++ b/server/src/remote_tasks/refresh_task.rs @@ -122,11 +122,11 @@ impl Default for TaskState { /// Handle a single timer tick. /// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks. pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> { - let cache = super::get_cache()?; + let cache = super::get_cache(); if task_state.is_due_for_rotate_check() { log::debug!("checking if remote task archive should be rotated"); - if rotate_cache(cache.clone()).await? { + if rotate_cache(cache).await? { log::info!("rotated remote task archive"); // rotation always applies the journal as well @@ -137,7 +137,7 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> } if task_state.is_due_for_journal_apply() { - apply_journal(cache.clone()).await?; + apply_journal(cache).await?; task_state.reset_journal_apply(); } @@ -151,7 +151,7 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> let mut tasks_to_poll: HashSet = HashSet::from_iter(cache_state.tracked_tasks().cloned()); - let active_tasks = get_active_tasks(cache.clone()).await?; + let active_tasks = get_active_tasks(cache).await?; tasks_to_poll.extend(active_tasks.into_iter()); let poll_results = poll_tracked_tasks( @@ -188,7 +188,7 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> .iter() .any(|(_, result)| matches!(result, PollResult::RemoteGone | PollResult::RequestError)) { - update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?; + update_task_cache(all_tasks, update_state_for_remote, poll_results).await?; } Ok(()) @@ -196,13 +196,13 @@ pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> /// Manually trigger task collection from a list of remotes. pub async fn refresh_taskcache(remotes: Vec) -> Result<(), Error> { - let cache = super::get_cache()?; + let cache = super::get_cache(); let cache_state = cache.read_state(); let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await; if !all_tasks.is_empty() { - update_task_cache(cache, all_tasks, update_state_for_remote, HashMap::new()).await?; + update_task_cache(all_tasks, update_state_for_remote, HashMap::new()).await?; } Ok(()) @@ -215,7 +215,7 @@ pub async fn refresh_taskcache(remotes: Vec) -> Result<(), Error> { /// without any prior task archive rotation. pub async fn init_cache() -> Result<(), Error> { tokio::task::spawn_blocking(|| { - let cache = super::get_cache()?; + let cache = super::get_cache(); cache.write()?.init(proxmox_time::epoch_i64())?; Ok(()) }) @@ -359,7 +359,7 @@ fn get_remotes_with_finished_tasks( /// Rotate the task cache if necessary. /// /// Returns Ok(true) the cache's files were rotated. -async fn rotate_cache(cache: TaskCache) -> Result { +async fn rotate_cache(cache: &'static TaskCache) -> Result { tokio::task::spawn_blocking(move || { cache.write()?.rotate(align_timestamp( proxmox_time::epoch_i64(), @@ -370,12 +370,12 @@ async fn rotate_cache(cache: TaskCache) -> Result { } /// Apply the task cache journal. -async fn apply_journal(cache: TaskCache) -> Result<(), Error> { +async fn apply_journal(cache: &'static TaskCache) -> Result<(), Error> { tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await? } /// Get a list of active tasks. -async fn get_active_tasks(cache: TaskCache) -> Result, Error> { +async fn get_active_tasks(cache: &'static TaskCache) -> Result, Error> { tokio::task::spawn_blocking(move || { let tasks: Vec = cache .read()? @@ -523,7 +523,6 @@ fn map_pbs_task(task: pbs_api_types::TaskListItem, remote: String) -> TaskCacheI /// Update task cache with results from tracked task polling & regular task fetching. async fn update_task_cache( - cache: TaskCache, new_tasks: Vec, update_state_for_remote: NodeFetchSuccessMap, poll_results: HashMap, @@ -539,7 +538,7 @@ async fn update_task_cache( }) .collect(); - cache + super::get_cache() .write()? .update(new_tasks, &update_state_for_remote, drop_tracked)?; diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs index d97fa710..f0e3533a 100644 --- a/server/src/remote_tasks/task_cache.rs +++ b/server/src/remote_tasks/task_cache.rs @@ -175,14 +175,14 @@ pub struct TaskCache { } /// A [`TaskCache`] locked for writing. -pub struct WritableTaskCache { - cache: TaskCache, +pub struct WritableTaskCache<'a> { + cache: &'a TaskCache, lock: TaskCacheLock, } /// A [`TaskCache`] locked for reading. -pub struct ReadableTaskCache { - cache: TaskCache, +pub struct ReadableTaskCache<'a> { + cache: &'a TaskCache, lock: TaskCacheLock, } @@ -223,7 +223,7 @@ impl NodeFetchSuccessMap { } } -impl ReadableTaskCache { +impl<'a> ReadableTaskCache<'a> { /// Iterate over cached tasks. pub fn get_tasks(&self, mode: GetTasks) -> Result, Error> { self.cache @@ -232,7 +232,7 @@ impl ReadableTaskCache { } } -impl WritableTaskCache { +impl<'a> WritableTaskCache<'a> { /// Create initial task archive files that can be backfilled with the /// recent task history from a remote. /// @@ -796,7 +796,7 @@ impl TaskCache { uncompressed: u32, rotate_after: u64, journal_max_size: u64, - ) -> Result { + ) -> Self { let base_path = path.into(); let journal_path = base_path.join(WAL_FILENAME); @@ -804,7 +804,7 @@ impl TaskCache { let active_path = base_path.join(ACTIVE_FILENAME); let lock_path = base_path.join(LOCKFILE_FILENAME); - Ok(Self { + Self { base_path, journal_path, state_path, @@ -815,18 +815,18 @@ impl TaskCache { max_files, rotate_after, uncompressed_files: uncompressed, - }) + } } /// Lock the cache for reading. - pub fn read(self) -> Result { + pub fn read(&self) -> Result, Error> { let lock = self.lock_impl(false)?; Ok(ReadableTaskCache { cache: self, lock }) } /// Lock the cache for writing. - pub fn write(self) -> Result { + pub fn write(&self) -> Result, Error> { let lock = self.lock_impl(true)?; Ok(WritableTaskCache { cache: self, lock }) @@ -1400,8 +1400,7 @@ mod tests { 1, 0, DEFAULT_MAX_SIZE, - ) - .unwrap(); + ); Ok((tmp_dir, cache)) } -- 2.47.3