public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Thomas Lamprecht <t.lamprecht@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH 4/8] subscription: add key pool and node status API endpoints
Date: Thu,  7 May 2026 09:17:27 +0200	[thread overview]
Message-ID: <20260507072436.2649563-5-t.lamprecht@proxmox.com> (raw)
In-Reply-To: <20260507072436.2649563-1-t.lamprecht@proxmox.com>

Add REST endpoints under /subscriptions for the pool, the combined
remote-vs-pool node-status view, and the bulk paths (auto-assign,
apply-pending, clear-pending).

Endpoints touching a specific remote require the matching resource
privilege on that remote in addition to the system-scope bit, so an
operator with global system access alone cannot push keys to remotes
they have no other authority on. Audit-only callers see only the
remotes they may audit on read paths; pending operations skip remotes
they may not modify.

Add takes an all-or-nothing list. Apply-pending runs in a worker that
re-reads its plan when it fires (honouring a re-assign between API
call and worker firing) and bails on the first push failure so the
rest stays pending for retry.

Mutating endpoints accept an optional ConfigDigest; reads expose it
in the response.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
---
 lib/pdm-api-types/src/subscription.rs |   9 +
 server/src/api/mod.rs                 |   2 +
 server/src/api/resources.rs           |   8 +
 server/src/api/subscriptions/mod.rs   | 967 ++++++++++++++++++++++++++
 4 files changed, 986 insertions(+)
 create mode 100644 server/src/api/subscriptions/mod.rs

diff --git a/lib/pdm-api-types/src/subscription.rs b/lib/pdm-api-types/src/subscription.rs
index 26ecfba..ead3c1b 100644
--- a/lib/pdm-api-types/src/subscription.rs
+++ b/lib/pdm-api-types/src/subscription.rs
@@ -529,6 +529,15 @@ pub struct RemoteNodeStatus {
     pub current_key: Option<String>,
 }
 
+#[api]
+#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "kebab-case")]
+/// Result of the bulk clear-pending API endpoint.
+pub struct ClearPendingResult {
+    /// Number of pool entries whose pending push or reissue was cleared.
+    pub cleared: u32,
+}
+
 #[api]
 #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
 #[serde(rename_all = "kebab-case")]
diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs
index 110191b..9680edc 100644
--- a/server/src/api/mod.rs
+++ b/server/src/api/mod.rs
@@ -18,6 +18,7 @@ pub mod remotes;
 pub mod resources;
 mod rrd_common;
 pub mod sdn;
+pub mod subscriptions;
 
 #[sortable]
 const SUBDIRS: SubdirMap = &sorted!([
@@ -31,6 +32,7 @@ const SUBDIRS: SubdirMap = &sorted!([
     ("resources", &resources::ROUTER),
     ("nodes", &nodes::ROUTER),
     ("sdn", &sdn::ROUTER),
+    ("subscriptions", &subscriptions::ROUTER),
     ("version", &Router::new().get(&API_METHOD_VERSION)),
 ]);
 
diff --git a/server/src/api/resources.rs b/server/src/api/resources.rs
index 50315b1..49c3974 100644
--- a/server/src/api/resources.rs
+++ b/server/src/api/resources.rs
@@ -848,6 +848,14 @@ fn get_cached_subscription_info(remote: &str, max_age: u64) -> Option<CachedSubs
     }
 }
 
+/// Drop the cached subscription state for a remote, forcing the next read to refetch.
+pub fn invalidate_subscription_info_for_remote(remote_id: &str) {
+    let mut cache = SUBSCRIPTION_CACHE
+        .write()
+        .expect("subscription mutex poisoned");
+    cache.remove(remote_id);
+}
+
 /// Update cached subscription data.
 ///
 /// If the cache already contains more recent data we don't insert the passed resources.
