From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id EB47A1FF17C for ; Wed, 3 Sep 2025 11:36:57 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id D2C262FA1B; Wed, 3 Sep 2025 11:37:12 +0200 (CEST) Mime-Version: 1.0 Date: Wed, 03 Sep 2025 11:37:07 +0200 Message-Id: To: "Proxmox Datacenter Manager development discussion" , "Stefan Hanreich" X-Mailer: aerc 0.20.0 References: <20250902140956.228031-1-s.hanreich@proxmox.com> <20250902140956.228031-17-s.hanreich@proxmox.com> In-Reply-To: <20250902140956.228031-17-s.hanreich@proxmox.com> From: "Shannon Sterz" X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1756892212791 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.125 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_1 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_3 0.1 random spam to be learned in bayes RCVD_IN_MSPIKE_H2 0.001 Average reputation (+2) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pdm-devel] [PATCH proxmox-datacenter-manager v3 01/15] server: add locked sdn client helpers X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Datacenter Manager development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" On Tue Sep 2, 2025 at 4:09 PM CEST, Stefan Hanreich wrote: > Add a new client that represents a remote with a locked SDN > configuration. It works by creating a new PveClient and then locking > the SDN configuration via the client. It ensures that, while the lock > is held, all methods are called with the proper lock secret. It also > provides helpers for applying / rollbacking the configuration and > releasing the lock. > > Additionally, a collection type is introduced, that can hold multiple > locked SDN clients. It provides a helper for executing SDN API calls > across multiple remotes and rolling back the SDN configuration of all > remotes if any API command on any remote fails. This client will be > used for making changes across all remotes in PDM worker tasks. > > After applying, we cannot rollback the configuration - so the > collection only logs any errors - waits for the execution to finish on > all remotes and then returns a Result indicating whether the operation > was successful on *all* remotes. > > Signed-off-by: Stefan Hanreich > --- > server/src/lib.rs | 1 + > server/src/sdn_client.rs | 432 +++++++++++++++++++++++++++++++++++++++ > 2 files changed, 433 insertions(+) > create mode 100644 server/src/sdn_client.rs > > diff --git a/server/src/lib.rs b/server/src/lib.rs > index 3f8b770..33213e1 100644 > --- a/server/src/lib.rs > +++ b/server/src/lib.rs > @@ -14,6 +14,7 @@ pub mod task_utils; > > pub mod connection; > pub mod pbs_client; > +pub mod sdn_client; > > #[cfg(any(remote_config = "faked", test))] > pub mod test_support; > diff --git a/server/src/sdn_client.rs b/server/src/sdn_client.rs > new file mode 100644 > index 0000000..13dfdd8 > --- /dev/null > +++ b/server/src/sdn_client.rs > @@ -0,0 +1,432 @@ > +use std::sync::Arc; > +use std::time::Duration; > + > +use anyhow::{self, bail, Context}; > + > +use futures::{future::join_all, stream::FuturesUnordered, StreamExt, TryFutureExt}; > +use pdm_api_types::{remotes::Remote, RemoteUpid}; > +use pve_api_types::{ > + client::PveClient, CreateSdnLock, CreateVnet, CreateZone, PveUpid, ReleaseSdnLock, ReloadSdn, > + RollbackSdn, > +}; > + > +use crate::api::pve::{connect, get_remote}; > + > +/// Wrapper for [`PveClient`] for representing a locked SDN configuration. > +/// > +/// It stores the client that has been locked, as well as the lock_token that is required for > +/// making changes to the SDN configuration. It provides methods that proxy the respective SDN > +/// endpoints, where it adds the lock_token when making the proxied calls. > +pub struct LockedSdnClient { > + secret: String, small nit: any reason to not name this `lock_token` as well? imo that would avoid some confusion down the line. `secret` is a very generic term. > + client: Arc, > +} > + > +#[derive(Debug)] > +pub enum LockedSdnClientError { > + Client(proxmox_client::Error), > + Other(anyhow::Error), > +} > + > +impl std::error::Error for LockedSdnClientError {} > + > +impl std::fmt::Display for LockedSdnClientError { > + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { > + match self { > + Self::Client(err) => err.fmt(f), > + Self::Other(err) => err.fmt(f), > + } > + } > +} > + > +impl From for LockedSdnClientError { > + fn from(value: proxmox_client::Error) -> Self { > + Self::Client(value) > + } > +} > + > +impl From for LockedSdnClientError { > + fn from(value: anyhow::Error) -> Self { > + Self::Other(value) > + } > +} > + > +impl LockedSdnClient { > + /// Creates a new PveClient for a given [`Remote`] and locks the SDN configuration there. > + /// > + /// # Errors > + /// > + /// This function will return an error if locking the remote fails. > + pub async fn new( > + remote: &Remote, > + allow_pending: impl Into>, > + ) -> Result { > + let client = connect(remote).map_err(LockedSdnClientError::from)?; > + > + let params = CreateSdnLock { > + allow_pending: allow_pending.into(), > + }; > + > + client > + .acquire_sdn_lock(params) > + .await > + .map(|secret| Self { secret, client }) > + .map_err(LockedSdnClientError::from) > + } > + > + /// proxies [`PveClient::create_vnet`] and adds lock_token to the passed parameters before > + /// making the call. > + pub async fn create_vnet(&self, mut params: CreateVnet) -> Result<(), proxmox_client::Error> { > + params.lock_token = Some(self.secret.clone()); > + > + self.client.create_vnet(params).await > + } > + > + /// proxies [`PveClient::create_zone`] and adds lock_token to the passed parameters before > + /// making the call. > + pub async fn create_zone(&self, mut params: CreateZone) -> Result<(), proxmox_client::Error> { > + params.lock_token = Some(self.secret.clone()); > + > + self.client.create_zone(params).await > + } > + > + /// applies the changes made while the client was locked and returns the original [`PveClient`] if the > + /// changes have been applied successfully. > + pub async fn apply_and_release( > + self, > + ) -> Result<(PveUpid, Arc), proxmox_client::Error> { > + let params = ReloadSdn { > + lock_token: Some(self.secret.clone()), > + release_lock: Some(true), > + }; > + > + self.client > + .sdn_apply(params) > + .await > + .map(move |upid| (upid, self.client)) > + } > + > + /// releases the lock on the [`PveClient`] without applying pending changes. > + pub async fn release( > + self, > + force: impl Into>, > + ) -> Result, proxmox_client::Error> { > + let params = ReleaseSdnLock { > + force: force.into(), > + lock_token: Some(self.secret), > + }; > + > + self.client.release_sdn_lock(params).await?; > + Ok(self.client) > + } > + > + /// rolls back all pending changes and then releases the lock > + pub async fn rollback_and_release( > + self, > + ) -> Result, proxmox_client::Error> { > + let params = RollbackSdn { > + lock_token: Some(self.secret), > + release_lock: Some(true), > + }; > + > + self.client.rollback_sdn_changes(params).await?; > + Ok(self.client) > + } > +} > + > +/// Context for [`LockedSdnClient`] stored in [`LockedSdnClients`]. > +pub struct LockedSdnClientContext { > + remote_id: String, > + data: C, > +} > + > +impl LockedSdnClientContext { > + fn new(remote_id: String, data: C) -> Self { > + Self { remote_id, data } > + } > + > + pub fn remote_id(&self) -> &str { > + &self.remote_id > + } > + > + pub fn data(&self) -> &C { > + &self.data > + } > +} > + > +/// A collection abstracting [`LockedSdnClient`] for multiple locked remotes. > +/// > +/// It can be used for running the same command across multiple remotes, while automatically > +/// handling rollback and releasing locks in case of failures across all remotes. If an API call > +/// made to one of the remotes fails, then this client will automatically take care of rolling back > +/// all changes made during the transaction and then releasing the locks. > +pub struct LockedSdnClients { > + clients: Vec<(LockedSdnClient, LockedSdnClientContext)>, > +} > + > +impl LockedSdnClients { > + /// A convenience function for creating locked clients for multiple remotes. > + /// > + /// For each remote a Context can be specified, which will be supplied to all callbacks that > + /// are using this [`LockedSdnClients`] to make calls across all remotes. > + /// > + /// # Errors > + /// > + /// This function will return an error if: > + /// * the remote configuration cannot be read > + /// * any of the supplied remotes is not contained in the configuration > + /// * locking the configuration on any remote fails > + /// > + /// If necessary, the configuration of all remotes will be unlocked, if possible. > + pub async fn from_remote_names>( > + remote_names: I, > + allow_pending: bool, > + ) -> Result { > + let (remote_config, _) = pdm_config::remotes::config()?; > + > + let mut clients = Vec::new(); > + > + for (remote_name, context) in remote_names { > + let remote = get_remote(&remote_config, &remote_name)?; > + proxmox_log::info!("acquiring lock for remote {}", remote.id); > + > + match LockedSdnClient::new(remote, allow_pending).await { > + Ok(client) => { > + let context = LockedSdnClientContext::new(remote_name, context); > + clients.push((client, context)); > + } > + Err(error) => { > + proxmox_log::info!( > + "encountered an error when locking a remote, releasing all locks" > + ); > + > + for (client, _) in clients { > + proxmox_log::info!("releasing lock for remote {}", remote.id); > + > + if let Err(error) = client.release(false).await { > + proxmox_log::error!( > + "could not release lock for remote {}: {error:#}", > + remote.id > + ) > + } > + } > + > + return match &error { > + LockedSdnClientError::Client(proxmox_client::Error::Api(status, _msg)) > + if *status == 501 => > + { > + bail!("remote {} does not support the sdn locking api, please upgrade to PVE 9 or newer!", remote.id) > + } > + _ => Err(error).with_context(|| { > + format!("could not lock sdn configuration for remote {}", remote.id) > + }), > + }; > + } > + }; > + } > + > + clients.sort_by(|(_, ctx_a), (_, ctx_b)| ctx_a.remote_id.cmp(&ctx_b.remote_id)); > + > + Ok(Self { clients }) > + } > + > + /// Executes the given callback for each [`LockedSdnClient`] in this collection. > + /// > + /// On error, it tries to rollback the configuration of *all* locked clients, releases the lock > + /// and returns the error. If rollbacking fails, an error will be logged and no further action > + /// is taken. > + pub async fn for_each(self, callback: F) -> Result > + where > + F: AsyncFn( > + &LockedSdnClient, > + &LockedSdnClientContext, > + ) -> Result<(), proxmox_client::Error>, > + { > + let futures = self.clients.iter().map(|(client, context)| { > + callback(client, context) > + .map_ok(|_| context.remote_id()) > + .map_err(|err| (err, context.remote_id())) > + }); > + > + let mut errors = false; > + > + for result in join_all(futures).await { > + match result { > + Ok(remote_id) => { > + proxmox_log::info!("succcessfully executed transaction on remote {remote_id}"); > + } > + Err((error, remote_id)) => { > + proxmox_log::error!( > + "failed to execute transaction on remote {remote_id}: {error:#}", > + ); > + errors = true; > + } > + } > + } > + > + if errors { > + let mut rollback_futures = FuturesUnordered::new(); > + > + for (client, ctx) in self.clients { > + let ctx = Arc::new(ctx); > + let err_ctx = ctx.clone(); > + > + rollback_futures.push( > + client > + .rollback_and_release() > + .map_ok(|_| ctx) > + .map_err(|err| (err, err_ctx)), > + ); > + } > + > + while let Some(result) = rollback_futures.next().await { > + match result { > + // older versions of PVE 9 potentially return 1 instead of an empty body, which > + // can trigger an BadApi Error in the client. Ignore the error here to work around > + // this issue. > + Ok(ctx) | Err((proxmox_client::Error::BadApi(_, _), ctx)) => { > + proxmox_log::info!( > + "successfully rolled back configuration for remote {}", > + ctx.remote_id() > + ) > + } > + Err((_, ctx)) => { > + proxmox_log::error!( > + "could not rollback and unlock configuration for remote {} - configuration needs to be manually unlocked via 'pvesh delete /cluster/sdn/lock --force 1'", > + ctx.remote_id() > + ) > + } > + } > + } > + > + bail!("running the transaction failed on at least one remote!"); > + } > + > + Ok(self) > + } > + > + // pve-http-server TCP connection timeout is 5 seconds, use a lower amount with some margin for > + // latency in order to avoid re-opening TCP connections for every polling request. > + const POLLING_INTERVAL: Duration = Duration::from_secs(3); > + > + /// Convenience function for polling a running task on a PVE remote. > + /// > + /// It polls a given task on a given node, waiting for the task to finish successfully. > + /// > + /// # Errors > + /// > + /// This function will return an error if: > + /// * There was a problem querying the task status (this does not necessarily mean the task failed). > + /// * The task finished unsuccessfully. > + async fn poll_task( > + node: String, > + upid: RemoteUpid, > + client: Arc, > + ) -> Result { > + loop { > + tokio::time::sleep(Self::POLLING_INTERVAL).await; > + > + let status = client.get_task_status(&node, &upid.upid).await?; > + > + if !status.is_running() { > + if status.finished_successfully() == Some(true) { > + return Ok(upid); > + } else { > + bail!( > + "task did not finish successfully on remote {}", > + upid.remote() > + ); > + } > + } > + } > + } > + > + /// Applies and Reloads the SDN configuration for all locked clients. > + /// > + /// This function tries to apply the SDN configuration for all supplied locked clients and, if > + /// it was successful, to reload the SDN configuration of the remote. It logs success and error > + /// messages via proxmox_log. Rollbacking in cases of failure is no longer possible, so this > + /// function then returns an error if applying or reloading the configuration was unsuccessful > + /// on at least one remote. > + /// > + /// # Errors This function returns an error if applying or reloading the configuration on one > + /// of the remotes failed. It will always wait for all futures to finish and only return an > + /// error afterwards. > + pub async fn apply_and_release(self) -> Result<(), anyhow::Error> { > + let mut futures = FuturesUnordered::new(); > + > + for (client, context) in self.clients { > + let ctx = Arc::new(context); > + let err_ctx = ctx.clone(); > + > + futures.push( > + client > + .apply_and_release() > + .map_ok(|(upid, client)| ((upid, client), ctx)) > + .map_err(|err| (err, err_ctx)), > + ); > + } > + > + let mut reload_futures = FuturesUnordered::new(); > + > + while let Some(result) = futures.next().await { > + match result { > + Ok(((upid, client), ctx)) => { > + proxmox_log::info!( > + "successfully applied sdn config on remote {}", > + ctx.remote_id() > + ); > + > + let Ok(remote_upid) = > + RemoteUpid::try_from((ctx.remote_id(), upid.to_string().as_str())) > + else { > + proxmox_log::error!("invalid UPID received from PVE: {upid}"); > + continue; > + }; > + > + reload_futures.push( > + Self::poll_task(upid.node.clone(), remote_upid, client) > + .map_err(move |err| (err, ctx)), > + ); > + } > + Err((error, ctx)) => { > + proxmox_log::error!( > + "failed to apply sdn configuration on remote {}: {error:#}, not reloading", > + ctx.remote_id() > + ); > + } > + } > + } > + > + proxmox_log::info!( > + "Waiting for reload tasks to finish on all remotes, this can take awhile" > + ); > + > + let mut errors = false; > + > + while let Some(result) = reload_futures.next().await { > + match result { > + Ok(upid) => { > + proxmox_log::info!( > + "successfully reloaded configuration on remote {}", > + upid.remote() > + ); > + } > + Err((error, ctx)) => { > + proxmox_log::error!( > + "could not reload configuration on remote {}: {error:#}", > + ctx.remote_id() > + ); > + > + errors = true; > + } > + } > + } > + > + if errors { > + bail!("failed to apply configuration on at least one remote"); > + } > + > + Ok(()) > + } > +} _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel