From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox v2 3/3] rest-server: propagate rate-limit tags from authenticated users
Date: Fri, 7 Nov 2025 14:23:26 +0100 [thread overview]
Message-ID: <20251107132329.42965-4-h.laimer@proxmox.com> (raw)
In-Reply-To: <20251107132329.42965-1-h.laimer@proxmox.com>
Tie REST connections rate-limiter callbacks to a shared tag handle
so we can push authenticated user IDs into the limiter, keeping
tags in sync whenever we clear or update them.
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
proxmox-rest-server/src/connection.rs | 18 +++-
proxmox-rest-server/src/rest.rs | 137 +++++++++++++++++++++++++-
2 files changed, 148 insertions(+), 7 deletions(-)
diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
index 9511b7cb..47750dc8 100644
--- a/proxmox-rest-server/src/connection.rs
+++ b/proxmox-rest-server/src/connection.rs
@@ -24,7 +24,7 @@ use tokio_openssl::SslStream;
use tokio_stream::wrappers::ReceiverStream;
#[cfg(feature = "rate-limited-stream")]
-use proxmox_http::{RateLimitedStream, ShareableRateLimit};
+use proxmox_http::{RateLimitedStream, RateLimiterTag, RateLimiterTags, ShareableRateLimit};
#[cfg(feature = "rate-limited-stream")]
pub type SharedRateLimit = Arc<dyn ShareableRateLimit>;
@@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
#[cfg(feature = "rate-limited-stream")]
-type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
+type LookupRateLimiter = dyn Fn(
+ std::net::SocketAddr,
+ &[RateLimiterTag],
+ ) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
+ Send
+ Sync
+ 'static;
@@ -369,7 +372,16 @@ impl AcceptBuilder {
#[cfg(feature = "rate-limited-stream")]
let socket = match self.lookup_rate_limiter.clone() {
- Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
+ Some(lookup) => {
+ let tags: Arc<Mutex<RateLimiterTags>> = Arc::new(Mutex::new(Vec::new()));
+ let tags_cb = Arc::clone(&tags);
+ let mut s = RateLimitedStream::with_limiter_update_cb(socket, move || {
+ let tags = tags_cb.lock().unwrap().clone();
+ lookup(peer, &tags)
+ });
+ s.set_tag_handle(tags);
+ s
+ }
None => RateLimitedStream::with_limiter(socket, None, None),
};
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index b76c4bc9..6e9692ae 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -29,6 +29,10 @@ use tower_service::Service;
use url::form_urlencoded;
use proxmox_http::Body;
+#[cfg(feature = "rate-limited-stream")]
+use proxmox_http::{RateLimiterTag, RateLimiterTags};
+#[cfg(not(feature = "rate-limited-stream"))]
+type RateLimiterTags = ();
use proxmox_router::{
check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
RpcEnvironmentType, UserInformation,
@@ -86,10 +90,26 @@ impl RestServer {
}
}
- pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
+ #[cfg(not(feature = "rate-limited-stream"))]
+ pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
+ where
+ T: PeerAddress + ?Sized,
+ {
+ Ok(ApiService {
+ peer: peer.peer_addr()?,
+ api_config: Arc::clone(&self.api_config),
+ })
+ }
+
+ #[cfg(feature = "rate-limited-stream")]
+ pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
+ where
+ T: PeerAddress + PeerRateLimitTags + ?Sized,
+ {
Ok(ApiService {
peer: peer.peer_addr()?,
api_config: Arc::clone(&self.api_config),
+ rate_limit_tags: peer.rate_limiter_tag_handle(),
})
}
}
@@ -185,6 +205,11 @@ pub trait PeerAddress {
fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
}
+#[cfg(feature = "rate-limited-stream")]
+pub trait PeerRateLimitTags {
+ fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>>;
+}
+
// tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to
// accept before the peer address is requested, so let's just generally implement this for
// Pin<Box<T>>
@@ -221,6 +246,41 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
}
}
+#[cfg(feature = "rate-limited-stream")]
+impl<T: PeerRateLimitTags> PeerRateLimitTags for Pin<Box<T>> {
+ fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+ T::rate_limiter_tag_handle(&**self)
+ }
+}
+
+#[cfg(feature = "rate-limited-stream")]
+impl<T: PeerRateLimitTags> PeerRateLimitTags for tokio_openssl::SslStream<T> {
+ fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+ self.get_ref().rate_limiter_tag_handle()
+ }
+}
+
+#[cfg(feature = "rate-limited-stream")]
+impl PeerRateLimitTags for tokio::net::TcpStream {
+ fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+ None
+ }
+}
+
+#[cfg(feature = "rate-limited-stream")]
+impl PeerRateLimitTags for tokio::net::UnixStream {
+ fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+ None
+ }
+}
+
+#[cfg(feature = "rate-limited-stream")]
+impl<T> PeerRateLimitTags for proxmox_http::RateLimitedStream<T> {
+ fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+ self.tag_handle()
+ }
+}
+
// Helper [Service] containing the peer Address
//
// The lower level connection [Service] implementation on
@@ -233,6 +293,8 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
pub struct ApiService {
pub peer: std::net::SocketAddr,
pub api_config: Arc<ApiConfig>,
+ #[cfg(feature = "rate-limited-stream")]
+ pub rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
}
impl ApiService {
@@ -354,6 +416,10 @@ impl Service<Request<Incoming>> for ApiService {
Some(proxied_peer) => proxied_peer,
None => self.peer,
};
+ #[cfg(feature = "rate-limited-stream")]
+ let rate_limit_tags = self.rate_limit_tags.clone();
+ #[cfg(not(feature = "rate-limited-stream"))]
+ let rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>> = None;
let header = self.api_config
.auth_cookie_name
@@ -368,7 +434,15 @@ impl Service<Request<Incoming>> for ApiService {
});
async move {
- let mut response = match Arc::clone(&config).handle_request(req, &peer).await {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ handle.lock().unwrap().clear();
+ }
+
+ let mut response = match Arc::clone(&config)
+ .handle_request(req, &peer, rate_limit_tags.clone())
+ .await
+ {
Ok(response) => response,
Err(err) => {
let (err, code) = match err.downcast_ref::<HttpError>() {
@@ -860,6 +934,8 @@ impl ApiConfig {
self: Arc<ApiConfig>,
req: Request<Incoming>,
peer: &std::net::SocketAddr,
+ #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))]
+ rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
) -> Result<Response<Body>, Error> {
let (parts, body) = req.into_parts();
let method = parts.method.clone();
@@ -890,6 +966,8 @@ impl ApiConfig {
full_path: &path,
relative_path_components,
rpcenv,
+ #[cfg(feature = "rate-limited-stream")]
+ rate_limit_tags: rate_limit_tags.clone(),
})
.await;
}
@@ -901,13 +979,29 @@ impl ApiConfig {
if components.is_empty() {
match self.check_auth(&parts.headers, &method).await {
Ok((auth_id, _user_info)) => {
- rpcenv.set_auth_id(Some(auth_id));
+ rpcenv.set_auth_id(Some(auth_id.clone()));
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ let mut guard = handle.lock().unwrap();
+ guard.clear();
+ guard.push(RateLimiterTag::User(auth_id));
+ }
return Ok(self.get_index(rpcenv, parts).await);
}
Err(AuthError::Generic(_)) => {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ handle.lock().unwrap().clear();
+ }
tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await;
}
- Err(AuthError::NoData) => {}
+ Err(AuthError::NoData) =>
+ {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ handle.lock().unwrap().clear();
+ }
+ }
}
Ok(self.get_index(rpcenv, parts).await)
} else {
@@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> {
full_path: &'a str,
relative_path_components: &'a [&'a str],
rpcenv: RestEnvironment,
+ #[cfg(feature = "rate-limited-stream")]
+ rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
}
pub(crate) struct Formatted {
@@ -992,6 +1088,8 @@ impl Formatted {
full_path,
relative_path_components,
mut rpcenv,
+ #[cfg(feature = "rate-limited-stream")]
+ rate_limit_tags,
}: ApiRequestData<'_>,
) -> Result<Response<Body>, Error> {
if relative_path_components.is_empty() {
@@ -1026,10 +1124,20 @@ impl Formatted {
if auth_required {
match config.check_auth(&parts.headers, &parts.method).await {
Ok((authid, info)) => {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ let mut guard = handle.lock().unwrap();
+ guard.clear();
+ guard.push(RateLimiterTag::User(authid.clone()));
+ }
rpcenv.set_auth_id(Some(authid));
user_info = info;
}
Err(auth_err) => {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ handle.lock().unwrap().clear();
+ }
let err = match auth_err {
AuthError::Generic(err) => err,
AuthError::NoData => {
@@ -1045,6 +1153,11 @@ impl Formatted {
return Err(err);
}
}
+ } else {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ handle.lock().unwrap().clear();
+ }
}
match api_method {
@@ -1108,6 +1221,8 @@ impl Unformatted {
full_path,
relative_path_components,
mut rpcenv,
+ #[cfg(feature = "rate-limited-stream")]
+ rate_limit_tags,
}: ApiRequestData<'_>,
) -> Result<Response<Body>, Error> {
if relative_path_components.is_empty() {
@@ -1133,10 +1248,20 @@ impl Unformatted {
if auth_required {
match config.check_auth(&parts.headers, &parts.method).await {
Ok((authid, info)) => {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ let mut guard = handle.lock().unwrap();
+ guard.clear();
+ guard.push(RateLimiterTag::User(authid.clone()));
+ }
rpcenv.set_auth_id(Some(authid));
user_info = info;
}
Err(auth_err) => {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ handle.lock().unwrap().clear();
+ }
let err = match auth_err {
AuthError::Generic(err) => err,
AuthError::NoData => {
@@ -1153,6 +1278,10 @@ impl Unformatted {
}
}
} else {
+ #[cfg(feature = "rate-limited-stream")]
+ if let Some(handle) = rate_limit_tags.as_ref() {
+ handle.lock().unwrap().clear();
+ }
user_info = Box::new(EmptyUserInformation {});
}
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-11-07 13:22 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-11-07 13:23 [pbs-devel] [PATCH proxmox{, -backup} v2 0/6] add user specific rate-limits Hannes Laimer
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox v2 1/3] pbs-api-types: allow traffic-control rules to match users Hannes Laimer
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox v2 2/3] http: track user tag updates on rate-limited streams Hannes Laimer
2025-11-07 13:23 ` Hannes Laimer [this message]
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] api: taffic-control: update/delete users on rule correctly Hannes Laimer
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox-backup v2 2/3] traffic-control: handle users specified in a " Hannes Laimer
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox-backup v2 3/3] ui: traffic-control: add users field in edit form and list Hannes Laimer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20251107132329.42965-4-h.laimer@proxmox.com \
--to=h.laimer@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.