* [PATCH datacenter-manager 2/4] server: remote cache: prepare for back-off mechanism
2026-05-29 13:30 [PATCH datacenter-manager 0/4] implement back-off mechanism for Dominik Csapak
2026-05-29 13:30 ` [RFC PATCH datacenter-manager 1/4] server: connection: multi client: use correct client error for retrying Dominik Csapak
@ 2026-05-29 13:30 ` Dominik Csapak
2026-05-29 23:40 ` Thomas Lamprecht
2026-05-29 13:30 ` [PATCH datacenter-manager 3/4] server: connection: multi-client: use back-off state from remote cache Dominik Csapak
2026-05-29 13:30 ` [PATCH datacenter-manager 4/4] server: pbs client: rework to use the back-off mechanism " Dominik Csapak
3 siblings, 1 reply; 7+ messages in thread
From: Dominik Csapak @ 2026-05-29 13:30 UTC (permalink / raw)
To: pdm-devel
this introduces a new field for the RemoteMappingCache that contains the
current status of a 'BackOffState'. This is intended to mark remotes as
unreachable when the connection to them fails and only to retry if
enough time elapsed. This is to prevent sending numerous connections out
to a remote that is known to not be reachable.
The back-off timeout is increased exponentially from 10 seconds up to
600 seconds, so at most it takes 10 minutes for a remote to be reachable
again if it was offline for a prolonged period of time.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
Note that this now takes up to 10 minutes for pdm to mark a remote as
reachable again, since it won't retry sooner. We could combat that by
e.g. retrying every 10th connection, even if the back-off timeout has
not run out yet. (probably has to be scaled by the nodes and tasks
we are running?). Another possibility would be to have either a special
API call to force refresh it, but my guess is that most users would
just abuse that button?
I'm very open for other ideas on how to improve this, maybe it's just
a matter of finetuning the back-off scale and maximum to get a well
working system.
server/src/remote_cache/mod.rs | 139 +++++++++++++++++++++++++++++++++
1 file changed, 139 insertions(+)
diff --git a/server/src/remote_cache/mod.rs b/server/src/remote_cache/mod.rs
index 39483e9f..d50a324c 100644
--- a/server/src/remote_cache/mod.rs
+++ b/server/src/remote_cache/mod.rs
@@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize};
use proxmox_product_config::replace_config;
use proxmox_product_config::{ApiLockGuard, open_api_lockfile};
+use proxmox_time::epoch_i64;
use pdm_api_types::remotes::RemoteType;
use pdm_config::ConfigVersionCache;
@@ -224,6 +225,84 @@ impl RemoteMappingCache {
self.info_by_hostname(remote, hostname)
.is_none_or(|info| info.reachable)
}
+
+ /// Mark a remote as reachable.
+ pub fn mark_remote_reachable(&mut self, remote: &str, reachable: RemoteState) {
+ if let Some(remote) = self.remotes.get_mut(remote) {
+ remote.set_reachable(reachable);
+ }
+ }
+
+ /// Returns the remaining backoff time and optionally the last error we got.
+ pub fn remote_remaining_backoff_time(
+ &self,
+ remote: &str,
+ current_time: i64,
+ ) -> (i64, Option<String>) {
+ match self.remotes.get(remote) {
+ Some(remote) => match &remote.backoff {
+ Some(backoff) => (
+ backoff.remaining_backoff_time(current_time),
+ Some(backoff.last_error.clone()),
+ ),
+ None => (0, None),
+ },
+ None => (0, None), // no info about remote, are we allowed to try?
+ }
+ }
+}
+
+/// The initial backoff time for a remote in case it's offline.
+const BACK_OFF_BASE_TIME_S: i64 = 10;
+/// The maximum back-off time between retries.
+const BACK_OFF_MAX_TIME_S: i64 = 600;
+
+/// Holds the current state for backing off an offline remote
+#[derive(Clone, Deserialize, Serialize)]
+struct BackOffState {
+ /// The last time a connection was tried
+ last_try: i64,
+ /// How often the back-off time is doubled since the start, up to a maximum of
+ /// [BACK_OFF_MAX_TIME_S]
+ back_off_doubling_count: u32,
+ /// The last error message we got:
+ last_error: String,
+}
+
+impl BackOffState {
+ /// Creates a new backoff state
+ fn new(time: i64, error: String) -> Self {
+ Self {
+ last_try: time,
+ back_off_doubling_count: 0,
+ last_error: error,
+ }
+ }
+
+ /// Increases the backoff state when the remote is still unreachable.
+ fn increase(&mut self, time: i64, error: String) {
+ if self.remaining_backoff_time(time) < BACK_OFF_MAX_TIME_S {
+ self.back_off_doubling_count += 1;
+ }
+ self.last_try = time;
+ self.last_error = error;
+ }
+
+ /// Returns the remaining time to try again in seconds.
+ /// If negative, we're free to try again.
+ fn remaining_backoff_time(&self, current_time: i64) -> i64 {
+ let back_off_time = (BACK_OFF_BASE_TIME_S * 2i64.pow(self.back_off_doubling_count))
+ .min(BACK_OFF_MAX_TIME_S);
+ self.last_try + back_off_time - current_time
+ }
+}
+
+/// If a remote is reachable or not
+pub enum RemoteState {
+ /// The remote is reachable
+ Reachable,
+ /// The remote is not reachable. Contains the error.
+ Unreachable(String),
}
/// An entry for a remote in a [`RemoteMappingCache`].
@@ -237,6 +316,9 @@ pub struct RemoteMapping {
/// Maps a node name to a hostname, for where we have that info.
pub node_to_host: HashMap<String, String>,
+
+ /// internal backoff state, only controlled via [Self::set_reachable]
+ backoff: Option<BackOffState>,
}
impl RemoteMapping {
@@ -245,6 +327,7 @@ impl RemoteMapping {
ty,
hosts: HashMap::new(),
node_to_host: HashMap::new(),
+ backoff: None,
}
}
@@ -260,6 +343,22 @@ impl RemoteMapping {
}
}
}
+
+ /// Sets the remote to reachable or unreachable
+ pub fn set_reachable(&mut self, reachable: RemoteState) {
+ match reachable {
+ RemoteState::Reachable => {
+ self.backoff = None;
+ }
+ RemoteState::Unreachable(err) => {
+ let time = epoch_i64();
+ match &mut self.backoff {
+ Some(backoff) => backoff.increase(time, err),
+ None => self.backoff = Some(BackOffState::new(time, err)),
+ }
+ }
+ }
+ }
}
/// All the data we keep cached for nodes found in [`RemoteMapping`].
@@ -289,3 +388,43 @@ impl HostInfo {
self.node_name.as_deref()
}
}
+
+#[cfg(test)]
+mod test {
+ use crate::remote_cache::BackOffState;
+
+ #[test]
+ fn test_back_off_calculation() {
+ let mut backoff = BackOffState::new(0, String::new());
+ // timeout should be @ 10 seconds
+
+ assert_eq!(backoff.remaining_backoff_time(1), 9);
+ assert_eq!(backoff.remaining_backoff_time(5), 5);
+ assert_eq!(backoff.remaining_backoff_time(9), 1);
+ assert_eq!(backoff.remaining_backoff_time(10), 0);
+ assert_eq!(backoff.remaining_backoff_time(15), -5);
+
+ backoff.increase(0, String::new());
+ // timeout should be now @ 2*10 seconds
+
+ assert_eq!(backoff.remaining_backoff_time(10), 10);
+ assert_eq!(backoff.remaining_backoff_time(15), 5);
+ assert_eq!(backoff.remaining_backoff_time(20), 0);
+ assert_eq!(backoff.remaining_backoff_time(30), -10);
+
+ backoff.increase(0, String::new());
+ backoff.increase(0, String::new());
+ backoff.increase(0, String::new());
+
+ // timeout should be now at 2^4 * 10 seconds
+ assert_eq!(backoff.remaining_backoff_time(150), 10);
+
+ backoff.increase(0, String::new());
+ backoff.increase(0, String::new());
+ backoff.increase(0, String::new());
+ backoff.increase(0, String::new());
+
+ // timeout should be now at 2^8 * 10 seconds -> maximum of 600
+ assert_eq!(backoff.remaining_backoff_time(590), 10);
+ }
+}
--
2.47.3
^ permalink raw reply related [flat|nested] 7+ messages in thread* [PATCH datacenter-manager 4/4] server: pbs client: rework to use the back-off mechanism from remote cache
2026-05-29 13:30 [PATCH datacenter-manager 0/4] implement back-off mechanism for Dominik Csapak
` (2 preceding siblings ...)
2026-05-29 13:30 ` [PATCH datacenter-manager 3/4] server: connection: multi-client: use back-off state from remote cache Dominik Csapak
@ 2026-05-29 13:30 ` Dominik Csapak
3 siblings, 0 replies; 7+ messages in thread
From: Dominik Csapak @ 2026-05-29 13:30 UTC (permalink / raw)
To: pdm-devel
instead of making the raw proxmox_client public (and implementing
`Deref`), implement `HttpApiClient` for the PbsClient and use that to
route the api calls through.
This makes it possible to use the back-off mechanism in a similar way as
in the PVE multi client. The `try_request` macro is heavily inspired and
borrowed from the multi client as well.
This makes the error behavior for know offline remotes better, the same
as for PVE remotes.
Note that in contrast to PVE, we don't have a tokio timeout in the
`try_request` macro, to have the same behaivor as before.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
server/src/connection.rs | 4 +-
server/src/pbs_client.rs | 167 +++++++++++++++++++++++++++++----------
2 files changed, 129 insertions(+), 42 deletions(-)
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 7122713f..48e554dd 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -360,7 +360,7 @@ impl ClientFactory for DefaultClientFactory {
fn make_pbs_client(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
let client = crate::connection::connect(remote, None)?;
- Ok(Box::new(PbsClient(client)))
+ Ok(Box::new(PbsClient::new(client, remote.id.clone())))
}
fn make_pve_client_with_endpoint(
@@ -393,7 +393,7 @@ impl ClientFactory for DefaultClientFactory {
async fn make_pbs_client_and_login(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
let client = connect_or_login(remote, None).await?;
- Ok(Box::new(PbsClient(client)))
+ Ok(Box::new(PbsClient::new(client, remote.id.clone())))
}
}
diff --git a/server/src/pbs_client.rs b/server/src/pbs_client.rs
index aa3397c5..535cc49a 100644
--- a/server/src/pbs_client.rs
+++ b/server/src/pbs_client.rs
@@ -4,14 +4,20 @@
//! API calls. This is a more organized client than what we get via the `pdm-client` crate within
//! the PBS repo, which is huge and messy...
+use std::{future::Future, pin::Pin};
+
use anyhow::bail; // don't import Error as default error in here
+use http::Method;
use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
-use proxmox_client::{ApiPathBuilder, ApiResponseData, Error, HttpApiClient};
+use proxmox_client::{
+ ApiPathBuilder, ApiResponseData, Error, HttpApiClient, HttpApiResponse, HttpApiResponseStream,
+};
use proxmox_router::stream::JsonRecords;
use proxmox_schema::api;
use proxmox_section_config::typed::SectionConfigData;
+use proxmox_time::epoch_i64;
use pbs_api_types::{Authid, BasicRealmInfo, Tokenname, TokennameRef, Userid};
@@ -49,19 +55,108 @@ pub fn connect_to_remote_by_id(id: &str) -> Result<Box<PbsClient>, anyhow::Error
connect_to_remote(&remotes, id)
}
-pub struct PbsClient(pub proxmox_client::Client);
-
-impl std::ops::Deref for PbsClient {
- type Target = proxmox_client::Client;
+pub struct PbsClient {
+ client: proxmox_client::Client,
+ remote: String,
+}
- fn deref(&self) -> &Self::Target {
- &self.0
+impl PbsClient {
+ /// Create a new [PbsClient] from a raw [proxmox_client::Client]
+ pub fn new(client: proxmox_client::Client, remote: String) -> Self {
+ Self { client, remote }
}
}
-impl std::ops::DerefMut for PbsClient {
- fn deref_mut(&mut self) -> &mut proxmox_client::Client {
- &mut self.0
+// doing this via a generic method is currently tedious as it requires an extra helper trait to
+// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from
+// its input to its generic output future... and then you run into borrow-checker limitations...
+macro_rules! try_request {
+ ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => {
+ let params = $params.map(serde_json::to_value);
+ Box::pin(async move {
+ // first check if the remote is reachable
+ {
+ let cache = crate::remote_cache::RemoteMappingCache::get();
+ let (back_off_time, error) =
+ cache.remote_remaining_backoff_time(&$self.remote, epoch_i64());
+ let error = error.unwrap_or("unknown error".to_string());
+ if back_off_time > 0 {
+ return Err(proxmox_client::Error::Connect(error.into()));
+ }
+ }
+
+ let params = params
+ .transpose()
+ .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?;
+
+ let request = $self
+ .client
+ .$how($method.clone(), $path_and_query, params.as_ref());
+
+ match request.await {
+ Err(proxmox_client::Error::Connect(err)) => {
+ let path = $path_and_query;
+ log::error!("client error on request {path}, giving up - {err:?}");
+ if let Ok(mut cache) = crate::remote_cache::RemoteMappingCache::write() {
+ let error = format!("remote not reachable: {err:?}");
+ cache.mark_remote_reachable(
+ &$self.remote,
+ crate::remote_cache::RemoteState::Unreachable(error),
+ );
+ let _ = cache.save();
+ }
+ Err(proxmox_client::Error::Client(err))
+ }
+ result => {
+ if let Ok(mut cache) = crate::remote_cache::RemoteMappingCache::write() {
+ cache.mark_remote_reachable(
+ &$self.remote,
+ crate::remote_cache::RemoteState::Reachable,
+ );
+ let _ = cache.save();
+ }
+ result
+ }
+ }
+ })
+ };
+}
+
+impl HttpApiClient for PbsClient {
+ type ResponseFuture<'a> =
+ Pin<Box<dyn Future<Output = Result<HttpApiResponse, proxmox_client::Error>> + Send + 'a>>;
+
+ type ResponseStreamFuture<'a> = Pin<
+ Box<
+ dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, proxmox_client::Error>>
+ + Send
+ + 'a,
+ >,
+ >;
+ type Body = proxmox_http::Body;
+
+ fn request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, request }
+ }
+
+ fn streaming_request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseStreamFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, streaming_request }
}
}
@@ -162,19 +257,19 @@ pub struct TaskLogLine {
impl PbsClient {
/// API version details, including some parts of the global datacenter config.
pub async fn version(&self) -> Result<pve_api_types::VersionResponse, Error> {
- Ok(self.0.get("/api2/extjs/version").await?.expect_json()?.data)
+ Ok(self.get("/api2/extjs/version").await?.expect_json()?.data)
}
/// List available authentication realms (domains).
pub async fn list_domains(&self) -> Result<Vec<BasicRealmInfo>, Error> {
let url = "/api2/extjs/access/domains";
- Ok(self.0.get(url).await?.expect_json()?.data)
+ Ok(self.get(url).await?.expect_json()?.data)
}
/// List the datastores.
pub async fn list_datastores(&self) -> Result<Vec<pbs_api_types::DataStoreConfig>, Error> {
let path = "/api2/extjs/config/datastore";
- Ok(self.0.get(path).await?.expect_json()?.data)
+ Ok(self.get(path).await?.expect_json()?.data)
}
/// List the namespaces of a datastores.
@@ -188,7 +283,7 @@ impl PbsClient {
.maybe_arg("parent", ¶m.parent)
.maybe_arg("max-depth", ¶m.max_depth)
.build();
- Ok(self.0.get(&path).await?.expect_json()?.data)
+ Ok(self.get(&path).await?.expect_json()?.data)
}
/// List a datastore's snapshots.
@@ -201,7 +296,6 @@ impl PbsClient {
.maybe_arg("ns", &namespace)
.build();
let response = self
- .0
.streaming_request(http::Method::GET, &path, None::<()>)
.await?;
@@ -251,7 +345,7 @@ impl PbsClient {
"/api2/extjs/access/users/{userid}/token/{}",
tokenid.as_str()
);
- let token = self.0.post(&path, ¶ms).await?.expect_json()?.data;
+ let token = self.post(&path, ¶ms).await?.expect_json()?.data;
// NOTE: While PVE has configurable privilege separation between user and tokens, PBS
// avoided that to make tokens safer by default, so we need to give out an ACL explicitly.
@@ -266,7 +360,7 @@ impl PbsClient {
propagate: true,
};
- self.0.put("/api2/extjs/access/acl", &acl).await?;
+ self.put("/api2/extjs/access/acl", &acl).await?;
Ok(token)
}
@@ -281,26 +375,26 @@ impl PbsClient {
),
tokenid.as_str()
);
- self.0.delete(&path).await?.nodata()?;
+ self.delete(&path).await?.nodata()?;
Ok(())
}
/// Return the status the Proxmox Backup Server instance
pub async fn node_status(&self) -> Result<pbs_api_types::NodeStatus, Error> {
let path = "/api2/extjs/nodes/localhost/status";
- Ok(self.0.get(path).await?.expect_json()?.data)
+ Ok(self.get(path).await?.expect_json()?.data)
}
/// Return a term ticket for calling the vncwebsocket endpoint
pub async fn node_shell_termproxy(&self) -> Result<pbs_api_types::NodeShellTicket, Error> {
let path = "/api2/extjs/nodes/localhost/termproxy";
- Ok(self.0.post_without_body(path).await?.expect_json()?.data)
+ Ok(self.post_without_body(path).await?.expect_json()?.data)
}
/// Return the node config of the Proxmox Backup Server instance
pub async fn node_config(&self) -> Result<pbs_api_types::NodeConfig, Error> {
let path = "/api2/extjs/nodes/localhost/config";
- Ok(self.0.get(path).await?.expect_json()?.data)
+ Ok(self.get(path).await?.expect_json()?.data)
}
/// Return the datastore status
@@ -309,7 +403,7 @@ impl PbsClient {
datastore: &str,
) -> Result<pbs_api_types::DataStoreStatus, Error> {
let path = format!("/api2/extjs/admin/datastore/{datastore}/status");
- Ok(self.0.get(&path).await?.expect_json()?.data)
+ Ok(self.get(&path).await?.expect_json()?.data)
}
/// Return datastore usages and estimates
@@ -317,7 +411,7 @@ impl PbsClient {
&self,
) -> Result<Vec<pbs_api_types::DataStoreStatusListItem>, Error> {
let path = "/api2/extjs/status/datastore-usage";
- Ok(self.0.get(path).await?.expect_json()?.data)
+ Ok(self.get(path).await?.expect_json()?.data)
}
/// Return backup server metrics.
@@ -331,13 +425,12 @@ impl PbsClient {
.maybe_arg("start-time", &start_time)
.build();
- Ok(self.0.get(&path).await?.expect_json()?.data)
+ Ok(self.get(&path).await?.expect_json()?.data)
}
/// Return PBS subscription info.
pub async fn get_subscription(&self) -> Result<proxmox_subscription::SubscriptionInfo, Error> {
Ok(self
- .0
.get("/api2/extjs/nodes/localhost/subscription")
.await?
.expect_json()?
@@ -349,16 +442,14 @@ impl PbsClient {
&self,
params: proxmox_subscription::SetSubscription,
) -> Result<(), Error> {
- self.0
- .put("/api2/extjs/nodes/localhost/subscription", ¶ms)
+ self.put("/api2/extjs/nodes/localhost/subscription", ¶ms)
.await?;
Ok(())
}
/// Tear down the subscription on the PBS node.
pub async fn delete_subscription(&self) -> Result<(), Error> {
- self.0
- .delete("/api2/extjs/nodes/localhost/subscription")
+ self.delete("/api2/extjs/nodes/localhost/subscription")
.await?;
Ok(())
}
@@ -369,8 +460,7 @@ impl PbsClient {
&self,
params: proxmox_subscription::UpdateSubscription,
) -> Result<(), Error> {
- self.0
- .post("/api2/extjs/nodes/localhost/subscription", ¶ms)
+ self.post("/api2/extjs/nodes/localhost/subscription", ¶ms)
.await?;
Ok(())
}
@@ -378,7 +468,6 @@ impl PbsClient {
/// Return a list of available system updates.
pub async fn list_available_updates(&self) -> Result<Vec<pbs_api_types::APTUpdateInfo>, Error> {
Ok(self
- .0
.get("/api2/extjs/nodes/localhost/apt/update")
.await?
.expect_json()?
@@ -391,7 +480,6 @@ impl PbsClient {
params: AptUpdateParams,
) -> Result<pbs_api_types::UPID, Error> {
Ok(self
- .0
.post("/api2/extjs/nodes/localhost/apt/update", ¶ms)
.await?
.expect_json()?
@@ -412,13 +500,12 @@ impl PbsClient {
.maybe_arg("version", &version)
.build();
- Ok(self.0.get(&path).await?.expect_json()?.data)
+ Ok(self.get(&path).await?.expect_json()?.data)
}
/// Return a list of the most important package versions.
pub async fn get_package_versions(&self) -> Result<Vec<pbs_api_types::APTUpdateInfo>, Error> {
Ok(self
- .0
.get("/api2/extjs/nodes/localhost/apt/versions")
.await?
.expect_json()?
@@ -430,7 +517,7 @@ impl PbsClient {
&self,
) -> Result<pbs_api_types::APTRepositoriesResult, Error> {
let url = "/api2/extjs/nodes/localhost/apt/repositories";
- Ok(self.0.get(url).await?.expect_json()?.data)
+ Ok(self.get(url).await?.expect_json()?.data)
}
/// Get list of tasks.
@@ -447,7 +534,7 @@ impl PbsClient {
.maybe_arg("since", &since)
.build();
- Ok(self.0.get(&url).await?.expect_json()?.data)
+ Ok(self.get(&url).await?.expect_json()?.data)
}
/// Read task log.
@@ -464,7 +551,7 @@ impl PbsClient {
.maybe_arg("start", &start)
.build();
- self.0.get(&url).await?.expect_json()
+ self.get(&url).await?.expect_json()
}
/// Read task status.
@@ -473,14 +560,14 @@ impl PbsClient {
upid: &str,
) -> Result<pdm_api_types::pbs::TaskStatus, Error> {
let url = format!("/api2/extjs/nodes/localhost/tasks/{upid}/status");
- let response = self.0.get(&url).await?;
+ let response = self.get(&url).await?;
Ok(response.expect_json()?.data)
}
/// Stop a task.
pub async fn stop_task(&self, upid: &str) -> Result<(), Error> {
let url = format!("/api2/extjs/nodes/localhost/tasks/{upid}");
- self.0.delete(&url).await?.nodata()
+ self.delete(&url).await?.nodata()
}
}
--
2.47.3
^ permalink raw reply related [flat|nested] 7+ messages in thread