all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>,
	Hannes Laimer <h.laimer@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox v3 3/3] rest-server: propagate rate-limit tags from authenticated users
Date: Wed, 12 Nov 2025 10:55:10 +0100	[thread overview]
Message-ID: <6860f744-d1d0-46a4-aef4-5482834b19a0@proxmox.com> (raw)
In-Reply-To: <20251110134255.69132-4-h.laimer@proxmox.com>

nit: needs reformatting via cargo fmt

On 11/10/25 2:43 PM, Hannes Laimer wrote:
> Tie REST connections rate-limiter callbacks to a shared tag handle
> so we can push authenticated user IDs into the limiter, keeping
> tags in sync whenever we clear or update them.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>   proxmox-rest-server/src/connection.rs |  11 ++-
>   proxmox-rest-server/src/rest.rs       | 137 +++++++++++++++++++++++++-
>   2 files changed, 141 insertions(+), 7 deletions(-)
> 
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 9511b7cb..ff2ee139 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -24,7 +24,7 @@ use tokio_openssl::SslStream;
>   use tokio_stream::wrappers::ReceiverStream;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -use proxmox_http::{RateLimitedStream, ShareableRateLimit};
> +use proxmox_http::{RateLimitedStream, RateLimiterTag, ShareableRateLimit};
>   
>   #[cfg(feature = "rate-limited-stream")]
>   pub type SharedRateLimit = Arc<dyn ShareableRateLimit>;
> @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
>   type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> +type LookupRateLimiter = dyn Fn(
> +        std::net::SocketAddr,
> +        &[RateLimiterTag],
> +    ) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
>       + Send
>       + Sync
>       + 'static;
> @@ -369,7 +372,9 @@ impl AcceptBuilder {
>   
>           #[cfg(feature = "rate-limited-stream")]
>           let socket = match self.lookup_rate_limiter.clone() {
> -            Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
> +            Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move |tags| {
> +                lookup(peer, tags)
> +            }),
>               None => RateLimitedStream::with_limiter(socket, None, None),
>           };
>   
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index b76c4bc9..6e9692ae 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -29,6 +29,10 @@ use tower_service::Service;
>   use url::form_urlencoded;
>   
>   use proxmox_http::Body;
> +#[cfg(feature = "rate-limited-stream")]
> +use proxmox_http::{RateLimiterTag, RateLimiterTags};
> +#[cfg(not(feature = "rate-limited-stream"))]
> +type RateLimiterTags = ();
>   use proxmox_router::{
>       check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
>       RpcEnvironmentType, UserInformation,
> @@ -86,10 +90,26 @@ impl RestServer {
>           }
>       }
>   
> -    pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> +    #[cfg(not(feature = "rate-limited-stream"))]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + ?Sized,
> +    {
> +        Ok(ApiService {
> +            peer: peer.peer_addr()?,
> +            api_config: Arc::clone(&self.api_config),
> +        })
> +    }
> +
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + PeerRateLimitTags + ?Sized,
> +    {
>           Ok(ApiService {
>               peer: peer.peer_addr()?,
>               api_config: Arc::clone(&self.api_config),
> +            rate_limit_tags: peer.rate_limiter_tag_handle(),
>           })
>       }
>   }
> @@ -185,6 +205,11 @@ pub trait PeerAddress {
>       fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +pub trait PeerRateLimitTags {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>>;
> +}
> +
>   // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to
>   // accept before the peer address is requested, so let's just generally implement this for
>   // Pin<Box<T>>
> @@ -221,6 +246,41 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>       }
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for Pin<Box<T>> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        T::rate_limiter_tag_handle(&**self)
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for tokio_openssl::SslStream<T> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.get_ref().rate_limiter_tag_handle()
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::TcpStream {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::UnixStream {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T> PeerRateLimitTags for proxmox_http::RateLimitedStream<T> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.tag_handle()
> +    }
> +}
> +
>   // Helper [Service] containing the peer Address
>   //
>   // The lower level connection [Service] implementation on
> @@ -233,6 +293,8 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>   pub struct ApiService {
>       pub peer: std::net::SocketAddr,
>       pub api_config: Arc<ApiConfig>,
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>   }
>   
>   impl ApiService {
> @@ -354,6 +416,10 @@ impl Service<Request<Incoming>> for ApiService {
>               Some(proxied_peer) => proxied_peer,
>               None => self.peer,
>           };
> +        #[cfg(feature = "rate-limited-stream")]
> +        let rate_limit_tags = self.rate_limit_tags.clone();
> +        #[cfg(not(feature = "rate-limited-stream"))]
> +        let rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>> = None;
>   
>           let header = self.api_config
>               .auth_cookie_name
> @@ -368,7 +434,15 @@ impl Service<Request<Incoming>> for ApiService {
>                });
>   
>           async move {
> -            let mut response = match Arc::clone(&config).handle_request(req, &peer).await {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
> +
> +            let mut response = match Arc::clone(&config)
> +                .handle_request(req, &peer, rate_limit_tags.clone())
> +                .await
> +            {
>                   Ok(response) => response,
>                   Err(err) => {
>                       let (err, code) = match err.downcast_ref::<HttpError>() {
> @@ -860,6 +934,8 @@ impl ApiConfig {
>           self: Arc<ApiConfig>,
>           req: Request<Incoming>,
>           peer: &std::net::SocketAddr,
> +        #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))]
> +        rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>       ) -> Result<Response<Body>, Error> {
>           let (parts, body) = req.into_parts();
>           let method = parts.method.clone();
> @@ -890,6 +966,8 @@ impl ApiConfig {
>                       full_path: &path,
>                       relative_path_components,
>                       rpcenv,
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    rate_limit_tags: rate_limit_tags.clone(),
>                   })
>                   .await;
>           }
> @@ -901,13 +979,29 @@ impl ApiConfig {
>           if components.is_empty() {
>               match self.check_auth(&parts.headers, &method).await {
>                   Ok((auth_id, _user_info)) => {
> -                    rpcenv.set_auth_id(Some(auth_id));
> +                    rpcenv.set_auth_id(Some(auth_id.clone()));
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(auth_id));
> +                    }
>                       return Ok(self.get_index(rpcenv, parts).await);
>                   }
>                   Err(AuthError::Generic(_)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await;
>                   }
> -                Err(AuthError::NoData) => {}
> +                Err(AuthError::NoData) =>
> +                {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
> +                }
>               }
>               Ok(self.get_index(rpcenv, parts).await)
>           } else {
> @@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> {
>       full_path: &'a str,
>       relative_path_components: &'a [&'a str],
>       rpcenv: RestEnvironment,
> +    #[cfg(feature = "rate-limited-stream")]
> +    rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>   }
>   
>   pub(crate) struct Formatted {
> @@ -992,6 +1088,8 @@ impl Formatted {
>               full_path,
>               relative_path_components,
>               mut rpcenv,
> +            #[cfg(feature = "rate-limited-stream")]
> +            rate_limit_tags,
>           }: ApiRequestData<'_>,
>       ) -> Result<Response<Body>, Error> {
>           if relative_path_components.is_empty() {
> @@ -1026,10 +1124,20 @@ impl Formatted {
>           if auth_required {
>               match config.check_auth(&parts.headers, &parts.method).await {
>                   Ok((authid, info)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(authid.clone()));
> +                    }
>                       rpcenv.set_auth_id(Some(authid));
>                       user_info = info;
>                   }
>                   Err(auth_err) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       let err = match auth_err {
>                           AuthError::Generic(err) => err,
>                           AuthError::NoData => {
> @@ -1045,6 +1153,11 @@ impl Formatted {
>                       return Err(err);
>                   }
>               }
> +        } else {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
>           }
>   
>           match api_method {
> @@ -1108,6 +1221,8 @@ impl Unformatted {
>               full_path,
>               relative_path_components,
>               mut rpcenv,
> +            #[cfg(feature = "rate-limited-stream")]
> +            rate_limit_tags,
>           }: ApiRequestData<'_>,
>       ) -> Result<Response<Body>, Error> {
>           if relative_path_components.is_empty() {
> @@ -1133,10 +1248,20 @@ impl Unformatted {
>           if auth_required {
>               match config.check_auth(&parts.headers, &parts.method).await {
>                   Ok((authid, info)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(authid.clone()));
> +                    }
>                       rpcenv.set_auth_id(Some(authid));
>                       user_info = info;
>                   }
>                   Err(auth_err) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       let err = match auth_err {
>                           AuthError::Generic(err) => err,
>                           AuthError::NoData => {
> @@ -1153,6 +1278,10 @@ impl Unformatted {
>                   }
>               }
>           } else {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
>               user_info = Box::new(EmptyUserInformation {});
>           }
>   



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  reply	other threads:[~2025-11-12  9:54 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-10 13:42 [pbs-devel] [PATCH proxmox{, -backup} v3 0/6] add user specific rate-limits Hannes Laimer
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: allow traffic-control rules to match users Hannes Laimer
2025-11-12  9:46   ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 2/3] http: track user tag updates on rate-limited streams Hannes Laimer
2025-11-12  9:46   ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 3/3] rest-server: propagate rate-limit tags from authenticated users Hannes Laimer
2025-11-12  9:55   ` Christian Ebner [this message]
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 1/3] api: taffic-control: update/delete users on rule correctly Hannes Laimer
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 2/3] traffic-control: handle users specified in a " Hannes Laimer
2025-11-12  9:55   ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 3/3] ui: traffic-control: add users field in edit form and list Hannes Laimer
2025-11-12  9:55   ` Christian Ebner
2025-11-12 10:08 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/6] add user specific rate-limits Christian Ebner
2025-11-12 10:36 ` [pbs-devel] superseded: " Hannes Laimer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=6860f744-d1d0-46a4-aef4-5482834b19a0@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=h.laimer@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal