public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: "Lukas Wagner" <l.wagner@proxmox.com>
To: "Thomas Lamprecht" <t.lamprecht@proxmox.com>,
	<pdm-devel@lists.proxmox.com>
Subject: Re: [PATCH datacenter-manager v2 4/8] subscription: add key pool and node status API endpoints
Date: Tue, 12 May 2026 14:21:50 +0200	[thread overview]
Message-ID: <DIGP0A3X4LHK.1YIGFB0N5143O@proxmox.com> (raw)
In-Reply-To: <20260507082943.2749725-5-t.lamprecht@proxmox.com>

On Thu May 7, 2026 at 10:26 AM CEST, Thomas Lamprecht wrote:
[...]
> +/// Add one or more subscription keys to the pool.
> +///
> +/// The key prefix determines the product type via [`ProductType::from_key`]. The schema regex
> +/// rejects anything that isn't a PVE or PBS key today; widen [`PRODUCT_KEY_REGEX`] in lockstep
> +/// with `from_key` and `push_key_to_remote` when PMG/POM remote support lands.
> +///
> +/// All-or-nothing: every key is validated for prefix and uniqueness (against the existing pool
> +/// and within the input list) before any change is persisted. A single bad key fails the
> +/// request and leaves the pool untouched.
> +fn add_keys(keys: Vec<String>, digest: Option<ConfigDigest>) -> Result<(), Error> {
> +    if keys.is_empty() {
> +        http_bail!(BAD_REQUEST, "no keys provided");
> +    }
> +
> +    let mut entries: Vec<SubscriptionKeyEntry> = Vec::with_capacity(keys.len());
> +    let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
> +    for key in &keys {
> +        if !seen.insert(key.as_str()) {
> +            http_bail!(BAD_REQUEST, "duplicate key in input: '{key}'");
> +        }
> +        let product_type = ProductType::from_key(key).ok_or_else(|| {
> +            warn!("rejecting unrecognised key prefix '{key}', possibly a new product line");
> +            format_err!("unrecognised key format: {key}")
> +        })?;
> +        entries.push(SubscriptionKeyEntry {
> +            key: key.clone(),
> +            product_type,
> +            level: SubscriptionLevel::from_key(Some(key)),
> +            source: SubscriptionKeySource::Manual,
> +            ..Default::default()
> +        });
> +    }
> +
> +    let _lock = pdm_config::subscriptions::lock_config()?;
> +    let (mut config, config_digest) = pdm_config::subscriptions::config()?;
> +    config_digest.detect_modification(digest.as_ref())?;
> +
> +    for entry in &entries {
> +        if config.contains_key(&entry.key) {
> +            http_bail!(CONFLICT, "key '{}' already exists in pool", entry.key);
> +        }
> +    }
> +
> +    for entry in entries {
> +        config.insert(entry.key.clone(), entry);
> +    }

FWIW, you could combine both loops, since you bail out if a duplicate
entry exists and thus never save the config. In theory, you could also
check the return value of .insert() and use if to check of a duplicate,
e.g.

    for entry in entries {
        if let Some(existing_key) = config.insert(entry.key.clone(), entry) {
            http_bail!(
                CONFLICT,
                "key '{}' already exists in pool",
                existing_key.key
            );
        }
    }

But not sure if that is actually easier to understand than an explicity
`contains_key`.

Nevertheless, I'd still at least combine both loops.

> +
> +    pdm_config::subscriptions::save_config(&config)
> +}
> +
> +#[api(
> +    input: {
> +        properties: {
> +            key: { schema: SUBSCRIPTION_KEY_SCHEMA },
> +        },
> +    },
> +    returns: { type: SubscriptionKeyEntry },
> +    access: {
> +        permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
> +    },
> +)]
> +/// Get details for a single key.
> +fn get_key(key: String, rpcenv: &mut dyn RpcEnvironment) -> Result<SubscriptionKeyEntry, Error> {
> +    let (config, digest) = pdm_config::subscriptions::config()?;
> +    rpcenv["digest"] = digest.to_hex().into();
> +    let mut entry = config
> +        .get(&key)
> +        .cloned()
> +        .ok_or_else(|| http_err!(NOT_FOUND, "key '{key}' not found in pool"))?;
> +    entry.level = SubscriptionLevel::from_key(Some(&entry.key));
> +    Ok(entry)
> +}
> +
> +#[api(
> +    input: {
> +        properties: {
> +            key: { schema: SUBSCRIPTION_KEY_SCHEMA },
> +            digest: {
> +                type: ConfigDigest,
> +                optional: true,
> +            },
> +        },
> +    },
> +    access: {
> +        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
> +    },
> +)]
> +/// Remove a key from the pool.
> +///
> +/// If the key is currently assigned to a remote node, the caller must also have
> +/// `PRIV_RESOURCE_MODIFY` on that remote, so an audit-only operator cannot release a key
> +/// another admin had pinned. Refuses if the key is currently the live active key on its bound
> +/// node, since dropping the pool entry would orphan that subscription on the remote: the
> +/// operator must Reissue Key first.
> +async fn delete_key(
> +    key: String,
> +    digest: Option<ConfigDigest>,
> +    rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<(), Error> {
> +    let auth_id: Authid = rpcenv
> +        .get_auth_id()
> +        .context("no authid available")?
> +        .parse()?;
> +    let user_info = CachedUserInfo::new()?;
> +
> +    // Live fetch must happen before the lock since the lock cannot span an .await; the
> +    // post-lock check below only refuses if the binding still matches what we observed.
> +    let synced_block = check_synced_assignment_for_unassign(&key).await?;
> +
> +    let _lock = pdm_config::subscriptions::lock_config()?;

This is potentially blocking the async handler, should be in a
spawn_blocking block (maybe together with the other config reading).

> +    let (mut config, config_digest) = pdm_config::subscriptions::config()?;
> +    config_digest.detect_modification(digest.as_ref())?;
> +    let mut shadow = pdm_config::subscriptions::shadow_config()?;
> +
> +    let Some(entry) = config.get(&key) else {
> +        http_bail!(NOT_FOUND, "key '{key}' not found in pool");
> +    };
> +
> +    if let Some(assigned_remote) = entry.remote.as_deref() {
> +        user_info.check_privs(
> +            &auth_id,
> +            &["resource", assigned_remote],
> +            PRIV_RESOURCE_MODIFY,
> +            false,
> +        )?;
> +    }
> +
> +    if let Some((blocking_remote, blocking_node)) = synced_block {
> +        if entry.remote.as_deref() == Some(blocking_remote.as_str())
> +            && entry.node.as_deref() == Some(blocking_node.as_str())
> +        {
> +            http_bail!(
> +                BAD_REQUEST,
> +                "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \
> +                 use Reissue Key to remove it from the remote first"
> +            );
> +        }
> +    }
> +
> +    config.remove(&key);
> +    // Cascade the shadow drop. Order mirrors `pdm-config/src/remotes.rs`: shadow first, then
> +    // main, so a partial failure cannot leave a key entry that lost its signed blob.
> +    shadow.remove(&key);
> +    pdm_config::subscriptions::save_shadow(&shadow)?;
> +    pdm_config::subscriptions::save_config(&config)?;
> +
> +    Ok(())
> +}
> +
> +#[api(
> +    input: {
> +        properties: {
> +            key: { schema: SUBSCRIPTION_KEY_SCHEMA },
> +            remote: {
> +                schema: REMOTE_ID_SCHEMA,
> +                optional: true,
> +            },
> +            node: {
> +                // NODE_SCHEMA rejects path-traversal input before it ends up interpolated into
> +                // the remote URL `/api2/extjs/nodes/{node}/subscription`.
> +                schema: NODE_SCHEMA,
> +                optional: true,
> +            },
> +            digest: {
> +                type: ConfigDigest,
> +                optional: true,
> +            },
> +        },
> +    },
> +    access: {
> +        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
> +    },
> +)]
> +/// Assign a key to a remote node, or unassign it (omit remote and node).
> +///
> +/// `PRIV_SYS_MODIFY` lets the caller touch the pool config; per-remote `PRIV_RESOURCE_MODIFY`
> +/// is enforced inside this handler so an operator cannot push a key to a remote they have no
> +/// other authority on.
> +async fn assign_key(
> +    key: String,
> +    remote: Option<String>,
> +    node: Option<String>,
> +    digest: Option<ConfigDigest>,
> +    rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<(), Error> {
> +    let auth_id: Authid = rpcenv
> +        .get_auth_id()
> +        .context("no authid available")?
> +        .parse()?;
> +    let user_info = CachedUserInfo::new()?;
> +
> +    // Best-effort orphan-prevention for the unassign path: if the entry is currently bound and
> +    // synced (the assigned key is the live current_key on its remote), refuse and direct the
> +    // operator to Reissue Key. The live fetch must happen before the lock since that lock cannot
> +    // span an .await; we re-check the binding under the lock and only refuse if it still
> +    // matches what we observed live.
> +    let synced_block = if remote.is_none() && node.is_none() {
> +        check_synced_assignment_for_unassign(&key).await?
> +    } else {
> +        None
> +    };
> +
> +    let _lock = pdm_config::subscriptions::lock_config()?;

same here with regards to potentially blocking an async API handler.

> +    let (mut config, config_digest) = pdm_config::subscriptions::config()?;
> +    config_digest.detect_modification(digest.as_ref())?;
> +
> +    let Some(stored_entry) = config.get(&key).cloned() else {
> +        http_bail!(NOT_FOUND, "key '{key}' not found in pool");
> +    };
> +    let product_type = stored_entry.product_type;
> +
> +    match (&remote, &node) {
> +        (Some(remote_name), Some(node_name)) => {
> +            user_info.check_privs(
> +                &auth_id,
> +                &["resource", remote_name],
> +                PRIV_RESOURCE_MODIFY,
> +                false,
> +            )?;
> +
> +            // Reassigning away from a previous remote requires modify on that remote too,
> +            // otherwise an audit-only-on-A operator could effectively pull a key off A by
> +            // re-binding it to a remote B they can modify and applying the push (which makes
> +            // the shop reissue the serverid to B and invalidates A).
> +            if let Some(prev_remote) = stored_entry.remote.as_deref() {
> +                if prev_remote != remote_name {
> +                    user_info.check_privs(
> +                        &auth_id,
> +                        &["resource", prev_remote],
> +                        PRIV_RESOURCE_MODIFY,
> +                        false,
> +                    )?;
> +                }
> +            }
> +
> +            let (remotes_config, _) = pdm_config::remotes::config()?;
> +            let remote_entry = remotes_config
> +                .get(remote_name)
> +                .ok_or_else(|| http_err!(NOT_FOUND, "remote '{remote_name}' not found"))?;
> +
> +            if !product_type.matches_remote_type(remote_entry.ty) {
> +                http_bail!(
> +                    BAD_REQUEST,
> +                    "key type '{product_type}' does not match remote type '{}'",
> +                    remote_entry.ty
> +                );
> +            }
> +
> +            for (_id, other) in config.iter() {
> +                if other.key != key
> +                    && other.remote.as_deref() == Some(remote_name.as_str())
> +                    && other.node.as_deref() == Some(node_name.as_str())
> +                {
> +                    http_bail!(
> +                        CONFLICT,
> +                        "key '{}' is already assigned to {remote_name}/{node_name}",
> +                        other.key
> +                    );
> +                }
> +            }
> +
> +            let entry = config.get_mut(&key).unwrap();

A short comment (or message in .expect()) would be nice to highlight
why the unwrap is okay here

> +            entry.remote = Some(remote_name.clone());
> +            entry.node = Some(node_name.clone());
> +        }
> +        (None, None) => {
> +            // Unassign also requires modify on the previously-pinned remote, so an audit-only
> +            // operator cannot rip a key off a node they cannot otherwise touch.
> +            if let Some(prev_remote) = stored_entry.remote.as_deref() {
> +                user_info.check_privs(
> +                    &auth_id,
> +                    &["resource", prev_remote],
> +                    PRIV_RESOURCE_MODIFY,
> +                    false,
> +                )?;
> +            }
> +            // Honour the pre-lock synced check only if the binding still matches what we
> +            // observed; if the binding moved between the live fetch and lock, the orphan check
> +            // is moot and we let the unassign through.
> +            if let Some((blocking_remote, blocking_node)) = synced_block {
> +                if stored_entry.remote.as_deref() == Some(blocking_remote.as_str())
> +                    && stored_entry.node.as_deref() == Some(blocking_node.as_str())
> +                {
> +                    http_bail!(
> +                        BAD_REQUEST,
> +                        "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \
> +                         use Reissue Key to remove it from the remote first"
> +                    );
> +                }
> +            }
> +            let entry = config.get_mut(&key).unwrap();
> +            entry.remote = None;
> +            entry.node = None;
> +        }
> +        _ => {
> +            http_bail!(
> +                BAD_REQUEST,
> +                "both 'remote' and 'node' must be provided, or neither"
> +            );
> +        }
> +    }
> +
> +    pdm_config::subscriptions::save_config(&config)?;
> +
> +    Ok(())
> +}
> +
> +/// Pre-lock check for [`assign_key`]'s unassign path: returns the (remote, node) the entry is
> +/// currently active on, if any, so the lock-protected branch can refuse the unassign and prompt
> +/// the operator to Reissue Key instead. Returns `None` for entries with no binding, no live
> +/// subscription, or a live subscription whose key does not match the entry.
> +async fn check_synced_assignment_for_unassign(
> +    key: &str,
> +) -> Result<Option<(String, String)>, Error> {
> +    let (config, _) = pdm_config::subscriptions::config()?;
> +    let Some(entry) = config.get(key) else {
> +        return Ok(None);
> +    };
> +    let (Some(prev_remote), Some(prev_node)) = (entry.remote.clone(), entry.node.clone()) else {
> +        return Ok(None);
> +    };
> +    let (remotes_config, _) = pdm_config::remotes::config()?;
> +    let Some(remote_entry) = remotes_config.get(&prev_remote) else {
> +        return Ok(None);
> +    };
> +    let live = match get_subscription_info_for_remote(remote_entry, FRESH_NODE_STATUS_MAX_AGE).await
> +    {
> +        Ok(v) => v,
> +        Err(_) => return Ok(None),
> +    };
> +    let synced = live
> +        .get(&prev_node)
> +        .and_then(|info| info.as_ref())
> +        .map(|info| {
> +            info.status == proxmox_subscription::SubscriptionStatus::Active
> +                && info.key.as_deref() == Some(key)
> +        })
> +        .unwrap_or(false);
> +    Ok(synced.then_some((prev_remote, prev_node)))
> +}
> +
> +/// Push a single key to its assigned remote node. Operates on a borrowed `Remote` so the
> +/// caller can fetch the remotes-config once and reuse it.
> +async fn push_key_to_remote(remote: &Remote, key: &str, node_name: &str) -> Result<(), Error> {
> +    let product_type =
> +        ProductType::from_key(key).ok_or_else(|| format_err!("unrecognised key format: {key}"))?;
> +
> +    // PVE and PBS share `proxmox_client::Client`, so `make_pbs_client_and_login` works for both;
> +    // only the PUT path differs.
> +    let path = match product_type {
> +        ProductType::Pve => format!("/api2/extjs/nodes/{node_name}/subscription"),
> +        ProductType::Pbs => "/api2/extjs/nodes/localhost/subscription".to_string(),
> +        ProductType::Pmg | ProductType::Pom => {
> +            bail!("PDM cannot push '{product_type}' keys: no remote support yet");
> +        }
> +    };
> +
> +    let client = crate::connection::make_pbs_client_and_login(remote).await?;

Any reason why you went with the _and_login variant here? Usually we
only use the regular `make_{pbs,pve}_client` variants, as PDM always
should have a token for accessing remotes, thus not requiring a ticket.

> +
> +    client
> +        .0
> +        .put(&path, &serde_json::json!({ "key": key }))
> +        .await?;
> +    client.0.post(&path, &serde_json::json!({})).await?;
> +
> +    info!("pushed key '{key}' to {}/{node_name}", remote.id);
> +    Ok(())
> +}
> +
> +#[api(
> +    input: {
> +        properties: {
> +            "max-age": {
> +                type: u64,
> +                optional: true,
> +                description: "Override the cache freshness window in seconds. \
> +                              Default 300 for panel views; pass 0 to force a fresh query.",
> +            },
> +        },
> +    },
> +    returns: {
> +        type: Array,
> +        description: "Subscription status of all remote nodes the user can audit.",
> +        items: { type: RemoteNodeStatus },
> +    },
> +    access: {
> +        permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
> +    },
> +)]
> +/// Get the subscription status of every remote node the caller can audit, combined with key pool
> +/// assignment information.
> +///
> +/// Per-remote `PRIV_RESOURCE_AUDIT` is enforced inside the handler so users only see remotes
> +/// they may audit.
> +async fn node_status(
> +    max_age: Option<u64>,
> +    rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<Vec<RemoteNodeStatus>, Error> {
> +    collect_node_status(max_age.unwrap_or(PANEL_NODE_STATUS_MAX_AGE), rpcenv).await
> +}
> +
> +/// Shared helper: fan out subscription queries to all remotes the caller has audit privilege on,
> +/// in parallel, reusing the per-remote `SUBSCRIPTION_CACHE` via `get_subscription_info_for_remote`.
> +/// Joins the results with the key-pool assignment table.
> +async fn collect_node_status(
> +    max_age: u64,
> +    rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<Vec<RemoteNodeStatus>, Error> {
> +    let auth_id: Authid = rpcenv
> +        .get_auth_id()
> +        .context("no authid available")?
> +        .parse()?;
> +    let user_info = CachedUserInfo::new()?;
> +
> +    let visible_remotes: Vec<(String, Remote)> = crate::api::remotes::RemoteIterator::new()?
> +        .any_privs(&user_info, &auth_id, PRIV_RESOURCE_AUDIT)
> +        .into_iter()
> +        .collect();
> +
> +    let (keys_config, _) = pdm_config::subscriptions::config()?;
> +
> +    // `get_subscription_info_for_remote` re-uses the per-remote `SUBSCRIPTION_CACHE` so this
> +    // fan-out is safe to run concurrently.
> +    let fetch = visible_remotes.iter().map(|(name, remote)| async move {
> +        let res = get_subscription_info_for_remote(remote, max_age).await;
> +        (name.clone(), remote.ty, res)
> +    });
> +    let results = join_all(fetch).await;

I *think* you could use the ParallelFetcher helper here, so that way you
get sane limits on concurrency for free.

> +
> +    let mut out = Vec::new();
> +    for (remote_name, remote_ty, result) in results {
> +        let node_infos = match result {
> +            Ok(info) => info,
> +            Err(err) => {
> +                warn!("failed to query subscription for remote {remote_name}: {err}");
> +                continue;
> +            }
> +        };
> +
> +        for (node_name, node_info) in &node_infos {
> +            let (status, level, sockets, current_key) = match node_info {
> +                Some(info) => (info.status, info.level, info.sockets, info.key.clone()),
> +                None => (
> +                    proxmox_subscription::SubscriptionStatus::NotFound,
> +                    SubscriptionLevel::None,
> +                    None,
> +                    None,
> +                ),
> +            };
> +
> +            let assigned_key = keys_config
> +                .iter()
> +                .find(|(_id, entry)| {
> +                    entry.remote.as_deref() == Some(remote_name.as_str())
> +                        && entry.node.as_deref() == Some(node_name.as_str())
> +                })
> +                .map(|(_id, entry)| entry.key.clone());
> +
> +            out.push(RemoteNodeStatus {
> +                remote: remote_name.clone(),
> +                ty: remote_ty,
> +                node: node_name.to_string(),
> +                sockets,
> +                status,
> +                level,
> +                assigned_key,
> +                current_key,
> +            });
> +        }
> +    }
> +
> +    out.sort_by(|a, b| (&a.remote, &a.node).cmp(&(&b.remote, &b.node)));
> +    Ok(out)
> +}
> +
> +#[api(
> +    input: {
> +        properties: {
> +            apply: {
> +                type: bool,
> +                optional: true,
> +                default: false,
> +                description: "Actually apply the proposed assignments. Without this, only a preview is returned.",
> +            },
> +        },
> +    },
> +    returns: {
> +        type: Array,
> +        description: "List of proposed or applied assignments.",
> +        items: { type: ProposedAssignment },
> +    },
> +    access: {
> +        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
> +    },

This does not mention the required Modify/Audit permissions on the
remote itself.

> +)]
> +/// Propose or apply automatic key-to-node assignments.
> +///
> +/// Matches unused pool keys to remote nodes that do not yet have a pool-assigned key, picking
> +/// the smallest PVE key that covers each node's socket count. When `apply=true`, the live node
> +/// statuses are fetched first (without holding the config lock - sync locks must not span
> +/// awaits), then proposals are computed and persisted under the lock with a per-key re-check
> +/// against the now-current pool state, so a parallel admin edit between fetch and apply does
> +/// not get silently overwritten.
> +async fn auto_assign(
> +    apply: Option<bool>,
> +    rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<Vec<ProposedAssignment>, Error> {
> +    let apply = apply.unwrap_or(false);
> +
> +    let auth_id: Authid = rpcenv
> +        .get_auth_id()
> +        .context("no authid available")?
> +        .parse()?;
> +    let user_info = CachedUserInfo::new()?;
> +
> +    let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
> +
> +    if !apply {
> +        let (config, _digest) = pdm_config::subscriptions::config()?;
> +        return Ok(compute_proposals(&config, &node_statuses));
> +    }
> +
> +    let _lock = pdm_config::subscriptions::lock_config()?;

Same thing here with regards potentially locking the async handler

> +    let (mut config, _digest) = pdm_config::subscriptions::config()?;
> +    let mut proposals = compute_proposals(&config, &node_statuses);
> +
> +    // Audit-only callers may see a remote in the preview but must not be able to stage a write
> +    // for it that another admin would later push on their behalf.
> +    proposals.retain(|p| {
> +        user_info.lookup_privs(&auth_id, &["resource", &p.remote]) & PRIV_RESOURCE_MODIFY != 0
> +    });
> +
> +    for p in &proposals {
> +        if let Some(entry) = config.get_mut(&p.key) {
> +            // Skip keys that another writer assigned between the preview and the lock.
> +            if entry.remote.is_none() {
> +                entry.remote = Some(p.remote.clone());
> +                entry.node = Some(p.node.clone());
> +            }
> +        }
> +    }
> +    pdm_config::subscriptions::save_config(&config)?;
> +
> +    Ok(proposals)
> +}
> +
> +fn compute_proposals(
> +    config: &SectionConfigData<SubscriptionKeyEntry>,
> +    node_statuses: &[RemoteNodeStatus],
> +) -> Vec<ProposedAssignment> {
> +    let mut target_nodes: Vec<&RemoteNodeStatus> = node_statuses
> +        .iter()
> +        .filter(|n| {
> +            n.assigned_key.is_none() && n.status != proxmox_subscription::SubscriptionStatus::Active
> +        })
> +        .collect();
> +
> +    // Sort PVE nodes by socket count descending so large nodes get keys first.
> +    target_nodes.sort_by(|a, b| b.sockets.unwrap_or(0).cmp(&a.sockets.unwrap_or(0)));
> +
> +    let mut proposals: Vec<ProposedAssignment> = Vec::new();
> +    let mut taken: std::collections::HashSet<String> = std::collections::HashSet::new();

I'd just import HashSet :)

> +
> +    for node in &target_nodes {
> +        let remote_type = node.ty;
> +
> +        let candidates = config.iter().filter(|(id, entry)| {
> +            entry.remote.is_none()
> +                && !taken.contains(*id)
> +                && entry.product_type.matches_remote_type(remote_type)
> +        });
> +
> +        let best_key = if remote_type == pdm_api_types::remotes::RemoteType::Pve {
> +            let node_sockets = node.sockets.unwrap_or(1) as u32;
> +            pick_best_pve_socket_key(
> +                node_sockets,
> +                candidates.map(|(id, entry)| (id.to_string(), entry.key.as_str())),
> +            )
> +        } else {
> +            candidates.map(|(id, _)| id.to_string()).next()
> +        };
> +
> +        if let Some(key_id) = best_key {
> +            let ks = config
> +                .get(&key_id)
> +                .and_then(|e| socket_count_from_key(&e.key));
> +            taken.insert(key_id.clone());
> +            proposals.push(ProposedAssignment {
> +                key: key_id,
> +                remote: node.remote.clone(),
> +                node: node.node.clone(),
> +                key_sockets: ks,
> +                node_sockets: node.sockets,
> +            });
> +        }
> +    }
> +
> +    proposals
> +}

^ In general, this function would benefit from a couple of test cases --
should be easy enough, since its a pure function.


[...]


> +)]
> +/// Clear every pending assignment in one bulk transaction.
> +///
> +/// Pending = pool key bound to a remote node whose live state does not confirm the assignment
> +/// (status not Active, a different `current_key`, or no row returned at all because the remote is
> +/// unreachable / the node is gone). Clears only those entries the caller has
> +/// `PRIV_RESOURCE_MODIFY` on; remotes the caller may only audit are skipped. Mirrors
> +/// `apply-pending` but drops the assignments instead of pushing them, so an operator can disown
> +/// stuck assignments without first having to bring the target back online.
> +async fn clear_pending(rpcenv: &mut dyn RpcEnvironment) -> Result<ClearPendingResult, Error> {
> +    let auth_id: Authid = rpcenv
> +        .get_auth_id()
> +        .context("no authid available")?
> +        .parse()?;
> +    let user_info = CachedUserInfo::new()?;
> +
> +    let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
> +    let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
> +
> +    if pending.is_empty() {
> +        return Ok(ClearPendingResult { cleared: 0 });
> +    }
> +
> +    let _lock = pdm_config::subscriptions::lock_config()?;

Same thing here with regards to blocking in an async fn

> +    let (mut config, _digest) = pdm_config::subscriptions::config()?;
> +
> +    let mut cleared: u32 = 0;
> +    for entry in &pending {
> +        // Re-check inside the lock so a concurrent reassign is not silently overwritten.
> +        if let Some(stored) = config.get_mut(&entry.key) {
> +            if stored.remote.as_deref() == Some(entry.remote.as_str())
> +                && stored.node.as_deref() == Some(entry.node.as_str())
> +            {
> +                stored.remote = None;
> +                stored.node = None;
> +                cleared += 1;
> +            }
> +        }
> +    }
> +
> +    if cleared > 0 {
> +        pdm_config::subscriptions::save_config(&config)?;
> +    }
> +
> +    Ok(ClearPendingResult { cleared })
> +}
> +

[...]




  parent reply	other threads:[~2026-05-12 12:22 UTC|newest]

Thread overview: 21+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-07  8:26 [PATCH datacenter-manager v2 0/8] subscription: add central key pool registry with reissue support Thomas Lamprecht
2026-05-07  8:26 ` [PATCH datacenter-manager v2 1/8] api: subscription cache: ensure max_age=0 forces a fresh fetch Thomas Lamprecht
2026-05-07 13:23   ` Lukas Wagner
2026-05-08 12:43   ` applied: " Lukas Wagner
2026-05-07  8:26 ` [PATCH datacenter-manager v2 2/8] api types: subscription level: render full names Thomas Lamprecht
2026-05-07 13:23   ` Lukas Wagner
2026-05-07  8:26 ` [PATCH datacenter-manager v2 3/8] subscription: add key pool data model and config layer Thomas Lamprecht
2026-05-12  9:51   ` Lukas Wagner
2026-05-07  8:26 ` [PATCH datacenter-manager v2 4/8] subscription: add key pool and node status API endpoints Thomas Lamprecht
2026-05-07 13:23   ` Lukas Wagner
2026-05-12 12:21   ` Lukas Wagner [this message]
2026-05-07  8:26 ` [PATCH datacenter-manager v2 5/8] ui: add subscription registry with key pool and node status Thomas Lamprecht
2026-05-12 14:45   ` Lukas Wagner
2026-05-07  8:26 ` [PATCH datacenter-manager v2 6/8] cli: add subscription key pool management subcommands Thomas Lamprecht
2026-05-12 12:56   ` Lukas Wagner
2026-05-07  8:26 ` [PATCH datacenter-manager v2 7/8] docs: add subscription registry chapter Thomas Lamprecht
2026-05-07  8:26 ` [PATCH datacenter-manager v2 8/8] subscription: add Reissue Key action with pending-reissue queue Thomas Lamprecht
2026-05-12 13:57   ` Lukas Wagner
2026-05-07  8:34 ` [PATCH datacenter-manager v2 9/9] fixup! ui: add subscription registry with key pool and node status Thomas Lamprecht
2026-05-07 13:23 ` [PATCH datacenter-manager v2 0/8] subscription: add central key pool registry with reissue support Lukas Wagner
2026-05-15  7:48 ` superseded: " Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=DIGP0A3X4LHK.1YIGFB0N5143O@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    --cc=t.lamprecht@proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal