From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 8B1641FF17C for ; Wed, 20 Aug 2025 10:24:24 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id D246E34AE9; Wed, 20 Aug 2025 10:26:09 +0200 (CEST) Message-ID: <06dbc04f-402d-4fc5-b1fa-c5beb3300f08@proxmox.com> Date: Wed, 20 Aug 2025 10:25:29 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Beta To: Proxmox Datacenter Manager development discussion , Lukas Wagner References: <20250814075622.51786-1-l.wagner@proxmox.com> <20250814075622.51786-3-l.wagner@proxmox.com> Content-Language: en-US From: Dominik Csapak In-Reply-To: <20250814075622.51786-3-l.wagner@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1755678286539 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.129 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_1 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_3 0.1 random spam to be learned in bayes RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lxc.rs, mod.rs, qemu.rs, main.rs, upid.pid] Subject: Re: [pdm-devel] [PATCH proxmox-datacenter-manager v6 2/6] remote tasks: add background task for task polling, use new task cache X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" one small comments inline, aside from that LGTM On 8/14/25 09:56, Lukas Wagner wrote: > This commits changes the remote task module as follows: > > - Add a new background task for regular polling of task data > Instead of triggering fetching of task data from the `get_tasks` function, > which is usually called by an API handler, we move the fetching to a > new background task. The task fetches the latest tasks from all remotes > and stores them in the task cache in regular intervals (10 minutes). > The `get_tasks` function itself only reads from the cache. > The main rationale for this change is that for large setups, fetching > tasks from all remotes can take a *long* time (e.g. hundreds of remotes, > each with a >100ms connection - adds up to minutes quickly). > If we do this from within `get_tasks`, the API handler calling the > function is also blocked for the entire time. > The `get_tasks` API is called every couple of seconds by the UI the get > a list of running remote tasks, so this *must* be quick. > > - Tracked tasks are also polled in the same background task, but with > a short polling delay (10 seconds). If a tracked task finishes, > a out-of-order fetch of tasks for a remote is performed to update > the cache with all task data from the finished task. > > - Only finished tasks are requested from the remotes. This avoids a > foreign (as in, not started by PDM) running task to appear stuck in > the running state until the next regular task cache refresh. > The tracked task polling could be extended to also poll running foreign > tasks, but this is easy addition for the future. > > - Tasks are now stored in the new improved task cache implementation. > This should make retrieving tasks much quicker and avoids > unneeded disk IO. > > Signed-off-by: Lukas Wagner > --- > > Notes: > Changes since v5: > - Incorporate review feedback from @Dominik (thx!) > - Change task tracking approach: > - Instead of using the oldest running task as a cutoff and > switching to a lower fetching interval if there is a tracked task, > we poll tracked tasks directly with a 10 second interval. > Once a tracked task finishes, we do a regular task fetch once > to get full task data (endtime, status). > This is a nicer approach for long running tasks, since we do > not repeatedly request the same tasks over and over again. > - Use proxmox_product_config to get CreateOptions where > it makes sense. > - Use timestamps instead of cycle counts to keep track > of when we want to rotate the task archive or do a full > task fetch > - Be more clever about how we request the semaphores. Instead > of requesting all semaphores that we could potentially to poll > multiple nodes of a remote in parallel, request them > on demand. > - Keep track of per-node failures while fetching tasks and > feed this information to the cache implementation > so that it can maintain the per-node cutoff timestamp. > - Make documentation of public constants a bit easier > to understand. > > Changes since v4: > - Rebase onto latest master, adapting to changes in > the section config type > > Changes since v2: > - Adapt to new locking approach (only drops a `mut`) > > Changes since v1: > > - use const Duration instead of u64s for durations, using > Duration::as_secs() where needed > - Move the remote_task fetching task functions to > src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > - remote_tasks::get_tasks: wrap function body in a > tokio::task::spawn_blocking. using the TaskCache::get_tasks > iterator does disk IO and could block the executor > - Added some doc strings to make the purpose/workings of > some functions clearer > - Couple of variables have been renamed for more clarity > > server/src/api/pve/lxc.rs | 10 +- > server/src/api/pve/mod.rs | 4 +- > server/src/api/pve/qemu.rs | 6 +- > server/src/api/remote_tasks.rs | 11 +- > server/src/bin/proxmox-datacenter-api/main.rs | 1 + > .../bin/proxmox-datacenter-api/tasks/mod.rs | 1 + > .../tasks/remote_tasks.rs | 559 ++++++++++++++++ > server/src/remote_tasks/mod.rs | 625 ++++-------------- > 8 files changed, 706 insertions(+), 511 deletions(-) > create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > > diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs > index f1c31425..83f9f4aa 100644 > --- a/server/src/api/pve/lxc.rs > +++ b/server/src/api/pve/lxc.rs > @@ -209,7 +209,7 @@ pub async fn lxc_start( > > let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -242,7 +242,7 @@ pub async fn lxc_stop( > > let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -277,7 +277,7 @@ pub async fn lxc_shutdown( > .shutdown_lxc_async(&node, vmid, Default::default()) > .await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -357,7 +357,7 @@ pub async fn lxc_migrate( > }; > let upid = pve.migrate_lxc(&node, vmid, params).await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate( > log::info!("migrating vm {vmid} of node {node:?}"); > let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?; > > - new_remote_upid(source, upid) > + new_remote_upid(source, upid).await > } > diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs > index dd7cf382..d472cf58 100644 > --- a/server/src/api/pve/mod.rs > +++ b/server/src/api/pve/mod.rs > @@ -76,9 +76,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES > const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS); > > // converts a remote + PveUpid into a RemoteUpid and starts tracking it > -fn new_remote_upid(remote: String, upid: PveUpid) -> Result { > +async fn new_remote_upid(remote: String, upid: PveUpid) -> Result { > let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?; > - remote_tasks::track_running_task(remote_upid.clone()); > + remote_tasks::track_running_task(remote_upid.clone()).await?; > Ok(remote_upid) > } > > diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs > index 5a41a69e..54ede112 100644 > --- a/server/src/api/pve/qemu.rs > +++ b/server/src/api/pve/qemu.rs > @@ -216,7 +216,7 @@ pub async fn qemu_start( > .start_qemu_async(&node, vmid, Default::default()) > .await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -377,7 +377,7 @@ pub async fn qemu_migrate( > }; > let upid = pve.migrate_qemu(&node, vmid, params).await?; > > - new_remote_upid(remote, upid) > + new_remote_upid(remote, upid).await > } > > #[api( > @@ -564,5 +564,5 @@ pub async fn qemu_remote_migrate( > log::info!("migrating vm {vmid} of node {node:?}"); > let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?; > > - new_remote_upid(source, upid) > + new_remote_upid(source, upid).await > } > diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs > index e629000c..05ce3666 100644 > --- a/server/src/api/remote_tasks.rs > +++ b/server/src/api/remote_tasks.rs > @@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS > }, > input: { > properties: { > - "max-age": { > - type: Integer, > - optional: true, > - // TODO: sensible default max-age > - default: 300, > - description: "Maximum age of cached task data", > - }, > filters: { > type: TaskFilters, > flatten: true, > @@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS > }, > )] > /// Get the list of tasks for all remotes. > -async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result, Error> { > - let tasks = remote_tasks::get_tasks(max_age, filters).await?; > +async fn list_tasks(filters: TaskFilters) -> Result, Error> { > + let tasks = remote_tasks::get_tasks(filters).await?; > > Ok(tasks) > } > diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs > index db6b2585..42bc0e1e 100644 > --- a/server/src/bin/proxmox-datacenter-api/main.rs > +++ b/server/src/bin/proxmox-datacenter-api/main.rs > @@ -376,6 +376,7 @@ async fn run(debug: bool) -> Result<(), Error> { > metric_collection::start_task(); > tasks::remote_node_mapping::start_task(); > resource_cache::start_task(); > + tasks::remote_tasks::start_task()?; > > server.await?; > log::info!("server shutting down, waiting for active workers to complete"); > diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs > index e6ead882..a6b1f439 100644 > --- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs > +++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs > @@ -1 +1,2 @@ > pub mod remote_node_mapping; > +pub mod remote_tasks; > diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > new file mode 100644 > index 00000000..4701a935 > --- /dev/null > +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > @@ -0,0 +1,559 @@ > +use std::{ > + collections::{HashMap, HashSet}, > + sync::Arc, > + time::{Duration, Instant}, > +}; > + > +use anyhow::{format_err, Error}; > +use nix::sys::stat::Mode; > +use tokio::{sync::Semaphore, task::JoinSet}; > + > +use pdm_api_types::{ > + remotes::{Remote, RemoteType}, > + RemoteUpid, > +}; > +use proxmox_section_config::typed::SectionConfigData; > +use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource}; > + > +use server::{ > + api::pve, > + remote_tasks::{ > + self, > + task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem}, > + KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER, > + }, > + task_utils, > +}; > + > +/// Tick interval for the remote task fetching task. > +/// This is also the rate at which we check on tracked tasks. > +const POLL_INTERVAL: Duration = Duration::from_secs(10); > + > +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked > +/// task for this remote). > +const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600); > + > +/// Interval at which to check for task cache rotation. > +const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600); > + > +/// Interval at which the task cache journal should be applied. > +/// > +/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes. > +/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing > +/// it will be slower than the task archive itself, as the entire journal must be loaded into > +/// memory and then sorted by task starttime. Applying the journal more often might > +/// lead to more writes, but should yield better performance. > +const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600); > + > +/// Maximum number of concurrent connections per remote. > +const CONNECTIONS_PER_PVE_REMOTE: usize = 5; > + > +/// Maximum number of total concurrent connections. > +const MAX_CONNECTIONS: usize = 20; > + > +/// Maximum number of tasks to fetch from a single remote in one API call. > +const MAX_TASKS_TO_FETCH: u64 = 5000; > + > +/// (Ephemeral) Remote task fetching task state. > +struct TaskState { > + /// Time at which we last checked for archive rotation. > + last_rotate_check: Instant, > + /// Time at which we fetch tasks the last time. > + last_fetch: Instant, > + /// Time at which we last applied the journal. > + last_journal_apply: Instant, > +} > + > +impl TaskState { > + fn new() -> Self { > + let now = Instant::now(); > + > + Self { > + last_rotate_check: now - CHECK_ROTATE_INTERVAL, > + last_fetch: now - TASK_FETCH_INTERVAL, > + last_journal_apply: now - APPLY_JOURNAL_INTERVAL, > + } > + } > + > + /// Reset the task archive rotation timestamp. > + fn reset_rotate_check(&mut self) { > + self.last_rotate_check = Instant::now(); > + } > + > + /// Reset the task fetch timestamp. > + fn reset_fetch(&mut self) { > + self.last_fetch = Instant::now(); > + } > + > + /// Reset the journal apply timestamp. > + fn reset_journal_apply(&mut self) { > + self.last_journal_apply = Instant::now(); > + } > + > + /// Should we check for archive rotation? > + fn is_due_for_rotate_check(&self) -> bool { > + Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL > + } > + > + /// Should we fetch tasks? > + fn is_due_for_fetch(&self) -> bool { > + Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL > + } > + > + /// Should we apply the task archive's journal? > + fn is_due_for_journal_apply(&self) -> bool { > + Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL > + } > +} > + > +/// Start the remote task fetching task > +pub fn start_task() -> Result<(), Error> { > + let dir_options = > + proxmox_product_config::default_create_options().perm(Mode::from_bits_truncate(0o0750)); > + > + proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(dir_options))?; > + > + tokio::spawn(async move { > + let task_scheduler = std::pin::pin!(remote_task_fetching_task()); > + let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future()); > + futures::future::select(task_scheduler, abort_future).await; > + }); > + > + Ok(()) > +} > + > +/// Task which handles fetching remote tasks and task archive rotation. > +/// This function never returns. > +async fn remote_task_fetching_task() -> ! { > + let mut task_state = TaskState::new(); > + > + let mut interval = tokio::time::interval(POLL_INTERVAL); > + interval.reset_at(task_utils::next_aligned_instant(POLL_INTERVAL.as_secs()).into()); > + > + // We don't really care about catching up to missed tick, we just want > + // a steady tick rate. > + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); > + > + if let Err(err) = init_cache().await { > + log::error!("error when initialized task cache: {err:#}"); > + } > + > + loop { > + interval.tick().await; > + if let Err(err) = do_tick(&mut task_state).await { > + log::error!("error when fetching remote tasks: {err:#}"); > + } > + } > +} > + > +/// Handle a single timer tick. > +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks. > +async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> { > + let cache = remote_tasks::get_cache()?; > + > + if task_state.is_due_for_rotate_check() { > + log::debug!("checking if remote task archive should be rotated"); > + if rotate_cache(cache.clone()).await? { > + log::info!("rotated remote task archive"); > + } > + > + task_state.reset_rotate_check(); > + } > + > + if task_state.is_due_for_journal_apply() { > + apply_journal(cache.clone()).await?; > + task_state.reset_journal_apply(); > + } > + > + let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??; > + > + let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS)); > + > + let cache_state = cache.read_state(); > + let poll_results = poll_tracked_tasks( > + &remote_config, > + cache_state.tracked_tasks(), > + Arc::clone(&total_connections_semaphore), > + ) > + .await?; > + > + // Get a list of remotes that we should poll in this cycle. > + let remotes = if task_state.is_due_for_fetch() { > + task_state.reset_fetch(); > + get_all_remotes(&remote_config) > + } else { > + get_remotes_with_finished_tasks(&remote_config, &poll_results) > + }; > + > + let (all_tasks, update_state_for_remote) = fetch_remotes( > + remotes, > + Arc::new(cache_state), > + Arc::clone(&total_connections_semaphore), > + ) > + .await; > + > + if !all_tasks.is_empty() { > + update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?; > + } > + > + Ok(()) > +} > + > +/// Initialize the remote task cache with initial archive files, in case there are not > +/// any archive files yet. > +/// > +/// This allows us to immediately backfill remote task history when setting up a new PDM instance > +/// without any prior task archive rotation. > +async fn init_cache() -> Result<(), Error> { > + tokio::task::spawn_blocking(|| { > + let cache = remote_tasks::get_cache()?; > + cache.write()?.init(proxmox_time::epoch_i64())?; > + Ok(()) > + }) > + .await? > +} > + > +/// Fetch tasks from a list of remotes. > +/// > +/// Returns a list of tasks and a map that shows whether we want to update the > +/// cutoff timestamp in the statefile. We don't want to update the cutoff if > +/// the connection to one remote failed or if we could not reach all remotes in a cluster. > +async fn fetch_remotes( > + remotes: Vec, > + cache_state: Arc, > + total_connections_semaphore: Arc, > +) -> (Vec, NodeFetchSuccessMap) { > + let mut join_set = JoinSet::new(); > + > + for remote in remotes { > + let semaphore = Arc::clone(&total_connections_semaphore); > + let state_clone = Arc::clone(&cache_state); > + > + join_set.spawn(async move { > + log::debug!("fetching remote tasks for '{}'", remote.id); > + fetch_tasks(&remote, state_clone, semaphore) > + .await > + .map_err(|err| { > + format_err!("could not fetch tasks from remote '{}': {err}", remote.id) > + }) > + }); > + } > + > + let mut all_tasks = Vec::new(); > + let mut update_state_for_remote = NodeFetchSuccessMap::default(); > + > + while let Some(res) = join_set.join_next().await { > + match res { > + Ok(Ok(FetchedTasks { > + tasks, > + node_results, > + })) => { > + all_tasks.extend(tasks); > + update_state_for_remote.merge(node_results); > + } > + Ok(Err(err)) => log::error!("{err:#}"), > + Err(err) => log::error!("could not join task fetching future: {err:#}"), > + } > + } > + > + (all_tasks, update_state_for_remote) > +} > + > +/// Return all remotes from the given config. > +fn get_all_remotes(remote_config: &SectionConfigData) -> Vec { > + remote_config > + .into_iter() > + .map(|(_, section)| section) > + .cloned() > + .collect() > +} > + > +/// Return all remotes that correspond to a list of finished tasks. > +fn get_remotes_with_finished_tasks( > + remote_config: &SectionConfigData, > + poll_results: &HashMap, > +) -> Vec { > + let remotes_with_finished_tasks: HashSet<&str> = poll_results > + .iter() > + .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote())) > + .collect(); > + > + remote_config > + .into_iter() > + .filter_map(|(name, remote)| { > + remotes_with_finished_tasks > + .contains(&name) > + .then_some(remote) > + }) > + .cloned() > + .collect() > +} > + > +/// Rotate the task cache if necessary. > +/// > +/// Returns Ok(true) the cache's files were rotated. > +async fn rotate_cache(cache: TaskCache) -> Result { > + tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await? > +} > + > +/// Apply the task cache journal. > +async fn apply_journal(cache: TaskCache) -> Result<(), Error> { > + tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await? > +} > + > +/// Fetched tasks from a single remote. > +struct FetchedTasks { > + /// List of tasks. > + tasks: Vec, > + /// Contains whether a cluster node was fetched successfully. > + node_results: NodeFetchSuccessMap, > +} > + > +/// Fetch tasks (active and finished) from a remote. > +async fn fetch_tasks( > + remote: &Remote, > + state: Arc, > + total_connections_semaphore: Arc, > +) -> Result { > + let mut tasks = Vec::new(); > + > + let mut node_results = NodeFetchSuccessMap::default(); > + > + match remote.ty { > + RemoteType::Pve => { > + let client = pve::connect(remote)?; > + > + let nodes = { > + // This permit *must* be dropped before we acquire the permits for the > + // per-node connections - otherwise we risk a deadlock. > + let _permit = total_connections_semaphore.acquire().await.unwrap(); > + client.list_nodes().await? > + }; > + > + // This second semaphore is used to limit the number of concurrent connections > + // *per remote*, not in total. > + let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE)); > + let mut join_set = JoinSet::new(); > + > + for node in nodes { > + let node_name = node.node.to_string(); > + > + let since = state > + .cutoff_timestamp(&remote.id, &node_name) > + .unwrap_or_else(|| { > + proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 > + }); > + > + let params = ListTasks { > + source: Some(ListTasksSource::Archive), > + since: Some(since), > + // If `limit` is not provided, we only receive 50 tasks > + limit: Some(MAX_TASKS_TO_FETCH), > + ..Default::default() > + }; > + > + let per_remote_permit = Arc::clone(&per_remote_semaphore) > + .acquire_owned() > + .await > + .unwrap(); > + > + let total_connections_permit = Arc::clone(&total_connections_semaphore) > + .acquire_owned() > + .await > + .unwrap(); > + > + let remote_clone = remote.clone(); > + > + join_set.spawn(async move { > + let res = async { > + let client = pve::connect(&remote_clone)?; > + let task_list = > + client > + .get_task_list(&node.node, params) > + .await > + .map_err(|err| { > + format_err!( > + "remote '{}', node '{}': {err}", > + remote_clone.id, > + node.node > + ) > + })?; > + Ok::, Error>(task_list) > + } > + .await; > + > + drop(total_connections_permit); > + drop(per_remote_permit); > + > + (node_name, res) > + }); > + } > + > + while let Some(result) = join_set.join_next().await { > + match result { > + Ok((node_name, result)) => match result { > + Ok(task_list) => { > + let mapped = > + task_list.into_iter().filter_map(|task| { > + match map_pve_task(task, &remote.id) { > + Ok(task) => Some(task), > + Err(err) => { > + log::error!( > + "could not map task data, skipping: {err:#}" > + ); > + None > + } > + } > + }); > + > + tasks.extend(mapped); > + node_results.set_node_success(remote.id.clone(), node_name); > + } > + Err(error) => { > + log::error!("could not fetch tasks: {error:#}"); > + node_results.set_node_failure(remote.id.clone(), node_name); > + } > + }, > + Err(err) => return Err(err.into()), > + } two things here: * are we sure we want to cancel every thing when wone task fetching task failed? iow. we maybe want Err(err) => log (err) instaed of return Err(err.into) ? should not happen, but if there is some panic we still wan't to log and continue maybe? * if not, we could do while let Some(result) = join_next().await { let (node_name, result) = result?; match result { Ok(task_list) => {} Err(err) => {} } } or maybe even better: while let Some(Ok((node_name, result))) = join_next() { match result { ... } } can't do those ofc if we wan't to log & continue > + } > + } > + RemoteType::Pbs => { > + // TODO: Add code for PBS > + } > + } > + > + Ok(FetchedTasks { > + tasks, > + node_results, > + }) > +} > + > +#[derive(PartialEq, Debug)] > +/// Outcome from polling a tracked task. > +enum PollResult { > + /// Tasks is still running. > + Running, > + /// Task is finished, poll remote tasks to get final status/endtime. > + Finished, > + /// Should be dropped from the active file. > + RequestError, > + /// Remote does not exist any more -> remove immediately from tracked task list. > + RemoteGone, > +} > + > +/// Poll all tracked tasks. > +async fn poll_tracked_tasks( > + remote_config: &SectionConfigData, > + tracked_tasks: impl Iterator, > + total_connections_semaphore: Arc, > +) -> Result, Error> { > + let mut join_set = JoinSet::new(); > + > + for task in tracked_tasks.cloned() { > + let permit = Arc::clone(&total_connections_semaphore) > + .acquire_owned() > + .await > + .unwrap(); > + > + let remote = remote_config.get(task.remote()).cloned(); > + > + join_set.spawn(async move { > + // Move permit into this async block. > + let _permit = permit; > + > + match remote { > + Some(remote) => poll_single_tracked_task(remote, task).await, > + None => { > + log::info!( > + "remote {} does not exist any more, dropping tracked task", > + task.remote() > + ); > + (task, PollResult::RemoteGone) > + } > + } > + }); > + } > + > + let mut results = HashMap::new(); > + while let Some(task_result) = join_set.join_next().await { > + let (upid, result) = task_result?; > + results.insert(upid, result); > + } > + > + Ok(results) > +} > + > +/// Poll a single tracked task. > +async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) { > + match remote.ty { > + RemoteType::Pve => { > + log::debug!("polling tracked task {}", task); > + > + let status = match server::api::pve::tasks::get_task_status( > + remote.id.clone(), > + task.clone(), > + false, > + ) > + .await > + { > + Ok(status) => status, > + Err(err) => { > + log::error!("could not get status from remote: {err:#}"); > + return (task, PollResult::RequestError); > + } > + }; > + > + let result = if status.exitstatus.is_some() { > + PollResult::Finished > + } else { > + PollResult::Running > + }; > + > + (task, result) > + } > + RemoteType::Pbs => { > + // TODO: Implement for PBS > + (task, PollResult::RequestError) > + } > + } > +} > + > +/// Map a `ListTasksResponse` to `TaskCacheItem` > +fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result { > + let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?; > + > + Ok(TaskCacheItem { > + upid: remote_upid, > + starttime: task.starttime, > + endtime: task.endtime, > + status: task.status, > + }) > +} > + > +/// Update task cache with results from tracked task polling & regular task fetching. > +async fn update_task_cache( > + cache: TaskCache, > + new_tasks: Vec, > + update_state_for_remote: NodeFetchSuccessMap, > + poll_results: HashMap, > +) -> Result<(), Error> { > + tokio::task::spawn_blocking(move || { > + let drop_tracked = poll_results > + .into_iter() > + .filter_map(|(upid, result)| match result { > + PollResult::Running => None, > + PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => { > + Some(upid) > + } > + }) > + .collect(); > + > + cache > + .write()? > + .update(new_tasks, &update_state_for_remote, drop_tracked)?; > + > + Ok(()) > + }) > + .await? > +} > diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs > index 7c8e31ef..cec2cc1e 100644 > --- a/server/src/remote_tasks/mod.rs > +++ b/server/src/remote_tasks/mod.rs > @@ -1,515 +1,156 @@ > -use std::{ > - collections::{HashMap, HashSet}, > - fs::File, > - path::{Path, PathBuf}, > - sync::{LazyLock, RwLock}, > - time::Duration, > -}; > +use std::path::Path; > > use anyhow::Error; > -use pdm_api_types::{ > - remotes::{Remote, RemoteType}, > - RemoteUpid, TaskFilters, TaskListItem, TaskStateType, > -}; > -use proxmox_sys::fs::CreateOptions; > -use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid}; > -use serde::{Deserialize, Serialize}; > -use tokio::task::JoinHandle; > > -use crate::{api::pve, task_utils}; > +use pdm_api_types::{RemoteUpid, TaskFilters, TaskListItem, TaskStateType}; > +use pve_api_types::PveUpid; > > -mod task_cache; > +pub mod task_cache; > + > +use task_cache::{GetTasks, TaskCache, TaskCacheItem}; > + > +/// Base directory for the remote task cache. > +pub const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks"); > + > +/// Maximum size at which the journal will applied early when adding new tasks. > +const JOURNAL_MAX_SIZE: u64 = 5 * 1024 * 1024; > + > +/// Rotate once the most recent archive file is at least 24 hour old. > +pub const ROTATE_AFTER: u64 = 24 * 3600; > + > +/// Keep 7 days worth of tasks. > +pub const KEEP_OLD_FILES: u32 = 7; > + > +/// Number of uncompressed archive files. These will be be the most recent ones. > +const NUMBER_OF_UNCOMPRESSED_FILES: u32 = 2; > > /// Get tasks for all remotes > // FIXME: filter for privileges > -pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result, Error> { > - let (remotes, _) = pdm_config::remotes::config()?; > +pub async fn get_tasks(filters: TaskFilters) -> Result, Error> { > + tokio::task::spawn_blocking(move || { > + let cache = get_cache()?.read()?; > > - let mut all_tasks = Vec::new(); > - > - let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json"); > - let mut cache = TaskCache::new(cache_path)?; > - > - // Force a refresh for all tasks of a remote if a task is finished. > - // Not super nice, but saves us from persisting finished tasks. Also, > - // the /nodes//tasks//status endpoint does not return > - // a task's endtime, which is only returned by > - // /nodes//tasks... > - // Room for improvements in the future. > - invalidate_cache_for_finished_tasks(&mut cache); > - > - for (remote_name, remote) in remotes.iter() { > - let now = proxmox_time::epoch_i64(); > - > - if let Some(tasks) = cache.get_tasks(remote_name, now, max_age) { > - // Data in cache is recent enough and has not been invalidated. > - all_tasks.extend(tasks); > + let which = if filters.running { > + GetTasks::Active > } else { > - let tasks = match fetch_tasks(remote).await { > - Ok(tasks) => tasks, > - Err(err) => { > - // ignore errors for not reachable remotes > - continue; > + GetTasks::All > + }; > + > + let returned_tasks = cache > + .get_tasks(which)? > + .skip(filters.start as usize) > + .take(filters.limit as usize) > + .filter_map(|task| { > + // TODO: Handle PBS tasks > + let pve_upid: Result = task.upid.upid.parse(); > + match pve_upid { > + Ok(pve_upid) => Some(TaskListItem { > + upid: task.upid.to_string(), > + node: pve_upid.node, > + pid: pve_upid.pid as i64, > + pstart: pve_upid.pstart, > + starttime: pve_upid.starttime, > + worker_type: pve_upid.worker_type, > + worker_id: None, > + user: pve_upid.auth_id, > + endtime: task.endtime, > + status: task.status, > + }), > + Err(err) => { > + log::error!("could not parse UPID: {err:#}"); > + None > + } > } > - }; > - cache.set_tasks(remote_name, tasks.clone(), now); > - > - all_tasks.extend(tasks); > - } > - } > - > - let mut returned_tasks = add_running_tasks(all_tasks)?; > - returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime)); > - let returned_tasks = returned_tasks > - .into_iter() > - .filter(|item| { > - if filters.running && item.endtime.is_some() { > - return false; > - } > - > - if let Some(until) = filters.until { > - if item.starttime > until { > + }) > + .filter(|item| { > + if filters.running && item.endtime.is_some() { > return false; > } > - } > > - if let Some(since) = filters.since { > - if item.starttime < since { > - return false; > - } > - } > - > - if let Some(needle) = &filters.userfilter { > - if !item.user.contains(needle) { > - return false; > - } > - } > - > - if let Some(typefilter) = &filters.typefilter { > - if !item.worker_type.contains(typefilter) { > - return false; > - } > - } > - > - let state = item.status.as_ref().map(|status| tasktype(status)); > - > - match (state, &filters.statusfilter) { > - (Some(TaskStateType::OK), _) if filters.errors => return false, > - (Some(state), Some(filters)) => { > - if !filters.contains(&state) { > + if let Some(until) = filters.until { > + if item.starttime > until { > return false; > } > } > - (None, Some(_)) => return false, > - _ => {} > - } > > - true > - }) > - .skip(filters.start as usize) > - .take(filters.limit as usize) > - .collect(); > - > - // We don't need to wait for this task to finish > - tokio::task::spawn_blocking(move || { > - if let Err(e) = cache.save() { > - log::error!("could not save task cache: {e}"); > - } > - }); > - > - Ok(returned_tasks) > -} > - > -/// Fetch tasks (active and finished) from a remote > -async fn fetch_tasks(remote: &Remote) -> Result, Error> { > - let mut tasks = Vec::new(); > - > - match remote.ty { > - RemoteType::Pve => { > - let client = pve::connect(remote)?; > - > - // N+1 requests - we could use /cluster/tasks, but that one > - // only gives a limited task history > - for node in client.list_nodes().await? { > - let params = ListTasks { > - // Include running tasks > - source: Some(ListTasksSource::All), > - // TODO: How much task history do we want? Right now we just hard-code it > - // to 7 days. > - since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60), > - ..Default::default() > - }; > - > - let list = client.get_task_list(&node.node, params).await?; > - let mapped = map_tasks(list, &remote.id)?; > - > - tasks.extend(mapped); > - } > - } > - RemoteType::Pbs => { > - // TODO: Add code for PBS > - } > - } > - > - Ok(tasks) > -} > - > -/// Convert a `Vec` to `Vec` > -fn map_tasks(tasks: Vec, remote: &str) -> Result, Error> { > - let mut mapped = Vec::new(); > - > - for task in tasks { > - let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?; > - > - mapped.push(TaskListItem { > - upid: remote_upid.to_string(), > - node: task.node, > - pid: task.pid, > - pstart: task.pstart as u64, > - starttime: task.starttime, > - worker_type: task.ty, > - worker_id: Some(task.id), > - user: task.user, > - endtime: task.endtime, > - status: task.status, > - }) > - } > - > - Ok(mapped) > -} > - > -/// Drops the cached task list of a remote for all finished tasks. > -/// > -/// We use this to force a refresh so that we get the full task > -/// info (including `endtime`) in the next API call. > -fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) { > - let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned"); > - > - // If a task is finished, we force a refresh for the remote - otherwise > - // we don't get the 'endtime' for the task. > - for task in finished.drain() { > - cache.invalidate_cache_for_remote(task.remote()); > - } > -} > - > -/// Supplement the list of tasks that we received from the remote with > -/// the tasks that were started by PDM and are currently running. > -fn add_running_tasks(cached_tasks: Vec) -> Result, Error> { > - let mut returned_tasks = Vec::new(); > - > - let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned"); > - for task in cached_tasks { > - let remote_upid = task.upid.parse()?; > - > - if running_tasks.contains(&remote_upid) { > - if task.endtime.is_some() { > - // Task is finished but we still think it is running -> > - // Drop it from RUNNING_FOREIGN_TASKS > - running_tasks.remove(&remote_upid); > - > - // No need to put it in FINISHED_TASKS, since we already > - // got its state recently enough (we know the status and endtime) > - } > - } else { > - returned_tasks.push(task); > - } > - } > - > - for task in running_tasks.iter() { > - let upid: PveUpid = task.upid.parse()?; > - returned_tasks.push(TaskListItem { > - upid: task.to_string(), > - node: upid.node, > - pid: upid.pid as i64, > - pstart: upid.pstart, > - starttime: upid.starttime, > - worker_type: upid.worker_type, > - worker_id: upid.worker_id, > - user: upid.auth_id, > - endtime: None, > - status: None, > - }); > - } > - > - Ok(returned_tasks) > -} > - > -/// A cache for fetched remote tasks. > -struct TaskCache { > - /// Cache entries > - content: TaskCacheContent, > - > - /// Entries that were added or updated - these will be persistet > - /// when `save` is called. > - new_or_updated: TaskCacheContent, > - > - /// Cache entries were changed/removed. > - dirty: bool, > - > - /// File-location at which the cached tasks are stored. > - cachefile_path: PathBuf, > -} > - > -impl TaskCache { > - /// Create a new tasks cache instance by loading > - /// the cache from disk. > - fn new(cachefile_path: PathBuf) -> Result { > - Ok(Self { > - content: Self::load_content()?, > - new_or_updated: Default::default(), > - dirty: false, > - cachefile_path, > - }) > - } > - > - /// Load the task cache contents from disk. > - fn load_content() -> Result { > - let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json"); > - let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?; > - > - let content = if let Some(content) = content { > - serde_json::from_str(&content)? > - } else { > - Default::default() > - }; > - > - Ok(content) > - } > - > - /// Get path for the cache's lockfile. > - fn lockfile_path(&self) -> PathBuf { > - let mut path = self.cachefile_path.clone(); > - path.set_extension("lock"); > - path > - } > - > - /// Persist the task cache > - /// > - /// This method requests an exclusive lock for the task cache lockfile. > - fn save(&mut self) -> Result<(), Error> { > - // if we have not updated anything, we don't have to update the cache file > - if !self.dirty { > - return Ok(()); > - } > - > - let _guard = self.lock(Duration::from_secs(5))?; > - > - // Read content again, in case somebody has changed it in the meanwhile > - let mut content = Self::load_content()?; > - > - for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() { > - if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) { > - // Only update entry if nobody else has updated it in the meanwhile > - if existing_entry.timestamp < entry.timestamp { > - *existing_entry = entry; > - } > - } else { > - content.remote_tasks.insert(remote_name, entry); > - } > - } > - > - let bytes = serde_json::to_vec_pretty(&content)?; > - > - let api_uid = pdm_config::api_user()?.uid; > - let api_gid = pdm_config::api_group()?.gid; > - > - let file_options = CreateOptions::new().owner(api_uid).group(api_gid); > - > - proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?; > - > - self.dirty = false; > - > - Ok(()) > - } > - > - // Update task data for a given remote. > - fn set_tasks(&mut self, remote: &str, tasks: Vec, timestamp: i64) { > - self.dirty = true; > - self.new_or_updated > - .remote_tasks > - .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks }); > - } > - > - // Get task data for a given remote. > - fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option> { > - if let Some(entry) = self.content.remote_tasks.get(remote) { > - if (entry.timestamp + max_age) < now { > - return None; > - } > - > - Some(entry.tasks.clone()) > - } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) { > - if (entry.timestamp + max_age) < now { > - return None; > - } > - Some(entry.tasks.clone()) > - } else { > - None > - } > - } > - > - // Invalidate cache for a given remote. > - fn invalidate_cache_for_remote(&mut self, remote: &str) { > - self.dirty = true; > - self.content.remote_tasks.remove(remote); > - } > - > - // Lock the cache for modification. > - // > - // While the cache is locked, other users can still read the cache > - // without a lock, since the cache file is replaced atomically > - // when updating. > - fn lock(&self, duration: Duration) -> Result { > - let api_uid = pdm_config::api_user()?.uid; > - let api_gid = pdm_config::api_group()?.gid; > - > - let file_options = CreateOptions::new().owner(api_uid).group(api_gid); > - proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options) > - } > -} > - > -#[derive(Serialize, Deserialize)] > -/// Per-remote entry in the task cache. > -struct TaskCacheEntry { > - timestamp: i64, > - tasks: Vec, > -} > - > -#[derive(Default, Serialize, Deserialize)] > -/// Content of the task cache file. > -struct TaskCacheContent { > - remote_tasks: HashMap, > -} > - > -/// Interval at which tracked tasks are polled > -const RUNNING_CHECK_INTERVAL_S: u64 = 10; > - > -/// Tasks which were started by PDM and are still running > -static RUNNING_FOREIGN_TASKS: LazyLock>> = LazyLock::new(init); > -/// Tasks which were started by PDM and w > -static FINISHED_FOREIGN_TASKS: LazyLock>> = LazyLock::new(init); > - > -fn init() -> RwLock> { > - RwLock::new(HashSet::new()) > -} > - > -/// Insert a remote UPID into the running list > -/// > -/// If it is the first entry in the list, a background task is started to track its state > -/// > -/// Returns the [`JoinHandle`] if a task was started. > -/// > -/// panics on a poisoned mutex > -pub fn track_running_task(task: RemoteUpid) -> Option> { > - let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap(); > - > - // the call inserting the first task in the list needs to start the checking task > - let need_start_task = tasks.is_empty(); > - tasks.insert(task); > - > - if !need_start_task { > - return None; > - } > - drop(tasks); > - > - Some(tokio::spawn(async move { > - loop { > - let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S); > - tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; > - > - let finished_tasks = get_finished_tasks().await; > - > - // skip iteration if we still have tasks, just not finished ones > - if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() { > - continue; > - } > - > - let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap(); > - // we either have finished tasks, or the running task list was empty > - let mut set = RUNNING_FOREIGN_TASKS.write().unwrap(); > - > - for (upid, _status) in finished_tasks { > - if set.remove(&upid) { > - finished.insert(upid); > - } else { > - // someone else removed & persisted the task in the meantime > - } > - } > - > - // if no task remains, end the current task > - // it will be restarted by the next caller that inserts one > - if set.is_empty() { > - return; > - } > - } > - })) > -} > - > -/// Get a list of running foreign tasks > -/// > -/// panics on a poisoned mutex > -pub fn get_running_tasks() -> Vec { > - RUNNING_FOREIGN_TASKS > - .read() > - .unwrap() > - .iter() > - .cloned() > - .collect() > -} > - > -/// Checks all current saved UPIDs if they're still running, and if not, > -/// returns their upids + status > -/// > -/// panics on a poisoned mutex > -pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> { > - let mut finished = Vec::new(); > - let config = match pdm_config::remotes::config() { > - Ok((config, _)) => config, > - Err(err) => { > - log::error!("could not open remotes config: {err}"); > - return Vec::new(); > - } > - }; > - for task in get_running_tasks() { > - match config.get(task.remote()) { > - Some(remote) => match remote.ty { > - RemoteType::Pve => { > - let status = match crate::api::pve::tasks::get_task_status( > - remote.id.clone(), > - task.clone(), > - false, > - ) > - .await > - { > - Ok(status) => status, > - Err(err) => { > - log::error!("could not get status from remote: {err}"); > - finished.push((task, "could not get status".to_string())); > - continue; > - } > - }; > - if let Some(status) = status.exitstatus { > - finished.push((task, status.to_string())); > + if let Some(since) = filters.since { > + if item.starttime < since { > + return false; > } > } > - RemoteType::Pbs => { > - let _client = match crate::pbs_client::connect(remote) { > - Ok(client) => client, > - Err(err) => { > - log::error!("could not get status from remote: {err}"); > - finished.push((task, "could not get status".to_string())); > - continue; > - } > - }; > - // FIXME implement get task status > - finished.push((task, "unknown state".to_string())); > - } > - }, > - None => finished.push((task, "unknown remote".to_string())), > - } > - } > > - finished > + if let Some(needle) = &filters.userfilter { > + if !item.user.contains(needle) { > + return false; > + } > + } > + > + if let Some(typefilter) = &filters.typefilter { > + if !item.worker_type.contains(typefilter) { > + return false; > + } > + } > + > + let state = item.status.as_ref().map(|status| tasktype(status)); > + > + match (state, &filters.statusfilter) { > + (Some(TaskStateType::OK), _) if filters.errors => return false, > + (Some(state), Some(filters)) => { > + if !filters.contains(&state) { > + return false; > + } > + } > + (None, Some(_)) => return false, > + _ => {} > + } > + > + true > + }) > + .collect(); > + > + Ok(returned_tasks) > + }) > + .await? > +} > + > +/// Insert a newly created tasks into the list of tracked tasks. > +/// > +/// Any tracked task will be polled with a short interval until the task > +/// has finished. > +pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> { > + tokio::task::spawn_blocking(move || { > + let cache = get_cache()?.write()?; > + // TODO:: Handle PBS tasks correctly. > + let pve_upid: pve_api_types::PveUpid = task.upid.parse()?; > + let task = TaskCacheItem { > + upid: task.clone(), > + starttime: pve_upid.starttime, > + status: None, > + endtime: None, > + }; > + cache.add_tracked_task(task) > + }) > + .await? > +} > + > +/// Get a new [`TaskCache`] instance. > +/// > +/// No heavy-weight operations are done here, it's fine to call this regularly as part of the > +/// update loop. > +pub fn get_cache() -> Result { > + let file_options = proxmox_product_config::default_create_options(); > + > + let cache_path = Path::new(REMOTE_TASKS_DIR); > + let cache = TaskCache::new( > + cache_path, > + file_options, > + KEEP_OLD_FILES, > + NUMBER_OF_UNCOMPRESSED_FILES, > + ROTATE_AFTER, > + JOURNAL_MAX_SIZE, > + )?; > + > + Ok(cache) > } > > /// Parses a task status string into a TaskStateType _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel