* [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache
@ 2025-02-04 9:55 Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper Wolfgang Bumiller
` (7 more replies)
0 siblings, 8 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-04 9:55 UTC (permalink / raw)
To: pdm-devel
This consists of two parts (and a question (at the end)):
1) Patches 1 through 3:
The `MultiClient` which implements cycling through multiple remotes
when requests fail due to network issues.
2) Patches 4 through 6:
A task caching the remote reachability state as well as mapping
hostnames to the pve-side node-names. Currently this simply runs every
60 seconds and goes through the current remotes+nodes and checks for
reachability.
If at that time the remote.cfg changed and the polling
task is still ongoing it will be aborted and started over with the new
config.
Finally, the reachability information will be used and updated by the
`MultiClient` implementation.
3) Patch 7 is mainly for debugging. I'm not sure whether we want to include
tracing instrumentation in general, or via a `#[cfg_attr]` or only for
debug builds...
It would probably also be nice to implicitly `#[instrument]` all the api
methods so we can follow along a trace from a specific api method.
(Currently I did this only temporarily without committing anything by
attaching, for example, a `#[instrument(name =
"api_method_list_nodes")]` attribute to the `list_nodes` api call to
more easily see the client usage via `journalctl -f
SPAN_NAME=api_method_list_nodes` on the shell...
Wolfgang Bumiller (7):
server: generic multi-client wrapper
server: store pve MultiClient for re-use
server: separate ConnectInfo from client creation
server: cache pve node reachability and names
server: don't try to connect to known-unreachable servers
server: try previously unreachable clients as last resort
server: add some tracing instrumentation
lib/pdm-config/src/config_version_cache.rs | 33 +-
server/src/api/pve/mod.rs | 20 +-
.../main.rs} | 3 +
.../bin/proxmox-datacenter-api/tasks/mod.rs | 1 +
.../tasks/remote_node_mapping.rs | 228 +++++++
server/src/connection.rs | 635 ++++++++++++++++--
server/src/lib.rs | 1 +
server/src/remote_cache/mod.rs | 291 ++++++++
8 files changed, 1140 insertions(+), 72 deletions(-)
rename server/src/bin/{proxmox-datacenter-api.rs => proxmox-datacenter-api/main.rs} (99%)
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/mod.rs
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs
create mode 100644 server/src/remote_cache/mod.rs
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
@ 2025-02-04 9:55 ` Wolfgang Bumiller
2025-02-11 14:50 ` Lukas Wagner
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 2/7] server: store pve MultiClient for re-use Wolfgang Bumiller
` (6 subsequent siblings)
7 siblings, 1 reply; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-04 9:55 UTC (permalink / raw)
To: pdm-devel
We'll use this to instantiate multiple clients for a pve cluster.
Unfortunately we cannot transparently just "connect to different
nodes" on the connection layer, since different remotes may have
different certificate fingerprints and may (if eg. a reverse proxy is
used) require different `Host:` headers, therefore the entire request
needs to be recreated when we need to switch servers.
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
server/src/connection.rs | 284 +++++++++++++++++++++++++++++++++++++--
1 file changed, 270 insertions(+), 14 deletions(-)
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 0adeba2..767a2f9 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -3,14 +3,21 @@
//! Make sure to call [`init`] to inject a concrete [`ClientFactory`]
//! instance before calling any of the provided functions.
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::Mutex as StdMutex;
use std::sync::OnceLock;
+use std::time::Duration;
use anyhow::{bail, format_err, Error};
use http::uri::Authority;
+use http::Method;
+use serde::Serialize;
-use proxmox_client::{Client, TlsOptions};
+use proxmox_client::{Client, HttpApiClient, HttpApiResponse, HttpApiResponseStream, TlsOptions};
-use pdm_api_types::remotes::{Remote, RemoteType};
+use pdm_api_types::remotes::{NodeUrl, Remote, RemoteType};
use pve_api_types::client::{PveClient, PveClientImpl};
use crate::pbs_client::PbsClient;
@@ -41,6 +48,26 @@ fn prepare_connect_client(
Some(endpoint) => format_err!("{endpoint} not configured for remote"),
None => format_err!("no nodes configured for remote"),
})?;
+
+ let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+ RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+ RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+ };
+
+ let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+
+ Ok(ConnectInfo {
+ client,
+ prefix,
+ perl_compat,
+ })
+}
+
+fn prepare_connect_client_to_node(
+ node: &NodeUrl,
+ default_port: u16,
+ pve_compat: bool,
+) -> Result<Client, Error> {
let mut options = TlsOptions::default();
if let Some(fp) = &node.fingerprint {
@@ -49,11 +76,6 @@ fn prepare_connect_client(
let host_port: Authority = node.hostname.parse()?;
- let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
- RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
- RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
- };
-
let uri: http::uri::Uri = format!(
"https://{}:{}",
host_port.host(),
@@ -64,12 +86,7 @@ fn prepare_connect_client(
let mut client =
proxmox_client::Client::with_options(uri.clone(), options, Default::default())?;
client.set_pve_compatibility(pve_compat);
-
- Ok(ConnectInfo {
- client,
- prefix,
- perl_compat,
- })
+ Ok(client)
}
/// Constructs a [`Client`] for the given [`Remote`] for an API token
@@ -92,6 +109,33 @@ fn connect(remote: &Remote, target_endpoint: Option<&str>) -> Result<Client, any
Ok(client)
}
+/// Like [`connect()`], but for remotes which have multiple clients.
+fn multi_connect(remote: &Remote) -> Result<MultiClient, anyhow::Error> {
+ let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+ RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+ RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+ };
+
+ let mut clients = Vec::new();
+
+ for node in &remote.nodes {
+ let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+ client.set_authentication(proxmox_client::Token {
+ userid: remote.authid.to_string(),
+ prefix: prefix.clone(),
+ value: remote.token.to_string(),
+ perl_compat,
+ });
+ clients.push(Arc::new(client));
+ }
+
+ if clients.is_empty() {
+ bail!("no nodes configured for remote");
+ }
+
+ Ok(MultiClient::new(clients))
+}
+
/// Constructs a [`Client`] for the given [`Remote`] for an API token or user
///
/// In case the remote has a user configured (instead of an API token), it will connect and get a
@@ -183,7 +227,7 @@ pub struct DefaultClientFactory;
#[async_trait::async_trait]
impl ClientFactory for DefaultClientFactory {
fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
- let client = crate::connection::connect(remote, None)?;
+ let client = crate::connection::multi_connect(remote)?;
Ok(Box::new(PveClientImpl(client)))
}
@@ -279,3 +323,215 @@ pub fn init(instance: Box<dyn ClientFactory + Send + Sync>) {
panic!("connection factory instance already set");
}
}
+
+/// This is another wrapper around the actual HTTP client responsible for dealing with connection
+/// problems: if we cannot reach a node of a cluster, this will attempt to retry a request on
+/// another node.
+///
+/// # Possible improvements
+///
+/// - For `GET` requests we could also start a 2nd request after a shorter time out (eg. 10s).
+/// - We could use RRD data for a "best guess" where to start eg. if we know a node was offline on
+/// the last rrd polling we'd start with a different one.
+/// For this, we still need to include the node names in the clients here somehow so that we can
+/// actually manage/re-shuffle them from the outside after this struct is already created.
+struct MultiClient {
+ state: StdMutex<MultiClientState>,
+ timeout: Duration,
+}
+
+impl MultiClient {
+ fn new(clients: Vec<Arc<Client>>) -> Self {
+ Self {
+ state: StdMutex::new(MultiClientState::new(clients)),
+ timeout: Duration::from_secs(60),
+ }
+ }
+}
+
+/// Keeps track of which client (iow. which specific node of a remote) we're supposed to be using
+/// right now.
+struct MultiClientState {
+ /// The current index *not* modulo the client count.
+ current: usize,
+ clients: Vec<Arc<Client>>,
+}
+
+impl MultiClientState {
+ fn new(clients: Vec<Arc<Client>>) -> Self {
+ Self {
+ current: 0,
+ clients,
+ }
+ }
+
+ /// Whenever a request fails with the *current* client we move the current entry forward.
+ ///
+ /// # Note:
+ ///
+ /// With our current strategy `failed_index` is always less than `current`, but if we change
+ /// the strategy, we may want to change this to something like `1 + max(current, failed)`.
+ fn failed(&mut self, failed_index: usize) {
+ if self.current == failed_index {
+ self.current = self.current.wrapping_add(1);
+ }
+ }
+
+ /// Get `current` as an *index* (i.e. modulo `clients.len()`).
+ fn index(&self) -> usize {
+ self.current % self.clients.len()
+ }
+
+ /// Get the current client and its index which can be passed to `failed()` if the client fails
+ /// to connect.
+ fn get(&self) -> (Arc<Client>, usize) {
+ let index = self.index();
+ (Arc::clone(&self.clients[index]), self.current)
+ }
+
+ /// Check if we already tried all clients since a specific starting index.
+ ///
+ /// When an API request is made we loop through the possible clients.
+ /// Since multiple requests might be running simultaneously, it's possible that multiple tasks
+ /// mark the same *or* *multiple* clients as failed already.
+ ///
+ /// We don't want to try clients which we know are currently non-functional, so a
+ /// request-retry-loop will fail as soon as the same *number* of clients since its starting
+ /// point were marked as faulty without retrying them all.
+ fn tried_all_since(&self, start: usize) -> bool {
+ self.tried_clients(start) >= self.clients.len()
+ }
+
+ /// We store the current index continuously without wrapping it modulo the client count (and
+ /// only do that when indexing the `clients` array), so that we can easily check if "all
+ /// currently running tasks taken together" have already tried all clients by comparing our
+ /// starting point to the current index.
+ fn tried_clients(&self, start: usize) -> usize {
+ self.current.wrapping_sub(start)
+ }
+}
+
+impl MultiClient {
+ /// This is the client usage strategy.
+ ///
+ /// This is basically a "generator" for clients to try.
+ ///
+ /// We share the "state" with other tasks. When a client fails, it is "marked" as failed and
+ /// the state "rotates" through the clients.
+ /// We might be skipping clients if other tasks already tried "more" clients, but that's fine,
+ /// since there's no point in trying the same remote twice simultaneously if it is currently
+ /// offline...
+ fn try_clients(&self) -> impl Iterator<Item = Arc<Client>> + '_ {
+ let mut start_current = None;
+ let state = &self.state;
+ std::iter::from_fn(move || {
+ let mut state = state.lock().unwrap();
+ match start_current {
+ None => {
+ // first attempt, just use the current client and remember the starting index
+ let (client, index) = state.get();
+ start_current = Some((index, index));
+ Some(client)
+ }
+ Some((start, current)) => {
+ // If our last request failed, the retry-loop asks for another client, mark the
+ // one we just used as failed and check if all clients have gone through a
+ // retry loop...
+ state.failed(current);
+ if state.tried_all_since(start) {
+ // This iterator (and therefore this retry-loop) has tried all clients.
+ // Give up.
+ return None;
+ }
+ // finally just get the new current client and update `current` for the later
+ // call to `failed()`
+ let (client, current) = state.get();
+ start_current = Some((start, current));
+ Some(client)
+ }
+ }
+ })
+ .fuse()
+ }
+}
+
+// doing this via a generic method is currently tedious as it requires an extra helper trait to
+// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from
+// its input to its generic output future... and then you run into borrow-checker limitations...
+macro_rules! try_request {
+ ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => {
+ let params = $params.map(serde_json::to_value);
+ Box::pin(async move {
+ let params = params
+ .transpose()
+ .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?;
+
+ let mut last_err = None;
+ let mut timed_out = false;
+ // The iterator in use here will automatically mark a client as faulty if we move on to
+ // the `next()` one.
+ for client in $self.try_clients() {
+ if let Some(err) = last_err.take() {
+ log::error!("API client error, trying another remote - {err:?}");
+ }
+ if timed_out {
+ timed_out = false;
+ log::error!("API client timed out, trying another remote");
+ }
+
+ let request = client.$how($method.clone(), $path_and_query, params.as_ref());
+ match tokio::time::timeout($self.timeout, request).await {
+ Ok(Err(proxmox_client::Error::Client(err))) => {
+ last_err = Some(err);
+ }
+ Ok(result) => return result,
+ Err(_) => {
+ timed_out = true;
+ }
+ }
+ }
+
+ Err(proxmox_client::Error::Other(
+ "failed to perform API request",
+ ))
+ })
+ };
+}
+
+impl HttpApiClient for MultiClient {
+ type ResponseFuture<'a> =
+ Pin<Box<dyn Future<Output = Result<HttpApiResponse, proxmox_client::Error>> + Send + 'a>>;
+
+ type ResponseStreamFuture<'a> = Pin<
+ Box<
+ dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, proxmox_client::Error>>
+ + Send
+ + 'a,
+ >,
+ >;
+ type Body = hyper::Body;
+
+ fn request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, request }
+ }
+
+ fn streaming_request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseStreamFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, streaming_request }
+ }
+}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pdm-devel] [PATCH datacenter-manager 2/7] server: store pve MultiClient for re-use
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper Wolfgang Bumiller
@ 2025-02-04 9:55 ` Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 3/7] server: separate ConnectInfo from client creation Wolfgang Bumiller
` (5 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-04 9:55 UTC (permalink / raw)
To: pdm-devel
The traits to create the clients needs adapting to keep around clients
by remote names.
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
server/src/api/pve/mod.rs | 20 +++---
server/src/connection.rs | 127 ++++++++++++++++++++++++++++++--------
2 files changed, 113 insertions(+), 34 deletions(-)
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index 2cefbb4..7b81504 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -20,14 +20,16 @@ use pdm_api_types::{
PRIV_SYS_MODIFY,
};
-use pve_api_types::client::PveClient;
-use pve_api_types::{
- ClusterNodeStatus, ClusterResourceKind, ClusterResourceType, ListRealm, PveUpid,
-};
+use pve_api_types::ClusterNodeStatus;
+use pve_api_types::ListRealm;
+use pve_api_types::PveUpid;
+use pve_api_types::{ClusterResourceKind, ClusterResourceType};
use super::resources::{map_pve_lxc, map_pve_node, map_pve_qemu, map_pve_storage};
-use crate::{connection, task_cache};
+use crate::connection;
+use crate::connection::PveClient;
+use crate::task_cache;
mod lxc;
mod node;
@@ -91,18 +93,18 @@ pub(crate) fn get_remote<'a>(
Ok(remote)
}
-pub async fn connect_or_login(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub async fn connect_or_login(remote: &Remote) -> Result<Arc<PveClient>, Error> {
connection::make_pve_client_and_login(remote).await
}
-pub fn connect(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub fn connect(remote: &Remote) -> Result<Arc<PveClient>, Error> {
connection::make_pve_client(remote)
}
fn connect_to_remote(
config: &SectionConfigData<Remote>,
id: &str,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+) -> Result<Arc<PveClient>, Error> {
connect(get_remote(config, id)?)
}
@@ -264,7 +266,7 @@ fn check_guest_permissions(
async fn find_node_for_vm(
node: Option<String>,
vmid: u32,
- pve: &(dyn PveClient + Send + Sync),
+ pve: &PveClient,
) -> Result<String, Error> {
// FIXME: The pve client should cache the resources
Ok(match node {
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 767a2f9..bee4959 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -3,12 +3,15 @@
//! Make sure to call [`init`] to inject a concrete [`ClientFactory`]
//! instance before calling any of the provided functions.
+use std::collections::HashMap;
use std::future::Future;
-use std::pin::Pin;
+use std::pin::{pin, Pin};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
-use std::sync::OnceLock;
-use std::time::Duration;
+use std::sync::Once;
+use std::sync::{LazyLock, OnceLock};
+use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Error};
use http::uri::Authority;
@@ -18,7 +21,7 @@ use serde::Serialize;
use proxmox_client::{Client, HttpApiClient, HttpApiResponse, HttpApiResponseStream, TlsOptions};
use pdm_api_types::remotes::{NodeUrl, Remote, RemoteType};
-use pve_api_types::client::{PveClient, PveClientImpl};
+use pve_api_types::client::PveClientImpl;
use crate::pbs_client::PbsClient;
@@ -183,7 +186,7 @@ async fn connect_or_login(
#[async_trait::async_trait]
pub trait ClientFactory {
/// Create a new API client for PVE remotes
- fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+ fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, Error>;
/// Create a new API client for PBS remotes
fn make_pbs_client(&self, remote: &Remote) -> Result<Box<PbsClient>, Error>;
@@ -193,7 +196,7 @@ pub trait ClientFactory {
&self,
remote: &Remote,
target_endpoint: Option<&str>,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+ ) -> Result<Arc<PveClient>, Error>;
/// Create a new API client for PVE remotes.
///
@@ -204,10 +207,7 @@ pub trait ClientFactory {
/// This is intended for API calls that accept a user in addition to tokens.
///
/// Note: currently does not support two factor authentication.
- async fn make_pve_client_and_login(
- &self,
- remote: &Remote,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+ async fn make_pve_client_and_login(&self, remote: &Remote) -> Result<Arc<PveClient>, Error>;
/// Create a new API client for PBS remotes.
///
@@ -224,11 +224,93 @@ pub trait ClientFactory {
/// Default production client factory
pub struct DefaultClientFactory;
+pub type PveClient = dyn pve_api_types::client::PveClient + Send + Sync;
+
+/// A cached client for a remote (to reuse connections and share info about connection issues in
+/// remotes with multiple nodes...).
+struct ClientEntry<T: ?Sized> {
+ last_used: SystemTime,
+ client: Arc<T>,
+ remote: Remote,
+}
+
+/// Contains the cached clients and handle to the future dealing with timing them out.
+#[derive(Default)]
+struct ConnectionCache {
+ pve_clients: StdMutex<HashMap<String, ClientEntry<PveClient>>>,
+}
+
+/// This cache is a singleton.
+static CONNECTION_CACHE: LazyLock<ConnectionCache> = LazyLock::new(Default::default);
+static CLEANUP_FUTURE_STARTED: Once = Once::new();
+
+impl ConnectionCache {
+ const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
+ const STALE_TIMEOUT: Duration = Duration::from_secs(30);
+
+ /// Access the cache
+ fn get() -> &'static Self {
+ let this = &CONNECTION_CACHE;
+ this.init();
+ this
+ }
+
+ /// If it hasn't already, spawn the cleanup future.
+ fn init(&self) {
+ CLEANUP_FUTURE_STARTED.call_once(|| {
+ tokio::spawn(async move {
+ let future = pin!(CONNECTION_CACHE.cleanup_future());
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(future, abort_future).await;
+ });
+ });
+ }
+
+ /// Run a cleanup operation every 30 seconds.
+ async fn cleanup_future(&self) {
+ loop {
+ tokio::time::sleep(Self::CLEANUP_INTERVAL).await;
+ self.cleanup_cycle();
+ }
+ }
+
+ /// Clean out cached clients older than 30 seconds.
+ fn cleanup_cycle(&self) {
+ let oldest_time = SystemTime::now() - Self::STALE_TIMEOUT;
+ self.pve_clients
+ .lock()
+ .unwrap()
+ .retain(|_remote_name, client| client.last_used >= oldest_time)
+ }
+
+ fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, anyhow::Error> {
+ let mut pve_clients = self.pve_clients.lock().unwrap();
+ if let Some(client) = pve_clients.get_mut(&remote.id) {
+ // Verify the remote is still the same:
+ if client.remote == *remote {
+ client.last_used = SystemTime::now();
+ return Ok(Arc::clone(&client.client));
+ }
+ }
+
+ let client: Arc<PveClient> =
+ Arc::new(PveClientImpl(crate::connection::multi_connect(remote)?));
+ pve_clients.insert(
+ remote.id.clone(),
+ ClientEntry {
+ last_used: SystemTime::now(),
+ client: Arc::clone(&client),
+ remote: remote.clone(),
+ },
+ );
+ Ok(client)
+ }
+}
+
#[async_trait::async_trait]
impl ClientFactory for DefaultClientFactory {
- fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
- let client = crate::connection::multi_connect(remote)?;
- Ok(Box::new(PveClientImpl(client)))
+ fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, Error> {
+ ConnectionCache::get().make_pve_client(remote)
}
fn make_pbs_client(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
@@ -240,17 +322,14 @@ impl ClientFactory for DefaultClientFactory {
&self,
remote: &Remote,
target_endpoint: Option<&str>,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+ ) -> Result<Arc<PveClient>, Error> {
let client = crate::connection::connect(remote, target_endpoint)?;
- Ok(Box::new(PveClientImpl(client)))
+ Ok(Arc::new(PveClientImpl(client)))
}
- async fn make_pve_client_and_login(
- &self,
- remote: &Remote,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+ async fn make_pve_client_and_login(&self, remote: &Remote) -> Result<Arc<PveClient>, Error> {
let client = connect_or_login(remote, None).await?;
- Ok(Box::new(PveClientImpl(client)))
+ Ok(Arc::new(PveClientImpl(client)))
}
async fn make_pbs_client_and_login(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
@@ -270,7 +349,7 @@ fn instance() -> &'static (dyn ClientFactory + Send + Sync) {
}
/// Create a new API client for PVE remotes
-pub fn make_pve_client(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub fn make_pve_client(remote: &Remote) -> Result<Arc<PveClient>, Error> {
instance().make_pve_client(remote)
}
@@ -278,7 +357,7 @@ pub fn make_pve_client(remote: &Remote) -> Result<Box<dyn PveClient + Send + Syn
pub fn make_pve_client_with_endpoint(
remote: &Remote,
target_endpoint: Option<&str>,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+) -> Result<Arc<PveClient>, Error> {
instance().make_pve_client_with_endpoint(remote, target_endpoint)
}
@@ -296,9 +375,7 @@ pub fn make_pbs_client(remote: &Remote) -> Result<Box<PbsClient>, Error> {
/// This is intended for API calls that accept a user in addition to tokens.
///
/// Note: currently does not support two factor authentication.
-pub async fn make_pve_client_and_login(
- remote: &Remote,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub async fn make_pve_client_and_login(remote: &Remote) -> Result<Arc<PveClient>, Error> {
instance().make_pve_client_and_login(remote).await
}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pdm-devel] [PATCH datacenter-manager 3/7] server: separate ConnectInfo from client creation
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 2/7] server: store pve MultiClient for re-use Wolfgang Bumiller
@ 2025-02-04 9:55 ` Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 4/7] server: cache pve node reachability and names Wolfgang Bumiller
` (4 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-04 9:55 UTC (permalink / raw)
To: pdm-devel
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
server/src/connection.rs | 141 +++++++++++++++++++++++----------------
1 file changed, 83 insertions(+), 58 deletions(-)
diff --git a/server/src/connection.rs b/server/src/connection.rs
index bee4959..aeea089 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -29,43 +29,29 @@ static INSTANCE: OnceLock<Box<dyn ClientFactory + Send + Sync>> = OnceLock::new(
/// Connection Info returned from [`prepare_connect_client`]
struct ConnectInfo {
- pub client: Client,
- pub prefix: String,
- pub perl_compat: bool,
+ prefix: String,
+ perl_compat: bool,
+ pve_compat: bool,
+ default_port: u16,
}
-/// Returns a [`proxmox_client::Client`] and a token prefix for the specified
-/// [`pdm_api_types::Remote`]
-fn prepare_connect_client(
- remote: &Remote,
- target_endpoint: Option<&str>,
-) -> Result<ConnectInfo, Error> {
- let node = remote
- .nodes
- .iter()
- .find(|endpoint| match target_endpoint {
- Some(target) => target == endpoint.hostname,
- None => true,
- })
- .ok_or_else(|| match target_endpoint {
- Some(endpoint) => format_err!("{endpoint} not configured for remote"),
- None => format_err!("no nodes configured for remote"),
- })?;
+impl ConnectInfo {
+ fn for_remote(remote: &Remote) -> Self {
+ let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+ RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+ RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+ };
- let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
- RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
- RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
- };
-
- let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
-
- Ok(ConnectInfo {
- client,
- prefix,
- perl_compat,
- })
+ ConnectInfo {
+ prefix,
+ perl_compat,
+ pve_compat,
+ default_port,
+ }
+ }
}
-
+///
+/// Returns a [`proxmox_client::Client`] set up to connect to a specific node.
fn prepare_connect_client_to_node(
node: &NodeUrl,
default_port: u16,
@@ -92,51 +78,82 @@ fn prepare_connect_client_to_node(
Ok(client)
}
+/// Returns a [`proxmox_client::Client`] and connection info required to set token authentication
+/// data for the [`pdm_api_types::Remote`].
+fn prepare_connect_client(
+ remote: &Remote,
+ target_endpoint: Option<&str>,
+) -> Result<(Client, ConnectInfo), Error> {
+ let node = remote
+ .nodes
+ .iter()
+ .find(|endpoint| match target_endpoint {
+ Some(target) => target == endpoint.hostname,
+ None => true,
+ })
+ .ok_or_else(|| match target_endpoint {
+ Some(endpoint) => format_err!("{endpoint} not configured for remote"),
+ None => format_err!("no nodes configured for remote"),
+ })?;
+
+ let info = ConnectInfo::for_remote(remote);
+
+ let client = prepare_connect_client_to_node(node, info.default_port, info.pve_compat)?;
+
+ Ok((client, info))
+}
+
/// Constructs a [`Client`] for the given [`Remote`] for an API token
///
/// It does not actually opens a connection there, but prepares the client with the correct
/// authentication information and settings for the [`RemoteType`]
fn connect(remote: &Remote, target_endpoint: Option<&str>) -> Result<Client, anyhow::Error> {
- let ConnectInfo {
- client,
- perl_compat,
- prefix,
- } = prepare_connect_client(remote, target_endpoint)?;
+ let (client, info) = prepare_connect_client(remote, target_endpoint)?;
client.set_authentication(proxmox_client::Token {
userid: remote.authid.to_string(),
- prefix,
value: remote.token.to_string(),
- perl_compat,
+ prefix: info.prefix,
+ perl_compat: info.perl_compat,
});
-
Ok(client)
}
-/// Like [`connect()`], but for remotes which have multiple clients.
-fn multi_connect(remote: &Remote) -> Result<MultiClient, anyhow::Error> {
- let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
- RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
- RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+/// Returns a [`MultiClient`] and connection info required to set token authentication
+/// data for the [`pdm_api_types::Remote`].
+fn prepare_connect_multi_client(remote: &Remote) -> Result<(MultiClient, ConnectInfo), Error> {
+ if remote.nodes.is_empty() {
+ bail!("no nodes configured for remote");
};
+ let info = ConnectInfo::for_remote(remote);
+
let mut clients = Vec::new();
for node in &remote.nodes {
- let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+ clients.push(Arc::new(prepare_connect_client_to_node(
+ node,
+ info.default_port,
+ info.pve_compat,
+ )?));
+ }
+
+ Ok((MultiClient::new(clients), info))
+}
+
+/// Like [`connect()`], but with failover support for remotes which can have multiple nodes.
+fn multi_connect(remote: &Remote) -> Result<MultiClient, anyhow::Error> {
+ let (client, info) = prepare_connect_multi_client(remote)?;
+
+ client.for_each_client(|client| {
client.set_authentication(proxmox_client::Token {
userid: remote.authid.to_string(),
- prefix: prefix.clone(),
value: remote.token.to_string(),
- perl_compat,
+ prefix: info.prefix.clone(),
+ perl_compat: info.perl_compat,
});
- clients.push(Arc::new(client));
- }
-
- if clients.is_empty() {
- bail!("no nodes configured for remote");
- }
+ });
- Ok(MultiClient::new(clients))
+ Ok(client)
}
/// Constructs a [`Client`] for the given [`Remote`] for an API token or user
@@ -155,8 +172,7 @@ async fn connect_or_login(
if remote.authid.is_token() {
connect(remote, target_endpoint)
} else {
- let info = prepare_connect_client(remote, target_endpoint)?;
- let client = info.client;
+ let (client, _info) = prepare_connect_client(remote, target_endpoint)?;
match client
.login(proxmox_login::Login::new(
client.api_url().to_string(),
@@ -424,6 +440,15 @@ impl MultiClient {
timeout: Duration::from_secs(60),
}
}
+
+ fn for_each_client<F>(&self, func: F)
+ where
+ F: Fn(&Arc<Client>),
+ {
+ for client in &self.state.lock().unwrap().clients {
+ func(client);
+ }
+ }
}
/// Keeps track of which client (iow. which specific node of a remote) we're supposed to be using
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pdm-devel] [PATCH datacenter-manager 4/7] server: cache pve node reachability and names
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
` (2 preceding siblings ...)
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 3/7] server: separate ConnectInfo from client creation Wolfgang Bumiller
@ 2025-02-04 9:55 ` Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 5/7] server: don't try to connect to known-unreachable servers Wolfgang Bumiller
` (3 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-04 9:55 UTC (permalink / raw)
To: pdm-devel
Add a `remote_cache` module to access the cache which maps
(remote,host) to a struct holding the node name and a "reachable"
boolean. This can e used to connect to specific nodes or skip nodes
known to be currently unreachable (although the latter part is not
currently implemented).
The ClientFactory trait gains helpers to connect to specific nodes.
The unprivileged API gets a `tasks` submodule with a task which
fills the node cache and watches the remote config for updates (via
the config version cache).
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
lib/pdm-config/src/config_version_cache.rs | 33 +-
.../main.rs} | 3 +
.../bin/proxmox-datacenter-api/tasks/mod.rs | 1 +
.../tasks/remote_node_mapping.rs | 226 ++++++++++++++
server/src/connection.rs | 32 ++
server/src/lib.rs | 1 +
server/src/remote_cache/mod.rs | 285 ++++++++++++++++++
7 files changed, 580 insertions(+), 1 deletion(-)
rename server/src/bin/{proxmox-datacenter-api.rs => proxmox-datacenter-api/main.rs} (99%)
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/mod.rs
create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs
create mode 100644 server/src/remote_cache/mod.rs
diff --git a/lib/pdm-config/src/config_version_cache.rs b/lib/pdm-config/src/config_version_cache.rs
index b4a47a8..5c4b61a 100644
--- a/lib/pdm-config/src/config_version_cache.rs
+++ b/lib/pdm-config/src/config_version_cache.rs
@@ -25,6 +25,8 @@ struct ConfigVersionCacheDataInner {
user_cache_generation: AtomicUsize,
// Traffic control (traffic-control.cfg) generation/version.
traffic_control_generation: AtomicUsize,
+ // Tracks updates to the remote/hostname/nodename mapping cache.
+ remote_mapping_cache: AtomicUsize,
// Add further atomics here
}
@@ -81,11 +83,23 @@ pub struct ConfigVersionCache {
static INSTANCE: OnceCell<Arc<ConfigVersionCache>> = OnceCell::new();
impl ConfigVersionCache {
- /// Open the memory based communication channel singleton.
+ /// Open the memory backed version cache.
pub fn new() -> Result<Arc<Self>, Error> {
INSTANCE.get_or_try_init(Self::open).cloned()
}
+ /// Convenience method to call [`ConfigVersionCache::new`] while turning the error into a log
+ /// message.
+ pub fn new_log_error() -> Option<Arc<Self>> {
+ match Self::new() {
+ Ok(this) => Some(this),
+ Err(err) => {
+ log::error!("failed to open config version cache - {err:?}");
+ None
+ }
+ }
+ }
+
// Actual work of `new`:
fn open() -> Result<Arc<Self>, Error> {
let user = crate::api_user()?;
@@ -141,4 +155,21 @@ impl ConfigVersionCache {
.traffic_control_generation
.fetch_add(1, Ordering::AcqRel);
}
+
+ /// Return the current remote mapping cache generation.
+ pub fn remote_mapping_cache(&self) -> usize {
+ self.shmem
+ .data()
+ .remote_mapping_cache
+ .load(Ordering::Relaxed)
+ }
+
+ /// Increase the remote mapping cache number.
+ pub fn increase_remote_mapping_cache(&self) -> usize {
+ self.shmem
+ .data()
+ .remote_mapping_cache
+ .fetch_add(1, Ordering::Relaxed)
+ + 1
+ }
}
diff --git a/server/src/bin/proxmox-datacenter-api.rs b/server/src/bin/proxmox-datacenter-api/main.rs
similarity index 99%
rename from server/src/bin/proxmox-datacenter-api.rs
rename to server/src/bin/proxmox-datacenter-api/main.rs
index a79094d..25852c8 100644
--- a/server/src/bin/proxmox-datacenter-api.rs
+++ b/server/src/bin/proxmox-datacenter-api/main.rs
@@ -31,6 +31,8 @@ use server::metric_collection;
use server::resource_cache;
use server::task_utils;
+mod tasks;
+
pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 5 * 60;
const PDM_LISTEN_ADDR: SocketAddr = SocketAddr::new(
@@ -287,6 +289,7 @@ async fn run(debug: bool) -> Result<(), Error> {
start_task_scheduler();
metric_collection::start_task();
+ tasks::remote_node_mapping::start_task();
resource_cache::start_task();
server.await?;
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
new file mode 100644
index 0000000..e6ead88
--- /dev/null
+++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
@@ -0,0 +1 @@
+pub mod remote_node_mapping;
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs
new file mode 100644
index 0000000..5912365
--- /dev/null
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs
@@ -0,0 +1,226 @@
+//! PVE remotes have multiple nodes which have names we cannot necessarily infer from the
+//! "hostname" field, since that might be a different address, potentially with a port.
+//!
+//! We also do not want users to have to maintain the PDM host/node-name combinations (in case they
+//! rename or reinstall nodes). Renaming would break the PDM config, reinstalling would break eg. a
+//! "machine-id" based mapping.
+//!
+//! We also cannot rely in the TLS fingerprints, because a whole cluster could potentially use a
+//! single wildcard certificate.
+//!
+//! Instead, we maintain a cached mapping of `address ↔ name` in `/var`, which gets polled
+//! regularly.
+//! For PVE we can query an address' `/cluster/status` and look for an entry marked as `local:1`.
+//! Later this might be changed to looking for the node name in the result of
+//! `/nodes/localhost/status` - once this is implemented and rolled out long enough in PVE.
+
+use std::future::Future;
+use std::pin::pin;
+
+use anyhow::{bail, Error};
+use tokio::task::JoinHandle;
+
+use proxmox_config_digest::ConfigDigest;
+use proxmox_section_config::typed::SectionConfigData;
+
+use pdm_api_types::remotes::{Remote, RemoteType};
+
+use server::remote_cache::{self, RemoteMappingCache};
+use server::task_utils;
+
+const CONFIG_POLL_INTERVAL: u64 = 60;
+
+fn spawn_aborted_on_shutdown<F>(future: F) -> JoinHandle<()>
+where
+ F: Future + Send + 'static,
+{
+ tokio::spawn(async move {
+ // TODO: The wrapping in a select(shutdown_future, fut) should probably be a helper in
+ // `proxmox_daemon`.
+ let future = pin!(future);
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(future, abort_future).await;
+ })
+}
+
+pub fn start_task() {
+ spawn_aborted_on_shutdown(CachingTask::default().run());
+}
+
+async fn poll_interval() {
+ let delay_target = task_utils::next_aligned_instant(CONFIG_POLL_INTERVAL);
+ tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+}
+
+#[derive(Default)]
+struct CachingTask {
+ current_name_task: Option<JoinHandle<()>>,
+ last_config_digest: Option<ConfigDigest>,
+}
+
+impl CachingTask {
+ /// The caching task's main entry point.
+ async fn run(mut self) {
+ loop {
+ self.run_once().await;
+ poll_interval().await;
+ }
+ }
+
+ /// A single iteration of the caching task.
+ async fn run_once(&mut self) {
+ let (config, digest) = match pdm_config::remotes::config() {
+ Ok(cd) => cd,
+ Err(err) => {
+ log::error!("failed to load remote config, not updating cache - {err:?}");
+ return;
+ }
+ };
+
+ if self
+ .last_config_digest
+ .as_ref()
+ .is_none_or(|d| digest != *d)
+ {
+ tracing::debug!("new config - updating remote node name cache");
+ self.last_config_digest = Some(digest);
+
+ // the config got updated - abort the current name-fetching task, we'll
+ // spawn a new one
+ if let Some(name_task) = self.current_name_task.take() {
+ tracing::debug!("aborting query task");
+ name_task.abort();
+ }
+
+ if let Err(err) = self.config_update(&config) {
+ log::error!("error updating remote node cache: {err:?}");
+ }
+ //} else {
+ // tracing::debug!("no change to the config");
+ }
+
+ if self
+ .current_name_task
+ .as_ref()
+ .is_none_or(|task| task.is_finished())
+ {
+ log::debug!("name task finished, starting reachability query task");
+ self.current_name_task =
+ Some(spawn_aborted_on_shutdown(Self::query_node_names(config)));
+ }
+ }
+
+ /// If the `remotes.cfg` was updated we need to go over all the remotes and see if there is any
+ /// information we need to update.
+ fn config_update(&mut self, config: &SectionConfigData<Remote>) -> Result<(), Error> {
+ let mut cache = RemoteMappingCache::write()?;
+
+ // prune remotes which were removed:
+ cache
+ .remotes
+ .retain(|name, _entry| config.contains_key(name));
+
+ // now update the existing remotes:
+ for (name, remote) in config {
+ self.prune_remote_nodes(&mut cache, name, remote);
+ }
+
+ cache.save()?;
+
+ Ok(())
+ }
+
+ fn prune_remote_nodes(&mut self, cache: &mut RemoteMappingCache, name: &str, remote: &Remote) {
+ let entry = cache
+ .remotes
+ .entry(name.to_string())
+ .or_insert_with(|| remote_cache::RemoteMapping::new(remote.ty));
+
+ // if the entry changed type, clear it
+ if entry.ty != remote.ty {
+ *entry = remote_cache::RemoteMapping::new(remote.ty);
+ }
+
+ // Only PVE entries currently have a node cache, so skip non-PVE remotes:
+ if remote.ty != RemoteType::Pve {
+ return;
+ }
+
+ // prune nodes which were removed:
+ entry.hosts.retain(|hostname, info| {
+ let retain = remote.nodes.iter().any(|node| node.hostname == *hostname);
+ if !retain {
+ if let Some(node_name) = info.node_name() {
+ entry.node_to_host.remove(node_name);
+ }
+ }
+ retain
+ });
+
+ // make sure currently known hostnames exist in the cache at least empty:
+ for node in &remote.nodes {
+ if !entry.hosts.contains_key(&node.hostname) {
+ entry.hosts.insert(
+ node.hostname.clone(),
+ remote_cache::HostInfo::new(node.hostname.clone()),
+ );
+ }
+ }
+ }
+
+ async fn query_node_names(config: SectionConfigData<Remote>) {
+ for (_name, remote) in &config {
+ if let Err(err) = Self::query_node_names_for_remote(remote).await {
+ log::error!("error updating node name cache - {err:?}");
+ }
+ }
+ }
+
+ async fn query_node_names_for_remote(remote: &Remote) -> Result<(), Error> {
+ // Only PVE entries currently have a node cache, so skip non-PVE remotes:
+ if remote.ty != RemoteType::Pve {
+ return Ok(());
+ }
+
+ // now add new nodes
+ for node in &remote.nodes {
+ tracing::debug!("querying remote {:?} node {:?}", remote.id, node.hostname);
+
+ // if the host is new, we need to query its name
+ let query_result = match query_node_name(remote, &node.hostname).await {
+ Ok(node_name) => Some(node_name),
+ Err(err) => {
+ log::error!(
+ "failed to query info for remote '{}' node '{}' - {err:?}",
+ remote.id,
+ node.hostname
+ );
+ None
+ }
+ };
+
+ let mut cache = RemoteMappingCache::write()?;
+ if let Some(info) = cache.info_by_hostname_mut(&remote.id, &node.hostname) {
+ info.reachable = query_result.is_some();
+ }
+ if let Some(node_name) = query_result {
+ cache.set_node_name(&remote.id, &node.hostname, Some(node_name));
+ }
+ cache.save()?;
+ }
+
+ Ok(())
+ }
+}
+
+/// Calls `/cluster/status` directly on a specific node to find its name.
+async fn query_node_name(remote: &Remote, hostname: &str) -> Result<String, Error> {
+ let client = server::connection::make_pve_client_with_endpoint(remote, Some(hostname))?;
+ let node_status_list = client.cluster_status().await?;
+ for node in node_status_list {
+ if node.local == Some(true) {
+ return Ok(node.name);
+ }
+ }
+ bail!("failed to connect to node {hostname}");
+}
diff --git a/server/src/connection.rs b/server/src/connection.rs
index aeea089..24e2e44 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -214,6 +214,19 @@ pub trait ClientFactory {
target_endpoint: Option<&str>,
) -> Result<Arc<PveClient>, Error>;
+ /// Create a new API client for PVE remotes, but with a specific endpoint.
+ ///
+ /// The default implementation ignores the `node` parameter and forwards to
+ /// `make_pve_client()`.
+ fn make_pve_client_with_node(
+ &self,
+ remote: &Remote,
+ node: &str,
+ ) -> Result<Arc<PveClient>, Error> {
+ let _ = node;
+ self.make_pve_client(remote)
+ }
+
/// Create a new API client for PVE remotes.
///
/// In case the remote has a user configured (instead of an API token), it will connect and get
@@ -343,6 +356,20 @@ impl ClientFactory for DefaultClientFactory {
Ok(Arc::new(PveClientImpl(client)))
}
+ fn make_pve_client_with_node(
+ &self,
+ remote: &Remote,
+ node: &str,
+ ) -> Result<Arc<PveClient>, Error> {
+ let cache = crate::remote_cache::RemoteMappingCache::get();
+ match cache.info_by_node_name(&remote.id, node) {
+ Some(info) if info.reachable => {
+ self.make_pve_client_with_endpoint(remote, Some(&info.hostname))
+ }
+ _ => self.make_pve_client(remote),
+ }
+ }
+
async fn make_pve_client_and_login(&self, remote: &Remote) -> Result<Arc<PveClient>, Error> {
let client = connect_or_login(remote, None).await?;
Ok(Arc::new(PveClientImpl(client)))
@@ -377,6 +404,11 @@ pub fn make_pve_client_with_endpoint(
instance().make_pve_client_with_endpoint(remote, target_endpoint)
}
+/// Create a new API client for PVE remotes and try to make it connect to a specific *node*.
+pub fn make_pve_client_with_node(remote: &Remote, node: &str) -> Result<Arc<PveClient>, Error> {
+ instance().make_pve_client_with_node(remote, node)
+}
+
/// Create a new API client for PBS remotes
pub fn make_pbs_client(remote: &Remote) -> Result<Box<PbsClient>, Error> {
instance().make_pbs_client(remote)
diff --git a/server/src/lib.rs b/server/src/lib.rs
index 12dc912..dcb24f0 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -6,6 +6,7 @@ pub mod auth;
pub mod context;
pub mod env;
pub mod metric_collection;
+pub mod remote_cache;
pub mod resource_cache;
pub mod task_cache;
pub mod task_utils;
diff --git a/server/src/remote_cache/mod.rs b/server/src/remote_cache/mod.rs
new file mode 100644
index 0000000..69e79f1
--- /dev/null
+++ b/server/src/remote_cache/mod.rs
@@ -0,0 +1,285 @@
+//! This currently only matters for PVE remotes.
+//!
+//! PVE remotes have multiple nodes which have names we cannot necessarily infer from the
+//! "hostname" field, since that might be a different address, potentially with a port.
+//!
+//! We also do not want users to have to maintain the PDM host/node-name combinations (in case they
+//! rename or reinstall nodes). Renaming would break the PDM config, reinstalling would break eg. a
+//! "machine-id" based mapping.
+//!
+//! We also cannot rely in the TLS fingerprints, because a whole cluster could potentially use a
+//! single wildcard certificate.
+//!
+//! Instead, we maintain a cached mapping of `address ↔ name` in `/var`, which gets polled
+//! regularly.
+//! For PVE we can query an address' `/cluster/status` and look for an entry marked as `local:1`.
+//! Later this might be changed to looking for the node name in the result of
+//! `/nodes/localhost/status` - once this is implemented and rolled out long enough in PVE.
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use anyhow::{Context as _, Error};
+use serde::{Deserialize, Serialize};
+
+use proxmox_product_config::replace_config;
+use proxmox_product_config::{open_api_lockfile, ApiLockGuard};
+
+use pdm_api_types::remotes::RemoteType;
+use pdm_config::ConfigVersionCache;
+
+const CACHE_FILENAME: &str = concat!(
+ pdm_buildcfg::PDM_CACHE_DIR_M!(),
+ "/remote-mapping-cache.json"
+);
+
+const LOCK_FILE: &str = concat!(
+ pdm_buildcfg::PDM_CACHE_DIR_M!(),
+ "/.remote-mapping-cache.json.lock"
+);
+
+static CURRENT_CACHE: Mutex<Option<CacheState>> = Mutex::new(None);
+
+#[derive(Clone)]
+struct CacheState {
+ cache: Arc<RemoteMappingCache>,
+ generation: usize,
+}
+
+impl CacheState {
+ fn get() -> Self {
+ let mut cache = CURRENT_CACHE.lock().unwrap();
+
+ let version_cache = ConfigVersionCache::new_log_error();
+
+ if let Some(cache) = cache.clone() {
+ if let Some(version_cache) = version_cache.as_deref() {
+ if cache.generation == version_cache.remote_mapping_cache() {
+ return cache;
+ }
+ // outdated, fall back to reloading
+ }
+ // outdated, or we failed to query the version cache, fall through to the load
+ }
+
+ // we have no valid cache yet:
+ let generation = version_cache.map(|c| c.remote_mapping_cache()).unwrap_or(0);
+
+ let data = Arc::new(RemoteMappingCache::load());
+ let this = CacheState {
+ cache: Arc::clone(&data),
+ generation,
+ };
+ *cache = Some(this.clone());
+ this
+ }
+
+ fn update(cache: RemoteMappingCache) {
+ let mut current_cache = CURRENT_CACHE.lock().unwrap();
+ let generation = match pdm_config::ConfigVersionCache::new_log_error() {
+ Some(version_cache) => version_cache.increase_remote_mapping_cache(),
+ None => 0,
+ };
+ *current_cache = Some(CacheState {
+ generation,
+ cache: Arc::new(cache),
+ });
+ }
+}
+
+pub struct WriteRemoteMappingCache {
+ pub data: RemoteMappingCache,
+ _lock: ApiLockGuard,
+}
+
+impl std::ops::Deref for WriteRemoteMappingCache {
+ type Target = RemoteMappingCache;
+
+ fn deref(&self) -> &Self::Target {
+ &self.data
+ }
+}
+
+impl std::ops::DerefMut for WriteRemoteMappingCache {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.data
+ }
+}
+
+impl WriteRemoteMappingCache {
+ fn new(lock: ApiLockGuard, data: RemoteMappingCache) -> Self {
+ Self { _lock: lock, data }
+ }
+
+ pub fn save(self) -> Result<(), Error> {
+ self.data.save()?;
+ CacheState::update(self.data);
+ Ok(())
+ }
+}
+
+/// File format for `/var/cache/proxmox-datacenter-manager/remote-mapping-cache.json`
+#[derive(Clone, Default, Deserialize, Serialize)]
+pub struct RemoteMappingCache {
+ /// This maps a remote name to its mapping.
+ pub remotes: HashMap<String, RemoteMapping>,
+}
+
+impl RemoteMappingCache {
+ /// Get read only access to the current cache.
+ pub fn get() -> Arc<Self> {
+ Arc::clone(&CacheState::get().cache)
+ }
+
+ /// *Lock* the cache lock file and get mutable access to the current contents.
+ pub fn write() -> Result<WriteRemoteMappingCache, Error> {
+ let write_lock = open_api_lockfile(LOCK_FILE, None, true)?;
+
+ Ok(WriteRemoteMappingCache::new(
+ write_lock,
+ Self::clone(&Self::get()),
+ ))
+ }
+
+ /// Load the current remote mapping cache. This always succeeds and may return an empty cache.
+ fn load() -> Self {
+ fn do_load() -> Result<Option<RemoteMappingCache>, Error> {
+ Ok(proxmox_sys::fs::file_read_optional_string(CACHE_FILENAME)?
+ .map(|content| serde_json::from_str(&content))
+ .transpose()?)
+ }
+
+ match do_load() {
+ Ok(Some(data)) => return data,
+ Ok(None) => (),
+ Err(err) => {
+ log::error!("corrupted remote-mapping-cache.json file, discarding - {err:?}");
+ }
+ }
+
+ Self::default()
+ }
+
+ /// Save the current remote mapping cache. This should only be done by the remote mapping task.
+ fn save(&self) -> Result<(), Error> {
+ let raw = serde_json::to_vec(self).context("failed to serialize remote mapping cache")?;
+ replace_config(CACHE_FILENAME, &raw)
+ }
+
+ /// Attempt to retrieve the host name from a node name.
+ pub fn node_name_to_hostname(&self, remote: &str, node_name: &str) -> Option<&str> {
+ Some(self.remotes.get(remote)?.node_to_host.get(node_name)?)
+ }
+
+ /// Attempt to get the node info for a node name.
+ pub fn info_by_node_name(&self, remote_name: &str, node_name: &str) -> Option<&HostInfo> {
+ let remote = self.remotes.get(remote_name)?;
+ let host = remote.node_to_host.get(node_name)?;
+ remote.hosts.get(host)
+ }
+
+ pub fn info_by_node_name_mut(
+ &mut self,
+ remote_name: &str,
+ node_name: &str,
+ ) -> Option<&mut HostInfo> {
+ let remote = self.remotes.get_mut(remote_name)?;
+ let host = remote.node_to_host.get(node_name)?;
+ remote.hosts.get_mut(host)
+ }
+
+ /// Attempt to retrieve the node name from a host name.
+ pub fn info_by_hostname(&self, remote: &str, hostname: &str) -> Option<&HostInfo> {
+ self.remotes.get(remote)?.hosts.get(hostname)
+ }
+
+ pub fn info_by_hostname_mut(&mut self, remote: &str, hostname: &str) -> Option<&mut HostInfo> {
+ self.remotes.get_mut(remote)?.hosts.get_mut(hostname)
+ }
+
+ /// Mark a host as reachable.
+ pub fn mark_host_reachable(&mut self, remote_name: &str, hostname: &str, reachable: bool) {
+ if let Some(info) = self.info_by_hostname_mut(remote_name, hostname) {
+ info.reachable = reachable;
+ }
+ }
+
+ /// Mark a host as reachable.
+ pub fn mark_node_reachable(&mut self, remote_name: &str, node_name: &str, reachable: bool) {
+ if let Some(info) = self.info_by_node_name_mut(remote_name, node_name) {
+ info.reachable = reachable;
+ }
+ }
+
+ /// Update the node name for a host, if the remote and host exist (otherwise this does
+ /// nothing).
+ pub fn set_node_name(&mut self, remote_name: &str, hostname: &str, node_name: Option<String>) {
+ if let Some(remote) = self.remotes.get_mut(remote_name) {
+ remote.set_node_name(hostname, node_name);
+ }
+ }
+}
+
+/// An entry for a remote in a [`RemoteMappingCache`].
+#[derive(Clone, Deserialize, Serialize)]
+pub struct RemoteMapping {
+ /// The remote type.
+ pub ty: RemoteType,
+
+ /// Maps a `hostname` to information we keep about it.
+ pub hosts: HashMap<String, HostInfo>,
+
+ /// Maps a node name to a hostname, for where we have that info.
+ pub node_to_host: HashMap<String, String>,
+}
+
+impl RemoteMapping {
+ pub fn new(ty: RemoteType) -> Self {
+ Self {
+ ty,
+ hosts: HashMap::new(),
+ node_to_host: HashMap::new(),
+ }
+ }
+
+ /// Update the node name for a host, if the host exists (otherwise this does nothing).
+ pub fn set_node_name(&mut self, hostname: &str, node_name: Option<String>) {
+ if let Some(info) = self.hosts.get_mut(hostname) {
+ if let Some(old) = info.node_name.take() {
+ self.node_to_host.remove(&old);
+ }
+ info.node_name = node_name;
+ if let Some(new) = &info.node_name {
+ self.node_to_host.insert(new.clone(), hostname.to_string());
+ }
+ }
+ }
+}
+
+/// All the data we keep cached for nodes found in [`RemoteMapping`].
+#[derive(Clone, Deserialize, Serialize)]
+pub struct HostInfo {
+ /// This is the host name associated with this node.
+ pub hostname: String,
+
+ /// This is the cluster side node name, if we know it.
+ node_name: Option<String>,
+
+ /// This means we were able to reach the node.
+ /// When a client fails to connect it may update this to mark it as unreachable.
+ pub reachable: bool,
+}
+
+impl HostInfo {
+ pub fn new(hostname: String) -> Self {
+ Self {
+ hostname,
+ node_name: None,
+ reachable: true,
+ }
+ }
+
+ pub fn node_name(&self) -> Option<&str> {
+ self.node_name.as_deref()
+ }
+}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pdm-devel] [PATCH datacenter-manager 5/7] server: don't try to connect to known-unreachable servers
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
` (3 preceding siblings ...)
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 4/7] server: cache pve node reachability and names Wolfgang Bumiller
@ 2025-02-04 9:55 ` Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 6/7] server: try previously unreachable clients as last resort Wolfgang Bumiller
` (2 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-04 9:55 UTC (permalink / raw)
To: pdm-devel
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
server/src/connection.rs | 93 +++++++++++++++++++++++++---------
server/src/remote_cache/mod.rs | 6 +++
2 files changed, 74 insertions(+), 25 deletions(-)
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 24e2e44..7c1c237 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -130,14 +130,17 @@ fn prepare_connect_multi_client(remote: &Remote) -> Result<(MultiClient, Connect
let mut clients = Vec::new();
for node in &remote.nodes {
- clients.push(Arc::new(prepare_connect_client_to_node(
- node,
- info.default_port,
- info.pve_compat,
- )?));
+ clients.push(MultiClientEntry {
+ client: Arc::new(prepare_connect_client_to_node(
+ node,
+ info.default_port,
+ info.pve_compat,
+ )?),
+ hostname: node.hostname.clone(),
+ });
}
- Ok((MultiClient::new(clients), info))
+ Ok((MultiClient::new(remote.id.clone(), clients), info))
}
/// Like [`connect()`], but with failover support for remotes which can have multiple nodes.
@@ -449,6 +452,14 @@ pub fn init(instance: Box<dyn ClientFactory + Send + Sync>) {
}
}
+/// In order to allow the [`MultiClient`] to check the cached reachability state of a client, we
+/// need to know which remote it belongs to, so store the metadata alongside the actual `Client`
+/// struct.
+struct MultiClientEntry {
+ client: Arc<Client>,
+ hostname: String,
+}
+
/// This is another wrapper around the actual HTTP client responsible for dealing with connection
/// problems: if we cannot reach a node of a cluster, this will attempt to retry a request on
/// another node.
@@ -456,19 +467,15 @@ pub fn init(instance: Box<dyn ClientFactory + Send + Sync>) {
/// # Possible improvements
///
/// - For `GET` requests we could also start a 2nd request after a shorter time out (eg. 10s).
-/// - We could use RRD data for a "best guess" where to start eg. if we know a node was offline on
-/// the last rrd polling we'd start with a different one.
-/// For this, we still need to include the node names in the clients here somehow so that we can
-/// actually manage/re-shuffle them from the outside after this struct is already created.
struct MultiClient {
state: StdMutex<MultiClientState>,
timeout: Duration,
}
impl MultiClient {
- fn new(clients: Vec<Arc<Client>>) -> Self {
+ fn new(remote: String, entries: Vec<MultiClientEntry>) -> Self {
Self {
- state: StdMutex::new(MultiClientState::new(clients)),
+ state: StdMutex::new(MultiClientState::new(remote, entries)),
timeout: Duration::from_secs(60),
}
}
@@ -477,8 +484,8 @@ impl MultiClient {
where
F: Fn(&Arc<Client>),
{
- for client in &self.state.lock().unwrap().clients {
- func(client);
+ for entry in &self.state.lock().unwrap().entries {
+ func(&entry.client);
}
}
}
@@ -488,15 +495,24 @@ impl MultiClient {
struct MultiClientState {
/// The current index *not* modulo the client count.
current: usize,
- clients: Vec<Arc<Client>>,
+ remote: String,
+ entries: Vec<MultiClientEntry>,
}
impl MultiClientState {
- fn new(clients: Vec<Arc<Client>>) -> Self {
- Self {
+ fn new(remote: String, entries: Vec<MultiClientEntry>) -> Self {
+ let mut this = Self {
current: 0,
- clients,
- }
+ remote,
+ entries,
+ };
+ this.skip_unreachable();
+ this
+ }
+
+ /// Moving to the next entry must wrap.
+ fn next(&mut self) {
+ self.current = self.current.wrapping_add(1);
}
/// Whenever a request fails with the *current* client we move the current entry forward.
@@ -507,20 +523,47 @@ impl MultiClientState {
/// the strategy, we may want to change this to something like `1 + max(current, failed)`.
fn failed(&mut self, failed_index: usize) {
if self.current == failed_index {
- self.current = self.current.wrapping_add(1);
+ let entry = self.get_entry();
+ log::error!("marking client {} as unreachable", entry.hostname);
+ if let Ok(mut cache) = crate::remote_cache::RemoteMappingCache::write() {
+ cache.mark_host_reachable(&self.remote, &entry.hostname, false);
+ let _ = cache.save();
+ }
+ self.next();
+ self.skip_unreachable();
+ }
+ }
+
+ /// Skip ahead as long as we're pointing to an unreachable.
+ fn skip_unreachable(&mut self) {
+ let cache = crate::remote_cache::RemoteMappingCache::get();
+ // loop at most as many times as we have entries...
+ for _ in 0..self.entries.len() {
+ let entry = self.get_entry();
+ if !cache.host_is_reachable(&self.remote, &entry.hostname) {
+ log::error!("skipping host {} - marked unreachable", entry.hostname);
+ self.next();
+ } else {
+ return;
+ }
}
}
- /// Get `current` as an *index* (i.e. modulo `clients.len()`).
+ /// Get `current` as an *index* (i.e. modulo `entries.len()`).
fn index(&self) -> usize {
- self.current % self.clients.len()
+ self.current % self.entries.len()
+ }
+
+ /// Get the current entry.
+ fn get_entry(&self) -> &MultiClientEntry {
+ &self.entries[self.index()]
}
/// Get the current client and its index which can be passed to `failed()` if the client fails
/// to connect.
fn get(&self) -> (Arc<Client>, usize) {
let index = self.index();
- (Arc::clone(&self.clients[index]), self.current)
+ (Arc::clone(&self.entries[index].client), self.current)
}
/// Check if we already tried all clients since a specific starting index.
@@ -533,11 +576,11 @@ impl MultiClientState {
/// request-retry-loop will fail as soon as the same *number* of clients since its starting
/// point were marked as faulty without retrying them all.
fn tried_all_since(&self, start: usize) -> bool {
- self.tried_clients(start) >= self.clients.len()
+ self.tried_clients(start) >= self.entries.len()
}
/// We store the current index continuously without wrapping it modulo the client count (and
- /// only do that when indexing the `clients` array), so that we can easily check if "all
+ /// only do that when indexing the `entries` array), so that we can easily check if "all
/// currently running tasks taken together" have already tried all clients by comparing our
/// starting point to the current index.
fn tried_clients(&self, start: usize) -> usize {
diff --git a/server/src/remote_cache/mod.rs b/server/src/remote_cache/mod.rs
index 69e79f1..57e4bf3 100644
--- a/server/src/remote_cache/mod.rs
+++ b/server/src/remote_cache/mod.rs
@@ -218,6 +218,12 @@ impl RemoteMappingCache {
remote.set_node_name(hostname, node_name);
}
}
+
+ /// Check if a host is reachable.
+ pub fn host_is_reachable(&self, remote: &str, hostname: &str) -> bool {
+ self.info_by_hostname(remote, hostname)
+ .is_none_or(|info| info.reachable)
+ }
}
/// An entry for a remote in a [`RemoteMappingCache`].
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pdm-devel] [PATCH datacenter-manager 6/7] server: try previously unreachable clients as last resort
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
` (4 preceding siblings ...)
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 5/7] server: don't try to connect to known-unreachable servers Wolfgang Bumiller
@ 2025-02-04 9:55 ` Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 7/7] server: add some tracing instrumentation Wolfgang Bumiller
2025-02-11 14:50 ` [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Lukas Wagner
7 siblings, 0 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-04 9:55 UTC (permalink / raw)
To: pdm-devel
and mark them as reachable again if they succeed
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
server/src/connection.rs | 92 ++++++++++++++++++++++++++++++++++------
1 file changed, 80 insertions(+), 12 deletions(-)
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 7c1c237..7ba38f1 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -469,13 +469,15 @@ struct MultiClientEntry {
/// - For `GET` requests we could also start a 2nd request after a shorter time out (eg. 10s).
struct MultiClient {
state: StdMutex<MultiClientState>,
+ remote: String,
timeout: Duration,
}
impl MultiClient {
fn new(remote: String, entries: Vec<MultiClientEntry>) -> Self {
Self {
- state: StdMutex::new(MultiClientState::new(remote, entries)),
+ state: StdMutex::new(MultiClientState::new(remote.clone(), entries)),
+ remote,
timeout: Duration::from_secs(60),
}
}
@@ -559,11 +561,16 @@ impl MultiClientState {
&self.entries[self.index()]
}
- /// Get the current client and its index which can be passed to `failed()` if the client fails
+ /// Get the current entry and its index which can be passed to `failed()` if the client fails
/// to connect.
- fn get(&self) -> (Arc<Client>, usize) {
+ fn get(&self) -> (&MultiClientEntry, usize) {
let index = self.index();
- (Arc::clone(&self.entries[index].client), self.current)
+ (&self.entries[index], self.current)
+ }
+
+ /// Get a client at a specific point (which still needs to be converted to an index).
+ fn get_at(&self, at: usize) -> &MultiClientEntry {
+ &self.entries[at % self.entries.len()]
}
/// Check if we already tried all clients since a specific starting index.
@@ -588,6 +595,30 @@ impl MultiClientState {
}
}
+struct TryClient {
+ client: Arc<Client>,
+ reachable: bool,
+ hostname: String,
+}
+
+impl TryClient {
+ fn reachable(entry: &MultiClientEntry) -> Self {
+ Self {
+ client: Arc::clone(&entry.client),
+ hostname: entry.hostname.clone(),
+ reachable: true,
+ }
+ }
+
+ fn unreachable(entry: &MultiClientEntry) -> Self {
+ Self {
+ client: Arc::clone(&entry.client),
+ hostname: entry.hostname.clone(),
+ reachable: false,
+ }
+ }
+}
+
impl MultiClient {
/// This is the client usage strategy.
///
@@ -598,17 +629,28 @@ impl MultiClient {
/// We might be skipping clients if other tasks already tried "more" clients, but that's fine,
/// since there's no point in trying the same remote twice simultaneously if it is currently
/// offline...
- fn try_clients(&self) -> impl Iterator<Item = Arc<Client>> + '_ {
+ fn try_clients(&self) -> impl Iterator<Item = TryClient> + '_ {
let mut start_current = None;
let state = &self.state;
+
+ let mut unreachable_clients = Vec::new();
+ let mut try_unreachable = None::<std::vec::IntoIter<_>>;
+
std::iter::from_fn(move || {
let mut state = state.lock().unwrap();
+
+ if let Some(ref mut try_unreachable) = try_unreachable {
+ return Some(TryClient::unreachable(
+ state.get_at(try_unreachable.next()?),
+ ));
+ }
+
match start_current {
None => {
// first attempt, just use the current client and remember the starting index
let (client, index) = state.get();
start_current = Some((index, index));
- Some(client)
+ Some(TryClient::reachable(client))
}
Some((start, current)) => {
// If our last request failed, the retry-loop asks for another client, mark the
@@ -618,13 +660,24 @@ impl MultiClient {
if state.tried_all_since(start) {
// This iterator (and therefore this retry-loop) has tried all clients.
// Give up.
- return None;
+ try_unreachable =
+ Some(std::mem::take(&mut unreachable_clients).into_iter());
+ return Some(TryClient::unreachable(
+ state.get_at(try_unreachable.as_mut()?.next()?),
+ ));
}
// finally just get the new current client and update `current` for the later
// call to `failed()`
- let (client, current) = state.get();
- start_current = Some((start, current));
- Some(client)
+ let (client, new_current) = state.get();
+ start_current = Some((start, new_current));
+
+ // remember all the clients we skipped:
+ let mut at = current + 1;
+ while at != new_current {
+ unreachable_clients.push(at);
+ at = at.wrapping_add(1);
+ }
+ Some(TryClient::reachable(client))
}
}
})
@@ -647,7 +700,12 @@ macro_rules! try_request {
let mut timed_out = false;
// The iterator in use here will automatically mark a client as faulty if we move on to
// the `next()` one.
- for client in $self.try_clients() {
+ for TryClient {
+ client,
+ hostname,
+ reachable,
+ } in $self.try_clients()
+ {
if let Some(err) = last_err.take() {
log::error!("API client error, trying another remote - {err:?}");
}
@@ -661,7 +719,17 @@ macro_rules! try_request {
Ok(Err(proxmox_client::Error::Client(err))) => {
last_err = Some(err);
}
- Ok(result) => return result,
+ Ok(result) => {
+ if !reachable {
+ log::error!("marking {hostname:?} as reachable again!");
+ if let Ok(mut cache) = crate::remote_cache::RemoteMappingCache::write()
+ {
+ cache.mark_host_reachable(&$self.remote, &hostname, true);
+ let _ = cache.save();
+ }
+ }
+ return result;
+ }
Err(_) => {
timed_out = true;
}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pdm-devel] [PATCH datacenter-manager 7/7] server: add some tracing instrumentation
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
` (5 preceding siblings ...)
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 6/7] server: try previously unreachable clients as last resort Wolfgang Bumiller
@ 2025-02-04 9:55 ` Wolfgang Bumiller
2025-02-11 14:50 ` [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Lukas Wagner
7 siblings, 0 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-04 9:55 UTC (permalink / raw)
To: pdm-devel
For debugging the client usage.
To see messages, set PROXMOX_DEBUG=trace and use, for instance:
# journalctl -f SPAN_NAME=remote_node_caching
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
.../tasks/remote_node_mapping.rs | 14 ++++++++------
server/src/connection.rs | 10 ++++++++++
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs
index 5912365..f678d4c 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_node_mapping.rs
@@ -68,6 +68,7 @@ impl CachingTask {
}
/// A single iteration of the caching task.
+ #[tracing::instrument(skip_all, name = "remote_node_caching")]
async fn run_once(&mut self) {
let (config, digest) = match pdm_config::remotes::config() {
Ok(cd) => cd,
@@ -82,21 +83,19 @@ impl CachingTask {
.as_ref()
.is_none_or(|d| digest != *d)
{
- tracing::debug!("new config - updating remote node name cache");
+ log::trace!("new config - updating remote node name cache");
self.last_config_digest = Some(digest);
// the config got updated - abort the current name-fetching task, we'll
// spawn a new one
if let Some(name_task) = self.current_name_task.take() {
- tracing::debug!("aborting query task");
+ log::trace!("aborting query task");
name_task.abort();
}
if let Err(err) = self.config_update(&config) {
log::error!("error updating remote node cache: {err:?}");
}
- //} else {
- // tracing::debug!("no change to the config");
}
if self
@@ -104,7 +103,7 @@ impl CachingTask {
.as_ref()
.is_none_or(|task| task.is_finished())
{
- log::debug!("name task finished, starting reachability query task");
+ log::trace!("name task finished, starting reachability query task");
self.current_name_task =
Some(spawn_aborted_on_shutdown(Self::query_node_names(config)));
}
@@ -168,8 +167,10 @@ impl CachingTask {
}
}
+ #[tracing::instrument(skip_all)]
async fn query_node_names(config: SectionConfigData<Remote>) {
for (_name, remote) in &config {
+ log::trace!("update remote {:?}", remote.id);
if let Err(err) = Self::query_node_names_for_remote(remote).await {
log::error!("error updating node name cache - {err:?}");
}
@@ -184,7 +185,7 @@ impl CachingTask {
// now add new nodes
for node in &remote.nodes {
- tracing::debug!("querying remote {:?} node {:?}", remote.id, node.hostname);
+ log::debug!("querying remote {:?} node {:?}", remote.id, node.hostname);
// if the host is new, we need to query its name
let query_result = match query_node_name(remote, &node.hostname).await {
@@ -215,6 +216,7 @@ impl CachingTask {
/// Calls `/cluster/status` directly on a specific node to find its name.
async fn query_node_name(remote: &Remote, hostname: &str) -> Result<String, Error> {
+ log::trace!("querying node name {hostname:?} for remote {:?}", remote.id);
let client = server::connection::make_pve_client_with_endpoint(remote, Some(hostname))?;
let node_status_list = client.cluster_status().await?;
for node in node_status_list {
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 7ba38f1..397b62f 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -603,6 +603,7 @@ struct TryClient {
impl TryClient {
fn reachable(entry: &MultiClientEntry) -> Self {
+ log::trace!("trying reachable client for host {:?}", entry.hostname);
Self {
client: Arc::clone(&entry.client),
hostname: entry.hostname.clone(),
@@ -611,6 +612,10 @@ impl TryClient {
}
fn unreachable(entry: &MultiClientEntry) -> Self {
+ log::trace!(
+ "trying previouslsy unreachable client for host {:?}",
+ entry.hostname
+ );
Self {
client: Arc::clone(&entry.client),
hostname: entry.hostname.clone(),
@@ -637,6 +642,8 @@ impl MultiClient {
let mut try_unreachable = None::<std::vec::IntoIter<_>>;
std::iter::from_fn(move || {
+ let _enter = tracing::span!(tracing::Level::TRACE, "multi_client_iterator").entered();
+
let mut state = state.lock().unwrap();
if let Some(ref mut try_unreachable) = try_unreachable {
@@ -650,6 +657,7 @@ impl MultiClient {
// first attempt, just use the current client and remember the starting index
let (client, index) = state.get();
start_current = Some((index, index));
+ log::trace!("trying reachable client {index}");
Some(TryClient::reachable(client))
}
Some((start, current)) => {
@@ -674,9 +682,11 @@ impl MultiClient {
// remember all the clients we skipped:
let mut at = current + 1;
while at != new_current {
+ log::trace!("(remembering unreachable client {at})");
unreachable_clients.push(at);
at = at.wrapping_add(1);
}
+ log::trace!("trying reachable client {new_current}");
Some(TryClient::reachable(client))
}
}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
` (6 preceding siblings ...)
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 7/7] server: add some tracing instrumentation Wolfgang Bumiller
@ 2025-02-11 14:50 ` Lukas Wagner
7 siblings, 0 replies; 11+ messages in thread
From: Lukas Wagner @ 2025-02-11 14:50 UTC (permalink / raw)
To: Proxmox Datacenter Manager development discussion, Wolfgang Bumiller
On 2025-02-04 10:55, Wolfgang Bumiller wrote:
> This consists of two parts (and a question (at the end)):
>
> 1) Patches 1 through 3:
> The `MultiClient` which implements cycling through multiple remotes
> when requests fail due to network issues.
>
> 2) Patches 4 through 6:
> A task caching the remote reachability state as well as mapping
> hostnames to the pve-side node-names. Currently this simply runs every
> 60 seconds and goes through the current remotes+nodes and checks for
> reachability.
> If at that time the remote.cfg changed and the polling
> task is still ongoing it will be aborted and started over with the new
> config.
> Finally, the reachability information will be used and updated by the
> `MultiClient` implementation.
>
> 3) Patch 7 is mainly for debugging. I'm not sure whether we want to include
> tracing instrumentation in general, or via a `#[cfg_attr]` or only for
> debug builds...
I quite like the approach you took in patch 7 and used the same in my metric collection improvement
patch series, so that's a +1 from me.
Not a super deep review, but looked through the code tried to understand it and checked if anything
looked off. Looks good to me.
Reviewed-by: Lukas Wagner <l.wagner@proxmox.com>
--
- Lukas
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper Wolfgang Bumiller
@ 2025-02-11 14:50 ` Lukas Wagner
2025-02-12 9:07 ` Wolfgang Bumiller
0 siblings, 1 reply; 11+ messages in thread
From: Lukas Wagner @ 2025-02-11 14:50 UTC (permalink / raw)
To: Proxmox Datacenter Manager development discussion, Wolfgang Bumiller
On 2025-02-04 10:55, Wolfgang Bumiller wrote:
> +
> +// doing this via a generic method is currently tedious as it requires an extra helper trait to
> +// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from
> +// its input to its generic output future... and then you run into borrow-checker limitations...
> +macro_rules! try_request {
> + ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => {
> + let params = $params.map(serde_json::to_value);
> + Box::pin(async move {
> + let params = params
> + .transpose()
> + .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?;
> +
> + let mut last_err = None;
> + let mut timed_out = false;
> + // The iterator in use here will automatically mark a client as faulty if we move on to
> + // the `next()` one.
> + for client in $self.try_clients() {
> + if let Some(err) = last_err.take() {
> + log::error!("API client error, trying another remote - {err:?}");
> + }
> + if timed_out {
> + timed_out = false;
> + log::error!("API client timed out, trying another remote");
> + }
> +
> + let request = client.$how($method.clone(), $path_and_query, params.as_ref());
> + match tokio::time::timeout($self.timeout, request).await {
> + Ok(Err(proxmox_client::Error::Client(err))) => {
> + last_err = Some(err);
> + }
> + Ok(result) => return result,
> + Err(_) => {
> + timed_out = true;
> + }
> + }
> + }
maybe add another
if let Some(err) = last_err {
log::error!("... {err} ...");
}
so that the actual `err` from the last client you tried is also logged?
Also if the *last* client to try times out I guess we would like to log that as well?
> +
> + Err(proxmox_client::Error::Other(
> + "failed to perform API request",
> + ))
> + })
> + };
> +}
--
- Lukas
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper
2025-02-11 14:50 ` Lukas Wagner
@ 2025-02-12 9:07 ` Wolfgang Bumiller
0 siblings, 0 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2025-02-12 9:07 UTC (permalink / raw)
To: Lukas Wagner; +Cc: Proxmox Datacenter Manager development discussion
On Tue, Feb 11, 2025 at 03:50:24PM +0100, Lukas Wagner wrote:
>
>
> On 2025-02-04 10:55, Wolfgang Bumiller wrote:
> > +
> > +// doing this via a generic method is currently tedious as it requires an extra helper trait to
> > +// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from
> > +// its input to its generic output future... and then you run into borrow-checker limitations...
> > +macro_rules! try_request {
> > + ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => {
> > + let params = $params.map(serde_json::to_value);
> > + Box::pin(async move {
> > + let params = params
> > + .transpose()
> > + .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?;
> > +
> > + let mut last_err = None;
> > + let mut timed_out = false;
> > + // The iterator in use here will automatically mark a client as faulty if we move on to
> > + // the `next()` one.
> > + for client in $self.try_clients() {
> > + if let Some(err) = last_err.take() {
> > + log::error!("API client error, trying another remote - {err:?}");
> > + }
> > + if timed_out {
> > + timed_out = false;
> > + log::error!("API client timed out, trying another remote");
> > + }
> > +
> > + let request = client.$how($method.clone(), $path_and_query, params.as_ref());
> > + match tokio::time::timeout($self.timeout, request).await {
> > + Ok(Err(proxmox_client::Error::Client(err))) => {
> > + last_err = Some(err);
> > + }
> > + Ok(result) => return result,
> > + Err(_) => {
> > + timed_out = true;
> > + }
> > + }
> > + }
>
> maybe add another
>
> if let Some(err) = last_err {
> log::error!("... {err} ...");
> }
>
> so that the actual `err` from the last client you tried is also logged?
> Also if the *last* client to try times out I guess we would like to log that as well?
Good point, will include in a v2.
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
^ permalink raw reply [flat|nested] 11+ messages in thread
end of thread, other threads:[~2025-02-12 9:08 UTC | newest]
Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-02-04 9:55 [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper Wolfgang Bumiller
2025-02-11 14:50 ` Lukas Wagner
2025-02-12 9:07 ` Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 2/7] server: store pve MultiClient for re-use Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 3/7] server: separate ConnectInfo from client creation Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 4/7] server: cache pve node reachability and names Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 5/7] server: don't try to connect to known-unreachable servers Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 6/7] server: try previously unreachable clients as last resort Wolfgang Bumiller
2025-02-04 9:55 ` [pdm-devel] [PATCH datacenter-manager 7/7] server: add some tracing instrumentation Wolfgang Bumiller
2025-02-11 14:50 ` [pdm-devel] [PATCH pdm 0/7] multi-remote client and node reachability cache Lukas Wagner
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox