From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id B69291FF15C for <inbox@lore.proxmox.com>; Wed, 5 Mar 2025 16:01:22 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id AA08418E12; Wed, 5 Mar 2025 16:01:16 +0100 (CET) From: Wolfgang Bumiller <w.bumiller@proxmox.com> To: pdm-devel@lists.proxmox.com Date: Wed, 5 Mar 2025 16:01:02 +0100 Message-Id: <20250305150108.245584-2-w.bumiller@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250305150108.245584-1-w.bumiller@proxmox.com> References: <20250305150108.245584-1-w.bumiller@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.082 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [PATCH v2 datacenter-manager 1/7] server: generic multi-client wrapper X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> We'll use this to instantiate multiple clients for a pve cluster. Unfortunately we cannot transparently just "connect to different nodes" on the connection layer, since different remotes may have different certificate fingerprints and may (if eg. a reverse proxy is used) require different `Host:` headers, therefore the entire request needs to be recreated when we need to switch servers. Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com> --- Changes to v1: Also log the final client's error if all clients fail. (Previously this was silently dropped). Noted by Lukas. server/src/connection.rs | 290 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 276 insertions(+), 14 deletions(-) diff --git a/server/src/connection.rs b/server/src/connection.rs index 0adeba2..9f93eb3 100644 --- a/server/src/connection.rs +++ b/server/src/connection.rs @@ -3,14 +3,21 @@ //! Make sure to call [`init`] to inject a concrete [`ClientFactory`] //! instance before calling any of the provided functions. +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::Mutex as StdMutex; use std::sync::OnceLock; +use std::time::Duration; use anyhow::{bail, format_err, Error}; use http::uri::Authority; +use http::Method; +use serde::Serialize; -use proxmox_client::{Client, TlsOptions}; +use proxmox_client::{Client, HttpApiClient, HttpApiResponse, HttpApiResponseStream, TlsOptions}; -use pdm_api_types::remotes::{Remote, RemoteType}; +use pdm_api_types::remotes::{NodeUrl, Remote, RemoteType}; use pve_api_types::client::{PveClient, PveClientImpl}; use crate::pbs_client::PbsClient; @@ -41,6 +48,26 @@ fn prepare_connect_client( Some(endpoint) => format_err!("{endpoint} not configured for remote"), None => format_err!("no nodes configured for remote"), })?; + + let (default_port, prefix, perl_compat, pve_compat) = match remote.ty { + RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true), + RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false), + }; + + let client = prepare_connect_client_to_node(node, default_port, pve_compat)?; + + Ok(ConnectInfo { + client, + prefix, + perl_compat, + }) +} + +fn prepare_connect_client_to_node( + node: &NodeUrl, + default_port: u16, + pve_compat: bool, +) -> Result<Client, Error> { let mut options = TlsOptions::default(); if let Some(fp) = &node.fingerprint { @@ -49,11 +76,6 @@ fn prepare_connect_client( let host_port: Authority = node.hostname.parse()?; - let (default_port, prefix, perl_compat, pve_compat) = match remote.ty { - RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true), - RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false), - }; - let uri: http::uri::Uri = format!( "https://{}:{}", host_port.host(), @@ -64,12 +86,7 @@ fn prepare_connect_client( let mut client = proxmox_client::Client::with_options(uri.clone(), options, Default::default())?; client.set_pve_compatibility(pve_compat); - - Ok(ConnectInfo { - client, - prefix, - perl_compat, - }) + Ok(client) } /// Constructs a [`Client`] for the given [`Remote`] for an API token @@ -92,6 +109,33 @@ fn connect(remote: &Remote, target_endpoint: Option<&str>) -> Result<Client, any Ok(client) } +/// Like [`connect()`], but for remotes which have multiple clients. +fn multi_connect(remote: &Remote) -> Result<MultiClient, anyhow::Error> { + let (default_port, prefix, perl_compat, pve_compat) = match remote.ty { + RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true), + RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false), + }; + + let mut clients = Vec::new(); + + for node in &remote.nodes { + let client = prepare_connect_client_to_node(node, default_port, pve_compat)?; + client.set_authentication(proxmox_client::Token { + userid: remote.authid.to_string(), + prefix: prefix.clone(), + value: remote.token.to_string(), + perl_compat, + }); + clients.push(Arc::new(client)); + } + + if clients.is_empty() { + bail!("no nodes configured for remote"); + } + + Ok(MultiClient::new(clients)) +} + /// Constructs a [`Client`] for the given [`Remote`] for an API token or user /// /// In case the remote has a user configured (instead of an API token), it will connect and get a @@ -183,7 +227,7 @@ pub struct DefaultClientFactory; #[async_trait::async_trait] impl ClientFactory for DefaultClientFactory { fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> { - let client = crate::connection::connect(remote, None)?; + let client = crate::connection::multi_connect(remote)?; Ok(Box::new(PveClientImpl(client))) } @@ -279,3 +323,221 @@ pub fn init(instance: Box<dyn ClientFactory + Send + Sync>) { panic!("connection factory instance already set"); } } + +/// This is another wrapper around the actual HTTP client responsible for dealing with connection +/// problems: if we cannot reach a node of a cluster, this will attempt to retry a request on +/// another node. +/// +/// # Possible improvements +/// +/// - For `GET` requests we could also start a 2nd request after a shorter time out (eg. 10s). +/// - We could use RRD data for a "best guess" where to start eg. if we know a node was offline on +/// the last rrd polling we'd start with a different one. +/// For this, we still need to include the node names in the clients here somehow so that we can +/// actually manage/re-shuffle them from the outside after this struct is already created. +struct MultiClient { + state: StdMutex<MultiClientState>, + timeout: Duration, +} + +impl MultiClient { + fn new(clients: Vec<Arc<Client>>) -> Self { + Self { + state: StdMutex::new(MultiClientState::new(clients)), + timeout: Duration::from_secs(60), + } + } +} + +/// Keeps track of which client (iow. which specific node of a remote) we're supposed to be using +/// right now. +struct MultiClientState { + /// The current index *not* modulo the client count. + current: usize, + clients: Vec<Arc<Client>>, +} + +impl MultiClientState { + fn new(clients: Vec<Arc<Client>>) -> Self { + Self { + current: 0, + clients, + } + } + + /// Whenever a request fails with the *current* client we move the current entry forward. + /// + /// # Note: + /// + /// With our current strategy `failed_index` is always less than `current`, but if we change + /// the strategy, we may want to change this to something like `1 + max(current, failed)`. + fn failed(&mut self, failed_index: usize) { + if self.current == failed_index { + self.current = self.current.wrapping_add(1); + } + } + + /// Get `current` as an *index* (i.e. modulo `clients.len()`). + fn index(&self) -> usize { + self.current % self.clients.len() + } + + /// Get the current client and its index which can be passed to `failed()` if the client fails + /// to connect. + fn get(&self) -> (Arc<Client>, usize) { + let index = self.index(); + (Arc::clone(&self.clients[index]), self.current) + } + + /// Check if we already tried all clients since a specific starting index. + /// + /// When an API request is made we loop through the possible clients. + /// Since multiple requests might be running simultaneously, it's possible that multiple tasks + /// mark the same *or* *multiple* clients as failed already. + /// + /// We don't want to try clients which we know are currently non-functional, so a + /// request-retry-loop will fail as soon as the same *number* of clients since its starting + /// point were marked as faulty without retrying them all. + fn tried_all_since(&self, start: usize) -> bool { + self.tried_clients(start) >= self.clients.len() + } + + /// We store the current index continuously without wrapping it modulo the client count (and + /// only do that when indexing the `clients` array), so that we can easily check if "all + /// currently running tasks taken together" have already tried all clients by comparing our + /// starting point to the current index. + fn tried_clients(&self, start: usize) -> usize { + self.current.wrapping_sub(start) + } +} + +impl MultiClient { + /// This is the client usage strategy. + /// + /// This is basically a "generator" for clients to try. + /// + /// We share the "state" with other tasks. When a client fails, it is "marked" as failed and + /// the state "rotates" through the clients. + /// We might be skipping clients if other tasks already tried "more" clients, but that's fine, + /// since there's no point in trying the same remote twice simultaneously if it is currently + /// offline... + fn try_clients(&self) -> impl Iterator<Item = Arc<Client>> + '_ { + let mut start_current = None; + let state = &self.state; + std::iter::from_fn(move || { + let mut state = state.lock().unwrap(); + match start_current { + None => { + // first attempt, just use the current client and remember the starting index + let (client, index) = state.get(); + start_current = Some((index, index)); + Some(client) + } + Some((start, current)) => { + // If our last request failed, the retry-loop asks for another client, mark the + // one we just used as failed and check if all clients have gone through a + // retry loop... + state.failed(current); + if state.tried_all_since(start) { + // This iterator (and therefore this retry-loop) has tried all clients. + // Give up. + return None; + } + // finally just get the new current client and update `current` for the later + // call to `failed()` + let (client, current) = state.get(); + start_current = Some((start, current)); + Some(client) + } + } + }) + .fuse() + } +} + +// doing this via a generic method is currently tedious as it requires an extra helper trait to +// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from +// its input to its generic output future... and then you run into borrow-checker limitations... +macro_rules! try_request { + ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => { + let params = $params.map(serde_json::to_value); + Box::pin(async move { + let params = params + .transpose() + .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?; + + let mut last_err = None; + let mut timed_out = false; + // The iterator in use here will automatically mark a client as faulty if we move on to + // the `next()` one. + for client in $self.try_clients() { + if let Some(err) = last_err.take() { + log::error!("API client error, trying another remote - {err:?}"); + } + if timed_out { + timed_out = false; + log::error!("API client timed out, trying another remote"); + } + + let request = client.$how($method.clone(), $path_and_query, params.as_ref()); + match tokio::time::timeout($self.timeout, request).await { + Ok(Err(proxmox_client::Error::Client(err))) => { + last_err = Some(err); + } + Ok(result) => return result, + Err(_) => { + timed_out = true; + } + } + } + + if let Some(err) = last_err { + log::error!("API client error (giving up) - {err:?}"); + } else if timed_out { + log::error!("API client timed out, no remotes reachable, giving up"); + } + + Err(proxmox_client::Error::Other( + "failed to perform API request", + )) + }) + }; +} + +impl HttpApiClient for MultiClient { + type ResponseFuture<'a> = + Pin<Box<dyn Future<Output = Result<HttpApiResponse, proxmox_client::Error>> + Send + 'a>>; + + type ResponseStreamFuture<'a> = Pin< + Box< + dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, proxmox_client::Error>> + + Send + + 'a, + >, + >; + type Body = hyper::Body; + + fn request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option<T>, + ) -> Self::ResponseFuture<'a> + where + T: Serialize + 'a, + { + try_request! { self, method, path_and_query, params, request } + } + + fn streaming_request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option<T>, + ) -> Self::ResponseStreamFuture<'a> + where + T: Serialize + 'a, + { + try_request! { self, method, path_and_query, params, streaming_request } + } +} -- 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel