From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 931D21FF14C for ; Fri, 29 May 2026 15:31:03 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 2E24ABE9A; Fri, 29 May 2026 15:31:03 +0200 (CEST) From: Dominik Csapak To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 4/4] server: pbs client: rework to use the back-off mechanism from remote cache Date: Fri, 29 May 2026 15:30:20 +0200 Message-ID: <20260529133026.3149896-5-d.csapak@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260529133026.3149896-1-d.csapak@proxmox.com> References: <20260529133026.3149896-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.049 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: AIW2VV7WPQ7UMAGRYDEK45D722OUK3MW X-Message-ID-Hash: AIW2VV7WPQ7UMAGRYDEK45D722OUK3MW X-MailFrom: d.csapak@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: instead of making the raw proxmox_client public (and implementing `Deref`), implement `HttpApiClient` for the PbsClient and use that to route the api calls through. This makes it possible to use the back-off mechanism in a similar way as in the PVE multi client. The `try_request` macro is heavily inspired and borrowed from the multi client as well. This makes the error behavior for know offline remotes better, the same as for PVE remotes. Note that in contrast to PVE, we don't have a tokio timeout in the `try_request` macro, to have the same behaivor as before. Signed-off-by: Dominik Csapak --- server/src/connection.rs | 4 +- server/src/pbs_client.rs | 167 +++++++++++++++++++++++++++++---------- 2 files changed, 129 insertions(+), 42 deletions(-) diff --git a/server/src/connection.rs b/server/src/connection.rs index 7122713f..48e554dd 100644 --- a/server/src/connection.rs +++ b/server/src/connection.rs @@ -360,7 +360,7 @@ impl ClientFactory for DefaultClientFactory { fn make_pbs_client(&self, remote: &Remote) -> Result, Error> { let client = crate::connection::connect(remote, None)?; - Ok(Box::new(PbsClient(client))) + Ok(Box::new(PbsClient::new(client, remote.id.clone()))) } fn make_pve_client_with_endpoint( @@ -393,7 +393,7 @@ impl ClientFactory for DefaultClientFactory { async fn make_pbs_client_and_login(&self, remote: &Remote) -> Result, Error> { let client = connect_or_login(remote, None).await?; - Ok(Box::new(PbsClient(client))) + Ok(Box::new(PbsClient::new(client, remote.id.clone()))) } } diff --git a/server/src/pbs_client.rs b/server/src/pbs_client.rs index aa3397c5..535cc49a 100644 --- a/server/src/pbs_client.rs +++ b/server/src/pbs_client.rs @@ -4,14 +4,20 @@ //! API calls. This is a more organized client than what we get via the `pdm-client` crate within //! the PBS repo, which is huge and messy... +use std::{future::Future, pin::Pin}; + use anyhow::bail; // don't import Error as default error in here +use http::Method; use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; -use proxmox_client::{ApiPathBuilder, ApiResponseData, Error, HttpApiClient}; +use proxmox_client::{ + ApiPathBuilder, ApiResponseData, Error, HttpApiClient, HttpApiResponse, HttpApiResponseStream, +}; use proxmox_router::stream::JsonRecords; use proxmox_schema::api; use proxmox_section_config::typed::SectionConfigData; +use proxmox_time::epoch_i64; use pbs_api_types::{Authid, BasicRealmInfo, Tokenname, TokennameRef, Userid}; @@ -49,19 +55,108 @@ pub fn connect_to_remote_by_id(id: &str) -> Result, anyhow::Error connect_to_remote(&remotes, id) } -pub struct PbsClient(pub proxmox_client::Client); - -impl std::ops::Deref for PbsClient { - type Target = proxmox_client::Client; +pub struct PbsClient { + client: proxmox_client::Client, + remote: String, +} - fn deref(&self) -> &Self::Target { - &self.0 +impl PbsClient { + /// Create a new [PbsClient] from a raw [proxmox_client::Client] + pub fn new(client: proxmox_client::Client, remote: String) -> Self { + Self { client, remote } } } -impl std::ops::DerefMut for PbsClient { - fn deref_mut(&mut self) -> &mut proxmox_client::Client { - &mut self.0 +// doing this via a generic method is currently tedious as it requires an extra helper trait to +// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from +// its input to its generic output future... and then you run into borrow-checker limitations... +macro_rules! try_request { + ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => { + let params = $params.map(serde_json::to_value); + Box::pin(async move { + // first check if the remote is reachable + { + let cache = crate::remote_cache::RemoteMappingCache::get(); + let (back_off_time, error) = + cache.remote_remaining_backoff_time(&$self.remote, epoch_i64()); + let error = error.unwrap_or("unknown error".to_string()); + if back_off_time > 0 { + return Err(proxmox_client::Error::Connect(error.into())); + } + } + + let params = params + .transpose() + .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?; + + let request = $self + .client + .$how($method.clone(), $path_and_query, params.as_ref()); + + match request.await { + Err(proxmox_client::Error::Connect(err)) => { + let path = $path_and_query; + log::error!("client error on request {path}, giving up - {err:?}"); + if let Ok(mut cache) = crate::remote_cache::RemoteMappingCache::write() { + let error = format!("remote not reachable: {err:?}"); + cache.mark_remote_reachable( + &$self.remote, + crate::remote_cache::RemoteState::Unreachable(error), + ); + let _ = cache.save(); + } + Err(proxmox_client::Error::Client(err)) + } + result => { + if let Ok(mut cache) = crate::remote_cache::RemoteMappingCache::write() { + cache.mark_remote_reachable( + &$self.remote, + crate::remote_cache::RemoteState::Reachable, + ); + let _ = cache.save(); + } + result + } + } + }) + }; +} + +impl HttpApiClient for PbsClient { + type ResponseFuture<'a> = + Pin> + Send + 'a>>; + + type ResponseStreamFuture<'a> = Pin< + Box< + dyn Future, proxmox_client::Error>> + + Send + + 'a, + >, + >; + type Body = proxmox_http::Body; + + fn request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseFuture<'a> + where + T: Serialize + 'a, + { + try_request! { self, method, path_and_query, params, request } + } + + fn streaming_request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseStreamFuture<'a> + where + T: Serialize + 'a, + { + try_request! { self, method, path_and_query, params, streaming_request } } } @@ -162,19 +257,19 @@ pub struct TaskLogLine { impl PbsClient { /// API version details, including some parts of the global datacenter config. pub async fn version(&self) -> Result { - Ok(self.0.get("/api2/extjs/version").await?.expect_json()?.data) + Ok(self.get("/api2/extjs/version").await?.expect_json()?.data) } /// List available authentication realms (domains). pub async fn list_domains(&self) -> Result, Error> { let url = "/api2/extjs/access/domains"; - Ok(self.0.get(url).await?.expect_json()?.data) + Ok(self.get(url).await?.expect_json()?.data) } /// List the datastores. pub async fn list_datastores(&self) -> Result, Error> { let path = "/api2/extjs/config/datastore"; - Ok(self.0.get(path).await?.expect_json()?.data) + Ok(self.get(path).await?.expect_json()?.data) } /// List the namespaces of a datastores. @@ -188,7 +283,7 @@ impl PbsClient { .maybe_arg("parent", ¶m.parent) .maybe_arg("max-depth", ¶m.max_depth) .build(); - Ok(self.0.get(&path).await?.expect_json()?.data) + Ok(self.get(&path).await?.expect_json()?.data) } /// List a datastore's snapshots. @@ -201,7 +296,6 @@ impl PbsClient { .maybe_arg("ns", &namespace) .build(); let response = self - .0 .streaming_request(http::Method::GET, &path, None::<()>) .await?; @@ -251,7 +345,7 @@ impl PbsClient { "/api2/extjs/access/users/{userid}/token/{}", tokenid.as_str() ); - let token = self.0.post(&path, ¶ms).await?.expect_json()?.data; + let token = self.post(&path, ¶ms).await?.expect_json()?.data; // NOTE: While PVE has configurable privilege separation between user and tokens, PBS // avoided that to make tokens safer by default, so we need to give out an ACL explicitly. @@ -266,7 +360,7 @@ impl PbsClient { propagate: true, }; - self.0.put("/api2/extjs/access/acl", &acl).await?; + self.put("/api2/extjs/access/acl", &acl).await?; Ok(token) } @@ -281,26 +375,26 @@ impl PbsClient { ), tokenid.as_str() ); - self.0.delete(&path).await?.nodata()?; + self.delete(&path).await?.nodata()?; Ok(()) } /// Return the status the Proxmox Backup Server instance pub async fn node_status(&self) -> Result { let path = "/api2/extjs/nodes/localhost/status"; - Ok(self.0.get(path).await?.expect_json()?.data) + Ok(self.get(path).await?.expect_json()?.data) } /// Return a term ticket for calling the vncwebsocket endpoint pub async fn node_shell_termproxy(&self) -> Result { let path = "/api2/extjs/nodes/localhost/termproxy"; - Ok(self.0.post_without_body(path).await?.expect_json()?.data) + Ok(self.post_without_body(path).await?.expect_json()?.data) } /// Return the node config of the Proxmox Backup Server instance pub async fn node_config(&self) -> Result { let path = "/api2/extjs/nodes/localhost/config"; - Ok(self.0.get(path).await?.expect_json()?.data) + Ok(self.get(path).await?.expect_json()?.data) } /// Return the datastore status @@ -309,7 +403,7 @@ impl PbsClient { datastore: &str, ) -> Result { let path = format!("/api2/extjs/admin/datastore/{datastore}/status"); - Ok(self.0.get(&path).await?.expect_json()?.data) + Ok(self.get(&path).await?.expect_json()?.data) } /// Return datastore usages and estimates @@ -317,7 +411,7 @@ impl PbsClient { &self, ) -> Result, Error> { let path = "/api2/extjs/status/datastore-usage"; - Ok(self.0.get(path).await?.expect_json()?.data) + Ok(self.get(path).await?.expect_json()?.data) } /// Return backup server metrics. @@ -331,13 +425,12 @@ impl PbsClient { .maybe_arg("start-time", &start_time) .build(); - Ok(self.0.get(&path).await?.expect_json()?.data) + Ok(self.get(&path).await?.expect_json()?.data) } /// Return PBS subscription info. pub async fn get_subscription(&self) -> Result { Ok(self - .0 .get("/api2/extjs/nodes/localhost/subscription") .await? .expect_json()? @@ -349,16 +442,14 @@ impl PbsClient { &self, params: proxmox_subscription::SetSubscription, ) -> Result<(), Error> { - self.0 - .put("/api2/extjs/nodes/localhost/subscription", ¶ms) + self.put("/api2/extjs/nodes/localhost/subscription", ¶ms) .await?; Ok(()) } /// Tear down the subscription on the PBS node. pub async fn delete_subscription(&self) -> Result<(), Error> { - self.0 - .delete("/api2/extjs/nodes/localhost/subscription") + self.delete("/api2/extjs/nodes/localhost/subscription") .await?; Ok(()) } @@ -369,8 +460,7 @@ impl PbsClient { &self, params: proxmox_subscription::UpdateSubscription, ) -> Result<(), Error> { - self.0 - .post("/api2/extjs/nodes/localhost/subscription", ¶ms) + self.post("/api2/extjs/nodes/localhost/subscription", ¶ms) .await?; Ok(()) } @@ -378,7 +468,6 @@ impl PbsClient { /// Return a list of available system updates. pub async fn list_available_updates(&self) -> Result, Error> { Ok(self - .0 .get("/api2/extjs/nodes/localhost/apt/update") .await? .expect_json()? @@ -391,7 +480,6 @@ impl PbsClient { params: AptUpdateParams, ) -> Result { Ok(self - .0 .post("/api2/extjs/nodes/localhost/apt/update", ¶ms) .await? .expect_json()? @@ -412,13 +500,12 @@ impl PbsClient { .maybe_arg("version", &version) .build(); - Ok(self.0.get(&path).await?.expect_json()?.data) + Ok(self.get(&path).await?.expect_json()?.data) } /// Return a list of the most important package versions. pub async fn get_package_versions(&self) -> Result, Error> { Ok(self - .0 .get("/api2/extjs/nodes/localhost/apt/versions") .await? .expect_json()? @@ -430,7 +517,7 @@ impl PbsClient { &self, ) -> Result { let url = "/api2/extjs/nodes/localhost/apt/repositories"; - Ok(self.0.get(url).await?.expect_json()?.data) + Ok(self.get(url).await?.expect_json()?.data) } /// Get list of tasks. @@ -447,7 +534,7 @@ impl PbsClient { .maybe_arg("since", &since) .build(); - Ok(self.0.get(&url).await?.expect_json()?.data) + Ok(self.get(&url).await?.expect_json()?.data) } /// Read task log. @@ -464,7 +551,7 @@ impl PbsClient { .maybe_arg("start", &start) .build(); - self.0.get(&url).await?.expect_json() + self.get(&url).await?.expect_json() } /// Read task status. @@ -473,14 +560,14 @@ impl PbsClient { upid: &str, ) -> Result { let url = format!("/api2/extjs/nodes/localhost/tasks/{upid}/status"); - let response = self.0.get(&url).await?; + let response = self.get(&url).await?; Ok(response.expect_json()?.data) } /// Stop a task. pub async fn stop_task(&self, upid: &str) -> Result<(), Error> { let url = format!("/api2/extjs/nodes/localhost/tasks/{upid}"); - self.0.delete(&url).await?.nodata() + self.delete(&url).await?.nodata() } } -- 2.47.3