From: Wolfgang Bumiller <w.bumiller@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper
Date: Tue, 4 Feb 2025 10:55:48 +0100 [thread overview]
Message-ID: <20250204095554.39501-2-w.bumiller@proxmox.com> (raw)
In-Reply-To: <20250204095554.39501-1-w.bumiller@proxmox.com>
We'll use this to instantiate multiple clients for a pve cluster.
Unfortunately we cannot transparently just "connect to different
nodes" on the connection layer, since different remotes may have
different certificate fingerprints and may (if eg. a reverse proxy is
used) require different `Host:` headers, therefore the entire request
needs to be recreated when we need to switch servers.
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
server/src/connection.rs | 284 +++++++++++++++++++++++++++++++++++++--
1 file changed, 270 insertions(+), 14 deletions(-)
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 0adeba2..767a2f9 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -3,14 +3,21 @@
//! Make sure to call [`init`] to inject a concrete [`ClientFactory`]
//! instance before calling any of the provided functions.
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::Mutex as StdMutex;
use std::sync::OnceLock;
+use std::time::Duration;
use anyhow::{bail, format_err, Error};
use http::uri::Authority;
+use http::Method;
+use serde::Serialize;
-use proxmox_client::{Client, TlsOptions};
+use proxmox_client::{Client, HttpApiClient, HttpApiResponse, HttpApiResponseStream, TlsOptions};
-use pdm_api_types::remotes::{Remote, RemoteType};
+use pdm_api_types::remotes::{NodeUrl, Remote, RemoteType};
use pve_api_types::client::{PveClient, PveClientImpl};
use crate::pbs_client::PbsClient;
@@ -41,6 +48,26 @@ fn prepare_connect_client(
Some(endpoint) => format_err!("{endpoint} not configured for remote"),
None => format_err!("no nodes configured for remote"),
})?;
+
+ let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+ RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+ RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+ };
+
+ let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+
+ Ok(ConnectInfo {
+ client,
+ prefix,
+ perl_compat,
+ })
+}
+
+fn prepare_connect_client_to_node(
+ node: &NodeUrl,
+ default_port: u16,
+ pve_compat: bool,
+) -> Result<Client, Error> {
let mut options = TlsOptions::default();
if let Some(fp) = &node.fingerprint {
@@ -49,11 +76,6 @@ fn prepare_connect_client(
let host_port: Authority = node.hostname.parse()?;
- let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
- RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
- RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
- };
-
let uri: http::uri::Uri = format!(
"https://{}:{}",
host_port.host(),
@@ -64,12 +86,7 @@ fn prepare_connect_client(
let mut client =
proxmox_client::Client::with_options(uri.clone(), options, Default::default())?;
client.set_pve_compatibility(pve_compat);
-
- Ok(ConnectInfo {
- client,
- prefix,
- perl_compat,
- })
+ Ok(client)
}
/// Constructs a [`Client`] for the given [`Remote`] for an API token
@@ -92,6 +109,33 @@ fn connect(remote: &Remote, target_endpoint: Option<&str>) -> Result<Client, any
Ok(client)
}
+/// Like [`connect()`], but for remotes which have multiple clients.
+fn multi_connect(remote: &Remote) -> Result<MultiClient, anyhow::Error> {
+ let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+ RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+ RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+ };
+
+ let mut clients = Vec::new();
+
+ for node in &remote.nodes {
+ let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+ client.set_authentication(proxmox_client::Token {
+ userid: remote.authid.to_string(),
+ prefix: prefix.clone(),
+ value: remote.token.to_string(),
+ perl_compat,
+ });
+ clients.push(Arc::new(client));
+ }
+
+ if clients.is_empty() {
+ bail!("no nodes configured for remote");
+ }
+
+ Ok(MultiClient::new(clients))
+}
+
/// Constructs a [`Client`] for the given [`Remote`] for an API token or user
///
/// In case the remote has a user configured (instead of an API token), it will connect and get a
@@ -183,7 +227,7 @@ pub struct DefaultClientFactory;
#[async_trait::async_trait]
impl ClientFactory for DefaultClientFactory {
fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
- let client = crate::connection::connect(remote, None)?;
+ let client = crate::connection::multi_connect(remote)?;
Ok(Box::new(PveClientImpl(client)))
}
@@ -279,3 +323,215 @@ pub fn init(instance: Box<dyn ClientFactory + Send + Sync>) {
panic!("connection factory instance already set");
}
}
+
+/// This is another wrapper around the actual HTTP client responsible for dealing with connection
+/// problems: if we cannot reach a node of a cluster, this will attempt to retry a request on
+/// another node.
+///
+/// # Possible improvements
+///
+/// - For `GET` requests we could also start a 2nd request after a shorter time out (eg. 10s).
+/// - We could use RRD data for a "best guess" where to start eg. if we know a node was offline on
+/// the last rrd polling we'd start with a different one.
+/// For this, we still need to include the node names in the clients here somehow so that we can
+/// actually manage/re-shuffle them from the outside after this struct is already created.
+struct MultiClient {
+ state: StdMutex<MultiClientState>,
+ timeout: Duration,
+}
+
+impl MultiClient {
+ fn new(clients: Vec<Arc<Client>>) -> Self {
+ Self {
+ state: StdMutex::new(MultiClientState::new(clients)),
+ timeout: Duration::from_secs(60),
+ }
+ }
+}
+
+/// Keeps track of which client (iow. which specific node of a remote) we're supposed to be using
+/// right now.
+struct MultiClientState {
+ /// The current index *not* modulo the client count.
+ current: usize,
+ clients: Vec<Arc<Client>>,
+}
+
+impl MultiClientState {
+ fn new(clients: Vec<Arc<Client>>) -> Self {
+ Self {
+ current: 0,
+ clients,
+ }
+ }
+
+ /// Whenever a request fails with the *current* client we move the current entry forward.
+ ///
+ /// # Note:
+ ///
+ /// With our current strategy `failed_index` is always less than `current`, but if we change
+ /// the strategy, we may want to change this to something like `1 + max(current, failed)`.
+ fn failed(&mut self, failed_index: usize) {
+ if self.current == failed_index {
+ self.current = self.current.wrapping_add(1);
+ }
+ }
+
+ /// Get `current` as an *index* (i.e. modulo `clients.len()`).
+ fn index(&self) -> usize {
+ self.current % self.clients.len()
+ }
+
+ /// Get the current client and its index which can be passed to `failed()` if the client fails
+ /// to connect.
+ fn get(&self) -> (Arc<Client>, usize) {
+ let index = self.index();
+ (Arc::clone(&self.clients[index]), self.current)
+ }
+
+ /// Check if we already tried all clients since a specific starting index.
+ ///
+ /// When an API request is made we loop through the possible clients.
+ /// Since multiple requests might be running simultaneously, it's possible that multiple tasks
+ /// mark the same *or* *multiple* clients as failed already.
+ ///
+ /// We don't want to try clients which we know are currently non-functional, so a
+ /// request-retry-loop will fail as soon as the same *number* of clients since its starting
+ /// point were marked as faulty without retrying them all.
+ fn tried_all_since(&self, start: usize) -> bool {
+ self.tried_clients(start) >= self.clients.len()
+ }
+
+ /// We store the current index continuously without wrapping it modulo the client count (and
+ /// only do that when indexing the `clients` array), so that we can easily check if "all
+ /// currently running tasks taken together" have already tried all clients by comparing our
+ /// starting point to the current index.
+ fn tried_clients(&self, start: usize) -> usize {
+ self.current.wrapping_sub(start)
+ }
+}
+
+impl MultiClient {
+ /// This is the client usage strategy.
+ ///
+ /// This is basically a "generator" for clients to try.
+ ///
+ /// We share the "state" with other tasks. When a client fails, it is "marked" as failed and
+ /// the state "rotates" through the clients.
+ /// We might be skipping clients if other tasks already tried "more" clients, but that's fine,
+ /// since there's no point in trying the same remote twice simultaneously if it is currently
+ /// offline...
+ fn try_clients(&self) -> impl Iterator<Item = Arc<Client>> + '_ {
+ let mut start_current = None;
+ let state = &self.state;
+ std::iter::from_fn(move || {
+ let mut state = state.lock().unwrap();
+ match start_current {
+ None => {
+ // first attempt, just use the current client and remember the starting index
+ let (client, index) = state.get();
+ start_current = Some((index, index));
+ Some(client)
+ }
+ Some((start, current)) => {
+ // If our last request failed, the retry-loop asks for another client, mark the
+ // one we just used as failed and check if all clients have gone through a
+ // retry loop...
+ state.failed(current);
+ if state.tried_all_since(start) {
+ // This iterator (and therefore this retry-loop) has tried all clients.
+ // Give up.
+ return None;
+ }
+ // finally just get the new current client and update `current` for the later
+ // call to `failed()`
+ let (client, current) = state.get();
+ start_current = Some((start, current));
+ Some(client)
+ }
+ }
+ })
+ .fuse()
+ }
+}
+
+// doing this via a generic method is currently tedious as it requires an extra helper trait to
+// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from
+// its input to its generic output future... and then you run into borrow-checker limitations...
+macro_rules! try_request {
+ ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => {
+ let params = $params.map(serde_json::to_value);
+ Box::pin(async move {
+ let params = params
+ .transpose()
+ .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?;
+
+ let mut last_err = None;
+ let mut timed_out = false;
+ // The iterator in use here will automatically mark a client as faulty if we move on to
+ // the `next()` one.
+ for client in $self.try_clients() {
+ if let Some(err) = last_err.take() {
+ log::error!("API client error, trying another remote - {err:?}");
+ }
+ if timed_out {
+ timed_out = false;
+ log::error!("API client timed out, trying another remote");
+ }
+
+ let request = client.$how($method.clone(), $path_and_query, params.as_ref());
+ match tokio::time::timeout($self.timeout, request).await {
+ Ok(Err(proxmox_client::Error::Client(err))) => {
+ last_err = Some(err);
+ }
+ Ok(result) => return result,
+ Err(_) => {
+ timed_out = true;
+ }
+ }
+ }
+
+ Err(proxmox_client::Error::Other(
+ "failed to perform API request",
+ ))
+ })
+ };
+}
+
+impl HttpApiClient for MultiClient {
+ type ResponseFuture<'a> =
+ Pin<Box<dyn Future<Output = Result<HttpApiResponse, proxmox_client::Error>> + Send + 'a>>;
+
+ type ResponseStreamFuture<'a> = Pin<
+ Box<
+ dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, proxmox_client::Error>>
+ + Send
+ + 'a,
+ >,
+ >;
+ type Body = hyper::Body;
+
+ fn request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, request }
+ }
+
+ fn streaming_request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseStreamFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, streaming_request }
+ }
+}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-02-04 9:56 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
2025-02-04 9:55 ` Wolfgang Bumiller [this message]
2025-02-11 14:50 ` [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper Lukas Wagner
2025-02-12 9:07 ` Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 2/7] server: store pve MultiClient for re-use Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 3/7] server: separate ConnectInfo from client creation Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 4/7] server: cache pve node reachability and names Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 5/7] server: don't try to connect to known-unreachable servers Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 6/7] server: try previously unreachable clients as last resort Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 7/7] server: add some tracing instrumentation Wolfgang Bumiller
2025-02-11 14:50 ` [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250204095554.39501-2-w.bumiller@proxmox.com \
--to=w.bumiller@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.