diff --git a/server/src/api/subscriptions/mod.rs b/server/src/api/subscriptions/mod.rs
new file mode 100644
index 0000000..26d9ecf
--- /dev/null
+++ b/server/src/api/subscriptions/mod.rs
@@ -0,0 +1,967 @@
+//! Subscription key pool management API.
+//!
+//! Manages a PDM-side pool of subscription keys, proposes key-to-node assignments, and pushes
+//! assigned keys to remote nodes. All entries are added manually for now; each entry is a bare
+//! `key` string with the product type derived from its prefix.
+
+use anyhow::{bail, format_err, Context, Error};
+use futures::future::join_all;
+
+use proxmox_access_control::CachedUserInfo;
+use proxmox_client::HttpApiClient;
+use proxmox_config_digest::ConfigDigest;
+use proxmox_log::{info, warn};
+use proxmox_router::{
+    http_bail, http_err, list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap,
+};
+use proxmox_schema::api;
+use proxmox_section_config::typed::SectionConfigData;
+use proxmox_sortable_macro::sortable;
+
+use pdm_api_types::remotes::{Remote, REMOTE_ID_SCHEMA};
+use pdm_api_types::subscription::{
+    pick_best_pve_socket_key, socket_count_from_key, ClearPendingResult, ProductType,
+    ProposedAssignment, RemoteNodeStatus, SubscriptionKeyEntry, SubscriptionKeySource,
+    SubscriptionLevel, SUBSCRIPTION_KEY_SCHEMA,
+};
+use pdm_api_types::{
+    Authid, NODE_SCHEMA, PRIV_RESOURCE_AUDIT, PRIV_RESOURCE_MODIFY, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+use crate::api::resources::{
+    get_subscription_info_for_remote, invalidate_subscription_info_for_remote,
+};
+
+pub const ROUTER: Router = Router::new()
+    .get(&list_subdirs_api_method!(SUBDIRS))
+    .subdirs(SUBDIRS);
+
+#[sortable]
+const SUBDIRS: SubdirMap = &sorted!([
+    (
+        "apply-pending",
+        &Router::new().post(&API_METHOD_APPLY_PENDING)
+    ),
+    ("auto-assign", &Router::new().post(&API_METHOD_AUTO_ASSIGN)),
+    (
+        "clear-pending",
+        &Router::new().post(&API_METHOD_CLEAR_PENDING)
+    ),
+    ("keys", &KEYS_ROUTER),
+    ("node-status", &Router::new().get(&API_METHOD_NODE_STATUS)),
+]);
+
+const KEYS_ROUTER: Router = Router::new()
+    .get(&API_METHOD_LIST_KEYS)
+    .post(&API_METHOD_ADD_KEYS)
+    .match_all("key", &KEY_ITEM_ROUTER);
+
+const KEY_ITEM_ROUTER: Router = Router::new()
+    .get(&API_METHOD_GET_KEY)
+    .put(&API_METHOD_ASSIGN_KEY)
+    .delete(&API_METHOD_DELETE_KEY);
+
+/// Force-fresh node-status query so the next view reflects the new state instead of returning a
+/// cached entry up to 5 minutes later. Used by auto-assign / apply-pending / clear-pending to
+/// avoid double-driving a node that has already moved to Active in the cache window.
+const FRESH_NODE_STATUS_MAX_AGE: u64 = 0;
+
+/// Cached node-status freshness used by read-only views. Five minutes matches the resource-cache
+/// convention and is short enough that admins rarely see stale data on the panel.
+const PANEL_NODE_STATUS_MAX_AGE: u64 = 5 * 60;
+
+#[api(
+    returns: {
+        type: Array,
+        description: "List of subscription keys in the pool.",
+        items: { type: SubscriptionKeyEntry },
+    },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+    },
+)]
+/// List all subscription keys in the key pool.
+fn list_keys(rpcenv: &mut dyn RpcEnvironment) -> Result<Vec<SubscriptionKeyEntry>, Error> {
+    let (config, digest) = pdm_config::subscriptions::config()?;
+    rpcenv["digest"] = digest.to_hex().into();
+    Ok(config
+        .into_iter()
+        .map(|(_id, mut entry)| {
+            entry.level = SubscriptionLevel::from_key(Some(&entry.key));
+            entry
+        })
+        .collect())
+}
+
+#[api(
+    input: {
+        properties: {
+            keys: {
+                type: Array,
+                description: "Subscription keys to add to the pool.",
+                items: { schema: SUBSCRIPTION_KEY_SCHEMA },
+            },
+            digest: {
+                type: ConfigDigest,
+                optional: true,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Add one or more subscription keys to the pool.
+///
+/// The key prefix determines the product type via [`ProductType::from_key`]. The schema regex
+/// rejects anything that isn't a PVE or PBS key today; widen [`PRODUCT_KEY_REGEX`] in lockstep
+/// with `from_key` and `push_key_to_remote` when PMG/POM remote support lands.
+///
+/// All-or-nothing: every key is validated for prefix and uniqueness (against the existing pool
+/// and within the input list) before any change is persisted. A single bad key fails the
+/// request and leaves the pool untouched.
+fn add_keys(keys: Vec<String>, digest: Option<ConfigDigest>) -> Result<(), Error> {
+    if keys.is_empty() {
+        http_bail!(BAD_REQUEST, "no keys provided");
+    }
+
+    let mut entries: Vec<SubscriptionKeyEntry> = Vec::with_capacity(keys.len());
+    let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
+    for key in &keys {
+        if !seen.insert(key.as_str()) {
+            http_bail!(BAD_REQUEST, "duplicate key in input: '{key}'");
+        }
+        let product_type = ProductType::from_key(key).ok_or_else(|| {
+            warn!("rejecting unrecognised key prefix '{key}', possibly a new product line");
+            format_err!("unrecognised key format: {key}")
+        })?;
+        entries.push(SubscriptionKeyEntry {
+            key: key.clone(),
+            product_type,
+            level: SubscriptionLevel::from_key(Some(key)),
+            source: SubscriptionKeySource::Manual,
+            ..Default::default()
+        });
+    }
+
+    let _lock = pdm_config::subscriptions::lock_config()?;
+    let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+    config_digest.detect_modification(digest.as_ref())?;
+
+    for entry in &entries {
+        if config.contains_key(&entry.key) {
+            http_bail!(CONFLICT, "key '{}' already exists in pool", entry.key);
+        }
+    }
+
+    for entry in entries {
+        config.insert(entry.key.clone(), entry);
+    }
+
+    pdm_config::subscriptions::save_config(&config)
+}
+
+#[api(
+    input: {
+        properties: {
+            key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+        },
+    },
+    returns: { type: SubscriptionKeyEntry },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+    },
+)]
+/// Get details for a single key.
+fn get_key(key: String, rpcenv: &mut dyn RpcEnvironment) -> Result<SubscriptionKeyEntry, Error> {
+    let (config, digest) = pdm_config::subscriptions::config()?;
+    rpcenv["digest"] = digest.to_hex().into();
+    let mut entry = config
+        .get(&key)
+        .cloned()
+        .ok_or_else(|| http_err!(NOT_FOUND, "key '{key}' not found in pool"))?;
+    entry.level = SubscriptionLevel::from_key(Some(&entry.key));
+    Ok(entry)
+}
+
+#[api(
+    input: {
+        properties: {
+            key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+            digest: {
+                type: ConfigDigest,
+                optional: true,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Remove a key from the pool.
+///
+/// If the key is currently assigned to a remote node, the caller must also have
+/// `PRIV_RESOURCE_MODIFY` on that remote, so an audit-only operator cannot release a key
+/// another admin had pinned. Refuses if the key is currently the live active key on its bound
+/// node, since dropping the pool entry would orphan that subscription on the remote: the
+/// operator must Reissue Key first.
+async fn delete_key(
+    key: String,
+    digest: Option<ConfigDigest>,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+    let auth_id: Authid = rpcenv
+        .get_auth_id()
+        .context("no authid available")?
+        .parse()?;
+    let user_info = CachedUserInfo::new()?;
+
+    // Live fetch must happen before the lock since the lock cannot span an .await; the
+    // post-lock check below only refuses if the binding still matches what we observed.
+    let synced_block = check_synced_assignment_for_unassign(&key).await?;
+
+    let _lock = pdm_config::subscriptions::lock_config()?;
+    let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+    config_digest.detect_modification(digest.as_ref())?;
+    let mut shadow = pdm_config::subscriptions::shadow_config()?;
+
+    let Some(entry) = config.get(&key) else {
+        http_bail!(NOT_FOUND, "key '{key}' not found in pool");
+    };
+
+    if let Some(assigned_remote) = entry.remote.as_deref() {
+        user_info.check_privs(
+            &auth_id,
+            &["resource", assigned_remote],
+            PRIV_RESOURCE_MODIFY,
+            false,
+        )?;
+    }
+
+    if let Some((blocking_remote, blocking_node)) = synced_block {
+        if entry.remote.as_deref() == Some(blocking_remote.as_str())
+            && entry.node.as_deref() == Some(blocking_node.as_str())
+        {
+            http_bail!(
+                BAD_REQUEST,
+                "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \
+                 use Reissue Key to remove it from the remote first"
+            );
+        }
+    }
+
+    config.remove(&key);
+    // Cascade the shadow drop. Order mirrors `pdm-config/src/remotes.rs`: shadow first, then
+    // main, so a partial failure cannot leave a key entry that lost its signed blob.
+    shadow.remove(&key);
+    pdm_config::subscriptions::save_shadow(&shadow)?;
+    pdm_config::subscriptions::save_config(&config)?;
+
+    Ok(())
+}
+
+#[api(
+    input: {
+        properties: {
+            key: { schema: SUBSCRIPTION_KEY_SCHEMA },
+            remote: {
+                schema: REMOTE_ID_SCHEMA,
+                optional: true,
+            },
+            node: {
+                // NODE_SCHEMA rejects path-traversal input before it ends up interpolated into
+                // the remote URL `/api2/extjs/nodes/{node}/subscription`.
+                schema: NODE_SCHEMA,
+                optional: true,
+            },
+            digest: {
+                type: ConfigDigest,
+                optional: true,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Assign a key to a remote node, or unassign it (omit remote and node).
+///
+/// `PRIV_SYS_MODIFY` lets the caller touch the pool config; per-remote `PRIV_RESOURCE_MODIFY`
+/// is enforced inside this handler so an operator cannot push a key to a remote they have no
+/// other authority on.
+async fn assign_key(
+    key: String,
+    remote: Option<String>,
+    node: Option<String>,
+    digest: Option<ConfigDigest>,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+    let auth_id: Authid = rpcenv
+        .get_auth_id()
+        .context("no authid available")?
+        .parse()?;
+    let user_info = CachedUserInfo::new()?;
+
+    // Best-effort orphan-prevention for the unassign path: if the entry is currently bound and
+    // synced (the assigned key is the live current_key on its remote), refuse and direct the
+    // operator to Reissue Key. The live fetch must happen before the lock since that lock cannot
+    // span an .await; we re-check the binding under the lock and only refuse if it still
+    // matches what we observed live.
+    let synced_block = if remote.is_none() && node.is_none() {
+        check_synced_assignment_for_unassign(&key).await?
+    } else {
+        None
+    };
+
+    let _lock = pdm_config::subscriptions::lock_config()?;
+    let (mut config, config_digest) = pdm_config::subscriptions::config()?;
+    config_digest.detect_modification(digest.as_ref())?;
+
+    let Some(stored_entry) = config.get(&key).cloned() else {
+        http_bail!(NOT_FOUND, "key '{key}' not found in pool");
+    };
+    let product_type = stored_entry.product_type;
+
+    match (&remote, &node) {
+        (Some(remote_name), Some(node_name)) => {
+            user_info.check_privs(
+                &auth_id,
+                &["resource", remote_name],
+                PRIV_RESOURCE_MODIFY,
+                false,
+            )?;
+
+            // Reassigning away from a previous remote requires modify on that remote too,
+            // otherwise an audit-only-on-A operator could effectively pull a key off A by
+            // re-binding it to a remote B they can modify and applying the push (which makes
+            // the shop reissue the serverid to B and invalidates A).
+            if let Some(prev_remote) = stored_entry.remote.as_deref() {
+                if prev_remote != remote_name {
+                    user_info.check_privs(
+                        &auth_id,
+                        &["resource", prev_remote],
+                        PRIV_RESOURCE_MODIFY,
+                        false,
+                    )?;
+                }
+            }
+
+            let (remotes_config, _) = pdm_config::remotes::config()?;
+            let remote_entry = remotes_config
+                .get(remote_name)
+                .ok_or_else(|| http_err!(NOT_FOUND, "remote '{remote_name}' not found"))?;
+
+            if !product_type.matches_remote_type(remote_entry.ty) {
+                http_bail!(
+                    BAD_REQUEST,
+                    "key type '{product_type}' does not match remote type '{}'",
+                    remote_entry.ty
+                );
+            }
+
+            for (_id, other) in config.iter() {
+                if other.key != key
+                    && other.remote.as_deref() == Some(remote_name.as_str())
+                    && other.node.as_deref() == Some(node_name.as_str())
+                {
+                    http_bail!(
+                        CONFLICT,
+                        "key '{}' is already assigned to {remote_name}/{node_name}",
+                        other.key
+                    );
+                }
+            }
+
+            let entry = config.get_mut(&key).unwrap();
+            entry.remote = Some(remote_name.clone());
+            entry.node = Some(node_name.clone());
+        }
+        (None, None) => {
+            // Unassign also requires modify on the previously-pinned remote, so an audit-only
+            // operator cannot rip a key off a node they cannot otherwise touch.
+            if let Some(prev_remote) = stored_entry.remote.as_deref() {
+                user_info.check_privs(
+                    &auth_id,
+                    &["resource", prev_remote],
+                    PRIV_RESOURCE_MODIFY,
+                    false,
+                )?;
+            }
+            // Honour the pre-lock synced check only if the binding still matches what we
+            // observed; if the binding moved between the live fetch and lock, the orphan check
+            // is moot and we let the unassign through.
+            if let Some((blocking_remote, blocking_node)) = synced_block {
+                if stored_entry.remote.as_deref() == Some(blocking_remote.as_str())
+                    && stored_entry.node.as_deref() == Some(blocking_node.as_str())
+                {
+                    http_bail!(
+                        BAD_REQUEST,
+                        "key '{key}' is currently active on {blocking_remote}/{blocking_node}; \
+                         use Reissue Key to remove it from the remote first"
+                    );
+                }
+            }
+            let entry = config.get_mut(&key).unwrap();
+            entry.remote = None;
+            entry.node = None;
+        }
+        _ => {
+            http_bail!(
+                BAD_REQUEST,
+                "both 'remote' and 'node' must be provided, or neither"
+            );
+        }
+    }
+
+    pdm_config::subscriptions::save_config(&config)?;
+
+    Ok(())
+}
+
+/// Pre-lock check for [`assign_key`]'s unassign path: returns the (remote, node) the entry is
+/// currently active on, if any, so the lock-protected branch can refuse the unassign and prompt
+/// the operator to Reissue Key instead. Returns `None` for entries with no binding, no live
+/// subscription, or a live subscription whose key does not match the entry.
+async fn check_synced_assignment_for_unassign(
+    key: &str,
+) -> Result<Option<(String, String)>, Error> {
+    let (config, _) = pdm_config::subscriptions::config()?;
+    let Some(entry) = config.get(key) else {
+        return Ok(None);
+    };
+    let (Some(prev_remote), Some(prev_node)) = (entry.remote.clone(), entry.node.clone()) else {
+        return Ok(None);
+    };
+    let (remotes_config, _) = pdm_config::remotes::config()?;
+    let Some(remote_entry) = remotes_config.get(&prev_remote) else {
+        return Ok(None);
+    };
+    let live = match get_subscription_info_for_remote(remote_entry, FRESH_NODE_STATUS_MAX_AGE).await
+    {
+        Ok(v) => v,
+        Err(_) => return Ok(None),
+    };
+    let synced = live
+        .get(&prev_node)
+        .and_then(|info| info.as_ref())
+        .map(|info| {
+            info.status == proxmox_subscription::SubscriptionStatus::Active
+                && info.key.as_deref() == Some(key)
+        })
+        .unwrap_or(false);
+    Ok(synced.then_some((prev_remote, prev_node)))
+}
+
+/// Push a single key to its assigned remote node. Operates on a borrowed `Remote` so the
+/// caller can fetch the remotes-config once and reuse it.
+async fn push_key_to_remote(remote: &Remote, key: &str, node_name: &str) -> Result<(), Error> {
+    let product_type =
+        ProductType::from_key(key).ok_or_else(|| format_err!("unrecognised key format: {key}"))?;
+
+    // PVE and PBS share `proxmox_client::Client`, so `make_pbs_client_and_login` works for both;
+    // only the PUT path differs.
+    let path = match product_type {
+        ProductType::Pve => format!("/api2/extjs/nodes/{node_name}/subscription"),
+        ProductType::Pbs => "/api2/extjs/nodes/localhost/subscription".to_string(),
+        ProductType::Pmg | ProductType::Pom => {
+            bail!("PDM cannot push '{product_type}' keys: no remote support yet");
+        }
+    };
+
+    let client = crate::connection::make_pbs_client_and_login(remote).await?;
+
+    client
+        .0
+        .put(&path, &serde_json::json!({ "key": key }))
+        .await?;
+    client.0.post(&path, &serde_json::json!({})).await?;
+
+    info!("pushed key '{key}' to {}/{node_name}", remote.id);
+    Ok(())
+}
+
+#[api(
+    input: {
+        properties: {
+            "max-age": {
+                type: u64,
+                optional: true,
+                description: "Override the cache freshness window in seconds. \
+                              Default 300 for panel views; pass 0 to force a fresh query.",
+            },
+        },
+    },
+    returns: {
+        type: Array,
+        description: "Subscription status of all remote nodes the user can audit.",
+        items: { type: RemoteNodeStatus },
+    },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_AUDIT, false),
+    },
+)]
+/// Get the subscription status of every remote node the caller can audit, combined with key pool
+/// assignment information.
+///
+/// Per-remote `PRIV_RESOURCE_AUDIT` is enforced inside the handler so users only see remotes
+/// they may audit.
+async fn node_status(
+    max_age: Option<u64>,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<RemoteNodeStatus>, Error> {
+    collect_node_status(max_age.unwrap_or(PANEL_NODE_STATUS_MAX_AGE), rpcenv).await
+}
+
+/// Shared helper: fan out subscription queries to all remotes the caller has audit privilege on,
+/// in parallel, reusing the per-remote `SUBSCRIPTION_CACHE` via `get_subscription_info_for_remote`.
+/// Joins the results with the key-pool assignment table.
+async fn collect_node_status(
+    max_age: u64,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<RemoteNodeStatus>, Error> {
+    let auth_id: Authid = rpcenv
+        .get_auth_id()
+        .context("no authid available")?
+        .parse()?;
+    let user_info = CachedUserInfo::new()?;
+
+    let visible_remotes: Vec<(String, Remote)> = crate::api::remotes::RemoteIterator::new()?
+        .any_privs(&user_info, &auth_id, PRIV_RESOURCE_AUDIT)
+        .into_iter()
+        .collect();
+
+    let (keys_config, _) = pdm_config::subscriptions::config()?;
+
+    // `get_subscription_info_for_remote` re-uses the per-remote `SUBSCRIPTION_CACHE` so this
+    // fan-out is safe to run concurrently.
+    let fetch = visible_remotes.iter().map(|(name, remote)| async move {
+        let res = get_subscription_info_for_remote(remote, max_age).await;
+        (name.clone(), remote.ty, res)
+    });
+    let results = join_all(fetch).await;
+
+    let mut out = Vec::new();
+    for (remote_name, remote_ty, result) in results {
+        let node_infos = match result {
+            Ok(info) => info,
+            Err(err) => {
+                warn!("failed to query subscription for remote {remote_name}: {err}");
+                continue;
+            }
+        };
+
+        for (node_name, node_info) in &node_infos {
+            let (status, level, sockets, current_key) = match node_info {
+                Some(info) => (info.status, info.level, info.sockets, info.key.clone()),
+                None => (
+                    proxmox_subscription::SubscriptionStatus::NotFound,
+                    SubscriptionLevel::None,
+                    None,
+                    None,
+                ),
+            };
+
+            let assigned_key = keys_config
+                .iter()
+                .find(|(_id, entry)| {
+                    entry.remote.as_deref() == Some(remote_name.as_str())
+                        && entry.node.as_deref() == Some(node_name.as_str())
+                })
+                .map(|(_id, entry)| entry.key.clone());
+
+            out.push(RemoteNodeStatus {
+                remote: remote_name.clone(),
+                ty: remote_ty,
+                node: node_name.to_string(),
+                sockets,
+                status,
+                level,
+                assigned_key,
+                current_key,
+            });
+        }
+    }
+
+    out.sort_by(|a, b| (&a.remote, &a.node).cmp(&(&b.remote, &b.node)));
+    Ok(out)
+}
+
+#[api(
+    input: {
+        properties: {
+            apply: {
+                type: bool,
+                optional: true,
+                default: false,
+                description: "Actually apply the proposed assignments. Without this, only a preview is returned.",
+            },
+        },
+    },
+    returns: {
+        type: Array,
+        description: "List of proposed or applied assignments.",
+        items: { type: ProposedAssignment },
+    },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Propose or apply automatic key-to-node assignments.
+///
+/// Matches unused pool keys to remote nodes that do not yet have a pool-assigned key, picking
+/// the smallest PVE key that covers each node's socket count. When `apply=true`, the live node
+/// statuses are fetched first (without holding the config lock - sync locks must not span
+/// awaits), then proposals are computed and persisted under the lock with a per-key re-check
+/// against the now-current pool state, so a parallel admin edit between fetch and apply does
+/// not get silently overwritten.
+async fn auto_assign(
+    apply: Option<bool>,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<ProposedAssignment>, Error> {
+    let apply = apply.unwrap_or(false);
+
+    let auth_id: Authid = rpcenv
+        .get_auth_id()
+        .context("no authid available")?
+        .parse()?;
+    let user_info = CachedUserInfo::new()?;
+
+    let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+
+    if !apply {
+        let (config, _digest) = pdm_config::subscriptions::config()?;
+        return Ok(compute_proposals(&config, &node_statuses));
+    }
+
+    let _lock = pdm_config::subscriptions::lock_config()?;
+    let (mut config, _digest) = pdm_config::subscriptions::config()?;
+    let mut proposals = compute_proposals(&config, &node_statuses);
+
+    // Audit-only callers may see a remote in the preview but must not be able to stage a write
+    // for it that another admin would later push on their behalf.
+    proposals.retain(|p| {
+        user_info.lookup_privs(&auth_id, &["resource", &p.remote]) & PRIV_RESOURCE_MODIFY != 0
+    });
+
+    for p in &proposals {
+        if let Some(entry) = config.get_mut(&p.key) {
+            // Skip keys that another writer assigned between the preview and the lock.
+            if entry.remote.is_none() {
+                entry.remote = Some(p.remote.clone());
+                entry.node = Some(p.node.clone());
+            }
+        }
+    }
+    pdm_config::subscriptions::save_config(&config)?;
+
+    Ok(proposals)
+}
+
+fn compute_proposals(
+    config: &SectionConfigData<SubscriptionKeyEntry>,
+    node_statuses: &[RemoteNodeStatus],
+) -> Vec<ProposedAssignment> {
+    let mut target_nodes: Vec<&RemoteNodeStatus> = node_statuses
+        .iter()
+        .filter(|n| {
+            n.assigned_key.is_none() && n.status != proxmox_subscription::SubscriptionStatus::Active
+        })
+        .collect();
+
+    // Sort PVE nodes by socket count descending so large nodes get keys first.
+    target_nodes.sort_by(|a, b| b.sockets.unwrap_or(0).cmp(&a.sockets.unwrap_or(0)));
+
+    let mut proposals: Vec<ProposedAssignment> = Vec::new();
+    let mut taken: std::collections::HashSet<String> = std::collections::HashSet::new();
+
+    for node in &target_nodes {
+        let remote_type = node.ty;
+
+        let candidates = config.iter().filter(|(id, entry)| {
+            entry.remote.is_none()
+                && !taken.contains(*id)
+                && entry.product_type.matches_remote_type(remote_type)
+        });
+
+        let best_key = if remote_type == pdm_api_types::remotes::RemoteType::Pve {
+            let node_sockets = node.sockets.unwrap_or(1) as u32;
+            pick_best_pve_socket_key(
+                node_sockets,
+                candidates.map(|(id, entry)| (id.to_string(), entry.key.as_str())),
+            )
+        } else {
+            candidates.map(|(id, _)| id.to_string()).next()
+        };
+
+        if let Some(key_id) = best_key {
+            let ks = config
+                .get(&key_id)
+                .and_then(|e| socket_count_from_key(&e.key));
+            taken.insert(key_id.clone());
+            proposals.push(ProposedAssignment {
+                key: key_id,
+                remote: node.remote.clone(),
+                node: node.node.clone(),
+                key_sockets: ks,
+                node_sockets: node.sockets,
+            });
+        }
+    }
+
+    proposals
+}
+
+#[api(
+    returns: {
+        type: String,
+        optional: true,
+        description: "Task UPID; absent when there is nothing pending to push.",
+    },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Push every pending key assignment to its remote node.
+///
+/// Pending = the live node does not confirm the assigned key as its current active subscription
+/// (status not Active, a different `current_key`, or the remote did not respond / is gone). Each
+/// push is logged from a worker task so the admin can follow progress.
+///
+/// The worker bails on the first failure; the remaining entries stay pending so the operator
+/// can fix the underlying issue (or clear that one assignment) and trigger another apply.
+///
+/// Returns `None` when nothing is pending so the caller can show a short info message instead of
+/// opening a task progress dialog for a no-op worker.
+async fn apply_pending(rpcenv: &mut dyn RpcEnvironment) -> Result<Option<String>, Error> {
+    let auth_id: Authid = rpcenv
+        .get_auth_id()
+        .context("no authid available")?
+        .parse()?;
+    let user_info = CachedUserInfo::new()?;
+
+    let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+    let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+    if pending.is_empty() {
+        return Ok(None);
+    }
+
+    let worker_auth = auth_id.clone();
+    let upid = proxmox_rest_server::WorkerTask::spawn(
+        "subscription-push",
+        None,
+        auth_id.to_string(),
+        true,
+        move |_worker| async move { run_apply_pending(worker_auth).await },
+    )?;
+
+    Ok(Some(upid))
+}
+
+/// Re-validate and run the apply-pending plan from inside a worker.
+///
+/// The worker re-reads remotes and the pool config so a reassign or removal between the API call
+/// returning a UPID and the worker firing is honoured (pushing the old key to a node after the
+/// operator retracted the assignment was a real footgun).
+async fn run_apply_pending(auth_id: Authid) -> Result<(), Error> {
+    let user_info = CachedUserInfo::new()?;
+    let (remotes_config, _) = pdm_config::remotes::config()?;
+    let (config, _) = pdm_config::subscriptions::config()?;
+
+    let node_statuses = collect_status_uncached(&remotes_config).await;
+    let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+    if pending.is_empty() {
+        info!("apply-pending: nothing to do (state changed since the API call)");
+        return Ok(());
+    }
+
+    let total = pending.len();
+    let mut ok = 0usize;
+
+    for entry in pending {
+        let Some(remote) = remotes_config.get(&entry.remote) else {
+            bail!(
+                "remote '{}' vanished, aborting after {ok}/{total} successful pushes",
+                entry.remote,
+            );
+        };
+        // Honour the case where the operator unassigned the key while the worker was queued.
+        if !pool_assignment_still_valid(&config, &entry) {
+            info!(
+                "skipping {}/{}: pool assignment changed before worker ran",
+                entry.remote, entry.node
+            );
+            continue;
+        }
+
+        info!(
+            "pushing {} to {}/{}...",
+            entry.key, entry.remote, entry.node
+        );
+        if let Err(err) = push_key_to_remote(remote, &entry.key, &entry.node).await {
+            bail!(
+                "push of {} to {}/{} failed after {ok}/{total} successful pushes: {err}",
+                entry.key,
+                entry.remote,
+                entry.node,
+            );
+        }
+        info!("  success");
+        invalidate_subscription_info_for_remote(&entry.remote);
+        ok += 1;
+    }
+
+    info!("finished: {ok}/{total} pushes succeeded");
+    Ok(())
+}
+
+#[api(
+    returns: { type: ClearPendingResult },
+    access: {
+        permission: &Permission::Privilege(&["system"], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Clear every pending assignment in one bulk transaction.
+///
+/// Pending = pool key bound to a remote node whose live state does not confirm the assignment
+/// (status not Active, a different `current_key`, or no row returned at all because the remote is
+/// unreachable / the node is gone). Clears only those entries the caller has
+/// `PRIV_RESOURCE_MODIFY` on; remotes the caller may only audit are skipped. Mirrors
+/// `apply-pending` but drops the assignments instead of pushing them, so an operator can disown
+/// stuck assignments without first having to bring the target back online.
+async fn clear_pending(rpcenv: &mut dyn RpcEnvironment) -> Result<ClearPendingResult, Error> {
+    let auth_id: Authid = rpcenv
+        .get_auth_id()
+        .context("no authid available")?
+        .parse()?;
+    let user_info = CachedUserInfo::new()?;
+
+    let node_statuses = collect_node_status(FRESH_NODE_STATUS_MAX_AGE, rpcenv).await?;
+    let pending = compute_pending(&user_info, &auth_id, &node_statuses)?;
+
+    if pending.is_empty() {
+        return Ok(ClearPendingResult { cleared: 0 });
+    }
+
+    let _lock = pdm_config::subscriptions::lock_config()?;
+    let (mut config, _digest) = pdm_config::subscriptions::config()?;
+
+    let mut cleared: u32 = 0;
+    for entry in &pending {
+        // Re-check inside the lock so a concurrent reassign is not silently overwritten.
+        if let Some(stored) = config.get_mut(&entry.key) {
+            if stored.remote.as_deref() == Some(entry.remote.as_str())
+                && stored.node.as_deref() == Some(entry.node.as_str())
+            {
+                stored.remote = None;
+                stored.node = None;
+                cleared += 1;
+            }
+        }
+    }
+
+    if cleared > 0 {
+        pdm_config::subscriptions::save_config(&config)?;
+    }
+
+    Ok(ClearPendingResult { cleared })
+}
+
+/// Plan entry for one pending push.
+#[derive(Clone, Debug)]
+struct PendingEntry {
+    key: String,
+    remote: String,
+    node: String,
+}
+
+fn compute_pending(
+    user_info: &CachedUserInfo,
+    auth_id: &Authid,
+    node_statuses: &[RemoteNodeStatus],
+) -> Result<Vec<PendingEntry>, Error> {
+    let (config, _) = pdm_config::subscriptions::config()?;
+
+    Ok(config
+        .iter()
+        .filter_map(|(_id, entry)| {
+            let remote = entry.remote.as_deref()?;
+            let node = entry.node.as_deref()?;
+
+            if user_info.lookup_privs(auth_id, &["resource", remote]) & PRIV_RESOURCE_MODIFY == 0 {
+                return None;
+            }
+
+            // Treat anything other than "Active with the assigned key as the live current_key"
+            // as pending, including unreachable remotes, so an operator can clear stuck
+            // assignments without first having to bring the target back online.
+            let is_pending = match node_statuses
+                .iter()
+                .find(|n| n.remote == remote && n.node == node)
+            {
+                Some(n) => {
+                    n.status != proxmox_subscription::SubscriptionStatus::Active
+                        || n.current_key.as_deref() != Some(entry.key.as_str())
+                }
+                None => true,
+            };
+
+            is_pending.then(|| PendingEntry {
+                key: entry.key.clone(),
+                remote: remote.to_string(),
+                node: node.to_string(),
+            })
+        })
+        .collect())
+}
+
+fn pool_assignment_still_valid(
+    config: &SectionConfigData<SubscriptionKeyEntry>,
+    entry: &PendingEntry,
+) -> bool {
+    let Some(stored) = config.get(&entry.key) else {
+        return false;
+    };
+    stored.remote.as_deref() == Some(entry.remote.as_str())
+        && stored.node.as_deref() == Some(entry.node.as_str())
+}
+
+/// Like [`collect_node_status`] but bypasses the auth filter, for the apply-pending worker
+/// which gates each entry through its own per-remote priv check based on the persisted pool plan.
+async fn collect_status_uncached(
+    remotes_config: &SectionConfigData<Remote>,
+) -> Vec<RemoteNodeStatus> {
+    let fetch = remotes_config.iter().map(|(name, remote)| async move {
+        let res = get_subscription_info_for_remote(remote, FRESH_NODE_STATUS_MAX_AGE).await;
+        (name.to_string(), remote.ty, res)
+    });
+    let results = join_all(fetch).await;
+
+    let mut out = Vec::new();
+    for (remote_name, remote_ty, result) in results {
+        let Ok(node_infos) = result else { continue };
+        for (node_name, node_info) in &node_infos {
+            let (status, level, sockets, current_key) = match node_info {
+                Some(info) => (info.status, info.level, info.sockets, info.key.clone()),
+                None => (
+                    proxmox_subscription::SubscriptionStatus::NotFound,
+                    SubscriptionLevel::None,
+                    None,
+                    None,
+                ),
+            };
+            out.push(RemoteNodeStatus {
+                remote: remote_name.clone(),
+                ty: remote_ty,
+                node: node_name.to_string(),
+                sockets,
+                status,
+                level,
+                assigned_key: None,
+                current_key,
+            });
+        }
+    }
+    out
+}
-- 
2.47.3





  parent reply	other threads:[~2026-05-07  7:25 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-07  7:17 [PATCH 0/8] subscription: add central key pool registry with reissue support Thomas Lamprecht
2026-05-07  7:17 ` [PATCH 1/8] api: subscription cache: ensure max_age=0 forces a fresh fetch Thomas Lamprecht
2026-05-07  7:17 ` [PATCH 2/8] api types: subscription level: render full names Thomas Lamprecht
2026-05-07  7:17 ` [PATCH 3/8] subscription: add key pool data model and config layer Thomas Lamprecht
2026-05-07  7:17 ` Thomas Lamprecht [this message]
2026-05-07  7:17 ` [PATCH 5/8] ui: add subscription registry with key pool and node status Thomas Lamprecht
2026-05-07  8:15   ` Lukas Wagner
2026-05-07  8:33     ` Thomas Lamprecht
2026-05-07  7:17 ` [PATCH 6/8] cli: add subscription key pool management subcommands Thomas Lamprecht
2026-05-07  7:17 ` [PATCH 7/8] docs: add subscription registry chapter Thomas Lamprecht
2026-05-07  7:17 ` [PATCH 8/8] subscription: add Reissue Key action with pending-reissue queue Thomas Lamprecht
2026-05-07  7:50   ` Lukas Wagner
2026-05-07  8:38 ` superseded: [PATCH 0/8] subscription: add central key pool registry with reissue support Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260507072436.2649563-5-t.lamprecht@proxmox.com \
    --to=t.lamprecht@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal