From: Thomas Lamprecht <t.lamprecht@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager v2 4/8] subscription: add key pool and node status API endpoints
Date: Thu, 7 May 2026 10:26:45 +0200 [thread overview]
Message-ID: <20260507082943.2749725-5-t.lamprecht@proxmox.com> (raw)
In-Reply-To: <20260507082943.2749725-1-t.lamprecht@proxmox.com>
Add REST endpoints under /subscriptions for the pool, the combined
remote-vs-pool node-status view, and the bulk paths (auto-assign,
apply-pending, clear-pending).
Endpoints touching a specific remote require the matching resource
privilege on that remote in addition to the system-scope bit, so an
operator with global system access alone cannot push keys to remotes
they have no other authority on. Audit-only callers see only the
remotes they may audit on read paths; pending operations skip remotes
they may not modify.
Add takes an all-or-nothing list. Apply-pending runs in a worker that
re-reads its plan when it fires (honouring a re-assign between API
call and worker firing) and bails on the first push failure so the
rest stays pending for retry.
Mutating endpoints accept an optional ConfigDigest; reads expose it
in the response.
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
---
lib/pdm-api-types/src/subscription.rs | 9 +
server/src/api/mod.rs | 2 +
server/src/api/resources.rs | 8 +
server/src/api/subscriptions/mod.rs | 967 ++++++++++++++++++++++++++
4 files changed, 986 insertions(+)
create mode 100644 server/src/api/subscriptions/mod.rs
diff --git a/lib/pdm-api-types/src/subscription.rs b/lib/pdm-api-types/src/subscription.rs
index 26ecfba..ead3c1b 100644
--- a/lib/pdm-api-types/src/subscription.rs
+++ b/lib/pdm-api-types/src/subscription.rs
@@ -529,6 +529,15 @@ pub struct RemoteNodeStatus {
pub current_key: Option<String>,
}
+#[api]
+#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "kebab-case")]
+/// Result of the bulk clear-pending API endpoint.
+pub struct ClearPendingResult {
+ /// Number of pool entries whose pending push or reissue was cleared.
+ pub cleared: u32,
+}
+
#[api]
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs
index 110191b..9680edc 100644
--- a/server/src/api/mod.rs
+++ b/server/src/api/mod.rs
@@ -18,6 +18,7 @@ pub mod remotes;
pub mod resources;
mod rrd_common;
pub mod sdn;
+pub mod subscriptions;
#[sortable]
const SUBDIRS: SubdirMap = &sorted!([
@@ -31,6 +32,7 @@ const SUBDIRS: SubdirMap = &sorted!([
("resources", &resources::ROUTER),
("nodes", &nodes::ROUTER),
("sdn", &sdn::ROUTER),
+ ("subscriptions", &subscriptions::ROUTER),
("version", &Router::new().get(&API_METHOD_VERSION)),
]);
diff --git a/server/src/api/resources.rs b/server/src/api/resources.rs
index 50315b1..49c3974 100644
--- a/server/src/api/resources.rs
+++ b/server/src/api/resources.rs
@@ -848,6 +848,14 @@ fn get_cached_subscription_info(remote: &str, max_age: u64) -> Option<CachedSubs
}
}
+/// Drop the cached subscription state for a remote, forcing the next read to refetch.
+pub fn invalidate_subscription_info_for_remote(remote_id: &str) {
+ let mut cache = SUBSCRIPTION_CACHE
+ .write()
+ .expect("subscription mutex poisoned");
+ cache.remove(remote_id);
+}
+
/// Update cached subscription data.
///
/// If the cache already contains more recent data we don't insert the passed resources.
diff --git a/server/src/api/subscriptions/mod.rs b/server/src/api/subscriptions/mod.rs
new file mode 100644
index 0000000..26d9ecf
--- /dev/null
+++ b/server/src/api/subscriptions/mod.rs
@@ -0,0 +1,967 @@
+//! Subscription key pool management API.
+//!
+//! Manages a PDM-side pool of subscription keys, proposes key-to-node assignments, and pushes
+//! assigned keys to remote nodes. All entries are added manually for now; each entry is a bare
+//! `key` string with the product type derived from its prefix.
+
+use anyhow::{bail, format_err, Context, Error};
+use futures::future::join_all;
+
+use proxmox_access_control::CachedUserInfo;
+use proxmox_client::HttpApiClient;
+use proxmox_config_digest::ConfigDigest;
+use proxmox_log::{info, warn};
+use proxmox_router::{
+ http_bail, http_err, list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap,
+};
+use proxmox_schema::api;
+use proxmox_section_config::typed::SectionConfigData;
+use proxmox_sortable_macro::sortable;
+
+use pdm_api_types::remotes::{Remote, REMOTE_ID_SCHEMA};
+use pdm_api_types::subscription::{
+ pick_best_pve_socket_key, socket_count_from_key, ClearPendingResult, ProductType,
+ ProposedAssignment, RemoteNodeStatus, SubscriptionKeyEntry, SubscriptionKeySource,
+ SubscriptionLevel, SUBSCRIPTION_KEY_SCHEMA,
+};
+use pdm_api_types::{
+ Authid, NODE_SCHEMA, PRIV_RESOURCE_AUDIT, PRIV_RESOURCE_MODIFY, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+use crate::api::resources::{
+ get_subscription_info_for_remote, invalidate_subscription_info_for_remote,
+};
+
+pub const ROUTER: Router = Router::new()
+ .get(&list_subdirs_api_method!(SUBDIRS))
+ .subdirs(SUBDIRS);
+
+#[sortable]
+const SUBDIRS: SubdirMap = &sorted!([
+ (
+ "apply-pending",
+ &Router::new().post(&API_METHOD_APPLY_PENDING)
+ ),
+ ("auto-assign", &Router::new().post(&API_METHOD_AUTO_ASSIGN)),
+ (
+ "clear-pending",
+ &Router::new().post(&API_METHOD_CLEAR_PENDING)
+ ),
+ ("keys", &KEYS_ROUTER),
+ ("node-status", &Router::new().get(&API_METHOD_NODE_STATUS)),
+]);
+
+const KEYS_ROUTER: Router = Router::new()
+ .get(&API_METHOD_LIST_KEYS)
+ .post(&API_METHOD_ADD_KEYS)
+ .match_all("key", &KEY_ITEM_ROUTER);
+
+const KEY_ITEM_ROUTER: Router = Router::new()
+ .get(&API_METHOD_GET_KEY)
+ .put(&API_METHOD_ASSIGN_KEY)
+ .delete(&API_METHOD_DELETE_KEY);
+
+/// Force-fresh node-status query so the next view reflects the new state instead of returning a
+/// cached entry up to 5 minutes later. Used by auto-assign / apply-pending / clear-pending to
+/// avoid double-driving a node that has already moved to Active in the cache window.
+const FRESH_NODE_STATUS_MAX_AGE: u64 = 0;
+
+/// Cached node-status freshness used by read-only views. Five minutes matches the resource-cache
+/// convention and is short enough that admins rarely see stale data on the panel.
+const PANEL_NODE_STATUS_MAX_AGE: u64 = 5 * 60;
+
+#[api(
+ returns: {
+ type: Array,
+ description: "List of subscription keys in the pool.",
+ items: { type: SubscriptionKeyEntry },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// List all subscription keys in the key pool.
+fn list_keys(rpcenv: &mut dyn RpcEnvironment) -> Result<Vec<SubscriptionKeyEntry>, Error> {
+ let (config, digest) = pdm_config::subscriptions::config()?;
+ rpcenv["digest"] = digest.to_hex().into();
+ Ok(config
+ .into_iter()
+ .map(|(_id, mut entry)| {
+ entry.level = SubscriptionLevel::from_key(Some(&entry.key));
+ entry
+ })
+ .collect())
+}
+
+#[api(
+ input: {
+ properties: {
+ keys: {
+ type: Array,
+ description: "Subscription keys to add to the pool.",
+ items: { schema: SUBSCRIPTION_KEY_SCHEMA },
+ },
+ digest: {
+ type: ConfigDigest,
+ optional: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Add one or more subscription keys to the pool.
+///
+/// The key prefix determines the product type via [`ProductType::from_key`]. The schema regex
+/// rejects anything that isn't a PVE or PBS key today; widen [`PRODUCT_KEY_REGEX`] in lockstep
+/// with `from_key` and `push_key_to_remote` when PMG/POM remote support lands.
+///
+/// All-or-nothing: every key is validated for prefix and uniqueness (against the existing pool
+/// and within the input list) before any change is persisted. A single bad key fails the
+/// request and leaves the pool untouched.
+fn add_keys(keys: Vec<String>, digest: Option<ConfigDigest>) -> Result<(), Error> {
+ if keys.is_empty() {
+ http_bail!(BAD_REQUEST, "no keys provided");
+ }
+
+ let mut entries: Vec<SubscriptionKeyEntry> = Vec::with_capacity(keys.len());
+ let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
+ for key in &keys {
+ if !seen.insert(key.as_str()) {
+ http_bail!(BAD_REQUEST, "duplicate key in input: '{key}'");
+ }
+ let product_type = ProductType::from_key(key).ok_or_else(|| {
+ warn!("rejecting unrecognised key prefix '{key}', possibly a new product line");
+ format_err!("unrecognised key format: {key}")
+ })?;
+ entries.push(SubscriptionKeyEntry {
+ key: key.clone(),
+ product_type,
+ level: SubscriptionLevel::from_key(Some(key)),
+ source: SubscriptionKeySource::Manual,
+ ..Default::default()
+ });
+ }
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+ config_digest.detect_modification(digest.as_ref())?;
+
+ for entry in &entries {
+ if config.contains_key(&entry.key) {
+ http_bail!(CONFLICT, "key '{}' already exists in pool", entry.key);
+ }
+ }
+
+ for entry in entries {
+ config.insert(entry.key.clone(), entry);
+ }
+
+ pdm_config::subscriptions::save_config(&config)
+}
+
+#[api(
+ input: {
+ properties: {
+ key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+ },
+ },
+ returns: { type: SubscriptionKeyEntry },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// Get details for a single key.
+fn get_key(key: String, rpcenv: &mut dyn RpcEnvironment) -> Result<SubscriptionKeyEntry, Error> {
+ let (config, digest) = pdm_config::subscriptions::config()?;
+ rpcenv["digest"] = digest.to_hex().into();
+ let mut entry = config
+ .get(&key)
+ .cloned()
+ .ok_or_else(|| http_err!(NOT_FOUND, "key '{key}' not found in pool"))?;
+ entry.level = SubscriptionLevel::from_key(Some(&entry.key));
+ Ok(entry)
+}
+
+#[api(
+ input: {
+ properties: {
+ key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+ digest: {
+ type: ConfigDigest,
+ optional: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Remove a key from the pool.
+///
+/// If the key is currently assigned to a remote node, the caller must also have
+/// `PRIV_RESOURCE_MODIFY` on that remote, so an audit-only operator cannot release a key
+/// another admin had pinned. Refuses if the key is currently the live active key on its bound
+/// node, since dropping the pool entry would orphan that subscription on the remote: the
+/// operator must Reissue Key first.
+async fn delete_key(
+ key: String,
+ digest: Option<ConfigDigest>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ // Live fetch must happen before the lock since the lock cannot span an .await; the
+ // post-lock check below only refuses if the binding still matches what we observed.
+ let synced_block = check_synced_assignment_for_unassign(&key).await?;
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+ config_digest.detect_modification(digest.as_ref())?;
+ let mut shadow = pdm_config::subscriptions::shadow_config()?;
+
+ let Some(entry) = config.get(&key) else {
+ http_bail!(NOT_FOUND, "key '{key}' not found in pool");
+ };
+
+ if let Some(assigned_remote) = entry.remote.as_deref() {
+ user_info.check_privs(
+ &auth_id,
+ &["resource", assigned_remote],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )?;
+ }
+
+ if let Some((blocking_remote, blocking_node)) = synced_block {
+ if entry.remote.as_deref() == Some(blocking_remote.as_str())
+ && entry.node.as_deref() == Some(blocking_node.as_str())
+ {
+ http_bail!(
+ BAD_REQUEST,
+ "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \
+ use Reissue Key to remove it from the remote first"
+ );
+ }
+ }
+
+ config.remove(&key);
+ // Cascade the shadow drop. Order mirrors `pdm-config/src/remotes.rs`: shadow first, then
+ // main, so a partial failure cannot leave a key entry that lost its signed blob.
+ shadow.remove(&key);
+ pdm_config::subscriptions::save_shadow(&shadow)?;
+ pdm_config::subscriptions::save_config(&config)?;
+
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+ remote: {
+ schema: REMOTE_ID_SCHEMA,
+ optional: true,
+ },
+ node: {
+ // NODE_SCHEMA rejects path-traversal input before it ends up interpolated into
+ // the remote URL `/api2/extjs/nodes/{node}/subscription`.
+ schema: NODE_SCHEMA,
+ optional: true,
+ },
+ digest: {
+ type: ConfigDigest,
+ optional: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Assign a key to a remote node, or unassign it (omit remote and node).
+///
+/// `PRIV_SYS_MODIFY` lets the caller touch the pool config; per-remote `PRIV_RESOURCE_MODIFY`
+/// is enforced inside this handler so an operator cannot push a key to a remote they have no
+/// other authority on.
+async fn assign_key(
+ key: String,
+ remote: Option<String>,
+ node: Option<String>,
+ digest: Option<ConfigDigest>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ // Best-effort orphan-prevention for the unassign path: if the entry is currently bound and
+ // synced (the assigned key is the live current_key on its remote), refuse and direct the
+ // operator to Reissue Key. The live fetch must happen before the lock since that lock cannot
+ // span an .await; we re-check the binding under the lock and only refuse if it still
+ // matches what we observed live.
+ let synced_block = if remote.is_none() && node.is_none() {
+ check_synced_assignment_for_unassign(&key).await?
+ } else {
+ None
+ };
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+ config_digest.detect_modification(digest.as_ref())?;
+
+ let Some(stored_entry) = config.get(&key).cloned() else {
+ http_bail!(NOT_FOUND, "key '{key}' not found in pool");
+ };
+ let product_type = stored_entry.product_type;
+
+ match (&remote, &node) {
+ (Some(remote_name), Some(node_name)) => {
+ user_info.check_privs(
+ &auth_id,
+ &["resource", remote_name],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )?;
+
+ // Reassigning away from a previous remote requires modify on that remote too,
+ // otherwise an audit-only-on-A operator could effectively pull a key off A by
+ // re-binding it to a remote B they can modify and applying the push (which makes
+ // the shop reissue the serverid to B and invalidates A).
+ if let Some(prev_remote) = stored_entry.remote.as_deref() {
+ if prev_remote != remote_name {
+ user_info.check_privs(
+ &auth_id,
+ &["resource", prev_remote],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )?;
+ }
+ }
+
+ let (remotes_config, _) = pdm_config::remotes::config()?;
+ let remote_entry = remotes_config
+ .get(remote_name)
+ .ok_or_else(|| http_err!(NOT_FOUND, "remote '{remote_name}' not found"))?;
+
+ if !product_type.matches_remote_type(remote_entry.ty) {
+ http_bail!(
+ BAD_REQUEST,
+ "key type '{product_type}' does not match remote type '{}'",
+ remote_entry.ty
+ );
+ }
+
+ for (_id, other) in config.iter() {
+ if other.key != key
+ && other.remote.as_deref() == Some(remote_name.as_str())
+ && other.node.as_deref() == Some(node_name.as_str())
+ {
+ http_bail!(
+ CONFLICT,
+ "key '{}' is already assigned to {remote_name}/{node_name}",
+ other.key
+ );
+ }
+ }
+
+ let entry = config.get_mut(&key).unwrap();
+ entry.remote = Some(remote_name.clone());
+ entry.node = Some(node_name.clone());
+ }
+ (None, None) => {
+ // Unassign also requires modify on the previously-pinned remote, so an audit-only
+ // operator cannot rip a key off a node they cannot otherwise touch.
+ if let Some(prev_remote) = stored_entry.remote.as_deref() {
+ user_info.check_privs(
+ &auth_id,
+ &["resource", prev_remote],
+ PRIV_RESOURCE_MODIFY,
+ false,
+ )?;
+ }
+ // Honour the pre-lock synced check only if the binding still matches what we
+ // observed; if the binding moved between the live fetch and lock, the orphan check
+ // is moot and we let the unassign through.
+ if let Some((blocking_remote, blocking_node)) = synced_block {
+ if stored_entry.remote.as_deref() == Some(blocking_remote.as_str())
+ && stored_entry.node.as_deref() == Some(blocking_node.as_str())
+ {
+ http_bail!(
+ BAD_REQUEST,
+ "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \
+ use Reissue Key to remove it from the remote first"
+ );
+ }
+ }
+ let entry = config.get_mut(&key).unwrap();
+ entry.remote = None;
+ entry.node = None;
+ }
+ _ => {
+ http_bail!(
+ BAD_REQUEST,
+ "both 'remote' and 'node' must be provided, or neither"
+ );
+ }
+ }
+
+ pdm_config::subscriptions::save_config(&config)?;
+
+ Ok(())
+}
+
+/// Pre-lock check for [`assign_key`]'s unassign path: returns the (remote, node) the entry is
+/// currently active on, if any, so the lock-protected branch can refuse the unassign and prompt
+/// the operator to Reissue Key instead. Returns `None` for entries with no binding, no live
+/// subscription, or a live subscription whose key does not match the entry.
+async fn check_synced_assignment_for_unassign(
+ key: &str,
+) -> Result<Option<(String, String)>, Error> {
+ let (config, _) = pdm_config::subscriptions::config()?;
+ let Some(entry) = config.get(key) else {
+ return Ok(None);
+ };
+ let (Some(prev_remote), Some(prev_node)) = (entry.remote.clone(), entry.node.clone()) else {
+ return Ok(None);
+ };
+ let (remotes_config, _) = pdm_config::remotes::config()?;
+ let Some(remote_entry) = remotes_config.get(&prev_remote) else {
+ return Ok(None);
+ };
+ let live = match get_subscription_info_for_remote(remote_entry, FRESH_NODE_STATUS_MAX_AGE).await
+ {
+ Ok(v) => v,
+ Err(_) => return Ok(None),
+ };
+ let synced = live
+ .get(&prev_node)
+ .and_then(|info| info.as_ref())
+ .map(|info| {
+ info.status == proxmox_subscription::SubscriptionStatus::Active
+ && info.key.as_deref() == Some(key)
+ })
+ .unwrap_or(false);
+ Ok(synced.then_some((prev_remote, prev_node)))
+}
+
+/// Push a single key to its assigned remote node. Operates on a borrowed `Remote` so the
+/// caller can fetch the remotes-config once and reuse it.
+async fn push_key_to_remote(remote: &Remote, key: &str, node_name: &str) -> Result<(), Error> {
+ let product_type =
+ ProductType::from_key(key).ok_or_else(|| format_err!("unrecognised key format: {key}"))?;
+
+ // PVE and PBS share `proxmox_client::Client`, so `make_pbs_client_and_login` works for both;
+ // only the PUT path differs.
+ let path = match product_type {
+ ProductType::Pve => format!("/api2/extjs/nodes/{node_name}/subscription"),
+ ProductType::Pbs => "/api2/extjs/nodes/localhost/subscription".to_string(),
+ ProductType::Pmg | ProductType::Pom => {
+ bail!("PDM cannot push '{product_type}' keys: no remote support yet");
+ }
+ };
+
+ let client = crate::connection::make_pbs_client_and_login(remote).await?;
+
+ client
+ .0
+ .put(&path, &serde_json::json!({ "key": key }))
+ .await?;
+ client.0.post(&path, &serde_json::json!({})).await?;
+
+ info!("pushed key '{key}' to {}/{node_name}", remote.id);
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ "max-age": {
+ type: u64,
+ optional: true,
+ description: "Override the cache freshness window in seconds. \
+ Default 300 for panel views; pass 0 to force a fresh query.",
+ },
+ },
+ },
+ returns: {
+ type: Array,
+ description: "Subscription status of all remote nodes the user can audit.",
+ items: { type: RemoteNodeStatus },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// Get the subscription status of every remote node the caller can audit, combined with key pool
+/// assignment information.
+///
+/// Per-remote `PRIV_RESOURCE_AUDIT` is enforced inside the handler so users only see remotes
+/// they may audit.
+async fn node_status(
+ max_age: Option<u64>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<RemoteNodeStatus>, Error> {
+ collect_node_status(max_age.unwrap_or(PANEL_NODE_STATUS_MAX_AGE), rpcenv).await
+}
+
+/// Shared helper: fan out subscription queries to all remotes the caller has audit privilege on,
+/// in parallel, reusing the per-remote `SUBSCRIPTION_CACHE` via `get_subscription_info_for_remote`.
+/// Joins the results with the key-pool assignment table.
+async fn collect_node_status(
+ max_age: u64,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<RemoteNodeStatus>, Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let visible_remotes: Vec<(String, Remote)> = crate::api::remotes::RemoteIterator::new()?
+ .any_privs(&user_info, &auth_id, PRIV_RESOURCE_AUDIT)
+ .into_iter()
+ .collect();
+
+ let (keys_config, _) = pdm_config::subscriptions::config()?;
+
+ // `get_subscription_info_for_remote` re-uses the per-remote `SUBSCRIPTION_CACHE` so this
+ // fan-out is safe to run concurrently.
+ let fetch = visible_remotes.iter().map(|(name, remote)| async move {
+ let res = get_subscription_info_for_remote(remote, max_age).await;
+ (name.clone(), remote.ty, res)
+ });
+ let results = join_all(fetch).await;
+
+ let mut out = Vec::new();
+ for (remote_name, remote_ty, result) in results {
+ let node_infos = match result {
+ Ok(info) => info,
+ Err(err) => {
+ warn!("failed to query subscription for remote {remote_name}: {err}");
+ continue;
+ }
+ };
+
+ for (node_name, node_info) in &node_infos {
+ let (status, level, sockets, current_key) = match node_info {
+ Some(info) => (info.status, info.level, info.sockets, info.key.clone()),
+ None => (
+ proxmox_subscription::SubscriptionStatus::NotFound,
+ SubscriptionLevel::None,
+ None,
+ None,
+ ),
+ };
+
+ let assigned_key = keys_config
+ .iter()
+ .find(|(_id, entry)| {
+ entry.remote.as_deref() == Some(remote_name.as_str())
+ && entry.node.as_deref() == Some(node_name.as_str())
+ })
+ .map(|(_id, entry)| entry.key.clone());
+
+ out.push(RemoteNodeStatus {
+ remote: remote_name.clone(),
+ ty: remote_ty,
+ node: node_name.to_string(),
+ sockets,
+ status,
+ level,
+ assigned_key,
+ current_key,
+ });
+ }
+ }
+
+ out.sort_by(|a, b| (&a.remote, &a.node).cmp(&(&b.remote, &b.node)));
+ Ok(out)
+}
+
+#[api(
+ input: {
+ properties: {
+ apply: {
+ type: bool,
+ optional: true,
+ default: false,
+ description: "Actually apply the proposed assignments. Without this, only a preview is returned.",
+ },
+ },
+ },
+ returns: {
+ type: Array,
+ description: "List of proposed or applied assignments.",
+ items: { type: ProposedAssignment },
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Propose or apply automatic key-to-node assignments.
+///
+/// Matches unused pool keys to remote nodes that do not yet have a pool-assigned key, picking
+/// the smallest PVE key that covers each node's socket count. When `apply=true`, the live node
+/// statuses are fetched first (without holding the config lock - sync locks must not span
+/// awaits), then proposals are computed and persisted under the lock with a per-key re-check
+/// against the now-current pool state, so a parallel admin edit between fetch and apply does
+/// not get silently overwritten.
+async fn auto_assign(
+ apply: Option<bool>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<ProposedAssignment>, Error> {
+ let apply = apply.unwrap_or(false);
+
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+
+ if !apply {
+ let (config, _digest) = pdm_config::subscriptions::config()?;
+ return Ok(compute_proposals(&config, &node_statuses));
+ }
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, _digest) = pdm_config::subscriptions::config()?;
+ let mut proposals = compute_proposals(&config, &node_statuses);
+
+ // Audit-only callers may see a remote in the preview but must not be able to stage a write
+ // for it that another admin would later push on their behalf.
+ proposals.retain(|p| {
+ user_info.lookup_privs(&auth_id, &["resource", &p.remote]) & PRIV_RESOURCE_MODIFY != 0
+ });
+
+ for p in &proposals {
+ if let Some(entry) = config.get_mut(&p.key) {
+ // Skip keys that another writer assigned between the preview and the lock.
+ if entry.remote.is_none() {
+ entry.remote = Some(p.remote.clone());
+ entry.node = Some(p.node.clone());
+ }
+ }
+ }
+ pdm_config::subscriptions::save_config(&config)?;
+
+ Ok(proposals)
+}
+
+fn compute_proposals(
+ config: &SectionConfigData<SubscriptionKeyEntry>,
+ node_statuses: &[RemoteNodeStatus],
+) -> Vec<ProposedAssignment> {
+ let mut target_nodes: Vec<&RemoteNodeStatus> = node_statuses
+ .iter()
+ .filter(|n| {
+ n.assigned_key.is_none() && n.status != proxmox_subscription::SubscriptionStatus::Active
+ })
+ .collect();
+
+ // Sort PVE nodes by socket count descending so large nodes get keys first.
+ target_nodes.sort_by(|a, b| b.sockets.unwrap_or(0).cmp(&a.sockets.unwrap_or(0)));
+
+ let mut proposals: Vec<ProposedAssignment> = Vec::new();
+ let mut taken: std::collections::HashSet<String> = std::collections::HashSet::new();
+
+ for node in &target_nodes {
+ let remote_type = node.ty;
+
+ let candidates = config.iter().filter(|(id, entry)| {
+ entry.remote.is_none()
+ && !taken.contains(*id)
+ && entry.product_type.matches_remote_type(remote_type)
+ });
+
+ let best_key = if remote_type == pdm_api_types::remotes::RemoteType::Pve {
+ let node_sockets = node.sockets.unwrap_or(1) as u32;
+ pick_best_pve_socket_key(
+ node_sockets,
+ candidates.map(|(id, entry)| (id.to_string(), entry.key.as_str())),
+ )
+ } else {
+ candidates.map(|(id, _)| id.to_string()).next()
+ };
+
+ if let Some(key_id) = best_key {
+ let ks = config
+ .get(&key_id)
+ .and_then(|e| socket_count_from_key(&e.key));
+ taken.insert(key_id.clone());
+ proposals.push(ProposedAssignment {
+ key: key_id,
+ remote: node.remote.clone(),
+ node: node.node.clone(),
+ key_sockets: ks,
+ node_sockets: node.sockets,
+ });
+ }
+ }
+
+ proposals
+}
+
+#[api(
+ returns: {
+ type: String,
+ optional: true,
+ description: "Task UPID; absent when there is nothing pending to push.",
+ },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Push every pending key assignment to its remote node.
+///
+/// Pending = the live node does not confirm the assigned key as its current active subscription
+/// (status not Active, a different `current_key`, or the remote did not respond / is gone). Each
+/// push is logged from a worker task so the admin can follow progress.
+///
+/// The worker bails on the first failure; the remaining entries stay pending so the operator
+/// can fix the underlying issue (or clear that one assignment) and trigger another apply.
+///
+/// Returns `None` when nothing is pending so the caller can show a short info message instead of
+/// opening a task progress dialog for a no-op worker.
+async fn apply_pending(rpcenv: &mut dyn RpcEnvironment) -> Result<Option<String>, Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+ let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+ if pending.is_empty() {
+ return Ok(None);
+ }
+
+ let worker_auth = auth_id.clone();
+ let upid = proxmox_rest_server::WorkerTask::spawn(
+ "subscription-push",
+ None,
+ auth_id.to_string(),
+ true,
+ move |_worker| async move { run_apply_pending(worker_auth).await },
+ )?;
+
+ Ok(Some(upid))
+}
+
+/// Re-validate and run the apply-pending plan from inside a worker.
+///
+/// The worker re-reads remotes and the pool config so a reassign or removal between the API call
+/// returning a UPID and the worker firing is honoured (pushing the old key to a node after the
+/// operator retracted the assignment was a real footgun).
+async fn run_apply_pending(auth_id: Authid) -> Result<(), Error> {
+ let user_info = CachedUserInfo::new()?;
+ let (remotes_config, _) = pdm_config::remotes::config()?;
+ let (config, _) = pdm_config::subscriptions::config()?;
+
+ let node_statuses = collect_status_uncached(&remotes_config).await;
+ let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+ if pending.is_empty() {
+ info!("apply-pending: nothing to do (state changed since the API call)");
+ return Ok(());
+ }
+
+ let total = pending.len();
+ let mut ok = 0usize;
+
+ for entry in pending {
+ let Some(remote) = remotes_config.get(&entry.remote) else {
+ bail!(
+ "remote '{}' vanished, aborting after {ok}/{total} successful pushes",
+ entry.remote,
+ );
+ };
+ // Honour the case where the operator unassigned the key while the worker was queued.
+ if !pool_assignment_still_valid(&config, &entry) {
+ info!(
+ "skipping {}/{}: pool assignment changed before worker ran",
+ entry.remote, entry.node
+ );
+ continue;
+ }
+
+ info!(
+ "pushing {} to {}/{}...",
+ entry.key, entry.remote, entry.node
+ );
+ if let Err(err) = push_key_to_remote(remote, &entry.key, &entry.node).await {
+ bail!(
+ "push of {} to {}/{} failed after {ok}/{total} successful pushes: {err}",
+ entry.key,
+ entry.remote,
+ entry.node,
+ );
+ }
+ info!(" success");
+ invalidate_subscription_info_for_remote(&entry.remote);
+ ok += 1;
+ }
+
+ info!("finished: {ok}/{total} pushes succeeded");
+ Ok(())
+}
+
+#[api(
+ returns: { type: ClearPendingResult },
+ access: {
+ permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Clear every pending assignment in one bulk transaction.
+///
+/// Pending = pool key bound to a remote node whose live state does not confirm the assignment
+/// (status not Active, a different `current_key`, or no row returned at all because the remote is
+/// unreachable / the node is gone). Clears only those entries the caller has
+/// `PRIV_RESOURCE_MODIFY` on; remotes the caller may only audit are skipped. Mirrors
+/// `apply-pending` but drops the assignments instead of pushing them, so an operator can disown
+/// stuck assignments without first having to bring the target back online.
+async fn clear_pending(rpcenv: &mut dyn RpcEnvironment) -> Result<ClearPendingResult, Error> {
+ let auth_id: Authid = rpcenv
+ .get_auth_id()
+ .context("no authid available")?
+ .parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+ let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+ if pending.is_empty() {
+ return Ok(ClearPendingResult { cleared: 0 });
+ }
+
+ let _lock = pdm_config::subscriptions::lock_config()?;
+ let (mut config, _digest) = pdm_config::subscriptions::config()?;
+
+ let mut cleared: u32 = 0;
+ for entry in &pending {
+ // Re-check inside the lock so a concurrent reassign is not silently overwritten.
+ if let Some(stored) = config.get_mut(&entry.key) {
+ if stored.remote.as_deref() == Some(entry.remote.as_str())
+ && stored.node.as_deref() == Some(entry.node.as_str())
+ {
+ stored.remote = None;
+ stored.node = None;
+ cleared += 1;
+ }
+ }
+ }
+
+ if cleared > 0 {
+ pdm_config::subscriptions::save_config(&config)?;
+ }
+
+ Ok(ClearPendingResult { cleared })
+}
+
+/// Plan entry for one pending push.
+#[derive(Clone, Debug)]
+struct PendingEntry {
+ key: String,
+ remote: String,
+ node: String,
+}
+
+fn compute_pending(
+ user_info: &CachedUserInfo,
+ auth_id: &Authid,
+ node_statuses: &[RemoteNodeStatus],
+) -> Result<Vec<PendingEntry>, Error> {
+ let (config, _) = pdm_config::subscriptions::config()?;
+
+ Ok(config
+ .iter()
+ .filter_map(|(_id, entry)| {
+ let remote = entry.remote.as_deref()?;
+ let node = entry.node.as_deref()?;
+
+ if user_info.lookup_privs(auth_id, &["resource", remote]) & PRIV_RESOURCE_MODIFY == 0 {
+ return None;
+ }
+
+ // Treat anything other than "Active with the assigned key as the live current_key"
+ // as pending, including unreachable remotes, so an operator can clear stuck
+ // assignments without first having to bring the target back online.
+ let is_pending = match node_statuses
+ .iter()
+ .find(|n| n.remote == remote && n.node == node)
+ {
+ Some(n) => {
+ n.status != proxmox_subscription::SubscriptionStatus::Active
+ || n.current_key.as_deref() != Some(entry.key.as_str())
+ }
+ None => true,
+ };
+
+ is_pending.then(|| PendingEntry {
+ key: entry.key.clone(),
+ remote: remote.to_string(),
+ node: node.to_string(),
+ })
+ })
+ .collect())
+}
+
+fn pool_assignment_still_valid(
+ config: &SectionConfigData<SubscriptionKeyEntry>,
+ entry: &PendingEntry,
+) -> bool {
+ let Some(stored) = config.get(&entry.key) else {
+ return false;
+ };
+ stored.remote.as_deref() == Some(entry.remote.as_str())
+ && stored.node.as_deref() == Some(entry.node.as_str())
+}
+
+/// Like [`collect_node_status`] but bypasses the auth filter, for the apply-pending worker
+/// which gates each entry through its own per-remote priv check based on the persisted pool plan.
+async fn collect_status_uncached(
+ remotes_config: &SectionConfigData<Remote>,
+) -> Vec<RemoteNodeStatus> {
+ let fetch = remotes_config.iter().map(|(name, remote)| async move {
+ let res = get_subscription_info_for_remote(remote, FRESH_NODE_STATUS_MAX_AGE).await;
+ (name.to_string(), remote.ty, res)
+ });
+ let results = join_all(fetch).await;
+
+ let mut out = Vec::new();
+ for (remote_name, remote_ty, result) in results {
+ let Ok(node_infos) = result else { continue };
+ for (node_name, node_info) in &node_infos {
+ let (status, level, sockets, current_key) = match node_info {
+ Some(info) => (info.status, info.level, info.sockets, info.key.clone()),
+ None => (
+ proxmox_subscription::SubscriptionStatus::NotFound,
+ SubscriptionLevel::None,
+ None,
+ None,
+ ),
+ };
+ out.push(RemoteNodeStatus {
+ remote: remote_name.clone(),
+ ty: remote_ty,
+ node: node_name.to_string(),
+ sockets,
+ status,
+ level,
+ assigned_key: None,
+ current_key,
+ });
+ }
+ }
+ out
+}
--
2.47.3
next prev parent reply other threads:[~2026-05-07 8:30 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-07 8:26 [PATCH datacenter-manager v2 0/8] subscription: add central key pool registry with reissue support Thomas Lamprecht
2026-05-07 8:26 ` [PATCH datacenter-manager v2 1/8] api: subscription cache: ensure max_age=0 forces a fresh fetch Thomas Lamprecht
2026-05-07 13:23 ` Lukas Wagner
2026-05-08 12:43 ` applied: " Lukas Wagner
2026-05-07 8:26 ` [PATCH datacenter-manager v2 2/8] api types: subscription level: render full names Thomas Lamprecht
2026-05-07 13:23 ` Lukas Wagner
2026-05-07 8:26 ` [PATCH datacenter-manager v2 3/8] subscription: add key pool data model and config layer Thomas Lamprecht
2026-05-07 8:26 ` Thomas Lamprecht [this message]
2026-05-07 13:23 ` [PATCH datacenter-manager v2 4/8] subscription: add key pool and node status API endpoints Lukas Wagner
2026-05-07 8:26 ` [PATCH datacenter-manager v2 5/8] ui: add subscription registry with key pool and node status Thomas Lamprecht
2026-05-07 8:26 ` [PATCH datacenter-manager v2 6/8] cli: add subscription key pool management subcommands Thomas Lamprecht
2026-05-07 8:26 ` [PATCH datacenter-manager v2 7/8] docs: add subscription registry chapter Thomas Lamprecht
2026-05-07 8:26 ` [PATCH datacenter-manager v2 8/8] subscription: add Reissue Key action with pending-reissue queue Thomas Lamprecht
2026-05-07 8:34 ` [PATCH datacenter-manager v2 9/9] fixup! ui: add subscription registry with key pool and node status Thomas Lamprecht
2026-05-07 13:23 ` [PATCH datacenter-manager v2 0/8] subscription: add central key pool registry with reissue support Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260507082943.2749725-5-t.lamprecht@proxmox.com \
--to=t.lamprecht@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.