From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 8F7D51FF16B for ; Fri, 7 Nov 2025 14:22:57 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id AFF8810FC5; Fri, 7 Nov 2025 14:23:38 +0100 (CET) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Fri, 7 Nov 2025 14:23:26 +0100 Message-ID: <20251107132329.42965-4-h.laimer@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20251107132329.42965-1-h.laimer@proxmox.com> References: <20251107132329.42965-1-h.laimer@proxmox.com> MIME-Version: 1.0 X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1762521793521 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.044 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox v2 3/3] rest-server: propagate rate-limit tags from authenticated users X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Tie REST connections rate-limiter callbacks to a shared tag handle so we can push authenticated user IDs into the limiter, keeping tags in sync whenever we clear or update them. Signed-off-by: Hannes Laimer --- proxmox-rest-server/src/connection.rs | 18 +++- proxmox-rest-server/src/rest.rs | 137 +++++++++++++++++++++++++- 2 files changed, 148 insertions(+), 7 deletions(-) diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs index 9511b7cb..47750dc8 100644 --- a/proxmox-rest-server/src/connection.rs +++ b/proxmox-rest-server/src/connection.rs @@ -24,7 +24,7 @@ use tokio_openssl::SslStream; use tokio_stream::wrappers::ReceiverStream; #[cfg(feature = "rate-limited-stream")] -use proxmox_http::{RateLimitedStream, ShareableRateLimit}; +use proxmox_http::{RateLimitedStream, RateLimiterTag, RateLimiterTags, ShareableRateLimit}; #[cfg(feature = "rate-limited-stream")] pub type SharedRateLimit = Arc; @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin>; type ClientStreamResult = Pin>>; #[cfg(feature = "rate-limited-stream")] -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option, Option) +type LookupRateLimiter = dyn Fn( + std::net::SocketAddr, + &[RateLimiterTag], + ) -> (Option, Option) + Send + Sync + 'static; @@ -369,7 +372,16 @@ impl AcceptBuilder { #[cfg(feature = "rate-limited-stream")] let socket = match self.lookup_rate_limiter.clone() { - Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)), + Some(lookup) => { + let tags: Arc> = Arc::new(Mutex::new(Vec::new())); + let tags_cb = Arc::clone(&tags); + let mut s = RateLimitedStream::with_limiter_update_cb(socket, move || { + let tags = tags_cb.lock().unwrap().clone(); + lookup(peer, &tags) + }); + s.set_tag_handle(tags); + s + } None => RateLimitedStream::with_limiter(socket, None, None), }; diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs index b76c4bc9..6e9692ae 100644 --- a/proxmox-rest-server/src/rest.rs +++ b/proxmox-rest-server/src/rest.rs @@ -29,6 +29,10 @@ use tower_service::Service; use url::form_urlencoded; use proxmox_http::Body; +#[cfg(feature = "rate-limited-stream")] +use proxmox_http::{RateLimiterTag, RateLimiterTags}; +#[cfg(not(feature = "rate-limited-stream"))] +type RateLimiterTags = (); use proxmox_router::{ check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment, RpcEnvironmentType, UserInformation, @@ -86,10 +90,26 @@ impl RestServer { } } - pub fn api_service(&self, peer: &dyn PeerAddress) -> Result { + #[cfg(not(feature = "rate-limited-stream"))] + pub fn api_service(&self, peer: &T) -> Result + where + T: PeerAddress + ?Sized, + { + Ok(ApiService { + peer: peer.peer_addr()?, + api_config: Arc::clone(&self.api_config), + }) + } + + #[cfg(feature = "rate-limited-stream")] + pub fn api_service(&self, peer: &T) -> Result + where + T: PeerAddress + PeerRateLimitTags + ?Sized, + { Ok(ApiService { peer: peer.peer_addr()?, api_config: Arc::clone(&self.api_config), + rate_limit_tags: peer.rate_limiter_tag_handle(), }) } } @@ -185,6 +205,11 @@ pub trait PeerAddress { fn peer_addr(&self) -> Result; } +#[cfg(feature = "rate-limited-stream")] +pub trait PeerRateLimitTags { + fn rate_limiter_tag_handle(&self) -> Option>>; +} + // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to // accept before the peer address is requested, so let's just generally implement this for // Pin> @@ -221,6 +246,41 @@ impl PeerAddress for proxmox_http::RateLimitedStream { } } +#[cfg(feature = "rate-limited-stream")] +impl PeerRateLimitTags for Pin> { + fn rate_limiter_tag_handle(&self) -> Option>> { + T::rate_limiter_tag_handle(&**self) + } +} + +#[cfg(feature = "rate-limited-stream")] +impl PeerRateLimitTags for tokio_openssl::SslStream { + fn rate_limiter_tag_handle(&self) -> Option>> { + self.get_ref().rate_limiter_tag_handle() + } +} + +#[cfg(feature = "rate-limited-stream")] +impl PeerRateLimitTags for tokio::net::TcpStream { + fn rate_limiter_tag_handle(&self) -> Option>> { + None + } +} + +#[cfg(feature = "rate-limited-stream")] +impl PeerRateLimitTags for tokio::net::UnixStream { + fn rate_limiter_tag_handle(&self) -> Option>> { + None + } +} + +#[cfg(feature = "rate-limited-stream")] +impl PeerRateLimitTags for proxmox_http::RateLimitedStream { + fn rate_limiter_tag_handle(&self) -> Option>> { + self.tag_handle() + } +} + // Helper [Service] containing the peer Address // // The lower level connection [Service] implementation on @@ -233,6 +293,8 @@ impl PeerAddress for proxmox_http::RateLimitedStream { pub struct ApiService { pub peer: std::net::SocketAddr, pub api_config: Arc, + #[cfg(feature = "rate-limited-stream")] + pub rate_limit_tags: Option>>, } impl ApiService { @@ -354,6 +416,10 @@ impl Service> for ApiService { Some(proxied_peer) => proxied_peer, None => self.peer, }; + #[cfg(feature = "rate-limited-stream")] + let rate_limit_tags = self.rate_limit_tags.clone(); + #[cfg(not(feature = "rate-limited-stream"))] + let rate_limit_tags: Option>> = None; let header = self.api_config .auth_cookie_name @@ -368,7 +434,15 @@ impl Service> for ApiService { }); async move { - let mut response = match Arc::clone(&config).handle_request(req, &peer).await { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + handle.lock().unwrap().clear(); + } + + let mut response = match Arc::clone(&config) + .handle_request(req, &peer, rate_limit_tags.clone()) + .await + { Ok(response) => response, Err(err) => { let (err, code) = match err.downcast_ref::() { @@ -860,6 +934,8 @@ impl ApiConfig { self: Arc, req: Request, peer: &std::net::SocketAddr, + #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))] + rate_limit_tags: Option>>, ) -> Result, Error> { let (parts, body) = req.into_parts(); let method = parts.method.clone(); @@ -890,6 +966,8 @@ impl ApiConfig { full_path: &path, relative_path_components, rpcenv, + #[cfg(feature = "rate-limited-stream")] + rate_limit_tags: rate_limit_tags.clone(), }) .await; } @@ -901,13 +979,29 @@ impl ApiConfig { if components.is_empty() { match self.check_auth(&parts.headers, &method).await { Ok((auth_id, _user_info)) => { - rpcenv.set_auth_id(Some(auth_id)); + rpcenv.set_auth_id(Some(auth_id.clone())); + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + let mut guard = handle.lock().unwrap(); + guard.clear(); + guard.push(RateLimiterTag::User(auth_id)); + } return Ok(self.get_index(rpcenv, parts).await); } Err(AuthError::Generic(_)) => { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + handle.lock().unwrap().clear(); + } tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await; } - Err(AuthError::NoData) => {} + Err(AuthError::NoData) => + { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + handle.lock().unwrap().clear(); + } + } } Ok(self.get_index(rpcenv, parts).await) } else { @@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> { full_path: &'a str, relative_path_components: &'a [&'a str], rpcenv: RestEnvironment, + #[cfg(feature = "rate-limited-stream")] + rate_limit_tags: Option>>, } pub(crate) struct Formatted { @@ -992,6 +1088,8 @@ impl Formatted { full_path, relative_path_components, mut rpcenv, + #[cfg(feature = "rate-limited-stream")] + rate_limit_tags, }: ApiRequestData<'_>, ) -> Result, Error> { if relative_path_components.is_empty() { @@ -1026,10 +1124,20 @@ impl Formatted { if auth_required { match config.check_auth(&parts.headers, &parts.method).await { Ok((authid, info)) => { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + let mut guard = handle.lock().unwrap(); + guard.clear(); + guard.push(RateLimiterTag::User(authid.clone())); + } rpcenv.set_auth_id(Some(authid)); user_info = info; } Err(auth_err) => { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + handle.lock().unwrap().clear(); + } let err = match auth_err { AuthError::Generic(err) => err, AuthError::NoData => { @@ -1045,6 +1153,11 @@ impl Formatted { return Err(err); } } + } else { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + handle.lock().unwrap().clear(); + } } match api_method { @@ -1108,6 +1221,8 @@ impl Unformatted { full_path, relative_path_components, mut rpcenv, + #[cfg(feature = "rate-limited-stream")] + rate_limit_tags, }: ApiRequestData<'_>, ) -> Result, Error> { if relative_path_components.is_empty() { @@ -1133,10 +1248,20 @@ impl Unformatted { if auth_required { match config.check_auth(&parts.headers, &parts.method).await { Ok((authid, info)) => { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + let mut guard = handle.lock().unwrap(); + guard.clear(); + guard.push(RateLimiterTag::User(authid.clone())); + } rpcenv.set_auth_id(Some(authid)); user_info = info; } Err(auth_err) => { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + handle.lock().unwrap().clear(); + } let err = match auth_err { AuthError::Generic(err) => err, AuthError::NoData => { @@ -1153,6 +1278,10 @@ impl Unformatted { } } } else { + #[cfg(feature = "rate-limited-stream")] + if let Some(handle) = rate_limit_tags.as_ref() { + handle.lock().unwrap().clear(); + } user_info = Box::new(EmptyUserInformation {}); } -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel