From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id B54151FF187 for ; Fri, 19 Dec 2025 14:50:06 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id E2472CDCA; Fri, 19 Dec 2025 14:50:55 +0100 (CET) Mime-Version: 1.0 Date: Fri, 19 Dec 2025 14:50:46 +0100 Message-Id: To: "Lukas Wagner" X-Mailer: aerc 0.20.0 References: <20251218142007.279631-1-l.wagner@proxmox.com> <20251218142007.279631-4-l.wagner@proxmox.com> In-Reply-To: <20251218142007.279631-4-l.wagner@proxmox.com> From: "Shannon Sterz" X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1766152233703 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.088 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mod.rs] Subject: Re: [pdm-devel] [PATCH datacenter-manager v2 1/4] remote tasks: move implementation to server crate X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Cc: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" On Thu Dec 18, 2025 at 3:20 PM CET, Lukas Wagner wrote: > Most of the functions from the refresh task are needed for any kind of > manual refresh, so they have to moved to the shared server crate. > > No functional changes. Renamed the do_tick function for better clarity. > > Signed-off-by: Lukas Wagner > --- > .../tasks/remote_tasks.rs | 535 +----------------- > server/src/remote_tasks/mod.rs | 1 + > server/src/remote_tasks/refresh_task.rs | 530 +++++++++++++++++ > 3 files changed, 536 insertions(+), 530 deletions(-) > create mode 100644 server/src/remote_tasks/refresh_task.rs > > diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > index c71a0894..fd8823cb 100644 > --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs > @@ -1,28 +1,10 @@ > -use std::{ > - collections::{HashMap, HashSet}, > - sync::Arc, > - time::{Duration, Instant}, > -}; > +use std::time::Duration; > > use anyhow::Error; > use nix::sys::stat::Mode; > -use tokio::{sync::Semaphore, task::JoinSet}; > - > -use pdm_api_types::{ > - remotes::{Remote, RemoteType}, > - RemoteUpid, > -}; > -use proxmox_section_config::typed::SectionConfigData; > > use server::{ > - connection, > - parallel_fetcher::{NodeResults, ParallelFetcher}, > - pbs_client, > - remote_tasks::{ > - self, > - task_cache::{GetTasks, NodeFetchSuccessMap, State, TaskCache, TaskCacheItem}, > - KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER, > - }, > + remote_tasks::{self, refresh_task, REMOTE_TASKS_DIR}, nit: could use module level imports here :) > task_utils, > }; > > @@ -30,105 +12,6 @@ use server::{ > /// This is also the rate at which we check on tracked tasks. > const POLL_INTERVAL: Duration = Duration::from_secs(10); > > -/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked > -/// task for this remote). > -const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600); > - > -/// Interval in seconds at which we poll active tasks. This only really affects 'foreign' (as in, > -/// not started by PDM) tasks. Tasks which were started by PDM are always 'tracked' and therefore > -/// polled at the interval set in [`POLL_INTERVAL`]. > -// NOTE: Since we at the moment never query active tasks from remotes, this is merely a safeguard > -// to clear stuck active tasks from a previous bug. If we at some point query active tasks, we > -// might lower this interval. > -const POLL_ACTIVE_INTERVAL: Duration = Duration::from_secs(600); > - > -/// Interval at which to check for task cache rotation. > -const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600); > - > -/// Interval at which the task cache journal should be applied. > -/// > -/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes. > -/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing > -/// it will be slower than the task archive itself, as the entire journal must be loaded into > -/// memory and then sorted by task starttime. Applying the journal more often might > -/// lead to more writes, but should yield better performance. > -const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600); > - > -/// Maximum number of concurrent connections per remote. > -const CONNECTIONS_PER_PVE_REMOTE: usize = 5; > - > -/// Maximum number of total concurrent connections. > -const MAX_CONNECTIONS: usize = 20; > - > -/// Maximum number of tasks to fetch from a single remote in one API call. > -const MAX_TASKS_TO_FETCH: u64 = 5000; > - > -/// (Ephemeral) Remote task fetching task state. > -struct TaskState { > - /// Time at which we last checked for archive rotation. > - last_rotate_check: Instant, > - /// Time at which we fetch tasks the last time. > - last_fetch: Instant, > - /// Time at which we last applied the journal. > - last_journal_apply: Instant, > - /// Time at which we polled active tasks. This is done to ensure that > - /// active tasks are never stuck in the 'active' state > - last_active_poll: Instant, > -} > - > -impl TaskState { > - fn new() -> Self { > - let now = Instant::now(); > - > - Self { > - last_rotate_check: now - CHECK_ROTATE_INTERVAL, > - last_fetch: now - TASK_FETCH_INTERVAL, > - last_journal_apply: now - APPLY_JOURNAL_INTERVAL, > - last_active_poll: now - POLL_ACTIVE_INTERVAL, > - } > - } > - > - /// Reset the task archive rotation timestamp. > - fn reset_rotate_check(&mut self) { > - self.last_rotate_check = Instant::now(); > - } > - > - /// Reset the task fetch timestamp. > - fn reset_fetch(&mut self) { > - self.last_fetch = Instant::now(); > - } > - > - /// Reset the journal apply timestamp. > - fn reset_journal_apply(&mut self) { > - self.last_journal_apply = Instant::now(); > - } > - > - /// Reset the journal apply timestamp. > - fn reset_active_poll(&mut self) { > - self.last_active_poll = Instant::now(); > - } > - > - /// Should we check for archive rotation? > - fn is_due_for_rotate_check(&self) -> bool { > - Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL > - } > - > - /// Should we fetch tasks? > - fn is_due_for_fetch(&self) -> bool { > - Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL > - } > - > - /// Should we apply the task archive's journal? > - fn is_due_for_journal_apply(&self) -> bool { > - Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL > - } > - > - /// Should we poll active tasks? > - fn is_due_for_active_poll(&self) -> bool { > - Instant::now().duration_since(self.last_active_poll) > POLL_ACTIVE_INTERVAL > - } > -} > - > /// Start the remote task fetching task > pub fn start_task() -> Result<(), Error> { > let dir_options = > @@ -148,7 +31,7 @@ pub fn start_task() -> Result<(), Error> { > /// Task which handles fetching remote tasks and task archive rotation. > /// This function never returns. > async fn remote_task_fetching_task() -> ! { > - let mut task_state = TaskState::new(); > + let mut task_state = refresh_task::TaskState::new(); > > let mut interval = tokio::time::interval(POLL_INTERVAL); > interval.reset_at(task_utils::next_aligned_instant(POLL_INTERVAL.as_secs()).into()); > @@ -157,422 +40,14 @@ async fn remote_task_fetching_task() -> ! { > // a steady tick rate. > interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); > > - if let Err(err) = init_cache().await { > + if let Err(err) = refresh_task::init_cache().await { > log::error!("error when initialized task cache: {err:#}"); > } > > loop { > interval.tick().await; > - if let Err(err) = do_tick(&mut task_state).await { > + if let Err(err) = remote_tasks::refresh_task::handle_timer_tick(&mut task_state).await { nit: you already import `refresh_task` above. so drop `remote_tasks` in front here, should also enable you to drop the `self` import above. > log::error!("error when fetching remote tasks: {err:#}"); > } > } > } > - > -/// Handle a single timer tick. > -/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks. > -async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> { > - let cache = remote_tasks::get_cache()?; > - > - if task_state.is_due_for_rotate_check() { > - log::debug!("checking if remote task archive should be rotated"); > - if rotate_cache(cache.clone()).await? { > - log::info!("rotated remote task archive"); > - } > - > - task_state.reset_rotate_check(); > - } > - > - if task_state.is_due_for_journal_apply() { > - apply_journal(cache.clone()).await?; > - task_state.reset_journal_apply(); > - } > - > - let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??; > - > - let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS)); > - > - let cache_state = cache.read_state(); > - > - let poll_results = if task_state.is_due_for_active_poll() { > - let mut tasks_to_poll: HashSet = > - HashSet::from_iter(cache_state.tracked_tasks().cloned()); > - > - let active_tasks = get_active_tasks(cache.clone()).await?; > - tasks_to_poll.extend(active_tasks.into_iter()); > - > - let poll_results = poll_tracked_tasks( > - &remote_config, > - tasks_to_poll.iter(), > - Arc::clone(&total_connections_semaphore), > - ) > - .await?; > - > - task_state.reset_active_poll(); > - > - poll_results > - } else { > - poll_tracked_tasks( > - &remote_config, > - cache_state.tracked_tasks(), > - Arc::clone(&total_connections_semaphore), > - ) > - .await? > - }; > - > - // Get a list of remotes that we should poll in this cycle. > - let remotes = if task_state.is_due_for_fetch() { > - task_state.reset_fetch(); > - get_all_remotes(&remote_config) > - } else { > - get_remotes_with_finished_tasks(&remote_config, &poll_results) > - }; > - > - let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await; > - > - if !all_tasks.is_empty() > - || poll_results > - .iter() > - .any(|(_, result)| matches!(result, PollResult::RemoteGone | PollResult::RequestError)) > - { > - update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?; > - } > - > - Ok(()) > -} > - > -/// Initialize the remote task cache with initial archive files, in case there are not > -/// any archive files yet. > -/// > -/// This allows us to immediately backfill remote task history when setting up a new PDM instance > -/// without any prior task archive rotation. > -async fn init_cache() -> Result<(), Error> { > - tokio::task::spawn_blocking(|| { > - let cache = remote_tasks::get_cache()?; > - cache.write()?.init(proxmox_time::epoch_i64())?; > - Ok(()) > - }) > - .await? > -} > - > -/// Fetch tasks from a list of remotes. > -/// > -/// Returns a list of tasks and a map that shows whether we want to update the > -/// cutoff timestamp in the statefile. We don't want to update the cutoff if > -/// the connection to one remote failed or if we could not reach all remotes in a cluster. > -async fn fetch_remotes( > - remotes: Vec, > - cache_state: Arc, > -) -> (Vec, NodeFetchSuccessMap) { > - let fetcher = ParallelFetcher { > - max_connections: MAX_CONNECTIONS, > - max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE, > - context: cache_state, > - }; > - > - let fetch_results = fetcher > - .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node) > - .await; > - > - let mut all_tasks = Vec::new(); > - let mut node_success_map = NodeFetchSuccessMap::default(); > - > - for (remote_name, result) in fetch_results.remote_results { > - match result { > - Ok(remote_result) => { > - for (node_name, node_result) in remote_result.node_results { > - match node_result { > - Ok(NodeResults { data, .. }) => { > - all_tasks.extend(data); > - node_success_map.set_node_success(remote_name.clone(), node_name); > - } > - Err(err) => { > - log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}"); > - } > - } > - } > - } > - Err(err) => { > - log::error!("could not fetch tasks from remote '{remote_name}': {err:#}"); > - } > - } > - } > - > - (all_tasks, node_success_map) > -} > - > -async fn fetch_tasks_from_single_node( > - context: Arc, > - remote: Remote, > - node: String, > -) -> Result, Error> { > - let since = context > - .cutoff_timestamp(&remote.id, &node) > - .unwrap_or_else(|| { > - proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 > - }); > - > - match remote.ty { > - RemoteType::Pve => { > - let params = pve_api_types::ListTasks { > - source: Some(pve_api_types::ListTasksSource::Archive), > - since: Some(since), > - // If `limit` is not provided, we only receive 50 tasks > - limit: Some(MAX_TASKS_TO_FETCH), > - ..Default::default() > - }; > - > - let client = connection::make_pve_client(&remote)?; > - > - let task_list = client > - .get_task_list(&node, params) > - .await? > - .into_iter() > - .map(|task| map_pve_task(task, remote.id.clone())) > - .collect(); > - > - Ok(task_list) > - } > - RemoteType::Pbs => { > - let params = pbs_client::ListTasks { > - since: Some(since), > - // If `limit` is not provided, we only receive 50 tasks > - limit: Some(MAX_TASKS_TO_FETCH), > - }; > - > - let client = connection::make_pbs_client(&remote)?; > - > - let task_list = client > - .get_task_list(params) > - .await? > - .into_iter() > - .filter_map(|task| { > - if task.endtime.is_some() { > - // We only care about finished tasks. > - Some(map_pbs_task(task, remote.id.clone())) > - } else { > - None > - } > - }) > - .collect(); > - > - Ok(task_list) > - } > - } > -} > - > -/// Return all remotes from the given config. > -fn get_all_remotes(remote_config: &SectionConfigData) -> Vec { > - remote_config > - .into_iter() > - .map(|(_, section)| section) > - .cloned() > - .collect() > -} > - > -/// Return all remotes that correspond to a list of finished tasks. > -fn get_remotes_with_finished_tasks( > - remote_config: &SectionConfigData, > - poll_results: &HashMap, > -) -> Vec { > - let remotes_with_finished_tasks: HashSet<&str> = poll_results > - .iter() > - .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote())) > - .collect(); > - > - remote_config > - .into_iter() > - .filter_map(|(name, remote)| { > - remotes_with_finished_tasks > - .contains(&name) > - .then_some(remote) > - }) > - .cloned() > - .collect() > -} > - > -/// Rotate the task cache if necessary. > -/// > -/// Returns Ok(true) the cache's files were rotated. > -async fn rotate_cache(cache: TaskCache) -> Result { > - tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await? > -} > - > -/// Apply the task cache journal. > -async fn apply_journal(cache: TaskCache) -> Result<(), Error> { > - tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await? > -} > - > -/// Get a list of active tasks. > -async fn get_active_tasks(cache: TaskCache) -> Result, Error> { > - Ok(tokio::task::spawn_blocking(move || { > - let tasks: Vec = cache > - .read()? > - .get_tasks(GetTasks::Active)? > - .map(|t| t.upid) > - .collect(); > - > - Ok::, Error>(tasks) > - }) > - .await??) > -} > - > -#[derive(PartialEq, Debug)] > -/// Outcome from polling a tracked task. > -enum PollResult { > - /// Tasks is still running. > - Running, > - /// Task is finished, poll remote tasks to get final status/endtime. > - Finished, > - /// Should be dropped from the active file. > - RequestError, > - /// Remote does not exist any more -> remove immediately from tracked task list. > - RemoteGone, > -} > - > -/// Poll all tracked tasks. > -async fn poll_tracked_tasks( > - remote_config: &SectionConfigData, > - tracked_tasks: impl Iterator, > - total_connections_semaphore: Arc, > -) -> Result, Error> { > - let mut join_set = JoinSet::new(); > - > - for task in tracked_tasks.cloned() { > - let permit = Arc::clone(&total_connections_semaphore) > - .acquire_owned() > - .await > - .unwrap(); > - > - let remote = remote_config.get(task.remote()).cloned(); > - > - join_set.spawn(async move { > - // Move permit into this async block. > - let _permit = permit; > - > - match remote { > - Some(remote) => poll_single_tracked_task(remote, task).await, > - None => { > - log::info!( > - "remote {} does not exist any more, dropping tracked task", > - task.remote() > - ); > - (task, PollResult::RemoteGone) > - } > - } > - }); > - } > - > - let mut results = HashMap::new(); > - while let Some(task_result) = join_set.join_next().await { > - let (upid, result) = task_result?; > - results.insert(upid, result); > - } > - > - Ok(results) > -} > - > -/// Poll a single tracked task. > -async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) { > - match remote.ty { > - RemoteType::Pve => { > - log::debug!("polling tracked task {}", task); > - > - let status = match server::api::pve::tasks::get_task_status( > - remote.id.clone(), > - task.clone(), > - false, > - ) > - .await > - { > - Ok(status) => status, > - Err(err) => { > - log::error!("could not get status from remote: {err:#}"); > - return (task, PollResult::RequestError); > - } > - }; > - > - let result = if status.exitstatus.is_some() { > - PollResult::Finished > - } else { > - PollResult::Running > - }; > - > - (task, result) > - } > - RemoteType::Pbs => { > - let status = match server::api::pbs::tasks::get_task_status( > - remote.id.clone(), > - task.clone(), > - false, > - ) > - .await > - { > - Ok(status) => status, > - Err(err) => { > - log::error!("could not get status from remote: {err:#}"); > - return (task, PollResult::RequestError); > - } > - }; > - > - let result = if status.exitstatus.is_some() { > - PollResult::Finished > - } else { > - PollResult::Running > - }; > - > - (task, result) > - } > - } > -} > - > -/// Map a `pve_api_types::ListTasksResponse` to `TaskCacheItem` > -fn map_pve_task(task: pve_api_types::ListTasksResponse, remote: String) -> TaskCacheItem { > - let remote_upid = RemoteUpid::new(remote, RemoteType::Pve, task.upid); > - > - TaskCacheItem { > - upid: remote_upid, > - starttime: task.starttime, > - endtime: task.endtime, > - status: task.status, > - } > -} > - > -/// Map a `pbs_api_types::TaskListItem` to `TaskCacheItem` > -fn map_pbs_task(task: pbs_api_types::TaskListItem, remote: String) -> TaskCacheItem { > - let remote_upid = RemoteUpid::new(remote, RemoteType::Pbs, task.upid); > - > - TaskCacheItem { > - upid: remote_upid, > - starttime: task.starttime, > - endtime: task.endtime, > - status: task.status, > - } > -} > - > -/// Update task cache with results from tracked task polling & regular task fetching. > -async fn update_task_cache( > - cache: TaskCache, > - new_tasks: Vec, > - update_state_for_remote: NodeFetchSuccessMap, > - poll_results: HashMap, > -) -> Result<(), Error> { > - tokio::task::spawn_blocking(move || { > - let drop_tracked = poll_results > - .into_iter() > - .filter_map(|(upid, result)| match result { > - PollResult::Running => None, > - PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => { > - Some(upid) > - } > - }) > - .collect(); > - > - cache > - .write()? > - .update(new_tasks, &update_state_for_remote, drop_tracked)?; > - > - Ok(()) > - }) > - .await? > -} > diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs > index b080811f..50ac6708 100644 > --- a/server/src/remote_tasks/mod.rs > +++ b/server/src/remote_tasks/mod.rs > @@ -5,6 +5,7 @@ use anyhow::Error; > use pdm_api_types::{NativeUpid, RemoteUpid, TaskFilters, TaskListItem, TaskStateType}; > use pve_api_types::PveUpid; > > +pub mod refresh_task; > pub mod task_cache; > > use task_cache::{GetTasks, TaskCache, TaskCacheItem}; > diff --git a/server/src/remote_tasks/refresh_task.rs b/server/src/remote_tasks/refresh_task.rs > new file mode 100644 > index 00000000..0e8ed345 > --- /dev/null > +++ b/server/src/remote_tasks/refresh_task.rs > @@ -0,0 +1,530 @@ > +use std::{ > + collections::{HashMap, HashSet}, > + sync::Arc, > + time::{Duration, Instant}, > +}; > + > +use anyhow::Error; > +use pdm_api_types::{ > + remotes::{Remote, RemoteType}, > + RemoteUpid, > +}; > +use proxmox_section_config::typed::SectionConfigData; > +use tokio::{sync::Semaphore, task::JoinSet}; > + > +use crate::{ > + api, connection, > + parallel_fetcher::{NodeResults, ParallelFetcher}, > + pbs_client, > + remote_tasks::{ > + task_cache::{GetTasks, NodeFetchSuccessMap, State, TaskCache, TaskCacheItem}, > + KEEP_OLD_FILES, ROTATE_AFTER, > + }, > +}; > + > +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked > +/// task for this remote). > +const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600); > + > +/// Interval in seconds at which we poll active tasks. This only really affects 'foreign' (as in, > +/// not started by PDM) tasks. Tasks which were started by PDM are always 'tracked' and therefore > +/// polled at the interval set in [`POLL_INTERVAL`]. > +// NOTE: Since we at the moment never query active tasks from remotes, this is merely a safeguard > +// to clear stuck active tasks from a previous bug. If we at some point query active tasks, we > +// might lower this interval. > +const POLL_ACTIVE_INTERVAL: Duration = Duration::from_secs(600); > + > +/// Interval at which to check for task cache rotation. > +const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600); > + > +/// Interval at which the task cache journal should be applied. > +/// > +/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes. > +/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing > +/// it will be slower than the task archive itself, as the entire journal must be loaded into > +/// memory and then sorted by task starttime. Applying the journal more often might > +/// lead to more writes, but should yield better performance. > +const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600); > + > +/// Maximum number of concurrent connections per remote. > +const CONNECTIONS_PER_PVE_REMOTE: usize = 5; > + > +/// Maximum number of total concurrent connections. > +const MAX_CONNECTIONS: usize = 20; > + > +/// Maximum number of tasks to fetch from a single remote in one API call. > +const MAX_TASKS_TO_FETCH: u64 = 5000; > + > +/// (Ephemeral) Remote task fetching task state. > +pub struct TaskState { > + /// Time at which we last checked for archive rotation. > + last_rotate_check: Instant, > + /// Time at which we fetch tasks the last time. > + last_fetch: Instant, > + /// Time at which we last applied the journal. > + last_journal_apply: Instant, > + /// Time at which we polled active tasks. This is done to ensure that > + /// active tasks are never stuck in the 'active' state > + last_active_poll: Instant, > +} > + > +impl TaskState { > + pub fn new() -> Self { > + let now = Instant::now(); > + > + Self { > + last_rotate_check: now - CHECK_ROTATE_INTERVAL, > + last_fetch: now - TASK_FETCH_INTERVAL, > + last_journal_apply: now - APPLY_JOURNAL_INTERVAL, > + last_active_poll: now - POLL_ACTIVE_INTERVAL, > + } > + } > + > + /// Reset the task archive rotation timestamp. > + fn reset_rotate_check(&mut self) { > + self.last_rotate_check = Instant::now(); > + } > + > + /// Reset the task fetch timestamp. > + fn reset_fetch(&mut self) { > + self.last_fetch = Instant::now(); > + } > + > + /// Reset the journal apply timestamp. > + fn reset_journal_apply(&mut self) { > + self.last_journal_apply = Instant::now(); > + } > + > + /// Reset the journal apply timestamp. > + fn reset_active_poll(&mut self) { > + self.last_active_poll = Instant::now(); > + } > + > + /// Should we check for archive rotation? > + fn is_due_for_rotate_check(&self) -> bool { > + Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL > + } > + > + /// Should we fetch tasks? > + fn is_due_for_fetch(&self) -> bool { > + Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL > + } > + > + /// Should we apply the task archive's journal? > + fn is_due_for_journal_apply(&self) -> bool { > + Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL > + } > + > + /// Should we poll active tasks? > + fn is_due_for_active_poll(&self) -> bool { > + Instant::now().duration_since(self.last_active_poll) > POLL_ACTIVE_INTERVAL > + } > +} > + > +/// Handle a single timer tick. > +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks. > +pub async fn handle_timer_tick(task_state: &mut TaskState) -> Result<(), Error> { > + let cache = super::get_cache()?; > + > + if task_state.is_due_for_rotate_check() { > + log::debug!("checking if remote task archive should be rotated"); > + if rotate_cache(cache.clone()).await? { > + log::info!("rotated remote task archive"); > + } > + > + task_state.reset_rotate_check(); > + } > + > + if task_state.is_due_for_journal_apply() { > + apply_journal(cache.clone()).await?; > + task_state.reset_journal_apply(); > + } > + > + let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??; > + > + let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS)); > + > + let cache_state = cache.read_state(); > + > + let poll_results = if task_state.is_due_for_active_poll() { > + let mut tasks_to_poll: HashSet = > + HashSet::from_iter(cache_state.tracked_tasks().cloned()); > + > + let active_tasks = get_active_tasks(cache.clone()).await?; > + tasks_to_poll.extend(active_tasks.into_iter()); > + > + let poll_results = poll_tracked_tasks( > + &remote_config, > + tasks_to_poll.iter(), > + Arc::clone(&total_connections_semaphore), > + ) > + .await?; > + > + task_state.reset_active_poll(); > + > + poll_results > + } else { > + poll_tracked_tasks( > + &remote_config, > + cache_state.tracked_tasks(), > + Arc::clone(&total_connections_semaphore), > + ) > + .await? > + }; > + > + // Get a list of remotes that we should poll in this cycle. > + let remotes = if task_state.is_due_for_fetch() { > + task_state.reset_fetch(); > + get_all_remotes(&remote_config) > + } else { > + get_remotes_with_finished_tasks(&remote_config, &poll_results) > + }; > + > + let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await; > + > + if !all_tasks.is_empty() > + || poll_results > + .iter() > + .any(|(_, result)| matches!(result, PollResult::RemoteGone | PollResult::RequestError)) > + { > + update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?; > + } > + > + Ok(()) > +} > + > +/// Initialize the remote task cache with initial archive files, in case there are not > +/// any archive files yet. > +/// > +/// This allows us to immediately backfill remote task history when setting up a new PDM instance > +/// without any prior task archive rotation. > +pub async fn init_cache() -> Result<(), Error> { > + tokio::task::spawn_blocking(|| { > + let cache = super::get_cache()?; > + cache.write()?.init(proxmox_time::epoch_i64())?; > + Ok(()) > + }) > + .await? > +} > + > +/// Fetch tasks from a list of remotes. > +/// > +/// Returns a list of tasks and a map that shows whether we want to update the > +/// cutoff timestamp in the statefile. We don't want to update the cutoff if > +/// the connection to one remote failed or if we could not reach all remotes in a cluster. > +async fn fetch_remotes( > + remotes: Vec, > + cache_state: Arc, > +) -> (Vec, NodeFetchSuccessMap) { > + let fetcher = ParallelFetcher { > + max_connections: MAX_CONNECTIONS, > + max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE, > + context: cache_state, > + }; > + > + let fetch_results = fetcher > + .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node) > + .await; > + > + let mut all_tasks = Vec::new(); > + let mut node_success_map = NodeFetchSuccessMap::default(); > + > + for (remote_name, result) in fetch_results.remote_results { > + match result { > + Ok(remote_result) => { > + for (node_name, node_result) in remote_result.node_results { > + match node_result { > + Ok(NodeResults { data, .. }) => { > + all_tasks.extend(data); > + node_success_map.set_node_success(remote_name.clone(), node_name); > + } > + Err(err) => { > + log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}"); > + } > + } > + } > + } > + Err(err) => { > + log::error!("could not fetch tasks from remote '{remote_name}': {err:#}"); > + } > + } > + } > + > + (all_tasks, node_success_map) > +} > + > +async fn fetch_tasks_from_single_node( > + context: Arc, > + remote: Remote, > + node: String, > +) -> Result, Error> { > + let since = context > + .cutoff_timestamp(&remote.id, &node) > + .unwrap_or_else(|| { > + proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64 > + }); > + > + match remote.ty { > + RemoteType::Pve => { > + let params = pve_api_types::ListTasks { > + source: Some(pve_api_types::ListTasksSource::Archive), > + since: Some(since), > + // If `limit` is not provided, we only receive 50 tasks > + limit: Some(MAX_TASKS_TO_FETCH), > + ..Default::default() > + }; > + > + let client = connection::make_pve_client(&remote)?; > + > + let task_list = client > + .get_task_list(&node, params) > + .await? > + .into_iter() > + .map(|task| map_pve_task(task, remote.id.clone())) > + .collect(); > + > + Ok(task_list) > + } > + RemoteType::Pbs => { > + let params = pbs_client::ListTasks { > + since: Some(since), > + // If `limit` is not provided, we only receive 50 tasks > + limit: Some(MAX_TASKS_TO_FETCH), > + }; > + > + let client = connection::make_pbs_client(&remote)?; > + > + let task_list = client > + .get_task_list(params) > + .await? > + .into_iter() > + .filter_map(|task| { > + if task.endtime.is_some() { > + // We only care about finished tasks. > + Some(map_pbs_task(task, remote.id.clone())) > + } else { > + None > + } > + }) > + .collect(); > + > + Ok(task_list) > + } > + } > +} > + > +/// Return all remotes from the given config. > +fn get_all_remotes(remote_config: &SectionConfigData) -> Vec { > + remote_config > + .into_iter() > + .map(|(_, section)| section) > + .cloned() > + .collect() > +} > + > +/// Return all remotes that correspond to a list of finished tasks. > +fn get_remotes_with_finished_tasks( > + remote_config: &SectionConfigData, > + poll_results: &HashMap, > +) -> Vec { > + let remotes_with_finished_tasks: HashSet<&str> = poll_results > + .iter() > + .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote())) > + .collect(); > + > + remote_config > + .into_iter() > + .filter_map(|(name, remote)| { > + remotes_with_finished_tasks > + .contains(&name) > + .then_some(remote) > + }) > + .cloned() > + .collect() > +} > + > +/// Rotate the task cache if necessary. > +/// > +/// Returns Ok(true) the cache's files were rotated. > +async fn rotate_cache(cache: TaskCache) -> Result { > + tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await? > +} > + > +/// Apply the task cache journal. > +async fn apply_journal(cache: TaskCache) -> Result<(), Error> { > + tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await? > +} > + > +/// Get a list of active tasks. > +async fn get_active_tasks(cache: TaskCache) -> Result, Error> { > + Ok(tokio::task::spawn_blocking(move || { > + let tasks: Vec = cache > + .read()? > + .get_tasks(GetTasks::Active)? > + .map(|t| t.upid) > + .collect(); > + > + Ok::, Error>(tasks) > + }) > + .await??) > +} > + > +#[derive(PartialEq, Debug)] > +/// Outcome from polling a tracked task. > +enum PollResult { > + /// Tasks is still running. > + Running, > + /// Task is finished, poll remote tasks to get final status/endtime. > + Finished, > + /// Should be dropped from the active file. > + RequestError, > + /// Remote does not exist any more -> remove immediately from tracked task list. > + RemoteGone, > +} > + > +/// Poll all tracked tasks. > +async fn poll_tracked_tasks( > + remote_config: &SectionConfigData, > + tracked_tasks: impl Iterator, > + total_connections_semaphore: Arc, > +) -> Result, Error> { > + let mut join_set = JoinSet::new(); > + > + for task in tracked_tasks.cloned() { > + let permit = Arc::clone(&total_connections_semaphore) > + .acquire_owned() > + .await > + .unwrap(); > + > + let remote = remote_config.get(task.remote()).cloned(); > + > + join_set.spawn(async move { > + // Move permit into this async block. > + let _permit = permit; > + > + match remote { > + Some(remote) => poll_single_tracked_task(remote, task).await, > + None => { > + log::info!( > + "remote {} does not exist any more, dropping tracked task", > + task.remote() > + ); > + (task, PollResult::RemoteGone) > + } > + } > + }); > + } > + > + let mut results = HashMap::new(); > + while let Some(task_result) = join_set.join_next().await { > + let (upid, result) = task_result?; > + results.insert(upid, result); > + } > + > + Ok(results) > +} > + > +/// Poll a single tracked task. > +async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) { > + match remote.ty { > + RemoteType::Pve => { > + log::debug!("polling tracked task {}", task); > + > + let status = match api::pve::tasks::get_task_status( > + remote.id.clone(), > + task.clone(), > + false, > + ) > + .await > + { > + Ok(status) => status, > + Err(err) => { > + log::error!("could not get status from remote: {err:#}"); > + return (task, PollResult::RequestError); > + } > + }; > + > + let result = if status.exitstatus.is_some() { > + PollResult::Finished > + } else { > + PollResult::Running > + }; > + > + (task, result) > + } > + RemoteType::Pbs => { > + let status = match api::pbs::tasks::get_task_status( > + remote.id.clone(), > + task.clone(), > + false, > + ) > + .await > + { > + Ok(status) => status, > + Err(err) => { > + log::error!("could not get status from remote: {err:#}"); > + return (task, PollResult::RequestError); > + } > + }; > + > + let result = if status.exitstatus.is_some() { > + PollResult::Finished > + } else { > + PollResult::Running > + }; > + > + (task, result) > + } > + } > +} > + > +/// Map a `pve_api_types::ListTasksResponse` to `TaskCacheItem` > +fn map_pve_task(task: pve_api_types::ListTasksResponse, remote: String) -> TaskCacheItem { > + let remote_upid = RemoteUpid::new(remote, RemoteType::Pve, task.upid); > + > + TaskCacheItem { > + upid: remote_upid, > + starttime: task.starttime, > + endtime: task.endtime, > + status: task.status, > + } > +} > + > +/// Map a `pbs_api_types::TaskListItem` to `TaskCacheItem` > +fn map_pbs_task(task: pbs_api_types::TaskListItem, remote: String) -> TaskCacheItem { > + let remote_upid = RemoteUpid::new(remote, RemoteType::Pbs, task.upid); > + > + TaskCacheItem { > + upid: remote_upid, > + starttime: task.starttime, > + endtime: task.endtime, > + status: task.status, > + } > +} > + > +/// Update task cache with results from tracked task polling & regular task fetching. > +async fn update_task_cache( > + cache: TaskCache, > + new_tasks: Vec, > + update_state_for_remote: NodeFetchSuccessMap, > + poll_results: HashMap, > +) -> Result<(), Error> { > + tokio::task::spawn_blocking(move || { > + let drop_tracked = poll_results > + .into_iter() > + .filter_map(|(upid, result)| match result { > + PollResult::Running => None, > + PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => { > + Some(upid) > + } > + }) > + .collect(); > + > + cache > + .write()? > + .update(new_tasks, &update_state_for_remote, drop_tracked)?; > + > + Ok(()) > + }) > + .await? > +} _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel