From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 2454F1FF13F for ; Thu, 07 May 2026 10:30:51 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B9181F2C9; Thu, 7 May 2026 10:30:43 +0200 (CEST) From: Thomas Lamprecht To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager v2 4/8] subscription: add key pool and node status API endpoints Date: Thu, 7 May 2026 10:26:45 +0200 Message-ID: <20260507082943.2749725-5-t.lamprecht@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260507082943.2749725-1-t.lamprecht@proxmox.com> References: <20260507082943.2749725-1-t.lamprecht@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1778142489611 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.147 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_1 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_3 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: ILYGLDFLA63F6MBDEEG5V7V3SZ6V7PCJ X-Message-ID-Hash: ILYGLDFLA63F6MBDEEG5V7V3SZ6V7PCJ X-MailFrom: t.lamprecht@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Add REST endpoints under /subscriptions for the pool, the combined remote-vs-pool node-status view, and the bulk paths (auto-assign, apply-pending, clear-pending). Endpoints touching a specific remote require the matching resource privilege on that remote in addition to the system-scope bit, so an operator with global system access alone cannot push keys to remotes they have no other authority on. Audit-only callers see only the remotes they may audit on read paths; pending operations skip remotes they may not modify. Add takes an all-or-nothing list. Apply-pending runs in a worker that re-reads its plan when it fires (honouring a re-assign between API call and worker firing) and bails on the first push failure so the rest stays pending for retry. Mutating endpoints accept an optional ConfigDigest; reads expose it in the response. Signed-off-by: Thomas Lamprecht --- lib/pdm-api-types/src/subscription.rs | 9 + server/src/api/mod.rs | 2 + server/src/api/resources.rs | 8 + server/src/api/subscriptions/mod.rs | 967 ++++++++++++++++++++++++++ 4 files changed, 986 insertions(+) create mode 100644 server/src/api/subscriptions/mod.rs diff --git a/lib/pdm-api-types/src/subscription.rs b/lib/pdm-api-types/src/subscription.rs index 26ecfba..ead3c1b 100644 --- a/lib/pdm-api-types/src/subscription.rs +++ b/lib/pdm-api-types/src/subscription.rs @@ -529,6 +529,15 @@ pub struct RemoteNodeStatus { pub current_key: Option, } +#[api] +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +/// Result of the bulk clear-pending API endpoint. +pub struct ClearPendingResult { + /// Number of pool entries whose pending push or reissue was cleared. + pub cleared: u32, +} + #[api] #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index 110191b..9680edc 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -18,6 +18,7 @@ pub mod remotes; pub mod resources; mod rrd_common; pub mod sdn; +pub mod subscriptions; #[sortable] const SUBDIRS: SubdirMap = &sorted!([ @@ -31,6 +32,7 @@ const SUBDIRS: SubdirMap = &sorted!([ ("resources", &resources::ROUTER), ("nodes", &nodes::ROUTER), ("sdn", &sdn::ROUTER), + ("subscriptions", &subscriptions::ROUTER), ("version", &Router::new().get(&API_METHOD_VERSION)), ]); diff --git a/server/src/api/resources.rs b/server/src/api/resources.rs index 50315b1..49c3974 100644 --- a/server/src/api/resources.rs +++ b/server/src/api/resources.rs @@ -848,6 +848,14 @@ fn get_cached_subscription_info(remote: &str, max_age: u64) -> Option Result, Error> { + let (config, digest) = pdm_config::subscriptions::config()?; + rpcenv["digest"] = digest.to_hex().into(); + Ok(config + .into_iter() + .map(|(_id, mut entry)| { + entry.level = SubscriptionLevel::from_key(Some(&entry.key)); + entry + }) + .collect()) +} + +#[api( + input: { + properties: { + keys: { + type: Array, + description: "Subscription keys to add to the pool.", + items: { schema: SUBSCRIPTION_KEY_SCHEMA }, + }, + digest: { + type: ConfigDigest, + optional: true, + }, + }, + }, + access: { + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false), + }, +)] +/// Add one or more subscription keys to the pool. +/// +/// The key prefix determines the product type via [`ProductType::from_key`]. The schema regex +/// rejects anything that isn't a PVE or PBS key today; widen [`PRODUCT_KEY_REGEX`] in lockstep +/// with `from_key` and `push_key_to_remote` when PMG/POM remote support lands. +/// +/// All-or-nothing: every key is validated for prefix and uniqueness (against the existing pool +/// and within the input list) before any change is persisted. A single bad key fails the +/// request and leaves the pool untouched. +fn add_keys(keys: Vec, digest: Option) -> Result<(), Error> { + if keys.is_empty() { + http_bail!(BAD_REQUEST, "no keys provided"); + } + + let mut entries: Vec = Vec::with_capacity(keys.len()); + let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new(); + for key in &keys { + if !seen.insert(key.as_str()) { + http_bail!(BAD_REQUEST, "duplicate key in input: '{key}'"); + } + let product_type = ProductType::from_key(key).ok_or_else(|| { + warn!("rejecting unrecognised key prefix '{key}', possibly a new product line"); + format_err!("unrecognised key format: {key}") + })?; + entries.push(SubscriptionKeyEntry { + key: key.clone(), + product_type, + level: SubscriptionLevel::from_key(Some(key)), + source: SubscriptionKeySource::Manual, + ..Default::default() + }); + } + + let _lock = pdm_config::subscriptions::lock_config()?; + let (mut config, config_digest) = pdm_config::subscriptions::config()?; + config_digest.detect_modification(digest.as_ref())?; + + for entry in &entries { + if config.contains_key(&entry.key) { + http_bail!(CONFLICT, "key '{}' already exists in pool", entry.key); + } + } + + for entry in entries { + config.insert(entry.key.clone(), entry); + } + + pdm_config::subscriptions::save_config(&config) +} + +#[api( + input: { + properties: { + key: { schema: SUBSCRIPTION_KEY_SCHEMA }, + }, + }, + returns: { type: SubscriptionKeyEntry }, + access: { + permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false), + }, +)] +/// Get details for a single key. +fn get_key(key: String, rpcenv: &mut dyn RpcEnvironment) -> Result { + let (config, digest) = pdm_config::subscriptions::config()?; + rpcenv["digest"] = digest.to_hex().into(); + let mut entry = config + .get(&key) + .cloned() + .ok_or_else(|| http_err!(NOT_FOUND, "key '{key}' not found in pool"))?; + entry.level = SubscriptionLevel::from_key(Some(&entry.key)); + Ok(entry) +} + +#[api( + input: { + properties: { + key: { schema: SUBSCRIPTION_KEY_SCHEMA }, + digest: { + type: ConfigDigest, + optional: true, + }, + }, + }, + access: { + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false), + }, +)] +/// Remove a key from the pool. +/// +/// If the key is currently assigned to a remote node, the caller must also have +/// `PRIV_RESOURCE_MODIFY` on that remote, so an audit-only operator cannot release a key +/// another admin had pinned. Refuses if the key is currently the live active key on its bound +/// node, since dropping the pool entry would orphan that subscription on the remote: the +/// operator must Reissue Key first. +async fn delete_key( + key: String, + digest: Option, + rpcenv: &mut dyn RpcEnvironment, +) -> Result<(), Error> { + let auth_id: Authid = rpcenv + .get_auth_id() + .context("no authid available")? + .parse()?; + let user_info = CachedUserInfo::new()?; + + // Live fetch must happen before the lock since the lock cannot span an .await; the + // post-lock check below only refuses if the binding still matches what we observed. + let synced_block = check_synced_assignment_for_unassign(&key).await?; + + let _lock = pdm_config::subscriptions::lock_config()?; + let (mut config, config_digest) = pdm_config::subscriptions::config()?; + config_digest.detect_modification(digest.as_ref())?; + let mut shadow = pdm_config::subscriptions::shadow_config()?; + + let Some(entry) = config.get(&key) else { + http_bail!(NOT_FOUND, "key '{key}' not found in pool"); + }; + + if let Some(assigned_remote) = entry.remote.as_deref() { + user_info.check_privs( + &auth_id, + &["resource", assigned_remote], + PRIV_RESOURCE_MODIFY, + false, + )?; + } + + if let Some((blocking_remote, blocking_node)) = synced_block { + if entry.remote.as_deref() == Some(blocking_remote.as_str()) + && entry.node.as_deref() == Some(blocking_node.as_str()) + { + http_bail!( + BAD_REQUEST, + "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \ + use Reissue Key to remove it from the remote first" + ); + } + } + + config.remove(&key); + // Cascade the shadow drop. Order mirrors `pdm-config/src/remotes.rs`: shadow first, then + // main, so a partial failure cannot leave a key entry that lost its signed blob. + shadow.remove(&key); + pdm_config::subscriptions::save_shadow(&shadow)?; + pdm_config::subscriptions::save_config(&config)?; + + Ok(()) +} + +#[api( + input: { + properties: { + key: { schema: SUBSCRIPTION_KEY_SCHEMA }, + remote: { + schema: REMOTE_ID_SCHEMA, + optional: true, + }, + node: { + // NODE_SCHEMA rejects path-traversal input before it ends up interpolated into + // the remote URL `/api2/extjs/nodes/{node}/subscription`. + schema: NODE_SCHEMA, + optional: true, + }, + digest: { + type: ConfigDigest, + optional: true, + }, + }, + }, + access: { + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false), + }, +)] +/// Assign a key to a remote node, or unassign it (omit remote and node). +/// +/// `PRIV_SYS_MODIFY` lets the caller touch the pool config; per-remote `PRIV_RESOURCE_MODIFY` +/// is enforced inside this handler so an operator cannot push a key to a remote they have no +/// other authority on. +async fn assign_key( + key: String, + remote: Option, + node: Option, + digest: Option, + rpcenv: &mut dyn RpcEnvironment, +) -> Result<(), Error> { + let auth_id: Authid = rpcenv + .get_auth_id() + .context("no authid available")? + .parse()?; + let user_info = CachedUserInfo::new()?; + + // Best-effort orphan-prevention for the unassign path: if the entry is currently bound and + // synced (the assigned key is the live current_key on its remote), refuse and direct the + // operator to Reissue Key. The live fetch must happen before the lock since that lock cannot + // span an .await; we re-check the binding under the lock and only refuse if it still + // matches what we observed live. + let synced_block = if remote.is_none() && node.is_none() { + check_synced_assignment_for_unassign(&key).await? + } else { + None + }; + + let _lock = pdm_config::subscriptions::lock_config()?; + let (mut config, config_digest) = pdm_config::subscriptions::config()?; + config_digest.detect_modification(digest.as_ref())?; + + let Some(stored_entry) = config.get(&key).cloned() else { + http_bail!(NOT_FOUND, "key '{key}' not found in pool"); + }; + let product_type = stored_entry.product_type; + + match (&remote, &node) { + (Some(remote_name), Some(node_name)) => { + user_info.check_privs( + &auth_id, + &["resource", remote_name], + PRIV_RESOURCE_MODIFY, + false, + )?; + + // Reassigning away from a previous remote requires modify on that remote too, + // otherwise an audit-only-on-A operator could effectively pull a key off A by + // re-binding it to a remote B they can modify and applying the push (which makes + // the shop reissue the serverid to B and invalidates A). + if let Some(prev_remote) = stored_entry.remote.as_deref() { + if prev_remote != remote_name { + user_info.check_privs( + &auth_id, + &["resource", prev_remote], + PRIV_RESOURCE_MODIFY, + false, + )?; + } + } + + let (remotes_config, _) = pdm_config::remotes::config()?; + let remote_entry = remotes_config + .get(remote_name) + .ok_or_else(|| http_err!(NOT_FOUND, "remote '{remote_name}' not found"))?; + + if !product_type.matches_remote_type(remote_entry.ty) { + http_bail!( + BAD_REQUEST, + "key type '{product_type}' does not match remote type '{}'", + remote_entry.ty + ); + } + + for (_id, other) in config.iter() { + if other.key != key + && other.remote.as_deref() == Some(remote_name.as_str()) + && other.node.as_deref() == Some(node_name.as_str()) + { + http_bail!( + CONFLICT, + "key '{}' is already assigned to {remote_name}/{node_name}", + other.key + ); + } + } + + let entry = config.get_mut(&key).unwrap(); + entry.remote = Some(remote_name.clone()); + entry.node = Some(node_name.clone()); + } + (None, None) => { + // Unassign also requires modify on the previously-pinned remote, so an audit-only + // operator cannot rip a key off a node they cannot otherwise touch. + if let Some(prev_remote) = stored_entry.remote.as_deref() { + user_info.check_privs( + &auth_id, + &["resource", prev_remote], + PRIV_RESOURCE_MODIFY, + false, + )?; + } + // Honour the pre-lock synced check only if the binding still matches what we + // observed; if the binding moved between the live fetch and lock, the orphan check + // is moot and we let the unassign through. + if let Some((blocking_remote, blocking_node)) = synced_block { + if stored_entry.remote.as_deref() == Some(blocking_remote.as_str()) + && stored_entry.node.as_deref() == Some(blocking_node.as_str()) + { + http_bail!( + BAD_REQUEST, + "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \ + use Reissue Key to remove it from the remote first" + ); + } + } + let entry = config.get_mut(&key).unwrap(); + entry.remote = None; + entry.node = None; + } + _ => { + http_bail!( + BAD_REQUEST, + "both 'remote' and 'node' must be provided, or neither" + ); + } + } + + pdm_config::subscriptions::save_config(&config)?; + + Ok(()) +} + +/// Pre-lock check for [`assign_key`]'s unassign path: returns the (remote, node) the entry is +/// currently active on, if any, so the lock-protected branch can refuse the unassign and prompt +/// the operator to Reissue Key instead. Returns `None` for entries with no binding, no live +/// subscription, or a live subscription whose key does not match the entry. +async fn check_synced_assignment_for_unassign( + key: &str, +) -> Result, Error> { + let (config, _) = pdm_config::subscriptions::config()?; + let Some(entry) = config.get(key) else { + return Ok(None); + }; + let (Some(prev_remote), Some(prev_node)) = (entry.remote.clone(), entry.node.clone()) else { + return Ok(None); + }; + let (remotes_config, _) = pdm_config::remotes::config()?; + let Some(remote_entry) = remotes_config.get(&prev_remote) else { + return Ok(None); + }; + let live = match get_subscription_info_for_remote(remote_entry, FRESH_NODE_STATUS_MAX_AGE).await + { + Ok(v) => v, + Err(_) => return Ok(None), + }; + let synced = live + .get(&prev_node) + .and_then(|info| info.as_ref()) + .map(|info| { + info.status == proxmox_subscription::SubscriptionStatus::Active + && info.key.as_deref() == Some(key) + }) + .unwrap_or(false); + Ok(synced.then_some((prev_remote, prev_node))) +} + +/// Push a single key to its assigned remote node. Operates on a borrowed `Remote` so the +/// caller can fetch the remotes-config once and reuse it. +async fn push_key_to_remote(remote: &Remote, key: &str, node_name: &str) -> Result<(), Error> { + let product_type = + ProductType::from_key(key).ok_or_else(|| format_err!("unrecognised key format: {key}"))?; + + // PVE and PBS share `proxmox_client::Client`, so `make_pbs_client_and_login` works for both; + // only the PUT path differs. + let path = match product_type { + ProductType::Pve => format!("/api2/extjs/nodes/{node_name}/subscription"), + ProductType::Pbs => "/api2/extjs/nodes/localhost/subscription".to_string(), + ProductType::Pmg | ProductType::Pom => { + bail!("PDM cannot push '{product_type}' keys: no remote support yet"); + } + }; + + let client = crate::connection::make_pbs_client_and_login(remote).await?; + + client + .0 + .put(&path, &serde_json::json!({ "key": key })) + .await?; + client.0.post(&path, &serde_json::json!({})).await?; + + info!("pushed key '{key}' to {}/{node_name}", remote.id); + Ok(()) +} + +#[api( + input: { + properties: { + "max-age": { + type: u64, + optional: true, + description: "Override the cache freshness window in seconds. \ + Default 300 for panel views; pass 0 to force a fresh query.", + }, + }, + }, + returns: { + type: Array, + description: "Subscription status of all remote nodes the user can audit.", + items: { type: RemoteNodeStatus }, + }, + access: { + permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false), + }, +)] +/// Get the subscription status of every remote node the caller can audit, combined with key pool +/// assignment information. +/// +/// Per-remote `PRIV_RESOURCE_AUDIT` is enforced inside the handler so users only see remotes +/// they may audit. +async fn node_status( + max_age: Option, + rpcenv: &mut dyn RpcEnvironment, +) -> Result, Error> { + collect_node_status(max_age.unwrap_or(PANEL_NODE_STATUS_MAX_AGE), rpcenv).await +} + +/// Shared helper: fan out subscription queries to all remotes the caller has audit privilege on, +/// in parallel, reusing the per-remote `SUBSCRIPTION_CACHE` via `get_subscription_info_for_remote`. +/// Joins the results with the key-pool assignment table. +async fn collect_node_status( + max_age: u64, + rpcenv: &mut dyn RpcEnvironment, +) -> Result, Error> { + let auth_id: Authid = rpcenv + .get_auth_id() + .context("no authid available")? + .parse()?; + let user_info = CachedUserInfo::new()?; + + let visible_remotes: Vec<(String, Remote)> = crate::api::remotes::RemoteIterator::new()? + .any_privs(&user_info, &auth_id, PRIV_RESOURCE_AUDIT) + .into_iter() + .collect(); + + let (keys_config, _) = pdm_config::subscriptions::config()?; + + // `get_subscription_info_for_remote` re-uses the per-remote `SUBSCRIPTION_CACHE` so this + // fan-out is safe to run concurrently. + let fetch = visible_remotes.iter().map(|(name, remote)| async move { + let res = get_subscription_info_for_remote(remote, max_age).await; + (name.clone(), remote.ty, res) + }); + let results = join_all(fetch).await; + + let mut out = Vec::new(); + for (remote_name, remote_ty, result) in results { + let node_infos = match result { + Ok(info) => info, + Err(err) => { + warn!("failed to query subscription for remote {remote_name}: {err}"); + continue; + } + }; + + for (node_name, node_info) in &node_infos { + let (status, level, sockets, current_key) = match node_info { + Some(info) => (info.status, info.level, info.sockets, info.key.clone()), + None => ( + proxmox_subscription::SubscriptionStatus::NotFound, + SubscriptionLevel::None, + None, + None, + ), + }; + + let assigned_key = keys_config + .iter() + .find(|(_id, entry)| { + entry.remote.as_deref() == Some(remote_name.as_str()) + && entry.node.as_deref() == Some(node_name.as_str()) + }) + .map(|(_id, entry)| entry.key.clone()); + + out.push(RemoteNodeStatus { + remote: remote_name.clone(), + ty: remote_ty, + node: node_name.to_string(), + sockets, + status, + level, + assigned_key, + current_key, + }); + } + } + + out.sort_by(|a, b| (&a.remote, &a.node).cmp(&(&b.remote, &b.node))); + Ok(out) +} + +#[api( + input: { + properties: { + apply: { + type: bool, + optional: true, + default: false, + description: "Actually apply the proposed assignments. Without this, only a preview is returned.", + }, + }, + }, + returns: { + type: Array, + description: "List of proposed or applied assignments.", + items: { type: ProposedAssignment }, + }, + access: { + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false), + }, +)] +/// Propose or apply automatic key-to-node assignments. +/// +/// Matches unused pool keys to remote nodes that do not yet have a pool-assigned key, picking +/// the smallest PVE key that covers each node's socket count. When `apply=true`, the live node +/// statuses are fetched first (without holding the config lock - sync locks must not span +/// awaits), then proposals are computed and persisted under the lock with a per-key re-check +/// against the now-current pool state, so a parallel admin edit between fetch and apply does +/// not get silently overwritten. +async fn auto_assign( + apply: Option, + rpcenv: &mut dyn RpcEnvironment, +) -> Result, Error> { + let apply = apply.unwrap_or(false); + + let auth_id: Authid = rpcenv + .get_auth_id() + .context("no authid available")? + .parse()?; + let user_info = CachedUserInfo::new()?; + + let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?; + + if !apply { + let (config, _digest) = pdm_config::subscriptions::config()?; + return Ok(compute_proposals(&config, &node_statuses)); + } + + let _lock = pdm_config::subscriptions::lock_config()?; + let (mut config, _digest) = pdm_config::subscriptions::config()?; + let mut proposals = compute_proposals(&config, &node_statuses); + + // Audit-only callers may see a remote in the preview but must not be able to stage a write + // for it that another admin would later push on their behalf. + proposals.retain(|p| { + user_info.lookup_privs(&auth_id, &["resource", &p.remote]) & PRIV_RESOURCE_MODIFY != 0 + }); + + for p in &proposals { + if let Some(entry) = config.get_mut(&p.key) { + // Skip keys that another writer assigned between the preview and the lock. + if entry.remote.is_none() { + entry.remote = Some(p.remote.clone()); + entry.node = Some(p.node.clone()); + } + } + } + pdm_config::subscriptions::save_config(&config)?; + + Ok(proposals) +} + +fn compute_proposals( + config: &SectionConfigData, + node_statuses: &[RemoteNodeStatus], +) -> Vec { + let mut target_nodes: Vec<&RemoteNodeStatus> = node_statuses + .iter() + .filter(|n| { + n.assigned_key.is_none() && n.status != proxmox_subscription::SubscriptionStatus::Active + }) + .collect(); + + // Sort PVE nodes by socket count descending so large nodes get keys first. + target_nodes.sort_by(|a, b| b.sockets.unwrap_or(0).cmp(&a.sockets.unwrap_or(0))); + + let mut proposals: Vec = Vec::new(); + let mut taken: std::collections::HashSet = std::collections::HashSet::new(); + + for node in &target_nodes { + let remote_type = node.ty; + + let candidates = config.iter().filter(|(id, entry)| { + entry.remote.is_none() + && !taken.contains(*id) + && entry.product_type.matches_remote_type(remote_type) + }); + + let best_key = if remote_type == pdm_api_types::remotes::RemoteType::Pve { + let node_sockets = node.sockets.unwrap_or(1) as u32; + pick_best_pve_socket_key( + node_sockets, + candidates.map(|(id, entry)| (id.to_string(), entry.key.as_str())), + ) + } else { + candidates.map(|(id, _)| id.to_string()).next() + }; + + if let Some(key_id) = best_key { + let ks = config + .get(&key_id) + .and_then(|e| socket_count_from_key(&e.key)); + taken.insert(key_id.clone()); + proposals.push(ProposedAssignment { + key: key_id, + remote: node.remote.clone(), + node: node.node.clone(), + key_sockets: ks, + node_sockets: node.sockets, + }); + } + } + + proposals +} + +#[api( + returns: { + type: String, + optional: true, + description: "Task UPID; absent when there is nothing pending to push.", + }, + access: { + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false), + }, +)] +/// Push every pending key assignment to its remote node. +/// +/// Pending = the live node does not confirm the assigned key as its current active subscription +/// (status not Active, a different `current_key`, or the remote did not respond / is gone). Each +/// push is logged from a worker task so the admin can follow progress. +/// +/// The worker bails on the first failure; the remaining entries stay pending so the operator +/// can fix the underlying issue (or clear that one assignment) and trigger another apply. +/// +/// Returns `None` when nothing is pending so the caller can show a short info message instead of +/// opening a task progress dialog for a no-op worker. +async fn apply_pending(rpcenv: &mut dyn RpcEnvironment) -> Result, Error> { + let auth_id: Authid = rpcenv + .get_auth_id() + .context("no authid available")? + .parse()?; + let user_info = CachedUserInfo::new()?; + + let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?; + let pending = compute_pending(&user_info, &auth_id, &node_statuses)?; + + if pending.is_empty() { + return Ok(None); + } + + let worker_auth = auth_id.clone(); + let upid = proxmox_rest_server::WorkerTask::spawn( + "subscription-push", + None, + auth_id.to_string(), + true, + move |_worker| async move { run_apply_pending(worker_auth).await }, + )?; + + Ok(Some(upid)) +} + +/// Re-validate and run the apply-pending plan from inside a worker. +/// +/// The worker re-reads remotes and the pool config so a reassign or removal between the API call +/// returning a UPID and the worker firing is honoured (pushing the old key to a node after the +/// operator retracted the assignment was a real footgun). +async fn run_apply_pending(auth_id: Authid) -> Result<(), Error> { + let user_info = CachedUserInfo::new()?; + let (remotes_config, _) = pdm_config::remotes::config()?; + let (config, _) = pdm_config::subscriptions::config()?; + + let node_statuses = collect_status_uncached(&remotes_config).await; + let pending = compute_pending(&user_info, &auth_id, &node_statuses)?; + + if pending.is_empty() { + info!("apply-pending: nothing to do (state changed since the API call)"); + return Ok(()); + } + + let total = pending.len(); + let mut ok = 0usize; + + for entry in pending { + let Some(remote) = remotes_config.get(&entry.remote) else { + bail!( + "remote '{}' vanished, aborting after {ok}/{total} successful pushes", + entry.remote, + ); + }; + // Honour the case where the operator unassigned the key while the worker was queued. + if !pool_assignment_still_valid(&config, &entry) { + info!( + "skipping {}/{}: pool assignment changed before worker ran", + entry.remote, entry.node + ); + continue; + } + + info!( + "pushing {} to {}/{}...", + entry.key, entry.remote, entry.node + ); + if let Err(err) = push_key_to_remote(remote, &entry.key, &entry.node).await { + bail!( + "push of {} to {}/{} failed after {ok}/{total} successful pushes: {err}", + entry.key, + entry.remote, + entry.node, + ); + } + info!(" success"); + invalidate_subscription_info_for_remote(&entry.remote); + ok += 1; + } + + info!("finished: {ok}/{total} pushes succeeded"); + Ok(()) +} + +#[api( + returns: { type: ClearPendingResult }, + access: { + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false), + }, +)] +/// Clear every pending assignment in one bulk transaction. +/// +/// Pending = pool key bound to a remote node whose live state does not confirm the assignment +/// (status not Active, a different `current_key`, or no row returned at all because the remote is +/// unreachable / the node is gone). Clears only those entries the caller has +/// `PRIV_RESOURCE_MODIFY` on; remotes the caller may only audit are skipped. Mirrors +/// `apply-pending` but drops the assignments instead of pushing them, so an operator can disown +/// stuck assignments without first having to bring the target back online. +async fn clear_pending(rpcenv: &mut dyn RpcEnvironment) -> Result { + let auth_id: Authid = rpcenv + .get_auth_id() + .context("no authid available")? + .parse()?; + let user_info = CachedUserInfo::new()?; + + let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?; + let pending = compute_pending(&user_info, &auth_id, &node_statuses)?; + + if pending.is_empty() { + return Ok(ClearPendingResult { cleared: 0 }); + } + + let _lock = pdm_config::subscriptions::lock_config()?; + let (mut config, _digest) = pdm_config::subscriptions::config()?; + + let mut cleared: u32 = 0; + for entry in &pending { + // Re-check inside the lock so a concurrent reassign is not silently overwritten. + if let Some(stored) = config.get_mut(&entry.key) { + if stored.remote.as_deref() == Some(entry.remote.as_str()) + && stored.node.as_deref() == Some(entry.node.as_str()) + { + stored.remote = None; + stored.node = None; + cleared += 1; + } + } + } + + if cleared > 0 { + pdm_config::subscriptions::save_config(&config)?; + } + + Ok(ClearPendingResult { cleared }) +} + +/// Plan entry for one pending push. +#[derive(Clone, Debug)] +struct PendingEntry { + key: String, + remote: String, + node: String, +} + +fn compute_pending( + user_info: &CachedUserInfo, + auth_id: &Authid, + node_statuses: &[RemoteNodeStatus], +) -> Result, Error> { + let (config, _) = pdm_config::subscriptions::config()?; + + Ok(config + .iter() + .filter_map(|(_id, entry)| { + let remote = entry.remote.as_deref()?; + let node = entry.node.as_deref()?; + + if user_info.lookup_privs(auth_id, &["resource", remote]) & PRIV_RESOURCE_MODIFY == 0 { + return None; + } + + // Treat anything other than "Active with the assigned key as the live current_key" + // as pending, including unreachable remotes, so an operator can clear stuck + // assignments without first having to bring the target back online. + let is_pending = match node_statuses + .iter() + .find(|n| n.remote == remote && n.node == node) + { + Some(n) => { + n.status != proxmox_subscription::SubscriptionStatus::Active + || n.current_key.as_deref() != Some(entry.key.as_str()) + } + None => true, + }; + + is_pending.then(|| PendingEntry { + key: entry.key.clone(), + remote: remote.to_string(), + node: node.to_string(), + }) + }) + .collect()) +} + +fn pool_assignment_still_valid( + config: &SectionConfigData, + entry: &PendingEntry, +) -> bool { + let Some(stored) = config.get(&entry.key) else { + return false; + }; + stored.remote.as_deref() == Some(entry.remote.as_str()) + && stored.node.as_deref() == Some(entry.node.as_str()) +} + +/// Like [`collect_node_status`] but bypasses the auth filter, for the apply-pending worker +/// which gates each entry through its own per-remote priv check based on the persisted pool plan. +async fn collect_status_uncached( + remotes_config: &SectionConfigData, +) -> Vec { + let fetch = remotes_config.iter().map(|(name, remote)| async move { + let res = get_subscription_info_for_remote(remote, FRESH_NODE_STATUS_MAX_AGE).await; + (name.to_string(), remote.ty, res) + }); + let results = join_all(fetch).await; + + let mut out = Vec::new(); + for (remote_name, remote_ty, result) in results { + let Ok(node_infos) = result else { continue }; + for (node_name, node_info) in &node_infos { + let (status, level, sockets, current_key) = match node_info { + Some(info) => (info.status, info.level, info.sockets, info.key.clone()), + None => ( + proxmox_subscription::SubscriptionStatus::NotFound, + SubscriptionLevel::None, + None, + None, + ), + }; + out.push(RemoteNodeStatus { + remote: remote_name.clone(), + ty: remote_ty, + node: node_name.to_string(), + sockets, + status, + level, + assigned_key: None, + current_key, + }); + } + } + out +} -- 2.47.3