From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id E4BC31FF15E for ; Mon, 10 Nov 2025 09:55:02 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B2FA1FF7C; Mon, 10 Nov 2025 09:55:47 +0100 (CET) Message-ID: <40eb63b7-fb47-4d68-b9b4-9bc4716df4b9@proxmox.com> Date: Mon, 10 Nov 2025 09:55:12 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird To: Proxmox Backup Server development discussion , Hannes Laimer References: <20251107132329.42965-1-h.laimer@proxmox.com> <20251107132329.42965-4-h.laimer@proxmox.com> Content-Language: en-US, de-DE From: Christian Ebner In-Reply-To: <20251107132329.42965-4-h.laimer@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1762764890669 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox v2 3/3] rest-server: propagate rate-limit tags from authenticated users X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" On 11/7/25 2:23 PM, Hannes Laimer wrote: > Tie REST connections rate-limiter callbacks to a shared tag handle > so we can push authenticated user IDs into the limiter, keeping > tags in sync whenever we clear or update them. > > Signed-off-by: Hannes Laimer > --- > proxmox-rest-server/src/connection.rs | 18 +++- > proxmox-rest-server/src/rest.rs | 137 +++++++++++++++++++++++++- > 2 files changed, 148 insertions(+), 7 deletions(-) > > diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs > index 9511b7cb..47750dc8 100644 > --- a/proxmox-rest-server/src/connection.rs > +++ b/proxmox-rest-server/src/connection.rs > @@ -24,7 +24,7 @@ use tokio_openssl::SslStream; > use tokio_stream::wrappers::ReceiverStream; > > #[cfg(feature = "rate-limited-stream")] > -use proxmox_http::{RateLimitedStream, ShareableRateLimit}; > +use proxmox_http::{RateLimitedStream, RateLimiterTag, RateLimiterTags, ShareableRateLimit}; > > #[cfg(feature = "rate-limited-stream")] > pub type SharedRateLimit = Arc; > @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin>; > type ClientStreamResult = Pin>>; > > #[cfg(feature = "rate-limited-stream")] > -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option, Option) > +type LookupRateLimiter = dyn Fn( > + std::net::SocketAddr, > + &[RateLimiterTag], > + ) -> (Option, Option) > + Send > + Sync > + 'static; > @@ -369,7 +372,16 @@ impl AcceptBuilder { > > #[cfg(feature = "rate-limited-stream")] > let socket = match self.lookup_rate_limiter.clone() { > - Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)), > + Some(lookup) => { > + let tags: Arc> = Arc::new(Mutex::new(Vec::new())); > + let tags_cb = Arc::clone(&tags); > + let mut s = RateLimitedStream::with_limiter_update_cb(socket, move || { > + let tags = tags_cb.lock().unwrap().clone(); > + lookup(peer, &tags) > + }); > + s.set_tag_handle(tags); > + s > + } as mentioned on the previous patch, this can greatly be simplified if the tag handle is instantiated within the helper and the tag list passed along as parameter to the callback, see the suggested diff below. > None => RateLimitedStream::with_limiter(socket, None, None), > }; > > diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs > index b76c4bc9..6e9692ae 100644 > --- a/proxmox-rest-server/src/rest.rs > +++ b/proxmox-rest-server/src/rest.rs > @@ -29,6 +29,10 @@ use tower_service::Service; > use url::form_urlencoded; > > use proxmox_http::Body; > +#[cfg(feature = "rate-limited-stream")] > +use proxmox_http::{RateLimiterTag, RateLimiterTags}; > +#[cfg(not(feature = "rate-limited-stream"))] > +type RateLimiterTags = (); > use proxmox_router::{ > check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment, > RpcEnvironmentType, UserInformation, > @@ -86,10 +90,26 @@ impl RestServer { > } > } > > - pub fn api_service(&self, peer: &dyn PeerAddress) -> Result { > + #[cfg(not(feature = "rate-limited-stream"))] > + pub fn api_service(&self, peer: &T) -> Result > + where > + T: PeerAddress + ?Sized, > + { > + Ok(ApiService { > + peer: peer.peer_addr()?, > + api_config: Arc::clone(&self.api_config), > + }) > + } > + > + #[cfg(feature = "rate-limited-stream")] > + pub fn api_service(&self, peer: &T) -> Result > + where > + T: PeerAddress + PeerRateLimitTags + ?Sized, > + { > Ok(ApiService { > peer: peer.peer_addr()?, > api_config: Arc::clone(&self.api_config), > + rate_limit_tags: peer.rate_limiter_tag_handle(), > }) > } > } > @@ -185,6 +205,11 @@ pub trait PeerAddress { > fn peer_addr(&self) -> Result; > } > > +#[cfg(feature = "rate-limited-stream")] > +pub trait PeerRateLimitTags { > + fn rate_limiter_tag_handle(&self) -> Option>>; > +} > + > // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to > // accept before the peer address is requested, so let's just generally implement this for > // Pin> > @@ -221,6 +246,41 @@ impl PeerAddress for proxmox_http::RateLimitedStream { > } > } > > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for Pin> { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + T::rate_limiter_tag_handle(&**self) > + } > +} > + > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for tokio_openssl::SslStream { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + self.get_ref().rate_limiter_tag_handle() > + } > +} > + > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for tokio::net::TcpStream { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + None > + } > +} > + > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for tokio::net::UnixStream { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + None > + } > +} > + > +#[cfg(feature = "rate-limited-stream")] > +impl PeerRateLimitTags for proxmox_http::RateLimitedStream { > + fn rate_limiter_tag_handle(&self) -> Option>> { > + self.tag_handle() > + } > +} > + > // Helper [Service] containing the peer Address > // > // The lower level connection [Service] implementation on > @@ -233,6 +293,8 @@ impl PeerAddress for proxmox_http::RateLimitedStream { > pub struct ApiService { > pub peer: std::net::SocketAddr, > pub api_config: Arc, > + #[cfg(feature = "rate-limited-stream")] > + pub rate_limit_tags: Option>>, > } > > impl ApiService { > @@ -354,6 +416,10 @@ impl Service> for ApiService { > Some(proxied_peer) => proxied_peer, > None => self.peer, > }; > + #[cfg(feature = "rate-limited-stream")] > + let rate_limit_tags = self.rate_limit_tags.clone(); > + #[cfg(not(feature = "rate-limited-stream"))] > + let rate_limit_tags: Option>> = None; > > let header = self.api_config > .auth_cookie_name > @@ -368,7 +434,15 @@ impl Service> for ApiService { > }); > > async move { > - let mut response = match Arc::clone(&config).handle_request(req, &peer).await { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > + > + let mut response = match Arc::clone(&config) > + .handle_request(req, &peer, rate_limit_tags.clone()) > + .await > + { > Ok(response) => response, > Err(err) => { > let (err, code) = match err.downcast_ref::() { > @@ -860,6 +934,8 @@ impl ApiConfig { > self: Arc, > req: Request, > peer: &std::net::SocketAddr, > + #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))] > + rate_limit_tags: Option>>, > ) -> Result, Error> { > let (parts, body) = req.into_parts(); > let method = parts.method.clone(); > @@ -890,6 +966,8 @@ impl ApiConfig { > full_path: &path, > relative_path_components, > rpcenv, > + #[cfg(feature = "rate-limited-stream")] > + rate_limit_tags: rate_limit_tags.clone(), > }) > .await; > } > @@ -901,13 +979,29 @@ impl ApiConfig { > if components.is_empty() { > match self.check_auth(&parts.headers, &method).await { > Ok((auth_id, _user_info)) => { > - rpcenv.set_auth_id(Some(auth_id)); > + rpcenv.set_auth_id(Some(auth_id.clone())); > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + let mut guard = handle.lock().unwrap(); > + guard.clear(); > + guard.push(RateLimiterTag::User(auth_id)); > + } > return Ok(self.get_index(rpcenv, parts).await); > } > Err(AuthError::Generic(_)) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await; > } > - Err(AuthError::NoData) => {} > + Err(AuthError::NoData) => > + { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > + } > } > Ok(self.get_index(rpcenv, parts).await) > } else { > @@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> { > full_path: &'a str, > relative_path_components: &'a [&'a str], > rpcenv: RestEnvironment, > + #[cfg(feature = "rate-limited-stream")] > + rate_limit_tags: Option>>, > } > > pub(crate) struct Formatted { > @@ -992,6 +1088,8 @@ impl Formatted { > full_path, > relative_path_components, > mut rpcenv, > + #[cfg(feature = "rate-limited-stream")] > + rate_limit_tags, > }: ApiRequestData<'_>, > ) -> Result, Error> { > if relative_path_components.is_empty() { > @@ -1026,10 +1124,20 @@ impl Formatted { > if auth_required { > match config.check_auth(&parts.headers, &parts.method).await { > Ok((authid, info)) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + let mut guard = handle.lock().unwrap(); > + guard.clear(); > + guard.push(RateLimiterTag::User(authid.clone())); > + } > rpcenv.set_auth_id(Some(authid)); > user_info = info; > } > Err(auth_err) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > let err = match auth_err { > AuthError::Generic(err) => err, > AuthError::NoData => { > @@ -1045,6 +1153,11 @@ impl Formatted { > return Err(err); > } > } > + } else { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > } > > match api_method { > @@ -1108,6 +1221,8 @@ impl Unformatted { > full_path, > relative_path_components, > mut rpcenv, > + #[cfg(feature = "rate-limited-stream")] > + rate_limit_tags, > }: ApiRequestData<'_>, > ) -> Result, Error> { > if relative_path_components.is_empty() { > @@ -1133,10 +1248,20 @@ impl Unformatted { > if auth_required { > match config.check_auth(&parts.headers, &parts.method).await { > Ok((authid, info)) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + let mut guard = handle.lock().unwrap(); > + guard.clear(); > + guard.push(RateLimiterTag::User(authid.clone())); > + } > rpcenv.set_auth_id(Some(authid)); > user_info = info; > } > Err(auth_err) => { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > let err = match auth_err { > AuthError::Generic(err) => err, > AuthError::NoData => { > @@ -1153,6 +1278,10 @@ impl Unformatted { > } > } > } else { > + #[cfg(feature = "rate-limited-stream")] > + if let Some(handle) = rate_limit_tags.as_ref() { > + handle.lock().unwrap().clear(); > + } > user_info = Box::new(EmptyUserInformation {}); > } > diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs index 47750dc8..e9f51828 100644 --- a/proxmox-rest-server/src/connection.rs +++ b/proxmox-rest-server/src/connection.rs @@ -372,16 +372,9 @@ impl AcceptBuilder { #[cfg(feature = "rate-limited-stream")] let socket = match self.lookup_rate_limiter.clone() { - Some(lookup) => { - let tags: Arc> = Arc::new(Mutex::new(Vec::new())); - let tags_cb = Arc::clone(&tags); - let mut s = RateLimitedStream::with_limiter_update_cb(socket, move || { - let tags = tags_cb.lock().unwrap().clone(); - lookup(peer, &tags) - }); - s.set_tag_handle(tags); - s - } + Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move |tags| { + lookup(peer, tags) + }), None => RateLimitedStream::with_limiter(socket, None, None), }; _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel