public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: Proxmox Datacenter Manager development discussion
	<pdm-devel@lists.proxmox.com>,
	Dominik Csapak <d.csapak@proxmox.com>
Subject: Re: [pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache
Date: Thu, 3 Jul 2025 10:05:47 +0200	[thread overview]
Message-ID: <4226966c-dd23-4bcf-9ecf-2f129bf1a07b@proxmox.com> (raw)
In-Reply-To: <112a7d67-79c3-4d23-bee7-8b1d02140104@proxmox.com>

On  2025-05-14 17:27, Dominik Csapak wrote:
>> +
>> +/// Tick interval for the remote task fetching task.
>> +/// This is also the rate at which we check on tracked tasks.
>> +const TASK_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
>> +
>> +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
>> +/// task for this remote).
>> +const REGULAR_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
> 
> imho those two are confusingly documented, if 'REGULAR_..' is used
> for the normal interval, why does 'TASK_REFRE..' say its used too for that?
> 

I'll make sure to phrase it a bit more clear. Thanks!

>> +/// Number of cycles until a regular refresh.
>> +const REGULAR_REFRESH_CYCLES: u64 =
>> +    REGULAR_REFRESH_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
>> +
>> +/// Check if we want to rotate once every hour.
> 
> commas are important, one can read this sentence in two ways:
> 
> Check if we want to rotate, once every hour
> or
> Check if want to (rotate once every hour) (brackets for clarity)
> 
> IMHO a better way to write that is:
> 
> Check once every hour if we want to rotate.

Ack

> 
>> +const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
>> +/// Number of cycles before we want to check if we should rotate the task archives.
>> +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
>> +
>> +/// Rotate once the most recent archive file is at least 24 hour old.
>> +const ROTATE_AFTER: Duration = Duration::from_secs(24 * 3600);
>> +
>> +/// Keep 7 days worth of tasks.
>> +const KEEP_OLD_FILES: u64 = 7;
>> +
>> +/// Maximum number of concurrent connections per remote.
>> +const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
>> +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_PVE_REMOTE` is taken into
>> +/// consideration when accounting for the total number of connections.
>> +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_PVE_REMOTE` is 5, we can connect
>> +/// to 4 PVE remotes in parallel.
>> +const MAX_CONNECTIONS: usize = 20;
>> +
>> +/// Maximum number of tasks to fetch from a single remote in one API call.
>> +const MAX_TASKS_TO_FETCH: u64 = 5000;
>> +
>> +/// Start the remote task fetching task
>> +pub fn start_task() -> Result<(), Error> {
>> +    let api_uid = pdm_config::api_user()?.uid;
>> +    let api_gid = pdm_config::api_group()?.gid;
>> +    let file_options = CreateOptions::new()
>> +        .owner(api_uid)
>> +        .group(api_gid)
>> +        .perm(Mode::from_bits_truncate(0o0750));
>> +    proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
> 
> this should probably use the proxmox_product_config crate
> 

Ack, I'll check that out

>> +
>> +    tokio::spawn(async move {
>> +        let task_scheduler = std::pin::pin!(remote_task_fetching_task());
>> +        let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
>> +        futures::future::select(task_scheduler, abort_future).await;
>> +    });
>> +
>> +    Ok(())
>> +}
>> +
>> +/// Task which handles fetching remote tasks and task archive rotation.
>> +/// This function never returns.
>> +async fn remote_task_fetching_task() -> ! {
>> +    let mut cycle = 0u64;
>> +    let mut interval = tokio::time::interval(TASK_REFRESH_INTERVAL);
>> +    interval.reset_at(task_utils::next_aligned_instant(TASK_REFRESH_INTERVAL.as_secs()).into());
>> +
>> +    // We don't really care about catching up to missed tick, we just want
>> +    // a steady tick rate.
>> +    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
>> +
>> +    if let Err(err) = init_cache().await {
>> +        log::error!("error when initialized task cache: {err}");
>> +    }
>> +
>> +    loop {
>> +        interval.tick().await;
>> +        if let Err(err) = do_tick(cycle).await {
>> +            log::error!("error when fetching remote tasks: {err}");
>> +        }
>> +
>> +        // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
>> +        // better be safe and use .wrapping_add(1) :)
>> +        cycle = cycle.wrapping_add(1);
>> +    }
> 
> you do the cycle check here manually, but this can be bad, e.g.
> 
> if one cycle takes a long time (say 1 minute instead of a 10 seconds) and that
> regularly, you fetch the remotes not every 10 minutes (as the comment above would indicate)
> but only every hour
> 
> I guess you wanted to be on the safe side and not being too overly aggressive with the polling,
> but having the fetch/rotation interval be that dependent on the cycle duration seems
> also not very good to me.

Yeah, the idea was to combine the regular polling and the tracked task polling into a single task.
But I agree, it'd be probably better to use timestamps instead of cycle counts for keeping
track of what should be done when.

Thanks

> 
>> +}
>> +
>> +/// Handle a single timer tick.
>> +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
>> +async fn do_tick(cycle: u64) -> Result<(), Error> {
>> +    let cache = remote_tasks::get_cache()?;
>> +
>> +    if should_check_for_cache_rotation(cycle) {
>> +        log::debug!("checking if remote task archive should be rotated");
>> +        if rotate_cache(cache.clone()).await? {
>> +            log::info!("rotated remote task archive");
>> +        }
>> +    }
>> +
>> +    let state = cache.read_state();
>> +
>> +    let mut all_tasks = HashMap::new();
>> +
>> +    let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
>> +    let mut join_set = JoinSet::new();
>> +
>> +    // Get a list of remotes that we should poll in this cycle.
>> +    let remotes = remotes_to_check(cycle, &state).await?;
>> +    for remote in remotes {
>> +        let since = get_cutoff_timestamp(&remote, &state);
>> +
>> +        let permit = if remote.ty == RemoteType::Pve {
>> +            // Acquire multiple permits, for PVE remotes we want
>> +            // to multiple nodes in parallel.
>> +            //
>> +            // `.unwrap()` is safe, we never close the semaphore.
>> +            Arc::clone(&total_connections_semaphore)
>> +                .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
>> +                .await
>> +                .unwrap()
> 
> would it be possible to acquire the connection semaphores dynamicall inside the
> `fetch_tasks` call up to the maximum?
> 
> that way, we could e.g. connect to 20 remotes with one host in parallel
> instead of always having maximum of 4 ?
> (not sure about the tokio semaphore possibilities here)
> 
> I'd still limit it to CONNECTIONS_PER_PVE_REMOTE for each remote,
> but in case one remote has less nodes, we could utilize the connection count
> for more remotes, doing more work in parallel.

IIRC there was some problem with allocating these on demand, I think there was some potential
for a deadlock - though I can't come up with the 'why' right now. I'll check again and
add some comment if I remember the reason again.

> 
>> +        } else {
>> +            // For PBS remotes we only have a single outgoing connection
>> +            //
>> +            // `.unwrap()` is safe, we never close the semaphore.
>> +            Arc::clone(&total_connections_semaphore)
>> +                .acquire_owned()
>> +                .await
>> +                .unwrap()
>> +        };
>> +
>> +        join_set.spawn(async move {
>> +            log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
>> +            let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
>> +                format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
>> +            });
>> +
>> +            drop(permit);
>> +            tasks
>> +        });
>> +    }
>> +
>> +
>> +/// Return list of remotes that are to be polled in this cycle.
>> +///
>> +/// If `cycle` is a multiple of `REGULAR_REFRESH_CYCLES`, the function will
>> +/// return all remotes from the remote config. This ensures that
>> +/// all remotes are polled at regular intervals.
>> +/// In any other case we only return remotes which currently have a tracked
>> +/// task.
>> +/// On daemon startup (when cycle is 0) we return all remotes to ensure
>> +/// that we get an up-to-date task list from all remotes.
>> +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
>> +    let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
>> +
>> +    let all = cycle % REGULAR_REFRESH_CYCLES == 0;
>> +
>> +    if all {
>> +        Ok(config.into_iter().map(|(_, section)| section).collect())
>> +    } else {
>> +        Ok(config
>> +            .into_iter()
>> +            .filter_map(|(name, remote)| {
>> +                if let Some(tracked) = state.tracked_tasks.get(&name) {
>> +                    if !tracked.is_empty() {
>> +                        Some(remote)
>> +                    } else {
>> +                        None
>> +                    }
>> +                } else {
>> +                    None
>> +                }
>> +            })
> 
> i think this could be more succinctly written as:
> 
> state
>     .tracked_tasks
>     .get(&name)
>     .and_then(|tracked| (!tracked.is_empty()).then_some(remote))
> 

Thanks, will do that.

>> +            .collect())
>> +    }
>> +}
>> +
>> +/// Get the timestamp from which on we should fetch tasks for a given remote.
>> +/// The returned timestamp is a UNIX timestamp (in seconds).
>> +fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
>> +    let oldest_active = state.oldest_active_task.get(&remote.id).copied();
>> +    let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
>> +
>> +    match (oldest_active, youngest_archived) {
>> +        (None, None) => 0,
>> +        (None, Some(youngest_archived)) => youngest_archived,
>> +        (Some(oldest_active), None) => oldest_active,
>> +        (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
>> +    }
>> +}
>> +
>> +/// Rotate the task cache if necessary.
>> +///
>> +/// Returns Ok(true) the cache's files were rotated.
>> +async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
>> +    tokio::task::spawn_blocking(move || {
>> +        cache.rotate(
>> +            proxmox_time::epoch_i64(),
>> +            ROTATE_AFTER.as_secs(),
>> +            KEEP_OLD_FILES,
>> +        )
>> +    })
>> +    .await?
>> +}
> 
> in pbs, we start a worker task for the log rotation, maybe we want here too ?
> 

Hmmm, do we have any guidelines when something should be a worker task and when not? 
I'm not opposed to it, but I'm just curious where to draw the line.
Also, being a worker task also implies that it can be cancelled, right? Does that make sense for
something like this?

>> +
>> +/// Fetch tasks (active and finished) from a remote
>> +/// `since` is a UNIX timestamp (seconds).
>> +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
>> +    let mut tasks = Vec::new();
>> +
>> +    let mut all_successful = true;
>> +
>> +    match remote.ty {
>> +        RemoteType::Pve => {
>> +            let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
>> +            let mut join_set = JoinSet::new();


-- 
- Lukas



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel

  reply	other threads:[~2025-07-03  8:05 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-05-12 11:41 [pdm-devel] [PATCH proxmox-datacenter-manager v5 0/6] remote task cache fetching task / better cache backend Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 1/6] remote tasks: implement improved cache for remote tasks Lukas Wagner
2025-05-14 14:08   ` Dominik Csapak
2025-07-01 10:02     ` Lukas Wagner
2025-07-03  8:05     ` Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache Lukas Wagner
2025-05-14 15:27   ` Dominik Csapak
2025-07-03  8:05     ` Lukas Wagner [this message]
2025-07-03 11:25       ` Dominik Csapak
2025-07-09 11:22       ` Lukas Wagner
2025-07-09 11:35         ` Dominik Csapak
2025-07-09 12:25           ` Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 3/6] remote tasks: improve locking for task archive iterator Lukas Wagner
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 4/6] pdm-api-types: remote tasks: add new_from_str constructor for TaskStateType Lukas Wagner
2025-05-15  6:56   ` Dominik Csapak
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 5/6] fake remote: make the fake_remote feature compile again Lukas Wagner
2025-05-15  6:55   ` Dominik Csapak
2025-05-12 11:41 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 6/6] fake remote: clippy fixes Lukas Wagner
2025-05-15  7:05   ` Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4226966c-dd23-4bcf-9ecf-2f129bf1a07b@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=d.csapak@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal