From: Thomas Lamprecht <t.lamprecht@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH 4/8] subscription: add key pool and node status API endpoints
Date: Thu, 7 May 2026 09:17:27 +0200 [thread overview]
Message-ID: <20260507072436.2649563-5-t.lamprecht@proxmox.com> (raw)
In-Reply-To: <20260507072436.2649563-1-t.lamprecht@proxmox.com>
Add REST endpoints under /subscriptions for the pool, the combined
remote-vs-pool node-status view, and the bulk paths (auto-assign,
apply-pending, clear-pending).
Endpoints touching a specific remote require the matching resource
privilege on that remote in addition to the system-scope bit, so an
operator with global system access alone cannot push keys to remotes
they have no other authority on. Audit-only callers see only the
remotes they may audit on read paths; pending operations skip remotes
they may not modify.
Add takes an all-or-nothing list. Apply-pending runs in a worker that
re-reads its plan when it fires (honouring a re-assign between API
call and worker firing) and bails on the first push failure so the
rest stays pending for retry.
Mutating endpoints accept an optional ConfigDigest; reads expose it
in the response.
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
---
lib/pdm-api-types/src/subscription.rs | 9 +
server/src/api/mod.rs | 2 +
server/src/api/resources.rs | 8 +
server/src/api/subscriptions/mod.rs | 967 ++++++++++++++++++++++++++
4 files changed, 986 insertions(+)
create mode 100644 server/src/api/subscriptions/mod.rs
diff --git a/lib/pdm-api-types/src/subscription.rs b/lib/pdm-api-types/src/subscription.rs
index 26ecfba..ead3c1b 100644
--- a/lib/pdm-api-types/src/subscription.rs
+++ b/lib/pdm-api-types/src/subscription.rs
@@ -529,6 +529,15 @@ pub struct RemoteNodeStatus {
pub current_key: Option<String>,
}
+#[api]
+#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "kebab-case")]
+/// Result of the bulk clear-pending API endpoint.
+pub struct ClearPendingResult {
+ /// Number of pool entries whose pending push or reissue was cleared.
+ pub cleared: u32,
+}
+
#[api]
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs
index 110191b..9680edc 100644
--- a/server/src/api/mod.rs
+++ b/server/src/api/mod.rs
@@ -18,6 +18,7 @@ pub mod remotes;
pub mod resources;
mod rrd_common;
pub mod sdn;
+pub mod subscriptions;
#[sortable]
const SUBDIRS: SubdirMap = &sorted!([
@@ -31,6 +32,7 @@ const SUBDIRS: SubdirMap = &sorted!([
("resources", &resources::ROUTER),
("nodes", &nodes::ROUTER),
("sdn", &sdn::ROUTER),
+ ("subscriptions", &subscriptions::ROUTER),
("version", &Router::new().get(&API_METHOD_VERSION)),
]);
diff --git a/server/src/api/resources.rs b/server/src/api/resources.rs
index 50315b1..49c3974 100644
--- a/server/src/api/resources.rs
+++ b/server/src/api/resources.rs
@@ -848,6 +848,14 @@ fn get_cached_subscription_info(remote: &str, max_age: u64) -> Option<CachedSubs
}
}
+/// Drop the cached subscription state for a remote, forcing the next read to refetch.
+pub fn invalidate_subscription_info_for_remote(remote_id: &str) {
+ let mut cache = SUBSCRIPTION_CACHE
+ .write()
+ .expect("subscription mutex poisoned");
+ cache.remove(remote_id);
+}
+
/// Update cached subscription data.
///
/// If the cache already contains more recent data we don't insert the passed resources.
diff --git a/server/src/api/subscriptions/mod.rs b/server/src/api/subscriptions/mod.rs
new file mode 100644
index 0000000..26d9ecf
--- /dev/null
+++ b/server/src/api/subscriptions/mod.rs
@@ -0,0 +1,967 @@
+//! Subscription key pool management API.
+//!
+//! Manages a PDM-side pool of subscription keys, proposes key-to-node assignments, and pushes
+//! assigned keys to remote nodes. All entries are added manually for now; each entry is a bare
+//! `key` string with the product type derived from its prefix.
+
+use anyhow::{bail, format_err, Context, Error};
+use futures::future::join_all;
+
+use proxmox_access_control::CachedUserInfo;
+use proxmox_client::HttpApiClient;
+use proxmox_config_digest::ConfigDigest;
+use proxmox_log::{info, warn};
+use proxmox_router::{
+ http_bail, http_err, list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap,
+};
+use proxmox_schema::api;
+use proxmox_section_config::typed::SectionConfigData;
+use proxmox_sortable_macro::sortable;
+
+use pdm_api_types::remotes::{Remote, REMOTE_ID_SCHEMA};
+use pdm_api_types::subscription::{
+ pick_best_pve_socket_key, socket_count_from_key, ClearPendingResult, ProductType,
+ ProposedAssignment, RemoteNodeStatus, SubscriptionKeyEntry, SubscriptionKeySource,
+ SubscriptionLevel, SUBSCRIPTION_KEY_SCHEMA,
+};
+use pdm_api_types::{
+ Authid, NODE_SCHEMA, PRIV_RESOURCE_AUDIT, PRIV_RESOURCE_MODIFY, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+use crate::api::resources::{
+ get_subscription_info_for_remote, invalidate_subscription_info_for_remote,
+};
+
+pub const ROUTER: Router = Router::new()
+ .get(&list_subdirs_api_method!(SUBDIRS))
+ .subdirs(SUBDIRS);
+
+#[sortable]
+const SUBDIRS: SubdirMap = &sorted!([
+ (
+ "apply-pending",
+ &Router::new().post(&API_METHOD_APPLY_PENDING)
+ ),
+ ("auto-assign", &Router::new().post(&API_METHOD_AUTO_ASSIGN)),
+ (
+ "clear-pending",
+ &Router::new().post(&API_METHOD_CLEAR_PENDING)
+ ),
+ ("keys", &KEYS_ROUTER),
+ ("node-status", &Router::new().get(&API_METHOD_NODE_STATUS)),
+]);
+
+const KEYS_ROUTER: Router = Router::new()
+ .get(&API_METHOD_LIST_KEYS)
+ .post(&API_METHOD_ADD_KEYS)
+ .match_all("key", &KEY_ITEM_ROUTER);
+
+const KEY_ITEM_ROUTER: Router = Router::new()
+ .get(&API_METHOD_GET_KEY)
+ .put(&API_METHOD_ASSIGN_KEY)
+ .delete(&API_METHOD_DELETE_KEY);
+
+/// Force-fresh node-status query so the next view reflects the new state instead of returning a
+/// cached entry up to 5 minutes later. Used by auto-assign / apply-pending / clear-pending to
+/// avoid double-driving a node that has already moved to Active in the cache window.
+const FRESH_NODE_STATUS_MAX_AGE: u64 = 0;
+
+/// Cached node-status freshness used by read-only views. Five minutes matches the resource-cache
+/// convention and is short enough that admins rarely see stale data on the panel.
+const PANEL_NODE_STATUS_MAX_AGE: u64 = 5 * 60;
+
+#[api(
+ returns: {
+ type: Array,
+ description: "List of subscription keys in the pool.",
+ items: { type: SubscriptionKeyEntry },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// List all subscription keys in the key pool.
+fn list_keys(rpcenv: &mut dyn RpcEnvironment) -> Result<Vec<SubscriptionKeyEntry>, Error> {
+ let (config, digest) = pdm_config::subscriptions::config()?;
+ rpcenv["digest"] = digest.to_hex().into();
+ Ok(config
+ .into_iter()
+ .map(|(_id, mut entry)| {
+ entry.level = SubscriptionLevel::from_key(Some(&entry.key));
+ entry
+ })
+ .collect())
+}
+
+#[api(
+ input: {
+ properties: {
+ keys: {
+ type: Array,
+ description: "Subscription keys to add to the pool.",
+ items: { schema: SUBSCRIPTION_KEY_SCHEMA },
+ },
+ digest: {
+ type: ConfigDigest,
+ optional: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Add one or more subscription keys to the pool.
+///
+/// The key prefix determines the product type via [`ProductType::from_key`]. The schema regex
+/// rejects anything that isn't a PVE or PBS key today; widen [`PRODUCT_KEY_REGEX`] in lockstep
+/// with `from_key` and `push_key_to_remote` when PMG/POM remote support lands.
+///
+/// All-or-nothing: every key is validated for prefix and uniqueness (against the existing pool
+/// and within the input list) before any change is persisted. A single bad key fails the
+/// request and leaves the pool untouched.
+fn add_keys(keys: Vec<String>, digest: Option<ConfigDigest>) -> Result<(), Error> {
+ if keys.is_empty() {
+ http_bail!(BAD_REQUEST, "no keys provided");
+ }
+
+ let mut entries: Vec<SubscriptionKeyEntry> = Vec::with_capacity(keys.len());
+ let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
+ for key in &keys {
+ if !seen.insert(key.as_str()) {
+ http_bail!(BAD_REQUEST, "duplicate key in input: '{key}'");
+ }
+ let product_type = ProductType::from_key(key).ok_or_else(|| {
+ warn!("rejecting unrecognised key prefix '{key}', possibly a new product line");
+ format_err!("unrecognised key format: {key}")
+ })?;
+ entries.push(SubscriptionKeyEntry {
+ key: key.clone(),
+ product_type,
+ level: SubscriptionLevel::from_key(Some(key)),
+ source: SubscriptionKeySource::Manual,
+ ..Default::default()
+ });
+ }
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+ config_digest.detect_modification(digest.as_ref())?;
+
+ for entry in &entries {
+ if config.contains_key(&entry.key) {
+ http_bail!(CONFLICT, "key '{}' already exists in pool", entry.key);
+ }
+ }
+
+ for entry in entries {
+ config.insert(entry.key.clone(), entry);
+ }
+
+ pdm_config::subscriptions::save_config(&config)
+}
+
+#[api(
+ input: {
+ properties: {
+ key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+ },
+ },
+ returns: { type: SubscriptionKeyEntry },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// Get details for a single key.
+fn get_key(key: String, rpcenv: &mut dyn RpcEnvironment) -> Result<SubscriptionKeyEntry, Error> {
+ let (config, digest) = pdm_config::subscriptions::config()?;
+ rpcenv["digest"] = digest.to_hex().into();
+ let mut entry = config
+ .get(&key)
+ .cloned()
+ .ok_or_else(|| http_err!(NOT_FOUND, "key '{key}' not found in pool"))?;
+ entry.level = SubscriptionLevel::from_key(Some(&entry.key));
+ Ok(entry)
+}
+
+#[api(
+ input: {
+ properties: {
+ key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+ digest: {
+ type: ConfigDigest,
+ optional: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Remove a key from the pool.
+///
+/// If the key is currently assigned to a remote node, the caller must also have
+/// `PRIV_RESOURCE_MODIFY` on that remote, so an audit-only operator cannot release a key
+/// another admin had pinned. Refuses if the key is currently the live active key on its bound
+/// node, since dropping the pool entry would orphan that subscription on the remote: the
+/// operator must Reissue Key first.
+async fn delete_key(
+ key: String,
+ digest: Option<ConfigDigest>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ // Live fetch must happen before the lock since the lock cannot span an .await; the
+ // post-lock check below only refuses if the binding still matches what we observed.
+ let synced_block = check_synced_assignment_for_unassign(&key).await?;
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+ config_digest.detect_modification(digest.as_ref())?;
+ let mut shadow = pdm_config::subscriptions::shadow_config()?;
+
+ let Some(entry) = config.get(&key) else {
+ http_bail!(NOT_FOUND, "key '{key}' not found in pool");
+ };
+
+ if let Some(assigned_remote) = entry.remote.as_deref() {
+ user_info.check_privs(
+ &auth_id,
+ &["resource", assigned_remote],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )?;
+ }
+
+ if let Some((blocking_remote, blocking_node)) = synced_block {
+ if entry.remote.as_deref() == Some(blocking_remote.as_str())
+ && entry.node.as_deref() == Some(blocking_node.as_str())
+ {
+ http_bail!(
+ BAD_REQUEST,
+ "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \
+ use Reissue Key to remove it from the remote first"
+ );
+ }
+ }
+
+ config.remove(&key);
+ // Cascade the shadow drop. Order mirrors `pdm-config/src/remotes.rs`: shadow first, then
+ // main, so a partial failure cannot leave a key entry that lost its signed blob.
+ shadow.remove(&key);
+ pdm_config::subscriptions::save_shadow(&shadow)?;
+ pdm_config::subscriptions::save_config(&config)?;
+
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+ remote: {
+ schema: REMOTE_ID_SCHEMA,
+ optional: true,
+ },
+ node: {
+ // NODE_SCHEMA rejects path-traversal input before it ends up interpolated into
+ // the remote URL `/api2/extjs/nodes/{node}/subscription`.
+ schema: NODE_SCHEMA,
+ optional: true,
+ },
+ digest: {
+ type: ConfigDigest,
+ optional: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Assign a key to a remote node, or unassign it (omit remote and node).
+///
+/// `PRIV_SYS_MODIFY` lets the caller touch the pool config; per-remote `PRIV_RESOURCE_MODIFY`
+/// is enforced inside this handler so an operator cannot push a key to a remote they have no
+/// other authority on.
+async fn assign_key(
+ key: String,
+ remote: Option<String>,
+ node: Option<String>,
+ digest: Option<ConfigDigest>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ // Best-effort orphan-prevention for the unassign path: if the entry is currently bound and
+ // synced (the assigned key is the live current_key on its remote), refuse and direct the
+ // operator to Reissue Key. The live fetch must happen before the lock since that lock cannot
+ // span an .await; we re-check the binding under the lock and only refuse if it still
+ // matches what we observed live.
+ let synced_block = if remote.is_none() && node.is_none() {
+ check_synced_assignment_for_unassign(&key).await?
+ } else {
+ None
+ };
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+ config_digest.detect_modification(digest.as_ref())?;
+
+ let Some(stored_entry) = config.get(&key).cloned() else {
+ http_bail!(NOT_FOUND, "key '{key}' not found in pool");
+ };
+ let product_type = stored_entry.product_type;
+
+ match (&remote, &node) {
+ (Some(remote_name), Some(node_name)) => {
+ user_info.check_privs(
+ &auth_id,
+ &["resource", remote_name],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )?;
+
+ // Reassigning away from a previous remote requires modify on that remote too,
+ // otherwise an audit-only-on-A operator could effectively pull a key off A by
+ // re-binding it to a remote B they can modify and applying the push (which makes
+ // the shop reissue the serverid to B and invalidates A).
+ if let Some(prev_remote) = stored_entry.remote.as_deref() {
+ if prev_remote != remote_name {
+ user_info.check_privs(
+ &auth_id,
+ &["resource", prev_remote],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )?;
+ }
+ }
+
+ let (remotes_config, _) = pdm_config::remotes::config()?;
+ let remote_entry = remotes_config
+ .get(remote_name)
+ .ok_or_else(|| http_err!(NOT_FOUND, "remote '{remote_name}' not found"))?;
+
+ if !product_type.matches_remote_type(remote_entry.ty) {
+ http_bail!(
+ BAD_REQUEST,
+ "key type '{product_type}' does not match remote type '{}'",
+ remote_entry.ty
+ );
+ }
+
+ for (_id, other) in config.iter() {
+ if other.key != key
+ && other.remote.as_deref() == Some(remote_name.as_str())
+ && other.node.as_deref() == Some(node_name.as_str())
+ {
+ http_bail!(
+ CONFLICT,
+ "key '{}' is already assigned to {remote_name}/{node_name}",
+ other.key
+ );
+ }
+ }
+
+ let entry = config.get_mut(&key).unwrap();
+ entry.remote = Some(remote_name.clone());
+ entry.node = Some(node_name.clone());
+ }
+ (None, None) => {
+ // Unassign also requires modify on the previously-pinned remote, so an audit-only
+ // operator cannot rip a key off a node they cannot otherwise touch.
+ if let Some(prev_remote) = stored_entry.remote.as_deref() {
+ user_info.check_privs(
+ &auth_id,
+ &["resource", prev_remote],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )?;
+ }
+ // Honour the pre-lock synced check only if the binding still matches what we
+ // observed; if the binding moved between the live fetch and lock, the orphan check
+ // is moot and we let the unassign through.
+ if let Some((blocking_remote, blocking_node)) = synced_block {
+ if stored_entry.remote.as_deref() == Some(blocking_remote.as_str())
+ && stored_entry.node.as_deref() == Some(blocking_node.as_str())
+ {
+ http_bail!(
+ BAD_REQUEST,
+ "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \
+ use Reissue Key to remove it from the remote first"
+ );
+ }
+ }
+ let entry = config.get_mut(&key).unwrap();
+ entry.remote = None;
+ entry.node = None;
+ }
+ _ => {
+ http_bail!(
+ BAD_REQUEST,
+ "both 'remote' and 'node' must be provided, or neither"
+ );
+ }
+ }
+
+ pdm_config::subscriptions::save_config(&config)?;
+
+ Ok(())
+}
+
+/// Pre-lock check for [`assign_key`]'s unassign path: returns the (remote, node) the entry is
+/// currently active on, if any, so the lock-protected branch can refuse the unassign and prompt
+/// the operator to Reissue Key instead. Returns `None` for entries with no binding, no live
+/// subscription, or a live subscription whose key does not match the entry.
+async fn check_synced_assignment_for_unassign(
+ key: &str,
+) -> Result<Option<(String, String)>, Error> {
+ let (config, _) = pdm_config::subscriptions::config()?;
+ let Some(entry) = config.get(key) else {
+ return Ok(None);
+ };
+ let (Some(prev_remote), Some(prev_node)) = (entry.remote.clone(), entry.node.clone()) else {
+ return Ok(None);
+ };
+ let (remotes_config, _) = pdm_config::remotes::config()?;
+ let Some(remote_entry) = remotes_config.get(&prev_remote) else {
+ return Ok(None);
+ };
+ let live = match get_subscription_info_for_remote(remote_entry, FRESH_NODE_STATUS_MAX_AGE).await
+ {
+ Ok(v) => v,
+ Err(_) => return Ok(None),
+ };
+ let synced = live
+ .get(&prev_node)
+ .and_then(|info| info.as_ref())
+ .map(|info| {
+ info.status == proxmox_subscription::SubscriptionStatus::Active
+ && info.key.as_deref() == Some(key)
+ })
+ .unwrap_or(false);
+ Ok(synced.then_some((prev_remote, prev_node)))
+}
+
+/// Push a single key to its assigned remote node. Operates on a borrowed `Remote` so the
+/// caller can fetch the remotes-config once and reuse it.
+async fn push_key_to_remote(remote: &Remote, key: &str, node_name: &str) -> Result<(), Error> {
+ let product_type =
+ ProductType::from_key(key).ok_or_else(|| format_err!("unrecognised key format: {key}"))?;
+
+ // PVE and PBS share `proxmox_client::Client`, so `make_pbs_client_and_login` works for both;
+ // only the PUT path differs.
+ let path = match product_type {
+ ProductType::Pve => format!("/api2/extjs/nodes/{node_name}/subscription"),
+ ProductType::Pbs => "/api2/extjs/nodes/localhost/subscription".to_string(),
+ ProductType::Pmg | ProductType::Pom => {
+ bail!("PDM cannot push '{product_type}' keys: no remote support yet");
+ }
+ };
+
+ let client = crate::connection::make_pbs_client_and_login(remote).await?;
+
+ client
+ .0
+ .put(&path, &serde_json::json!({ "key": key }))
+ .await?;
+ client.0.post(&path, &serde_json::json!({})).await?;
+
+ info!("pushed key '{key}' to {}/{node_name}", remote.id);
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ "max-age": {
+ type: u64,
+ optional: true,
+ description: "Override the cache freshness window in seconds. \
+ Default 300 for panel views; pass 0 to force a fresh query.",
+ },
+ },
+ },
+ returns: {
+ type: Array,
+ description: "Subscription status of all remote nodes the user can audit.",
+ items: { type: RemoteNodeStatus },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// Get the subscription status of every remote node the caller can audit, combined with key pool
+/// assignment information.
+///
+/// Per-remote `PRIV_RESOURCE_AUDIT` is enforced inside the handler so users only see remotes
+/// they may audit.
+async fn node_status(
+ max_age: Option<u64>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<RemoteNodeStatus>, Error> {
+ collect_node_status(max_age.unwrap_or(PANEL_NODE_STATUS_MAX_AGE), rpcenv).await
+}
+
+/// Shared helper: fan out subscription queries to all remotes the caller has audit privilege on,
+/// in parallel, reusing the per-remote `SUBSCRIPTION_CACHE` via `get_subscription_info_for_remote`.
+/// Joins the results with the key-pool assignment table.
+async fn collect_node_status(
+ max_age: u64,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<RemoteNodeStatus>, Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let visible_remotes: Vec<(String, Remote)> = crate::api::remotes::RemoteIterator::new()?
+ .any_privs(&user_info, &auth_id, PRIV_RESOURCE_AUDIT)
+ .into_iter()
+ .collect();
+
+ let (keys_config, _) = pdm_config::subscriptions::config()?;
+
+ // `get_subscription_info_for_remote` re-uses the per-remote `SUBSCRIPTION_CACHE` so this
+ // fan-out is safe to run concurrently.
+ let fetch = visible_remotes.iter().map(|(name, remote)| async move {
+ let res = get_subscription_info_for_remote(remote, max_age).await;
+ (name.clone(), remote.ty, res)
+ });
+ let results = join_all(fetch).await;
+
+ let mut out = Vec::new();
+ for (remote_name, remote_ty, result) in results {
+ let node_infos = match result {
+ Ok(info) => info,
+ Err(err) => {
+ warn!("failed to query subscription for remote {remote_name}: {err}");
+ continue;
+ }
+ };
+
+ for (node_name, node_info) in &node_infos {
+ let (status, level, sockets, current_key) = match node_info {
+ Some(info) => (info.status, info.level, info.sockets, info.key.clone()),
+ None => (
+ proxmox_subscription::SubscriptionStatus::NotFound,
+ SubscriptionLevel::None,
+ None,
+ None,
+ ),
+ };
+
+ let assigned_key = keys_config
+ .iter()
+ .find(|(_id, entry)| {
+ entry.remote.as_deref() == Some(remote_name.as_str())
+ && entry.node.as_deref() == Some(node_name.as_str())
+ })
+ .map(|(_id, entry)| entry.key.clone());
+
+ out.push(RemoteNodeStatus {
+ remote: remote_name.clone(),
+ ty: remote_ty,
+ node: node_name.to_string(),
+ sockets,
+ status,
+ level,
+ assigned_key,
+ current_key,
+ });
+ }
+ }
+
+ out.sort_by(|a, b| (&a.remote, &a.node).cmp(&(&b.remote, &b.node)));
+ Ok(out)
+}
+
+#[api(
+ input: {
+ properties: {
+ apply: {
+ type: bool,
+ optional: true,
+ default: false,
+ description: "Actually apply the proposed assignments. Without this, only a preview is returned.",
+ },
+ },
+ },
+ returns: {
+ type: Array,
+ description: "List of proposed or applied assignments.",
+ items: { type: ProposedAssignment },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Propose or apply automatic key-to-node assignments.
+///
+/// Matches unused pool keys to remote nodes that do not yet have a pool-assigned key, picking
+/// the smallest PVE key that covers each node's socket count. When `apply=true`, the live node
+/// statuses are fetched first (without holding the config lock - sync locks must not span
+/// awaits), then proposals are computed and persisted under the lock with a per-key re-check
+/// against the now-current pool state, so a parallel admin edit between fetch and apply does
+/// not get silently overwritten.
+async fn auto_assign(
+ apply: Option<bool>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<ProposedAssignment>, Error> {
+ let apply = apply.unwrap_or(false);
+
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+
+ if !apply {
+ let (config, _digest) = pdm_config::subscriptions::config()?;
+ return Ok(compute_proposals(&config, &node_statuses));
+ }
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, _digest) = pdm_config::subscriptions::config()?;
+ let mut proposals = compute_proposals(&config, &node_statuses);
+
+ // Audit-only callers may see a remote in the preview but must not be able to stage a write
+ // for it that another admin would later push on their behalf.
+ proposals.retain(|p| {
+ user_info.lookup_privs(&auth_id, &["resource", &p.remote]) & PRIV_RESOURCE_MODIFY != 0
+ });
+
+ for p in &proposals {
+ if let Some(entry) = config.get_mut(&p.key) {
+ // Skip keys that another writer assigned between the preview and the lock.
+ if entry.remote.is_none() {
+ entry.remote = Some(p.remote.clone());
+ entry.node = Some(p.node.clone());
+ }
+ }
+ }
+ pdm_config::subscriptions::save_config(&config)?;
+
+ Ok(proposals)
+}
+
+fn compute_proposals(
+ config: &SectionConfigData<SubscriptionKeyEntry>,
+ node_statuses: &[RemoteNodeStatus],
+) -> Vec<ProposedAssignment> {
+ let mut target_nodes: Vec<&RemoteNodeStatus> = node_statuses
+ .iter()
+ .filter(|n| {
+ n.assigned_key.is_none() && n.status != proxmox_subscription::SubscriptionStatus::Active
+ })
+ .collect();
+
+ // Sort PVE nodes by socket count descending so large nodes get keys first.
+ target_nodes.sort_by(|a, b| b.sockets.unwrap_or(0).cmp(&a.sockets.unwrap_or(0)));
+
+ let mut proposals: Vec<ProposedAssignment> = Vec::new();
+ let mut taken: std::collections::HashSet<String> = std::collections::HashSet::new();
+
+ for node in &target_nodes {
+ let remote_type = node.ty;
+
+ let candidates = config.iter().filter(|(id, entry)| {
+ entry.remote.is_none()
+ && !taken.contains(*id)
+ && entry.product_type.matches_remote_type(remote_type)
+ });
+
+ let best_key = if remote_type == pdm_api_types::remotes::RemoteType::Pve {
+ let node_sockets = node.sockets.unwrap_or(1) as u32;
+ pick_best_pve_socket_key(
+ node_sockets,
+ candidates.map(|(id, entry)| (id.to_string(), entry.key.as_str())),
+ )
+ } else {
+ candidates.map(|(id, _)| id.to_string()).next()
+ };
+
+ if let Some(key_id) = best_key {
+ let ks = config
+ .get(&key_id)
+ .and_then(|e| socket_count_from_key(&e.key));
+ taken.insert(key_id.clone());
+ proposals.push(ProposedAssignment {
+ key: key_id,
+ remote: node.remote.clone(),
+ node: node.node.clone(),
+ key_sockets: ks,
+ node_sockets: node.sockets,
+ });
+ }
+ }
+
+ proposals
+}
+
+#[api(
+ returns: {
+ type: String,
+ optional: true,
+ description: "Task UPID; absent when there is nothing pending to push.",
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Push every pending key assignment to its remote node.
+///
+/// Pending = the live node does not confirm the assigned key as its current active subscription
+/// (status not Active, a different `current_key`, or the remote did not respond / is gone). Each
+/// push is logged from a worker task so the admin can follow progress.
+///
+/// The worker bails on the first failure; the remaining entries stay pending so the operator
+/// can fix the underlying issue (or clear that one assignment) and trigger another apply.
+///
+/// Returns `None` when nothing is pending so the caller can show a short info message instead of
+/// opening a task progress dialog for a no-op worker.
+async fn apply_pending(rpcenv: &mut dyn RpcEnvironment) -> Result<Option<String>, Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+ let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+ if pending.is_empty() {
+ return Ok(None);
+ }
+
+ let worker_auth = auth_id.clone();
+ let upid = proxmox_rest_server::WorkerTask::spawn(
+ "subscription-push",
+ None,
+ auth_id.to_string(),
+ true,
+ move |_worker| async move { run_apply_pending(worker_auth).await },
+ )?;
+
+ Ok(Some(upid))
+}
+
+/// Re-validate and run the apply-pending plan from inside a worker.
+///
+/// The worker re-reads remotes and the pool config so a reassign or removal between the API call
+/// returning a UPID and the worker firing is honoured (pushing the old key to a node after the
+/// operator retracted the assignment was a real footgun).
+async fn run_apply_pending(auth_id: Authid) -> Result<(), Error> {
+ let user_info = CachedUserInfo::new()?;
+ let (remotes_config, _) = pdm_config::remotes::config()?;
+ let (config, _) = pdm_config::subscriptions::config()?;
+
+ let node_statuses = collect_status_uncached(&remotes_config).await;
+ let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+ if pending.is_empty() {
+ info!("apply-pending: nothing to do (state changed since the API call)");
+ return Ok(());
+ }
+
+ let total = pending.len();
+ let mut ok = 0usize;
+
+ for entry in pending {
+ let Some(remote) = remotes_config.get(&entry.remote) else {
+ bail!(
+ "remote '{}' vanished, aborting after {ok}/{total} successful pushes",
+ entry.remote,
+ );
+ };
+ // Honour the case where the operator unassigned the key while the worker was queued.
+ if !pool_assignment_still_valid(&config, &entry) {
+ info!(
+ "skipping {}/{}: pool assignment changed before worker ran",
+ entry.remote, entry.node
+ );
+ continue;
+ }
+
+ info!(
+ "pushing {} to {}/{}...",
+ entry.key, entry.remote, entry.node
+ );
+ if let Err(err) = push_key_to_remote(remote, &entry.key, &entry.node).await {
+ bail!(
+ "push of {} to {}/{} failed after {ok}/{total} successful pushes: {err}",
+ entry.key,
+ entry.remote,
+ entry.node,
+ );
+ }
+ info!(" success");
+ invalidate_subscription_info_for_remote(&entry.remote);
+ ok += 1;
+ }
+
+ info!("finished: {ok}/{total} pushes succeeded");
+ Ok(())
+}
+
+#[api(
+ returns: { type: ClearPendingResult },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Clear every pending assignment in one bulk transaction.
+///
+/// Pending = pool key bound to a remote node whose live state does not confirm the assignment
+/// (status not Active, a different `current_key`, or no row returned at all because the remote is
+/// unreachable / the node is gone). Clears only those entries the caller has
+/// `PRIV_RESOURCE_MODIFY` on; remotes the caller may only audit are skipped. Mirrors
+/// `apply-pending` but drops the assignments instead of pushing them, so an operator can disown
+/// stuck assignments without first having to bring the target back online.
+async fn clear_pending(rpcenv: &mut dyn RpcEnvironment) -> Result<ClearPendingResult, Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+ let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+ if pending.is_empty() {
+ return Ok(ClearPendingResult { cleared: 0 });
+ }
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, _digest) = pdm_config::subscriptions::config()?;
+
+ let mut cleared: u32 = 0;
+ for entry in &pending {
+ // Re-check inside the lock so a concurrent reassign is not silently overwritten.
+ if let Some(stored) = config.get_mut(&entry.key) {
+ if stored.remote.as_deref() == Some(entry.remote.as_str())
+ && stored.node.as_deref() == Some(entry.node.as_str())
+ {
+ stored.remote = None;
+ stored.node = None;
+ cleared += 1;
+ }
+ }
+ }
+
+ if cleared > 0 {
+ pdm_config::subscriptions::save_config(&config)?;
+ }
+
+ Ok(ClearPendingResult { cleared })
+}
+
+/// Plan entry for one pending push.
+#[derive(Clone, Debug)]
+struct PendingEntry {
+ key: String,
+ remote: String,
+ node: String,
+}
+
+fn compute_pending(
+ user_info: &CachedUserInfo,
+ auth_id: &Authid,
+ node_statuses: &[RemoteNodeStatus],
+) -> Result<Vec<PendingEntry>, Error> {
+ let (config, _) = pdm_config::subscriptions::config()?;
+
+ Ok(config
+ .iter()
+ .filter_map(|(_id, entry)| {
+ let remote = entry.remote.as_deref()?;
+ let node = entry.node.as_deref()?;
+
+ if user_info.lookup_privs(auth_id, &["resource", remote]) & PRIV_RESOURCE_MODIFY == 0 {
+ return None;
+ }
+
+ // Treat anything other than "Active with the assigned key as the live current_key"
+ // as pending, including unreachable remotes, so an operator can clear stuck
+ // assignments without first having to bring the target back online.
+ let is_pending = match node_statuses
+ .iter()
+ .find(|n| n.remote == remote && n.node == node)
+ {
+ Some(n) => {
+ n.status != proxmox_subscription::SubscriptionStatus::Active
+ || n.current_key.as_deref() != Some(entry.key.as_str())
+ }
+ None => true,
+ };
+
+ is_pending.then(|| PendingEntry {
+ key: entry.key.clone(),
+ remote: remote.to_string(),
+ node: node.to_string(),
+ })
+ })
+ .collect())
+}
+
+fn pool_assignment_still_valid(
+ config: &SectionConfigData<SubscriptionKeyEntry>,
+ entry: &PendingEntry,
+) -> bool {
+ let Some(stored) = config.get(&entry.key) else {
+ return false;
+ };
+ stored.remote.as_deref() == Some(entry.remote.as_str())
+ && stored.node.as_deref() == Some(entry.node.as_str())
+}
+
+/// Like [`collect_node_status`] but bypasses the auth filter, for the apply-pending worker
+/// which gates each entry through its own per-remote priv check based on the persisted pool plan.
+async fn collect_status_uncached(
+ remotes_config: &SectionConfigData<Remote>,
+) -> Vec<RemoteNodeStatus> {
+ let fetch = remotes_config.iter().map(|(name, remote)| async move {
+ let res = get_subscription_info_for_remote(remote, FRESH_NODE_STATUS_MAX_AGE).await;
+ (name.to_string(), remote.ty, res)
+ });
+ let results = join_all(fetch).await;
+
+ let mut out = Vec::new();
+ for (remote_name, remote_ty, result) in results {
+ let Ok(node_infos) = result else { continue };
+ for (node_name, node_info) in &node_infos {
+ let (status, level, sockets, current_key) = match node_info {
+ Some(info) => (info.status, info.level, info.sockets, info.key.clone()),
+ None => (
+ proxmox_subscription::SubscriptionStatus::NotFound,
+ SubscriptionLevel::None,
+ None,
+ None,
+ ),
+ };
+ out.push(RemoteNodeStatus {
+ remote: remote_name.clone(),
+ ty: remote_ty,
+ node: node_name.to_string(),
+ sockets,
+ status,
+ level,
+ assigned_key: None,
+ current_key,
+ });
+ }
+ }
+ out
+}
--
2.47.3
next prev parent reply other threads:[~2026-05-07 7:25 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-07 7:17 [PATCH 0/8] subscription: add central key pool registry with reissue support Thomas Lamprecht
2026-05-07 7:17 ` [PATCH 1/8] api: subscription cache: ensure max_age=0 forces a fresh fetch Thomas Lamprecht
2026-05-07 7:17 ` [PATCH 2/8] api types: subscription level: render full names Thomas Lamprecht
2026-05-07 7:17 ` [PATCH 3/8] subscription: add key pool data model and config layer Thomas Lamprecht
2026-05-07 7:17 ` Thomas Lamprecht [this message]
2026-05-07 7:17 ` [PATCH 5/8] ui: add subscription registry with key pool and node status Thomas Lamprecht
2026-05-07 8:15 ` Lukas Wagner
2026-05-07 8:33 ` Thomas Lamprecht
2026-05-07 7:17 ` [PATCH 6/8] cli: add subscription key pool management subcommands Thomas Lamprecht
2026-05-07 7:17 ` [PATCH 7/8] docs: add subscription registry chapter Thomas Lamprecht
2026-05-07 7:17 ` [PATCH 8/8] subscription: add Reissue Key action with pending-reissue queue Thomas Lamprecht
2026-05-07 7:50 ` Lukas Wagner
2026-05-07 8:38 ` superseded: [PATCH 0/8] subscription: add central key pool registry with reissue support Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260507072436.2649563-5-t.lamprecht@proxmox.com \
--to=t.lamprecht@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox