From: Christian Ebner <c.ebner@proxmox.com>
To: Proxmox Backup Server development discussion
<pbs-devel@lists.proxmox.com>,
Hannes Laimer <h.laimer@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox v3 3/3] rest-server: propagate rate-limit tags from authenticated users
Date: Wed, 12 Nov 2025 10:55:10 +0100 [thread overview]
Message-ID: <6860f744-d1d0-46a4-aef4-5482834b19a0@proxmox.com> (raw)
In-Reply-To: <20251110134255.69132-4-h.laimer@proxmox.com>
nit: needs reformatting via cargo fmt
On 11/10/25 2:43 PM, Hannes Laimer wrote:
> Tie REST connections rate-limiter callbacks to a shared tag handle
> so we can push authenticated user IDs into the limiter, keeping
> tags in sync whenever we clear or update them.
>
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
> proxmox-rest-server/src/connection.rs | 11 ++-
> proxmox-rest-server/src/rest.rs | 137 +++++++++++++++++++++++++-
> 2 files changed, 141 insertions(+), 7 deletions(-)
>
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 9511b7cb..ff2ee139 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -24,7 +24,7 @@ use tokio_openssl::SslStream;
> use tokio_stream::wrappers::ReceiverStream;
>
> #[cfg(feature = "rate-limited-stream")]
> -use proxmox_http::{RateLimitedStream, ShareableRateLimit};
> +use proxmox_http::{RateLimitedStream, RateLimiterTag, ShareableRateLimit};
>
> #[cfg(feature = "rate-limited-stream")]
> pub type SharedRateLimit = Arc<dyn ShareableRateLimit>;
> @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
> type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
>
> #[cfg(feature = "rate-limited-stream")]
> -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> +type LookupRateLimiter = dyn Fn(
> + std::net::SocketAddr,
> + &[RateLimiterTag],
> + ) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> + Send
> + Sync
> + 'static;
> @@ -369,7 +372,9 @@ impl AcceptBuilder {
>
> #[cfg(feature = "rate-limited-stream")]
> let socket = match self.lookup_rate_limiter.clone() {
> - Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
> + Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move |tags| {
> + lookup(peer, tags)
> + }),
> None => RateLimitedStream::with_limiter(socket, None, None),
> };
>
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index b76c4bc9..6e9692ae 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -29,6 +29,10 @@ use tower_service::Service;
> use url::form_urlencoded;
>
> use proxmox_http::Body;
> +#[cfg(feature = "rate-limited-stream")]
> +use proxmox_http::{RateLimiterTag, RateLimiterTags};
> +#[cfg(not(feature = "rate-limited-stream"))]
> +type RateLimiterTags = ();
> use proxmox_router::{
> check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
> RpcEnvironmentType, UserInformation,
> @@ -86,10 +90,26 @@ impl RestServer {
> }
> }
>
> - pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> + #[cfg(not(feature = "rate-limited-stream"))]
> + pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> + where
> + T: PeerAddress + ?Sized,
> + {
> + Ok(ApiService {
> + peer: peer.peer_addr()?,
> + api_config: Arc::clone(&self.api_config),
> + })
> + }
> +
> + #[cfg(feature = "rate-limited-stream")]
> + pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> + where
> + T: PeerAddress + PeerRateLimitTags + ?Sized,
> + {
> Ok(ApiService {
> peer: peer.peer_addr()?,
> api_config: Arc::clone(&self.api_config),
> + rate_limit_tags: peer.rate_limiter_tag_handle(),
> })
> }
> }
> @@ -185,6 +205,11 @@ pub trait PeerAddress {
> fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
> }
>
> +#[cfg(feature = "rate-limited-stream")]
> +pub trait PeerRateLimitTags {
> + fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>>;
> +}
> +
> // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to
> // accept before the peer address is requested, so let's just generally implement this for
> // Pin<Box<T>>
> @@ -221,6 +246,41 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
> }
> }
>
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for Pin<Box<T>> {
> + fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> + T::rate_limiter_tag_handle(&**self)
> + }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for tokio_openssl::SslStream<T> {
> + fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> + self.get_ref().rate_limiter_tag_handle()
> + }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::TcpStream {
> + fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> + None
> + }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::UnixStream {
> + fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> + None
> + }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T> PeerRateLimitTags for proxmox_http::RateLimitedStream<T> {
> + fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> + self.tag_handle()
> + }
> +}
> +
> // Helper [Service] containing the peer Address
> //
> // The lower level connection [Service] implementation on
> @@ -233,6 +293,8 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
> pub struct ApiService {
> pub peer: std::net::SocketAddr,
> pub api_config: Arc<ApiConfig>,
> + #[cfg(feature = "rate-limited-stream")]
> + pub rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
> }
>
> impl ApiService {
> @@ -354,6 +416,10 @@ impl Service<Request<Incoming>> for ApiService {
> Some(proxied_peer) => proxied_peer,
> None => self.peer,
> };
> + #[cfg(feature = "rate-limited-stream")]
> + let rate_limit_tags = self.rate_limit_tags.clone();
> + #[cfg(not(feature = "rate-limited-stream"))]
> + let rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>> = None;
>
> let header = self.api_config
> .auth_cookie_name
> @@ -368,7 +434,15 @@ impl Service<Request<Incoming>> for ApiService {
> });
>
> async move {
> - let mut response = match Arc::clone(&config).handle_request(req, &peer).await {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + handle.lock().unwrap().clear();
> + }
> +
> + let mut response = match Arc::clone(&config)
> + .handle_request(req, &peer, rate_limit_tags.clone())
> + .await
> + {
> Ok(response) => response,
> Err(err) => {
> let (err, code) = match err.downcast_ref::<HttpError>() {
> @@ -860,6 +934,8 @@ impl ApiConfig {
> self: Arc<ApiConfig>,
> req: Request<Incoming>,
> peer: &std::net::SocketAddr,
> + #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))]
> + rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
> ) -> Result<Response<Body>, Error> {
> let (parts, body) = req.into_parts();
> let method = parts.method.clone();
> @@ -890,6 +966,8 @@ impl ApiConfig {
> full_path: &path,
> relative_path_components,
> rpcenv,
> + #[cfg(feature = "rate-limited-stream")]
> + rate_limit_tags: rate_limit_tags.clone(),
> })
> .await;
> }
> @@ -901,13 +979,29 @@ impl ApiConfig {
> if components.is_empty() {
> match self.check_auth(&parts.headers, &method).await {
> Ok((auth_id, _user_info)) => {
> - rpcenv.set_auth_id(Some(auth_id));
> + rpcenv.set_auth_id(Some(auth_id.clone()));
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + let mut guard = handle.lock().unwrap();
> + guard.clear();
> + guard.push(RateLimiterTag::User(auth_id));
> + }
> return Ok(self.get_index(rpcenv, parts).await);
> }
> Err(AuthError::Generic(_)) => {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + handle.lock().unwrap().clear();
> + }
> tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await;
> }
> - Err(AuthError::NoData) => {}
> + Err(AuthError::NoData) =>
> + {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + handle.lock().unwrap().clear();
> + }
> + }
> }
> Ok(self.get_index(rpcenv, parts).await)
> } else {
> @@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> {
> full_path: &'a str,
> relative_path_components: &'a [&'a str],
> rpcenv: RestEnvironment,
> + #[cfg(feature = "rate-limited-stream")]
> + rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
> }
>
> pub(crate) struct Formatted {
> @@ -992,6 +1088,8 @@ impl Formatted {
> full_path,
> relative_path_components,
> mut rpcenv,
> + #[cfg(feature = "rate-limited-stream")]
> + rate_limit_tags,
> }: ApiRequestData<'_>,
> ) -> Result<Response<Body>, Error> {
> if relative_path_components.is_empty() {
> @@ -1026,10 +1124,20 @@ impl Formatted {
> if auth_required {
> match config.check_auth(&parts.headers, &parts.method).await {
> Ok((authid, info)) => {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + let mut guard = handle.lock().unwrap();
> + guard.clear();
> + guard.push(RateLimiterTag::User(authid.clone()));
> + }
> rpcenv.set_auth_id(Some(authid));
> user_info = info;
> }
> Err(auth_err) => {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + handle.lock().unwrap().clear();
> + }
> let err = match auth_err {
> AuthError::Generic(err) => err,
> AuthError::NoData => {
> @@ -1045,6 +1153,11 @@ impl Formatted {
> return Err(err);
> }
> }
> + } else {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + handle.lock().unwrap().clear();
> + }
> }
>
> match api_method {
> @@ -1108,6 +1221,8 @@ impl Unformatted {
> full_path,
> relative_path_components,
> mut rpcenv,
> + #[cfg(feature = "rate-limited-stream")]
> + rate_limit_tags,
> }: ApiRequestData<'_>,
> ) -> Result<Response<Body>, Error> {
> if relative_path_components.is_empty() {
> @@ -1133,10 +1248,20 @@ impl Unformatted {
> if auth_required {
> match config.check_auth(&parts.headers, &parts.method).await {
> Ok((authid, info)) => {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + let mut guard = handle.lock().unwrap();
> + guard.clear();
> + guard.push(RateLimiterTag::User(authid.clone()));
> + }
> rpcenv.set_auth_id(Some(authid));
> user_info = info;
> }
> Err(auth_err) => {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + handle.lock().unwrap().clear();
> + }
> let err = match auth_err {
> AuthError::Generic(err) => err,
> AuthError::NoData => {
> @@ -1153,6 +1278,10 @@ impl Unformatted {
> }
> }
> } else {
> + #[cfg(feature = "rate-limited-stream")]
> + if let Some(handle) = rate_limit_tags.as_ref() {
> + handle.lock().unwrap().clear();
> + }
> user_info = Box::new(EmptyUserInformation {});
> }
>
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-11-12 9:54 UTC|newest]
Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-11-10 13:42 [pbs-devel] [PATCH proxmox{, -backup} v3 0/6] add user specific rate-limits Hannes Laimer
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: allow traffic-control rules to match users Hannes Laimer
2025-11-12 9:46 ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 2/3] http: track user tag updates on rate-limited streams Hannes Laimer
2025-11-12 9:46 ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox v3 3/3] rest-server: propagate rate-limit tags from authenticated users Hannes Laimer
2025-11-12 9:55 ` Christian Ebner [this message]
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 1/3] api: taffic-control: update/delete users on rule correctly Hannes Laimer
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 2/3] traffic-control: handle users specified in a " Hannes Laimer
2025-11-12 9:55 ` Christian Ebner
2025-11-10 13:42 ` [pbs-devel] [PATCH proxmox-backup v3 3/3] ui: traffic-control: add users field in edit form and list Hannes Laimer
2025-11-12 9:55 ` Christian Ebner
2025-11-12 10:08 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/6] add user specific rate-limits Christian Ebner
2025-11-12 10:36 ` [pbs-devel] superseded: " Hannes Laimer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=6860f744-d1d0-46a4-aef4-5482834b19a0@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=h.laimer@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox