From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id BF4121FF146 for ; Tue, 12 May 2026 14:22:01 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 97E9711D73; Tue, 12 May 2026 14:22:00 +0200 (CEST) Mime-Version: 1.0 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=UTF-8 Date: Tue, 12 May 2026 14:21:50 +0200 Message-Id: Subject: Re: [PATCH datacenter-manager v2 4/8] subscription: add key pool and node status API endpoints From: "Lukas Wagner" To: "Thomas Lamprecht" , X-Mailer: aerc 0.21.0-0-g5549850facc2-dirty References: <20260507082943.2749725-1-t.lamprecht@proxmox.com> <20260507082943.2749725-5-t.lamprecht@proxmox.com> In-Reply-To: <20260507082943.2749725-5-t.lamprecht@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1778588398888 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.054 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [remotes.rs] Message-ID-Hash: 7KIBXIGD2XB43BCEPEBDG64X55OXZFRT X-Message-ID-Hash: 7KIBXIGD2XB43BCEPEBDG64X55OXZFRT X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On Thu May 7, 2026 at 10:26 AM CEST, Thomas Lamprecht wrote: [...] > +/// Add one or more subscription keys to the pool. > +/// > +/// The key prefix determines the product type via [`ProductType::from_k= ey`]. The schema regex > +/// rejects anything that isn't a PVE or PBS key today; widen [`PRODUCT_= KEY_REGEX`] in lockstep > +/// with `from_key` and `push_key_to_remote` when PMG/POM remote support= lands. > +/// > +/// All-or-nothing: every key is validated for prefix and uniqueness (ag= ainst the existing pool > +/// and within the input list) before any change is persisted. A single = bad key fails the > +/// request and leaves the pool untouched. > +fn add_keys(keys: Vec, digest: Option) -> Result<(= ), Error> { > + if keys.is_empty() { > + http_bail!(BAD_REQUEST, "no keys provided"); > + } > + > + let mut entries: Vec =3D Vec::with_capacity(ke= ys.len()); > + let mut seen: std::collections::HashSet<&str> =3D std::collections::= HashSet::new(); > + for key in &keys { > + if !seen.insert(key.as_str()) { > + http_bail!(BAD_REQUEST, "duplicate key in input: '{key}'"); > + } > + let product_type =3D ProductType::from_key(key).ok_or_else(|| { > + warn!("rejecting unrecognised key prefix '{key}', possibly a= new product line"); > + format_err!("unrecognised key format: {key}") > + })?; > + entries.push(SubscriptionKeyEntry { > + key: key.clone(), > + product_type, > + level: SubscriptionLevel::from_key(Some(key)), > + source: SubscriptionKeySource::Manual, > + ..Default::default() > + }); > + } > + > + let _lock =3D pdm_config::subscriptions::lock_config()?; > + let (mut config, config_digest) =3D pdm_config::subscriptions::confi= g()?; > + config_digest.detect_modification(digest.as_ref())?; > + > + for entry in &entries { > + if config.contains_key(&entry.key) { > + http_bail!(CONFLICT, "key '{}' already exists in pool", entr= y.key); > + } > + } > + > + for entry in entries { > + config.insert(entry.key.clone(), entry); > + } FWIW, you could combine both loops, since you bail out if a duplicate entry exists and thus never save the config. In theory, you could also check the return value of .insert() and use if to check of a duplicate, e.g. for entry in entries { if let Some(existing_key) =3D config.insert(entry.key.clone(), entr= y) { http_bail!( CONFLICT, "key '{}' already exists in pool", existing_key.key ); } } But not sure if that is actually easier to understand than an explicity `contains_key`. Nevertheless, I'd still at least combine both loops. > + > + pdm_config::subscriptions::save_config(&config) > +} > + > +#[api( > + input: { > + properties: { > + key: { schema: SUBSCRIPTION_KEY_SCHEMA }, > + }, > + }, > + returns: { type: SubscriptionKeyEntry }, > + access: { > + permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, = false), > + }, > +)] > +/// Get details for a single key. > +fn get_key(key: String, rpcenv: &mut dyn RpcEnvironment) -> Result { > + let (config, digest) =3D pdm_config::subscriptions::config()?; > + rpcenv["digest"] =3D digest.to_hex().into(); > + let mut entry =3D config > + .get(&key) > + .cloned() > + .ok_or_else(|| http_err!(NOT_FOUND, "key '{key}' not found in po= ol"))?; > + entry.level =3D SubscriptionLevel::from_key(Some(&entry.key)); > + Ok(entry) > +} > + > +#[api( > + input: { > + properties: { > + key: { schema: SUBSCRIPTION_KEY_SCHEMA }, > + digest: { > + type: ConfigDigest, > + optional: true, > + }, > + }, > + }, > + access: { > + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY,= false), > + }, > +)] > +/// Remove a key from the pool. > +/// > +/// If the key is currently assigned to a remote node, the caller must a= lso have > +/// `PRIV_RESOURCE_MODIFY` on that remote, so an audit-only operator can= not release a key > +/// another admin had pinned. Refuses if the key is currently the live a= ctive key on its bound > +/// node, since dropping the pool entry would orphan that subscription o= n the remote: the > +/// operator must Reissue Key first. > +async fn delete_key( > + key: String, > + digest: Option, > + rpcenv: &mut dyn RpcEnvironment, > +) -> Result<(), Error> { > + let auth_id: Authid =3D rpcenv > + .get_auth_id() > + .context("no authid available")? > + .parse()?; > + let user_info =3D CachedUserInfo::new()?; > + > + // Live fetch must happen before the lock since the lock cannot span= an .await; the > + // post-lock check below only refuses if the binding still matches w= hat we observed. > + let synced_block =3D check_synced_assignment_for_unassign(&key).awai= t?; > + > + let _lock =3D pdm_config::subscriptions::lock_config()?; This is potentially blocking the async handler, should be in a spawn_blocking block (maybe together with the other config reading). > + let (mut config, config_digest) =3D pdm_config::subscriptions::confi= g()?; > + config_digest.detect_modification(digest.as_ref())?; > + let mut shadow =3D pdm_config::subscriptions::shadow_config()?; > + > + let Some(entry) =3D config.get(&key) else { > + http_bail!(NOT_FOUND, "key '{key}' not found in pool"); > + }; > + > + if let Some(assigned_remote) =3D entry.remote.as_deref() { > + user_info.check_privs( > + &auth_id, > + &["resource", assigned_remote], > + PRIV_RESOURCE_MODIFY, > + false, > + )?; > + } > + > + if let Some((blocking_remote, blocking_node)) =3D synced_block { > + if entry.remote.as_deref() =3D=3D Some(blocking_remote.as_str()) > + && entry.node.as_deref() =3D=3D Some(blocking_node.as_str()) > + { > + http_bail!( > + BAD_REQUEST, > + "key '{key}' is currently active on {blocking_remote}/{b= locking_node}; \ > + use Reissue Key to remove it from the remote first" > + ); > + } > + } > + > + config.remove(&key); > + // Cascade the shadow drop. Order mirrors `pdm-config/src/remotes.rs= `: shadow first, then > + // main, so a partial failure cannot leave a key entry that lost its= signed blob. > + shadow.remove(&key); > + pdm_config::subscriptions::save_shadow(&shadow)?; > + pdm_config::subscriptions::save_config(&config)?; > + > + Ok(()) > +} > + > +#[api( > + input: { > + properties: { > + key: { schema: SUBSCRIPTION_KEY_SCHEMA }, > + remote: { > + schema: REMOTE_ID_SCHEMA, > + optional: true, > + }, > + node: { > + // NODE_SCHEMA rejects path-traversal input before it en= ds up interpolated into > + // the remote URL `/api2/extjs/nodes/{node}/subscription= `. > + schema: NODE_SCHEMA, > + optional: true, > + }, > + digest: { > + type: ConfigDigest, > + optional: true, > + }, > + }, > + }, > + access: { > + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY,= false), > + }, > +)] > +/// Assign a key to a remote node, or unassign it (omit remote and node)= . > +/// > +/// `PRIV_SYS_MODIFY` lets the caller touch the pool config; per-remote = `PRIV_RESOURCE_MODIFY` > +/// is enforced inside this handler so an operator cannot push a key to = a remote they have no > +/// other authority on. > +async fn assign_key( > + key: String, > + remote: Option, > + node: Option, > + digest: Option, > + rpcenv: &mut dyn RpcEnvironment, > +) -> Result<(), Error> { > + let auth_id: Authid =3D rpcenv > + .get_auth_id() > + .context("no authid available")? > + .parse()?; > + let user_info =3D CachedUserInfo::new()?; > + > + // Best-effort orphan-prevention for the unassign path: if the entry= is currently bound and > + // synced (the assigned key is the live current_key on its remote), = refuse and direct the > + // operator to Reissue Key. The live fetch must happen before the lo= ck since that lock cannot > + // span an .await; we re-check the binding under the lock and only r= efuse if it still > + // matches what we observed live. > + let synced_block =3D if remote.is_none() && node.is_none() { > + check_synced_assignment_for_unassign(&key).await? > + } else { > + None > + }; > + > + let _lock =3D pdm_config::subscriptions::lock_config()?; same here with regards to potentially blocking an async API handler. > + let (mut config, config_digest) =3D pdm_config::subscriptions::confi= g()?; > + config_digest.detect_modification(digest.as_ref())?; > + > + let Some(stored_entry) =3D config.get(&key).cloned() else { > + http_bail!(NOT_FOUND, "key '{key}' not found in pool"); > + }; > + let product_type =3D stored_entry.product_type; > + > + match (&remote, &node) { > + (Some(remote_name), Some(node_name)) =3D> { > + user_info.check_privs( > + &auth_id, > + &["resource", remote_name], > + PRIV_RESOURCE_MODIFY, > + false, > + )?; > + > + // Reassigning away from a previous remote requires modify o= n that remote too, > + // otherwise an audit-only-on-A operator could effectively p= ull a key off A by > + // re-binding it to a remote B they can modify and applying = the push (which makes > + // the shop reissue the serverid to B and invalidates A). > + if let Some(prev_remote) =3D stored_entry.remote.as_deref() = { > + if prev_remote !=3D remote_name { > + user_info.check_privs( > + &auth_id, > + &["resource", prev_remote], > + PRIV_RESOURCE_MODIFY, > + false, > + )?; > + } > + } > + > + let (remotes_config, _) =3D pdm_config::remotes::config()?; > + let remote_entry =3D remotes_config > + .get(remote_name) > + .ok_or_else(|| http_err!(NOT_FOUND, "remote '{remote_nam= e}' not found"))?; > + > + if !product_type.matches_remote_type(remote_entry.ty) { > + http_bail!( > + BAD_REQUEST, > + "key type '{product_type}' does not match remote typ= e '{}'", > + remote_entry.ty > + ); > + } > + > + for (_id, other) in config.iter() { > + if other.key !=3D key > + && other.remote.as_deref() =3D=3D Some(remote_name.a= s_str()) > + && other.node.as_deref() =3D=3D Some(node_name.as_st= r()) > + { > + http_bail!( > + CONFLICT, > + "key '{}' is already assigned to {remote_name}/{= node_name}", > + other.key > + ); > + } > + } > + > + let entry =3D config.get_mut(&key).unwrap(); A short comment (or message in .expect()) would be nice to highlight why the unwrap is okay here > + entry.remote =3D Some(remote_name.clone()); > + entry.node =3D Some(node_name.clone()); > + } > + (None, None) =3D> { > + // Unassign also requires modify on the previously-pinned re= mote, so an audit-only > + // operator cannot rip a key off a node they cannot otherwis= e touch. > + if let Some(prev_remote) =3D stored_entry.remote.as_deref() = { > + user_info.check_privs( > + &auth_id, > + &["resource", prev_remote], > + PRIV_RESOURCE_MODIFY, > + false, > + )?; > + } > + // Honour the pre-lock synced check only if the binding stil= l matches what we > + // observed; if the binding moved between the live fetch and= lock, the orphan check > + // is moot and we let the unassign through. > + if let Some((blocking_remote, blocking_node)) =3D synced_blo= ck { > + if stored_entry.remote.as_deref() =3D=3D Some(blocking_r= emote.as_str()) > + && stored_entry.node.as_deref() =3D=3D Some(blocking= _node.as_str()) > + { > + http_bail!( > + BAD_REQUEST, > + "key '{key}' is currently active on {blocking_re= mote}/{blocking_node}; \ > + use Reissue Key to remove it from the remote fi= rst" > + ); > + } > + } > + let entry =3D config.get_mut(&key).unwrap(); > + entry.remote =3D None; > + entry.node =3D None; > + } > + _ =3D> { > + http_bail!( > + BAD_REQUEST, > + "both 'remote' and 'node' must be provided, or neither" > + ); > + } > + } > + > + pdm_config::subscriptions::save_config(&config)?; > + > + Ok(()) > +} > + > +/// Pre-lock check for [`assign_key`]'s unassign path: returns the (remo= te, node) the entry is > +/// currently active on, if any, so the lock-protected branch can refuse= the unassign and prompt > +/// the operator to Reissue Key instead. Returns `None` for entries with= no binding, no live > +/// subscription, or a live subscription whose key does not match the en= try. > +async fn check_synced_assignment_for_unassign( > + key: &str, > +) -> Result, Error> { > + let (config, _) =3D pdm_config::subscriptions::config()?; > + let Some(entry) =3D config.get(key) else { > + return Ok(None); > + }; > + let (Some(prev_remote), Some(prev_node)) =3D (entry.remote.clone(), = entry.node.clone()) else { > + return Ok(None); > + }; > + let (remotes_config, _) =3D pdm_config::remotes::config()?; > + let Some(remote_entry) =3D remotes_config.get(&prev_remote) else { > + return Ok(None); > + }; > + let live =3D match get_subscription_info_for_remote(remote_entry, FR= ESH_NODE_STATUS_MAX_AGE).await > + { > + Ok(v) =3D> v, > + Err(_) =3D> return Ok(None), > + }; > + let synced =3D live > + .get(&prev_node) > + .and_then(|info| info.as_ref()) > + .map(|info| { > + info.status =3D=3D proxmox_subscription::SubscriptionStatus:= :Active > + && info.key.as_deref() =3D=3D Some(key) > + }) > + .unwrap_or(false); > + Ok(synced.then_some((prev_remote, prev_node))) > +} > + > +/// Push a single key to its assigned remote node. Operates on a borrowe= d `Remote` so the > +/// caller can fetch the remotes-config once and reuse it. > +async fn push_key_to_remote(remote: &Remote, key: &str, node_name: &str)= -> Result<(), Error> { > + let product_type =3D > + ProductType::from_key(key).ok_or_else(|| format_err!("unrecognis= ed key format: {key}"))?; > + > + // PVE and PBS share `proxmox_client::Client`, so `make_pbs_client_a= nd_login` works for both; > + // only the PUT path differs. > + let path =3D match product_type { > + ProductType::Pve =3D> format!("/api2/extjs/nodes/{node_name}/sub= scription"), > + ProductType::Pbs =3D> "/api2/extjs/nodes/localhost/subscription"= .to_string(), > + ProductType::Pmg | ProductType::Pom =3D> { > + bail!("PDM cannot push '{product_type}' keys: no remote supp= ort yet"); > + } > + }; > + > + let client =3D crate::connection::make_pbs_client_and_login(remote).= await?; Any reason why you went with the _and_login variant here? Usually we only use the regular `make_{pbs,pve}_client` variants, as PDM always should have a token for accessing remotes, thus not requiring a ticket. > + > + client > + .0 > + .put(&path, &serde_json::json!({ "key": key })) > + .await?; > + client.0.post(&path, &serde_json::json!({})).await?; > + > + info!("pushed key '{key}' to {}/{node_name}", remote.id); > + Ok(()) > +} > + > +#[api( > + input: { > + properties: { > + "max-age": { > + type: u64, > + optional: true, > + description: "Override the cache freshness window in sec= onds. \ > + Default 300 for panel views; pass 0 to for= ce a fresh query.", > + }, > + }, > + }, > + returns: { > + type: Array, > + description: "Subscription status of all remote nodes the user c= an audit.", > + items: { type: RemoteNodeStatus }, > + }, > + access: { > + permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, = false), > + }, > +)] > +/// Get the subscription status of every remote node the caller can audi= t, combined with key pool > +/// assignment information. > +/// > +/// Per-remote `PRIV_RESOURCE_AUDIT` is enforced inside the handler so u= sers only see remotes > +/// they may audit. > +async fn node_status( > + max_age: Option, > + rpcenv: &mut dyn RpcEnvironment, > +) -> Result, Error> { > + collect_node_status(max_age.unwrap_or(PANEL_NODE_STATUS_MAX_AGE), rp= cenv).await > +} > + > +/// Shared helper: fan out subscription queries to all remotes the calle= r has audit privilege on, > +/// in parallel, reusing the per-remote `SUBSCRIPTION_CACHE` via `get_su= bscription_info_for_remote`. > +/// Joins the results with the key-pool assignment table. > +async fn collect_node_status( > + max_age: u64, > + rpcenv: &mut dyn RpcEnvironment, > +) -> Result, Error> { > + let auth_id: Authid =3D rpcenv > + .get_auth_id() > + .context("no authid available")? > + .parse()?; > + let user_info =3D CachedUserInfo::new()?; > + > + let visible_remotes: Vec<(String, Remote)> =3D crate::api::remotes::= RemoteIterator::new()? > + .any_privs(&user_info, &auth_id, PRIV_RESOURCE_AUDIT) > + .into_iter() > + .collect(); > + > + let (keys_config, _) =3D pdm_config::subscriptions::config()?; > + > + // `get_subscription_info_for_remote` re-uses the per-remote `SUBSCR= IPTION_CACHE` so this > + // fan-out is safe to run concurrently. > + let fetch =3D visible_remotes.iter().map(|(name, remote)| async move= { > + let res =3D get_subscription_info_for_remote(remote, max_age).aw= ait; > + (name.clone(), remote.ty, res) > + }); > + let results =3D join_all(fetch).await; I *think* you could use the ParallelFetcher helper here, so that way you get sane limits on concurrency for free. > + > + let mut out =3D Vec::new(); > + for (remote_name, remote_ty, result) in results { > + let node_infos =3D match result { > + Ok(info) =3D> info, > + Err(err) =3D> { > + warn!("failed to query subscription for remote {remote_n= ame}: {err}"); > + continue; > + } > + }; > + > + for (node_name, node_info) in &node_infos { > + let (status, level, sockets, current_key) =3D match node_inf= o { > + Some(info) =3D> (info.status, info.level, info.sockets, = info.key.clone()), > + None =3D> ( > + proxmox_subscription::SubscriptionStatus::NotFound, > + SubscriptionLevel::None, > + None, > + None, > + ), > + }; > + > + let assigned_key =3D keys_config > + .iter() > + .find(|(_id, entry)| { > + entry.remote.as_deref() =3D=3D Some(remote_name.as_s= tr()) > + && entry.node.as_deref() =3D=3D Some(node_name.a= s_str()) > + }) > + .map(|(_id, entry)| entry.key.clone()); > + > + out.push(RemoteNodeStatus { > + remote: remote_name.clone(), > + ty: remote_ty, > + node: node_name.to_string(), > + sockets, > + status, > + level, > + assigned_key, > + current_key, > + }); > + } > + } > + > + out.sort_by(|a, b| (&a.remote, &a.node).cmp(&(&b.remote, &b.node))); > + Ok(out) > +} > + > +#[api( > + input: { > + properties: { > + apply: { > + type: bool, > + optional: true, > + default: false, > + description: "Actually apply the proposed assignments. W= ithout this, only a preview is returned.", > + }, > + }, > + }, > + returns: { > + type: Array, > + description: "List of proposed or applied assignments.", > + items: { type: ProposedAssignment }, > + }, > + access: { > + permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY,= false), > + }, This does not mention the required Modify/Audit permissions on the remote itself. > +)] > +/// Propose or apply automatic key-to-node assignments. > +/// > +/// Matches unused pool keys to remote nodes that do not yet have a pool= -assigned key, picking > +/// the smallest PVE key that covers each node's socket count. When `app= ly=3Dtrue`, the live node > +/// statuses are fetched first (without holding the config lock - sync l= ocks must not span > +/// awaits), then proposals are computed and persisted under the lock wi= th a per-key re-check > +/// against the now-current pool state, so a parallel admin edit between= fetch and apply does > +/// not get silently overwritten. > +async fn auto_assign( > + apply: Option, > + rpcenv: &mut dyn RpcEnvironment, > +) -> Result, Error> { > + let apply =3D apply.unwrap_or(false); > + > + let auth_id: Authid =3D rpcenv > + .get_auth_id() > + .context("no authid available")? > + .parse()?; > + let user_info =3D CachedUserInfo::new()?; > + > + let node_statuses =3D collect_node_status(FRESH_NODE_STATUS_MAX_AGE,= rpcenv).await?; > + > + if !apply { > + let (config, _digest) =3D pdm_config::subscriptions::config()?; > + return Ok(compute_proposals(&config, &node_statuses)); > + } > + > + let _lock =3D pdm_config::subscriptions::lock_config()?; Same thing here with regards potentially locking the async handler > + let (mut config, _digest) =3D pdm_config::subscriptions::config()?; > + let mut proposals =3D compute_proposals(&config, &node_statuses); > + > + // Audit-only callers may see a remote in the preview but must not b= e able to stage a write > + // for it that another admin would later push on their behalf. > + proposals.retain(|p| { > + user_info.lookup_privs(&auth_id, &["resource", &p.remote]) & PRI= V_RESOURCE_MODIFY !=3D 0 > + }); > + > + for p in &proposals { > + if let Some(entry) =3D config.get_mut(&p.key) { > + // Skip keys that another writer assigned between the previe= w and the lock. > + if entry.remote.is_none() { > + entry.remote =3D Some(p.remote.clone()); > + entry.node =3D Some(p.node.clone()); > + } > + } > + } > + pdm_config::subscriptions::save_config(&config)?; > + > + Ok(proposals) > +} > + > +fn compute_proposals( > + config: &SectionConfigData, > + node_statuses: &[RemoteNodeStatus], > +) -> Vec { > + let mut target_nodes: Vec<&RemoteNodeStatus> =3D node_statuses > + .iter() > + .filter(|n| { > + n.assigned_key.is_none() && n.status !=3D proxmox_subscripti= on::SubscriptionStatus::Active > + }) > + .collect(); > + > + // Sort PVE nodes by socket count descending so large nodes get keys= first. > + target_nodes.sort_by(|a, b| b.sockets.unwrap_or(0).cmp(&a.sockets.un= wrap_or(0))); > + > + let mut proposals: Vec =3D Vec::new(); > + let mut taken: std::collections::HashSet =3D std::collection= s::HashSet::new(); I'd just import HashSet :) > + > + for node in &target_nodes { > + let remote_type =3D node.ty; > + > + let candidates =3D config.iter().filter(|(id, entry)| { > + entry.remote.is_none() > + && !taken.contains(*id) > + && entry.product_type.matches_remote_type(remote_type) > + }); > + > + let best_key =3D if remote_type =3D=3D pdm_api_types::remotes::R= emoteType::Pve { > + let node_sockets =3D node.sockets.unwrap_or(1) as u32; > + pick_best_pve_socket_key( > + node_sockets, > + candidates.map(|(id, entry)| (id.to_string(), entry.key.= as_str())), > + ) > + } else { > + candidates.map(|(id, _)| id.to_string()).next() > + }; > + > + if let Some(key_id) =3D best_key { > + let ks =3D config > + .get(&key_id) > + .and_then(|e| socket_count_from_key(&e.key)); > + taken.insert(key_id.clone()); > + proposals.push(ProposedAssignment { > + key: key_id, > + remote: node.remote.clone(), > + node: node.node.clone(), > + key_sockets: ks, > + node_sockets: node.sockets, > + }); > + } > + } > + > + proposals > +} ^ In general, this function would benefit from a couple of test cases -- should be easy enough, since its a pure function. [...] > +)] > +/// Clear every pending assignment in one bulk transaction. > +/// > +/// Pending =3D pool key bound to a remote node whose live state does no= t confirm the assignment > +/// (status not Active, a different `current_key`, or no row returned at= all because the remote is > +/// unreachable / the node is gone). Clears only those entries the calle= r has > +/// `PRIV_RESOURCE_MODIFY` on; remotes the caller may only audit are ski= pped. Mirrors > +/// `apply-pending` but drops the assignments instead of pushing them, s= o an operator can disown > +/// stuck assignments without first having to bring the target back onli= ne. > +async fn clear_pending(rpcenv: &mut dyn RpcEnvironment) -> Result { > + let auth_id: Authid =3D rpcenv > + .get_auth_id() > + .context("no authid available")? > + .parse()?; > + let user_info =3D CachedUserInfo::new()?; > + > + let node_statuses =3D collect_node_status(FRESH_NODE_STATUS_MAX_AGE,= rpcenv).await?; > + let pending =3D compute_pending(&user_info, &auth_id, &node_statuses= )?; > + > + if pending.is_empty() { > + return Ok(ClearPendingResult { cleared: 0 }); > + } > + > + let _lock =3D pdm_config::subscriptions::lock_config()?; Same thing here with regards to blocking in an async fn > + let (mut config, _digest) =3D pdm_config::subscriptions::config()?; > + > + let mut cleared: u32 =3D 0; > + for entry in &pending { > + // Re-check inside the lock so a concurrent reassign is not sile= ntly overwritten. > + if let Some(stored) =3D config.get_mut(&entry.key) { > + if stored.remote.as_deref() =3D=3D Some(entry.remote.as_str(= )) > + && stored.node.as_deref() =3D=3D Some(entry.node.as_str(= )) > + { > + stored.remote =3D None; > + stored.node =3D None; > + cleared +=3D 1; > + } > + } > + } > + > + if cleared > 0 { > + pdm_config::subscriptions::save_config(&config)?; > + } > + > + Ok(ClearPendingResult { cleared }) > +} > + [...]