From: Dominik Csapak <d.csapak@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager 4/4] server: pbs client: rework to use the back-off mechanism from remote cache
Date: Fri, 29 May 2026 15:30:20 +0200 [thread overview]
Message-ID: <20260529133026.3149896-5-d.csapak@proxmox.com> (raw)
In-Reply-To: <20260529133026.3149896-1-d.csapak@proxmox.com>
instead of making the raw proxmox_client public (and implementing
`Deref`), implement `HttpApiClient` for the PbsClient and use that to
route the api calls through.
This makes it possible to use the back-off mechanism in a similar way as
in the PVE multi client. The `try_request` macro is heavily inspired and
borrowed from the multi client as well.
This makes the error behavior for know offline remotes better, the same
as for PVE remotes.
Note that in contrast to PVE, we don't have a tokio timeout in the
`try_request` macro, to have the same behaivor as before.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
server/src/connection.rs | 4 +-
server/src/pbs_client.rs | 167 +++++++++++++++++++++++++++++----------
2 files changed, 129 insertions(+), 42 deletions(-)
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 7122713f..48e554dd 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -360,7 +360,7 @@ impl ClientFactory for DefaultClientFactory {
fn make_pbs_client(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
let client = crate::connection::connect(remote, None)?;
- Ok(Box::new(PbsClient(client)))
+ Ok(Box::new(PbsClient::new(client, remote.id.clone())))
}
fn make_pve_client_with_endpoint(
@@ -393,7 +393,7 @@ impl ClientFactory for DefaultClientFactory {
async fn make_pbs_client_and_login(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
let client = connect_or_login(remote, None).await?;
- Ok(Box::new(PbsClient(client)))
+ Ok(Box::new(PbsClient::new(client, remote.id.clone())))
}
}
diff --git a/server/src/pbs_client.rs b/server/src/pbs_client.rs
index aa3397c5..535cc49a 100644
--- a/server/src/pbs_client.rs
+++ b/server/src/pbs_client.rs
@@ -4,14 +4,20 @@
//! API calls. This is a more organized client than what we get via the `pdm-client` crate within
//! the PBS repo, which is huge and messy...
+use std::{future::Future, pin::Pin};
+
use anyhow::bail; // don't import Error as default error in here
+use http::Method;
use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
-use proxmox_client::{ApiPathBuilder, ApiResponseData, Error, HttpApiClient};
+use proxmox_client::{
+ ApiPathBuilder, ApiResponseData, Error, HttpApiClient, HttpApiResponse, HttpApiResponseStream,
+};
use proxmox_router::stream::JsonRecords;
use proxmox_schema::api;
use proxmox_section_config::typed::SectionConfigData;
+use proxmox_time::epoch_i64;
use pbs_api_types::{Authid, BasicRealmInfo, Tokenname, TokennameRef, Userid};
@@ -49,19 +55,108 @@ pub fn connect_to_remote_by_id(id: &str) -> Result<Box<PbsClient>, anyhow::Error
connect_to_remote(&remotes, id)
}
-pub struct PbsClient(pub proxmox_client::Client);
-
-impl std::ops::Deref for PbsClient {
- type Target = proxmox_client::Client;
+pub struct PbsClient {
+ client: proxmox_client::Client,
+ remote: String,
+}
- fn deref(&self) -> &Self::Target {
- &self.0
+impl PbsClient {
+ /// Create a new [PbsClient] from a raw [proxmox_client::Client]
+ pub fn new(client: proxmox_client::Client, remote: String) -> Self {
+ Self { client, remote }
}
}
-impl std::ops::DerefMut for PbsClient {
- fn deref_mut(&mut self) -> &mut proxmox_client::Client {
- &mut self.0
+// doing this via a generic method is currently tedious as it requires an extra helper trait to
+// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from
+// its input to its generic output future... and then you run into borrow-checker limitations...
+macro_rules! try_request {
+ ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => {
+ let params = $params.map(serde_json::to_value);
+ Box::pin(async move {
+ // first check if the remote is reachable
+ {
+ let cache = crate::remote_cache::RemoteMappingCache::get();
+ let (back_off_time, error) =
+ cache.remote_remaining_backoff_time(&$self.remote, epoch_i64());
+ let error = error.unwrap_or("unknown error".to_string());
+ if back_off_time > 0 {
+ return Err(proxmox_client::Error::Connect(error.into()));
+ }
+ }
+
+ let params = params
+ .transpose()
+ .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?;
+
+ let request = $self
+ .client
+ .$how($method.clone(), $path_and_query, params.as_ref());
+
+ match request.await {
+ Err(proxmox_client::Error::Connect(err)) => {
+ let path = $path_and_query;
+ log::error!("client error on request {path}, giving up - {err:?}");
+ if let Ok(mut cache) = crate::remote_cache::RemoteMappingCache::write() {
+ let error = format!("remote not reachable: {err:?}");
+ cache.mark_remote_reachable(
+ &$self.remote,
+ crate::remote_cache::RemoteState::Unreachable(error),
+ );
+ let _ = cache.save();
+ }
+ Err(proxmox_client::Error::Client(err))
+ }
+ result => {
+ if let Ok(mut cache) = crate::remote_cache::RemoteMappingCache::write() {
+ cache.mark_remote_reachable(
+ &$self.remote,
+ crate::remote_cache::RemoteState::Reachable,
+ );
+ let _ = cache.save();
+ }
+ result
+ }
+ }
+ })
+ };
+}
+
+impl HttpApiClient for PbsClient {
+ type ResponseFuture<'a> =
+ Pin<Box<dyn Future<Output = Result<HttpApiResponse, proxmox_client::Error>> + Send + 'a>>;
+
+ type ResponseStreamFuture<'a> = Pin<
+ Box<
+ dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, proxmox_client::Error>>
+ + Send
+ + 'a,
+ >,
+ >;
+ type Body = proxmox_http::Body;
+
+ fn request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, request }
+ }
+
+ fn streaming_request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseStreamFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, streaming_request }
}
}
@@ -162,19 +257,19 @@ pub struct TaskLogLine {
impl PbsClient {
/// API version details, including some parts of the global datacenter config.
pub async fn version(&self) -> Result<pve_api_types::VersionResponse, Error> {
- Ok(self.0.get("/api2/extjs/version").await?.expect_json()?.data)
+ Ok(self.get("/api2/extjs/version").await?.expect_json()?.data)
}
/// List available authentication realms (domains).
pub async fn list_domains(&self) -> Result<Vec<BasicRealmInfo>, Error> {
let url = "/api2/extjs/access/domains";
- Ok(self.0.get(url).await?.expect_json()?.data)
+ Ok(self.get(url).await?.expect_json()?.data)
}
/// List the datastores.
pub async fn list_datastores(&self) -> Result<Vec<pbs_api_types::DataStoreConfig>, Error> {
let path = "/api2/extjs/config/datastore";
- Ok(self.0.get(path).await?.expect_json()?.data)
+ Ok(self.get(path).await?.expect_json()?.data)
}
/// List the namespaces of a datastores.
@@ -188,7 +283,7 @@ impl PbsClient {
.maybe_arg("parent", ¶m.parent)
.maybe_arg("max-depth", ¶m.max_depth)
.build();
- Ok(self.0.get(&path).await?.expect_json()?.data)
+ Ok(self.get(&path).await?.expect_json()?.data)
}
/// List a datastore's snapshots.
@@ -201,7 +296,6 @@ impl PbsClient {
.maybe_arg("ns", &namespace)
.build();
let response = self
- .0
.streaming_request(http::Method::GET, &path, None::<()>)
.await?;
@@ -251,7 +345,7 @@ impl PbsClient {
"/api2/extjs/access/users/{userid}/token/{}",
tokenid.as_str()
);
- let token = self.0.post(&path, ¶ms).await?.expect_json()?.data;
+ let token = self.post(&path, ¶ms).await?.expect_json()?.data;
// NOTE: While PVE has configurable privilege separation between user and tokens, PBS
// avoided that to make tokens safer by default, so we need to give out an ACL explicitly.
@@ -266,7 +360,7 @@ impl PbsClient {
propagate: true,
};
- self.0.put("/api2/extjs/access/acl", &acl).await?;
+ self.put("/api2/extjs/access/acl", &acl).await?;
Ok(token)
}
@@ -281,26 +375,26 @@ impl PbsClient {
),
tokenid.as_str()
);
- self.0.delete(&path).await?.nodata()?;
+ self.delete(&path).await?.nodata()?;
Ok(())
}
/// Return the status the Proxmox Backup Server instance
pub async fn node_status(&self) -> Result<pbs_api_types::NodeStatus, Error> {
let path = "/api2/extjs/nodes/localhost/status";
- Ok(self.0.get(path).await?.expect_json()?.data)
+ Ok(self.get(path).await?.expect_json()?.data)
}
/// Return a term ticket for calling the vncwebsocket endpoint
pub async fn node_shell_termproxy(&self) -> Result<pbs_api_types::NodeShellTicket, Error> {
let path = "/api2/extjs/nodes/localhost/termproxy";
- Ok(self.0.post_without_body(path).await?.expect_json()?.data)
+ Ok(self.post_without_body(path).await?.expect_json()?.data)
}
/// Return the node config of the Proxmox Backup Server instance
pub async fn node_config(&self) -> Result<pbs_api_types::NodeConfig, Error> {
let path = "/api2/extjs/nodes/localhost/config";
- Ok(self.0.get(path).await?.expect_json()?.data)
+ Ok(self.get(path).await?.expect_json()?.data)
}
/// Return the datastore status
@@ -309,7 +403,7 @@ impl PbsClient {
datastore: &str,
) -> Result<pbs_api_types::DataStoreStatus, Error> {
let path = format!("/api2/extjs/admin/datastore/{datastore}/status");
- Ok(self.0.get(&path).await?.expect_json()?.data)
+ Ok(self.get(&path).await?.expect_json()?.data)
}
/// Return datastore usages and estimates
@@ -317,7 +411,7 @@ impl PbsClient {
&self,
) -> Result<Vec<pbs_api_types::DataStoreStatusListItem>, Error> {
let path = "/api2/extjs/status/datastore-usage";
- Ok(self.0.get(path).await?.expect_json()?.data)
+ Ok(self.get(path).await?.expect_json()?.data)
}
/// Return backup server metrics.
@@ -331,13 +425,12 @@ impl PbsClient {
.maybe_arg("start-time", &start_time)
.build();
- Ok(self.0.get(&path).await?.expect_json()?.data)
+ Ok(self.get(&path).await?.expect_json()?.data)
}
/// Return PBS subscription info.
pub async fn get_subscription(&self) -> Result<proxmox_subscription::SubscriptionInfo, Error> {
Ok(self
- .0
.get("/api2/extjs/nodes/localhost/subscription")
.await?
.expect_json()?
@@ -349,16 +442,14 @@ impl PbsClient {
&self,
params: proxmox_subscription::SetSubscription,
) -> Result<(), Error> {
- self.0
- .put("/api2/extjs/nodes/localhost/subscription", ¶ms)
+ self.put("/api2/extjs/nodes/localhost/subscription", ¶ms)
.await?;
Ok(())
}
/// Tear down the subscription on the PBS node.
pub async fn delete_subscription(&self) -> Result<(), Error> {
- self.0
- .delete("/api2/extjs/nodes/localhost/subscription")
+ self.delete("/api2/extjs/nodes/localhost/subscription")
.await?;
Ok(())
}
@@ -369,8 +460,7 @@ impl PbsClient {
&self,
params: proxmox_subscription::UpdateSubscription,
) -> Result<(), Error> {
- self.0
- .post("/api2/extjs/nodes/localhost/subscription", ¶ms)
+ self.post("/api2/extjs/nodes/localhost/subscription", ¶ms)
.await?;
Ok(())
}
@@ -378,7 +468,6 @@ impl PbsClient {
/// Return a list of available system updates.
pub async fn list_available_updates(&self) -> Result<Vec<pbs_api_types::APTUpdateInfo>, Error> {
Ok(self
- .0
.get("/api2/extjs/nodes/localhost/apt/update")
.await?
.expect_json()?
@@ -391,7 +480,6 @@ impl PbsClient {
params: AptUpdateParams,
) -> Result<pbs_api_types::UPID, Error> {
Ok(self
- .0
.post("/api2/extjs/nodes/localhost/apt/update", ¶ms)
.await?
.expect_json()?
@@ -412,13 +500,12 @@ impl PbsClient {
.maybe_arg("version", &version)
.build();
- Ok(self.0.get(&path).await?.expect_json()?.data)
+ Ok(self.get(&path).await?.expect_json()?.data)
}
/// Return a list of the most important package versions.
pub async fn get_package_versions(&self) -> Result<Vec<pbs_api_types::APTUpdateInfo>, Error> {
Ok(self
- .0
.get("/api2/extjs/nodes/localhost/apt/versions")
.await?
.expect_json()?
@@ -430,7 +517,7 @@ impl PbsClient {
&self,
) -> Result<pbs_api_types::APTRepositoriesResult, Error> {
let url = "/api2/extjs/nodes/localhost/apt/repositories";
- Ok(self.0.get(url).await?.expect_json()?.data)
+ Ok(self.get(url).await?.expect_json()?.data)
}
/// Get list of tasks.
@@ -447,7 +534,7 @@ impl PbsClient {
.maybe_arg("since", &since)
.build();
- Ok(self.0.get(&url).await?.expect_json()?.data)
+ Ok(self.get(&url).await?.expect_json()?.data)
}
/// Read task log.
@@ -464,7 +551,7 @@ impl PbsClient {
.maybe_arg("start", &start)
.build();
- self.0.get(&url).await?.expect_json()
+ self.get(&url).await?.expect_json()
}
/// Read task status.
@@ -473,14 +560,14 @@ impl PbsClient {
upid: &str,
) -> Result<pdm_api_types::pbs::TaskStatus, Error> {
let url = format!("/api2/extjs/nodes/localhost/tasks/{upid}/status");
- let response = self.0.get(&url).await?;
+ let response = self.get(&url).await?;
Ok(response.expect_json()?.data)
}
/// Stop a task.
pub async fn stop_task(&self, upid: &str) -> Result<(), Error> {
let url = format!("/api2/extjs/nodes/localhost/tasks/{upid}");
- self.0.delete(&url).await?.nodata()
+ self.delete(&url).await?.nodata()
}
}
--
2.47.3
prev parent reply other threads:[~2026-05-29 13:31 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-29 13:30 [PATCH datacenter-manager 0/4] implement back-off mechanism for Dominik Csapak
2026-05-29 13:30 ` [RFC PATCH datacenter-manager 1/4] server: connection: multi client: use correct client error for retrying Dominik Csapak
2026-05-29 23:25 ` Thomas Lamprecht
2026-05-29 13:30 ` [PATCH datacenter-manager 2/4] server: remote cache: prepare for back-off mechanism Dominik Csapak
2026-05-29 23:40 ` Thomas Lamprecht
2026-05-29 13:30 ` [PATCH datacenter-manager 3/4] server: connection: multi-client: use back-off state from remote cache Dominik Csapak
2026-05-29 13:30 ` Dominik Csapak [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260529133026.3149896-5-d.csapak@proxmox.com \
--to=d.csapak@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.