public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>,
	Hannes Laimer <h.laimer@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox v3 3/3] rest-server: propagate rate-limit tags from authenticated users
Date: Wed, 12 Nov 2025 10:55:10 +0100	[thread overview]
Message-ID: <6860f744-d1d0-46a4-aef4-5482834b19a0@proxmox.com> (raw)
In-Reply-To: <20251110134255.69132-4-h.laimer@proxmox.com>

nit: needs reformatting via cargo fmt

On 11/10/25 2:43 PM, Hannes Laimer wrote:
> Tie REST connections rate-limiter callbacks to a shared tag handle
> so we can push authenticated user IDs into the limiter, keeping
> tags in sync whenever we clear or update them.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>   proxmox-rest-server/src/connection.rs |  11 ++-
>   proxmox-rest-server/src/rest.rs       | 137 +++++++++++++++++++++++++-
>   2 files changed, 141 insertions(+), 7 deletions(-)
> 
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 9511b7cb..ff2ee139 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -24,7 +24,7 @@ use tokio_openssl::SslStream;
>   use tokio_stream::wrappers::ReceiverStream;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -use proxmox_http::{RateLimitedStream, ShareableRateLimit};
> +use proxmox_http::{RateLimitedStream, RateLimiterTag, ShareableRateLimit};
>   
>   #[cfg(feature = "rate-limited-stream")]
>   pub type SharedRateLimit = Arc<dyn ShareableRateLimit>;
> @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
>   type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> +type LookupRateLimiter = dyn Fn(
> +        std::net::SocketAddr,
> +        &[RateLimiterTag],
> +    ) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
>       + Send
>       + Sync
>       + 'static;
> @@ -369,7 +372,9 @@ impl AcceptBuilder {
>   
>           #[cfg(feature = "rate-limited-stream")]
>           let socket = match self.lookup_rate_limiter.clone() {
> -            Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
> +            Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move |tags| {
> +                lookup(peer, tags)
> +            }),
>               None => RateLimitedStream::with_limiter(socket, None, None),
>           };
>   
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index b76c4bc9..6e9692ae 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -29,6 +29,10 @@ use tower_service::Service;
>   use url::form_urlencoded;
>   
>   use proxmox_http::Body;
> +#[cfg(feature = "rate-limited-stream")]
> +use proxmox_http::{RateLimiterTag, RateLimiterTags};
> +#[cfg(not(feature = "rate-limited-stream"))]
> +type RateLimiterTags = ();
>   use proxmox_router::{
>       check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
>       RpcEnvironmentType, UserInformation,
> @@ -86,10 +90,26 @@ impl RestServer {
>           }
>       }
>   
> -    pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> +    #[cfg(not(feature = "rate-limited-stream"))]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + ?Sized,
> +    {
> +        Ok(ApiService {
> +            peer: peer.peer_addr()?,
> +            api_config: Arc::clone(&self.api_config),
> +        })
> +    }
> +
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + PeerRateLimitTags + ?Sized,
> +    {
>           Ok(ApiService {
>               peer: peer.peer_addr()?,
>               api_config: Arc::clone(&self.api_config),
> +            rate_limit_tags: peer.rate_limiter_tag_handle(),
>           })
>       }
>   }
> @@ -185,6 +205,11 @@ pub trait PeerAddress {
>       fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +pub trait PeerRateLimitTags {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>>;
> +}
> +
>   // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to
>   // accept before the peer address is requested, so let's just generally implement this for
>   // Pin<Box<T>>
> @@ -221,6 +246,41 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>       }
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for Pin<Box<T>> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        T::rate_limiter_tag_handle(&**self)
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for tokio_openssl::SslStream<T> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.get_ref().rate_limiter_tag_handle()
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::TcpStream {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::UnixStream {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T> PeerRateLimitTags for proxmox_http::RateLimitedStream<T> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.tag_handle()
> +    }
> +}
> +
>   // Helper [Service] containing the peer Address
>   //
>   // The lower level connection [Service] implementation on
> @@ -233,6 +293,8 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>   pub struct ApiService {
>       pub peer: std::net::SocketAddr,
>       pub api_config: Arc<ApiConfig>,
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>   }
>   
>   impl ApiService {
> @@ -354,6 +416,10 @@ impl Service<Request<Incoming>> for ApiService {
>               Some(proxied_peer) => proxied_peer,
>               None => self.peer,
>           };
> +        #[cfg(feature = "rate-limited-stream")]
> +        let rate_limit_tags = self.rate_limit_tags.clone();
> +        #[cfg(not(feature = "rate-limited-stream"))]
> +        let rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>> = None;
>   
>           let header = self.api_config
>               .auth_cookie_name
> @@ -368,7 +434,15 @@ impl Service<Request<Incoming>> for ApiService {
>                });
>   
>           async move {
> -            let mut response = match Arc::clone(&config).handle_request(req, &peer).await {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
> +
> +            let mut response = match Arc::clone(&config)
> +                .handle_request(req, &peer, rate_limit_tags.clone())
> +                .await
> +            {
>                   Ok(response) => response,
>                   Err(err) => {
>                       let (err, code) = match err.downcast_ref::<HttpError>() {
> @@ -860,6 +934,8 @@ impl ApiConfig {
>           self: Arc<ApiConfig>,
>           req: Request<Incoming>,
>           peer: &std::net::SocketAddr,
> +        #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))]
> +        rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>       ) -> Result<Response<Body>, Error> {
>           let (parts, body) = req.into_parts();
>           let method = parts.method.clone();
> @@ -890,6 +966,8 @@ impl ApiConfig {
>                       full_path: &path,
>                       relative_path_components,
>                       rpcenv,
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    rate_limit_tags: rate_limit_tags.clone(),
>                   })
>                   .await;
>           }
> @@ -901,13 +979,29 @@ impl ApiConfig {
>           if components.is_empty() {
>               match self.check_auth(&parts.headers, &method).await {
>                   Ok((auth_id, _user_info)) => {
> -                    rpcenv.set_auth_id(Some(auth_id));
> +                    rpcenv.set_auth_id(Some(auth_id.clone()));
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(auth_id));
> +                    }
>                       return Ok(self.get_index(rpcenv, parts).await);
>                   }
>                   Err(AuthError::Generic(_)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await;
>                   }
> -                Err(AuthError::NoData) => {}
> +                Err(AuthError::NoData) =>
> +                {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
> +                }
>               }
>               Ok(self.get_index(rpcenv, parts).await)
>           } else {
> @@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> {
>       full_path: &'a str,
>       relative_path_components: &'a [&'a str],
>       rpcenv: RestEnvironment,
> +    #[cfg(feature = "rate-limited-stream")]
> +    rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>   }
>   
>   pub(crate) struct Formatted {
> @@ -992,6 +1088,8 @@ impl Formatted {
>               full_path,
>               relative_path_components,
>               mut rpcenv,
> +            #[cfg(feature = "rate-limited-stream")]
> +            rate_limit_tags,
>           }: ApiRequestData<'_>,
>       ) -> Result<Response<Body>, Error> {
>           if relative_path_components.is_empty() {
> @@ -1026,10 +1124,20 @@ impl Formatted {
>           if auth_required {
>               match config.check_auth(&parts.headers, &parts.method).await {
>                   Ok((authid, info)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(authid.clone()));
> +                    }
>                       rpcenv.set_auth_id(Some(authid));
>                       user_info = info;
>                   }
>                   Err(auth_err) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       let err = match auth_err {
>                           AuthError::Generic(err) => err,
>                           AuthError::NoData => {
> @@ -1045,6 +1153,11 @@ impl Formatted {
>                       return Err(err);
>                   }
>               }
> +        } else {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
>           }
>   
>           match api_method {
> @@ -1108,6 +1221,8 @@ impl Unformatted {
>               full_path,
>               relative_path_components,
>               mut rpcenv,
> +            #[cfg(feature = "rate-limited-stream")]
> +            rate_limit_tags,
>           }: ApiRequestData<'_>,
>       ) -> Result<Response<Body>, Error> {
>           if relative_path_components.is_empty() {
> @@ -1133,10 +1248,20 @@ impl Unformatted {
>           if auth_required {
>               match config.check_auth(&parts.headers, &parts.method).await {
>                   Ok((authid, info)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(authid.clone()));
> +                    }
>                       rpcenv.set_auth_id(Some(authid));
>                       user_info = info;
>                   }
>                   Err(auth_err) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       let err = match auth_err {
>                           AuthError::Generic(err) => err,
>                           AuthError::NoData => {
> @@ -1153,6 +1278,10 @@ impl Unformatted {
>                   }
>               }
>           } else {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
>               user_info = Box::new(EmptyUserInformation {});
>           }
>   



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  reply	other threads:[~2025-11-12  9:54 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-10 13:42 [pbs-devel] [PATCH proxmox{, -backup} v3 0/6] add user specific rate-limits Hannes Laimer
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: allow traffic-control rules to match users Hannes Laimer
2025-11-12  9:46   ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 2/3] http: track user tag updates on rate-limited streams Hannes Laimer
2025-11-12  9:46   ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 3/3] rest-server: propagate rate-limit tags from authenticated users Hannes Laimer
2025-11-12  9:55   ` Christian Ebner [this message]
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 1/3] api: taffic-control: update/delete users on rule correctly Hannes Laimer
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 2/3] traffic-control: handle users specified in a " Hannes Laimer
2025-11-12  9:55   ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 3/3] ui: traffic-control: add users field in edit form and list Hannes Laimer
2025-11-12  9:55   ` Christian Ebner
2025-11-12 10:08 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/6] add user specific rate-limits Christian Ebner
2025-11-12 10:36 ` [pbs-devel] superseded: " Hannes Laimer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=6860f744-d1d0-46a4-aef4-5482834b19a0@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=h.laimer@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal