From: Lukas Wagner <l.wagner@proxmox.com>
To: Proxmox Datacenter Manager development discussion
<pdm-devel@lists.proxmox.com>,
Dominik Csapak <d.csapak@proxmox.com>
Subject: Re: [pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache
Date: Thu, 3 Jul 2025 10:05:47 +0200 [thread overview]
Message-ID: <4226966c-dd23-4bcf-9ecf-2f129bf1a07b@proxmox.com> (raw)
In-Reply-To: <112a7d67-79c3-4d23-bee7-8b1d02140104@proxmox.com>
On 2025-05-14 17:27, Dominik Csapak wrote:
>> +
>> +/// Tick interval for the remote task fetching task.
>> +/// This is also the rate at which we check on tracked tasks.
>> +const TASK_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
>> +
>> +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
>> +/// task for this remote).
>> +const REGULAR_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
>
> imho those two are confusingly documented, if 'REGULAR_..' is used
> for the normal interval, why does 'TASK_REFRE..' say its used too for that?
>
I'll make sure to phrase it a bit more clear. Thanks!
>> +/// Number of cycles until a regular refresh.
>> +const REGULAR_REFRESH_CYCLES: u64 =
>> + REGULAR_REFRESH_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
>> +
>> +/// Check if we want to rotate once every hour.
>
> commas are important, one can read this sentence in two ways:
>
> Check if we want to rotate, once every hour
> or
> Check if want to (rotate once every hour) (brackets for clarity)
>
> IMHO a better way to write that is:
>
> Check once every hour if we want to rotate.
Ack
>
>> +const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
>> +/// Number of cycles before we want to check if we should rotate the task archives.
>> +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
>> +
>> +/// Rotate once the most recent archive file is at least 24 hour old.
>> +const ROTATE_AFTER: Duration = Duration::from_secs(24 * 3600);
>> +
>> +/// Keep 7 days worth of tasks.
>> +const KEEP_OLD_FILES: u64 = 7;
>> +
>> +/// Maximum number of concurrent connections per remote.
>> +const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
>> +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_PVE_REMOTE` is taken into
>> +/// consideration when accounting for the total number of connections.
>> +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_PVE_REMOTE` is 5, we can connect
>> +/// to 4 PVE remotes in parallel.
>> +const MAX_CONNECTIONS: usize = 20;
>> +
>> +/// Maximum number of tasks to fetch from a single remote in one API call.
>> +const MAX_TASKS_TO_FETCH: u64 = 5000;
>> +
>> +/// Start the remote task fetching task
>> +pub fn start_task() -> Result<(), Error> {
>> + let api_uid = pdm_config::api_user()?.uid;
>> + let api_gid = pdm_config::api_group()?.gid;
>> + let file_options = CreateOptions::new()
>> + .owner(api_uid)
>> + .group(api_gid)
>> + .perm(Mode::from_bits_truncate(0o0750));
>> + proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
>
> this should probably use the proxmox_product_config crate
>
Ack, I'll check that out
>> +
>> + tokio::spawn(async move {
>> + let task_scheduler = std::pin::pin!(remote_task_fetching_task());
>> + let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
>> + futures::future::select(task_scheduler, abort_future).await;
>> + });
>> +
>> + Ok(())
>> +}
>> +
>> +/// Task which handles fetching remote tasks and task archive rotation.
>> +/// This function never returns.
>> +async fn remote_task_fetching_task() -> ! {
>> + let mut cycle = 0u64;
>> + let mut interval = tokio::time::interval(TASK_REFRESH_INTERVAL);
>> + interval.reset_at(task_utils::next_aligned_instant(TASK_REFRESH_INTERVAL.as_secs()).into());
>> +
>> + // We don't really care about catching up to missed tick, we just want
>> + // a steady tick rate.
>> + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
>> +
>> + if let Err(err) = init_cache().await {
>> + log::error!("error when initialized task cache: {err}");
>> + }
>> +
>> + loop {
>> + interval.tick().await;
>> + if let Err(err) = do_tick(cycle).await {
>> + log::error!("error when fetching remote tasks: {err}");
>> + }
>> +
>> + // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
>> + // better be safe and use .wrapping_add(1) :)
>> + cycle = cycle.wrapping_add(1);
>> + }
>
> you do the cycle check here manually, but this can be bad, e.g.
>
> if one cycle takes a long time (say 1 minute instead of a 10 seconds) and that
> regularly, you fetch the remotes not every 10 minutes (as the comment above would indicate)
> but only every hour
>
> I guess you wanted to be on the safe side and not being too overly aggressive with the polling,
> but having the fetch/rotation interval be that dependent on the cycle duration seems
> also not very good to me.
Yeah, the idea was to combine the regular polling and the tracked task polling into a single task.
But I agree, it'd be probably better to use timestamps instead of cycle counts for keeping
track of what should be done when.
Thanks
>
>> +}
>> +
>> +/// Handle a single timer tick.
>> +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
>> +async fn do_tick(cycle: u64) -> Result<(), Error> {
>> + let cache = remote_tasks::get_cache()?;
>> +
>> + if should_check_for_cache_rotation(cycle) {
>> + log::debug!("checking if remote task archive should be rotated");
>> + if rotate_cache(cache.clone()).await? {
>> + log::info!("rotated remote task archive");
>> + }
>> + }
>> +
>> + let state = cache.read_state();
>> +
>> + let mut all_tasks = HashMap::new();
>> +
>> + let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
>> + let mut join_set = JoinSet::new();
>> +
>> + // Get a list of remotes that we should poll in this cycle.
>> + let remotes = remotes_to_check(cycle, &state).await?;
>> + for remote in remotes {
>> + let since = get_cutoff_timestamp(&remote, &state);
>> +
>> + let permit = if remote.ty == RemoteType::Pve {
>> + // Acquire multiple permits, for PVE remotes we want
>> + // to multiple nodes in parallel.
>> + //
>> + // `.unwrap()` is safe, we never close the semaphore.
>> + Arc::clone(&total_connections_semaphore)
>> + .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
>> + .await
>> + .unwrap()
>
> would it be possible to acquire the connection semaphores dynamicall inside the
> `fetch_tasks` call up to the maximum?
>
> that way, we could e.g. connect to 20 remotes with one host in parallel
> instead of always having maximum of 4 ?
> (not sure about the tokio semaphore possibilities here)
>
> I'd still limit it to CONNECTIONS_PER_PVE_REMOTE for each remote,
> but in case one remote has less nodes, we could utilize the connection count
> for more remotes, doing more work in parallel.
IIRC there was some problem with allocating these on demand, I think there was some potential
for a deadlock - though I can't come up with the 'why' right now. I'll check again and
add some comment if I remember the reason again.
>
>> + } else {
>> + // For PBS remotes we only have a single outgoing connection
>> + //
>> + // `.unwrap()` is safe, we never close the semaphore.
>> + Arc::clone(&total_connections_semaphore)
>> + .acquire_owned()
>> + .await
>> + .unwrap()
>> + };
>> +
>> + join_set.spawn(async move {
>> + log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
>> + let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
>> + format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
>> + });
>> +
>> + drop(permit);
>> + tasks
>> + });
>> + }
>> +
>> +
>> +/// Return list of remotes that are to be polled in this cycle.
>> +///
>> +/// If `cycle` is a multiple of `REGULAR_REFRESH_CYCLES`, the function will
>> +/// return all remotes from the remote config. This ensures that
>> +/// all remotes are polled at regular intervals.
>> +/// In any other case we only return remotes which currently have a tracked
>> +/// task.
>> +/// On daemon startup (when cycle is 0) we return all remotes to ensure
>> +/// that we get an up-to-date task list from all remotes.
>> +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
>> + let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
>> +
>> + let all = cycle % REGULAR_REFRESH_CYCLES == 0;
>> +
>> + if all {
>> + Ok(config.into_iter().map(|(_, section)| section).collect())
>> + } else {
>> + Ok(config
>> + .into_iter()
>> + .filter_map(|(name, remote)| {
>> + if let Some(tracked) = state.tracked_tasks.get(&name) {
>> + if !tracked.is_empty() {
>> + Some(remote)
>> + } else {
>> + None
>> + }
>> + } else {
>> + None
>> + }
>> + })
>
> i think this could be more succinctly written as:
>
> state
> .tracked_tasks
> .get(&name)
> .and_then(|tracked| (!tracked.is_empty()).then_some(remote))
>
Thanks, will do that.
>> + .collect())
>> + }
>> +}
>> +
>> +/// Get the timestamp from which on we should fetch tasks for a given remote.
>> +/// The returned timestamp is a UNIX timestamp (in seconds).
>> +fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
>> + let oldest_active = state.oldest_active_task.get(&remote.id).copied();
>> + let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
>> +
>> + match (oldest_active, youngest_archived) {
>> + (None, None) => 0,
>> + (None, Some(youngest_archived)) => youngest_archived,
>> + (Some(oldest_active), None) => oldest_active,
>> + (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
>> + }
>> +}
>> +
>> +/// Rotate the task cache if necessary.
>> +///
>> +/// Returns Ok(true) the cache's files were rotated.
>> +async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
>> + tokio::task::spawn_blocking(move || {
>> + cache.rotate(
>> + proxmox_time::epoch_i64(),
>> + ROTATE_AFTER.as_secs(),
>> + KEEP_OLD_FILES,
>> + )
>> + })
>> + .await?
>> +}
>
> in pbs, we start a worker task for the log rotation, maybe we want here too ?
>
Hmmm, do we have any guidelines when something should be a worker task and when not?
I'm not opposed to it, but I'm just curious where to draw the line.
Also, being a worker task also implies that it can be cancelled, right? Does that make sense for
something like this?
>> +
>> +/// Fetch tasks (active and finished) from a remote
>> +/// `since` is a UNIX timestamp (seconds).
>> +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
>> + let mut tasks = Vec::new();
>> +
>> + let mut all_successful = true;
>> +
>> + match remote.ty {
>> + RemoteType::Pve => {
>> + let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
>> + let mut join_set = JoinSet::new();
--
- Lukas
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-07-03 8:05 UTC|newest]
Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-05-12 11:41 [pdm-devel] [PATCH proxmox-datacenter-manager v5 0/6] remote task cache fetching task / better cache backend Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 1/6] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-05-14 14:08 ` Dominik Csapak
2025-07-01 10:02 ` Lukas Wagner
2025-07-03 8:05 ` Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache Lukas Wagner
2025-05-14 15:27 ` Dominik Csapak
2025-07-03 8:05 ` Lukas Wagner [this message]
2025-07-03 11:25 ` Dominik Csapak
2025-07-09 11:22 ` Lukas Wagner
2025-07-09 11:35 ` Dominik Csapak
2025-07-09 12:25 ` Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 3/6] remote tasks: improve locking for task archive iterator Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 4/6] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
2025-05-15 6:56 ` Dominik Csapak
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 5/6] fake remote: make the fake_remote feature compile again Lukas Wagner
2025-05-15 6:55 ` Dominik Csapak
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 6/6] fake remote: clippy fixes Lukas Wagner
2025-05-15 7:05 ` Dominik Csapak
2025-08-14 8:04 ` [pdm-devel] superseded: [PATCH proxmox-datacenter-manager v5 0/6] remote task cache fetching task / better cache backend Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=4226966c-dd23-4bcf-9ecf-2f129bf1a07b@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=d.csapak@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.