From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 8FB5A1FF179 for ; Wed, 12 Nov 2025 10:54:31 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 3218F1CBC5; Wed, 12 Nov 2025 10:55:17 +0100 (CET) Message-ID: <6860f744-d1d0-46a4-aef4-5482834b19a0@proxmox.com> Date: Wed, 12 Nov 2025 10:55:10 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird From: Christian Ebner To: Proxmox Backup Server development discussion , Hannes Laimer References: <20251110134255.69132-1-h.laimer@proxmox.com> <20251110134255.69132-4-h.laimer@proxmox.com> Content-Language: en-US, de-DE In-Reply-To: <20251110134255.69132-4-h.laimer@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1762941286212 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox v3 3/3] rest-server: propagate rate-limit tags from authenticated users X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" nit: needs reformatting via cargo fmt On 11/10/25 2:43 PM, Hannes Laimer wrote: > Tie REST connections rate-limiter callbacks to a shared tag handle > so we can push authenticated user IDs into the limiter, keeping > tags in sync whenever we clear or update them. > > Signed-off-by: Hannes Laimer > --- > proxmox-rest-server/src/connection.rs | 11 ++- > proxmox-rest-server/src/rest.rs | 137 +++++++++++++++++++++++++- > 2 files changed, 141 insertions(+), 7 deletions(-) > > diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs > index 9511b7cb..ff2ee139 100644 > --- a/proxmox-rest-server/src/connection.rs > +++ b/proxmox-rest-server/src/connection.rs > @@ -24,7 +24,7 @@ use tokio_openssl::SslStream; > use tokio_stream::wrappers::ReceiverStream; > > #[cfg(feature = "rate-limited-stream")] > -use proxmox_http::{RateLimitedStream, ShareableRateLimit}; > +use proxmox_http::{RateLimitedStream, RateLimiterTag, ShareableRateLimit}; > > #[cfg(feature = "rate-limited-stream")] > pub type SharedRateLimit = Arc; > @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin>; > type ClientStreamResult = Pin>>; > > #[cfg(feature = "rate-limited-stream")] > -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option, Option) > +type LookupRateLimiter = dyn Fn( > + std::net::SocketAddr, > + &[RateLimiterTag], > + ) -> (Option, Option) > + Send > + Sync > + 'static; > @@ -369,7 +372,9 @@ impl AcceptBuilder { > > #[cfg(feature = "rate-limited-stream")] > let socket = match self.lookup_rate_limiter.clone() { > - Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)), > + Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move |tags| { > + lookup(peer, tags) > + }), > None => RateLimitedStream::with_limiter(socket, None, None), > }; > > diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs > index b76c4bc9..6e9692ae 100644 > --- a/proxmox-rest-server/src/rest.rs > +++ b/proxmox-rest-server/src/rest.rs > @@ -29,6 +29,10 @@ use tower_service::Service; > use url::form_urlencoded; > > use proxmox_http::Body; > +#[cfg(feature = "rate-limited-stream")] > +use proxmox_http::{RateLimiterTag, RateLimiterTags}; > +#[cfg(not(feature = "rate-limited-stream"))] > +type RateLimiterTags = (); > use proxmox_router::{ > check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment, > RpcEnvironmentType, UserInformation, > @@ -86,10 +90,26 @@ impl RestServer { > } > } > > - pub fn api_service(&self, peer: &dyn PeerAddress) -> Result { > + #[cfg(not(feature = "rate-limited-stream"))] > + pub fn api_service(&self, peer: &T) -> Result > + where > + T: PeerAddress + ?Sized, > + { > + Ok(ApiService { > + peer: peer.peer_addr()?, > + api_config: Arc::clone(&self.api_config), > + }) > + } > + > + #[cfg(feature = "rate-limited-stream")] > + pub fn api_service(&self, peer: &T) -> Result > + where > + T: PeerAddress + PeerRateLimitTags + ?Sized, > + { > Ok(ApiService { > peer: peer.peer_addr()?, > api_config: Arc::clone(&self.api_config), > + rate_limit_tags: peer.rate_limiter_tag_handle(), > }) > } > } > @@ -185,6 +205,11 @@ pub trait PeerAddress { > fn peer_addr(&self) -> Result; > } > > +#[cfg(feature = "rate-limited-stream")] > +pub trait PeerRateLimitTags { > + fn rate_limiter_tag_handle(&self) -> Option>>; > +} > + > // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to > // accept before the peer address is requested, so let's just generally implement this for > // Pin> > @@ -221,6 +246,41 @@ impl PeerAddress for proxmox_http::RateLimitedStream { > } > } > > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for Pin> { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + T::rate_limiter_tag_handle(&**self) > + } > +} > + > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for tokio_openssl::SslStream { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + self.get_ref().rate_limiter_tag_handle() > + } > +} > + > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for tokio::net::TcpStream { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + None > + } > +} > + > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for tokio::net::UnixStream { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + None > + } > +} > + > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for proxmox_http::RateLimitedStream { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + self.tag_handle() > + } > +} > + > // Helper [Service] containing the peer Address > // > // The lower level connection [Service] implementation on > @@ -233,6 +293,8 @@ impl PeerAddress for proxmox_http::RateLimitedStream { > pub struct ApiService { > pub peer: std::net::SocketAddr, > pub api_config: Arc, > + #[cfg(feature = "rate-limited-stream")] > + pub rate_limit_tags: Option>>, > } > > impl ApiService { > @@ -354,6 +416,10 @@ impl Service> for ApiService { > Some(proxied_peer) => proxied_peer, > None => self.peer, > }; > + #[cfg(feature = "rate-limited-stream")] > + let rate_limit_tags = self.rate_limit_tags.clone(); > + #[cfg(not(feature = "rate-limited-stream"))] > + let rate_limit_tags: Option>> = None; > > let header = self.api_config > .auth_cookie_name > @@ -368,7 +434,15 @@ impl Service> for ApiService { > }); > > async move { > - let mut response = match Arc::clone(&config).handle_request(req, &peer).await { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > + > + let mut response = match Arc::clone(&config) > + .handle_request(req, &peer, rate_limit_tags.clone()) > + .await > + { > Ok(response) => response, > Err(err) => { > let (err, code) = match err.downcast_ref::() { > @@ -860,6 +934,8 @@ impl ApiConfig { > self: Arc, > req: Request, > peer: &std::net::SocketAddr, > + #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))] > + rate_limit_tags: Option>>, > ) -> Result, Error> { > let (parts, body) = req.into_parts(); > let method = parts.method.clone(); > @@ -890,6 +966,8 @@ impl ApiConfig { > full_path: &path, > relative_path_components, > rpcenv, > + #[cfg(feature = "rate-limited-stream")] > + rate_limit_tags: rate_limit_tags.clone(), > }) > .await; > } > @@ -901,13 +979,29 @@ impl ApiConfig { > if components.is_empty() { > match self.check_auth(&parts.headers, &method).await { > Ok((auth_id, _user_info)) => { > - rpcenv.set_auth_id(Some(auth_id)); > + rpcenv.set_auth_id(Some(auth_id.clone())); > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + let mut guard = handle.lock().unwrap(); > + guard.clear(); > + guard.push(RateLimiterTag::User(auth_id)); > + } > return Ok(self.get_index(rpcenv, parts).await); > } > Err(AuthError::Generic(_)) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await; > } > - Err(AuthError::NoData) => {} > + Err(AuthError::NoData) => > + { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > + } > } > Ok(self.get_index(rpcenv, parts).await) > } else { > @@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> { > full_path: &'a str, > relative_path_components: &'a [&'a str], > rpcenv: RestEnvironment, > + #[cfg(feature = "rate-limited-stream")] > + rate_limit_tags: Option>>, > } > > pub(crate) struct Formatted { > @@ -992,6 +1088,8 @@ impl Formatted { > full_path, > relative_path_components, > mut rpcenv, > + #[cfg(feature = "rate-limited-stream")] > + rate_limit_tags, > }: ApiRequestData<'_>, > ) -> Result, Error> { > if relative_path_components.is_empty() { > @@ -1026,10 +1124,20 @@ impl Formatted { > if auth_required { > match config.check_auth(&parts.headers, &parts.method).await { > Ok((authid, info)) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + let mut guard = handle.lock().unwrap(); > + guard.clear(); > + guard.push(RateLimiterTag::User(authid.clone())); > + } > rpcenv.set_auth_id(Some(authid)); > user_info = info; > } > Err(auth_err) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > let err = match auth_err { > AuthError::Generic(err) => err, > AuthError::NoData => { > @@ -1045,6 +1153,11 @@ impl Formatted { > return Err(err); > } > } > + } else { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > } > > match api_method { > @@ -1108,6 +1221,8 @@ impl Unformatted { > full_path, > relative_path_components, > mut rpcenv, > + #[cfg(feature = "rate-limited-stream")] > + rate_limit_tags, > }: ApiRequestData<'_>, > ) -> Result, Error> { > if relative_path_components.is_empty() { > @@ -1133,10 +1248,20 @@ impl Unformatted { > if auth_required { > match config.check_auth(&parts.headers, &parts.method).await { > Ok((authid, info)) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + let mut guard = handle.lock().unwrap(); > + guard.clear(); > + guard.push(RateLimiterTag::User(authid.clone())); > + } > rpcenv.set_auth_id(Some(authid)); > user_info = info; > } > Err(auth_err) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > let err = match auth_err { > AuthError::Generic(err) => err, > AuthError::NoData => { > @@ -1153,6 +1278,10 @@ impl Unformatted { > } > } > } else { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > user_info = Box::new(EmptyUserInformation {}); > } > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel