From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox v5 2/3] http: track user tag updates on rate-limited streams
Date: Fri, 21 Nov 2025 14:50:39 +0100 [thread overview]
Message-ID: <20251121135043.97142-3-h.laimer@proxmox.com> (raw)
In-Reply-To: <20251121135043.97142-1-h.laimer@proxmox.com>
Introduce rate-limit tags with a user variant and let rate-limited
streams hold a shared handle so callbacks can refresh limits
whenever the tag set changes.
If we decide to implement something like [1] in the future this could
potentially include group rate-limits for example.
[1] https://bugzilla.proxmox.com/show_bug.cgi?id=5867
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
proxmox-http/src/lib.rs | 4 +-
proxmox-http/src/rate_limited_stream.rs | 71 +++++++++++++++++++++++--
2 files changed, 69 insertions(+), 6 deletions(-)
diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 8b6953b0..2c7bb750 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -34,7 +34,9 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
#[cfg(feature = "rate-limited-stream")]
mod rate_limited_stream;
#[cfg(feature = "rate-limited-stream")]
-pub use rate_limited_stream::RateLimitedStream;
+pub use rate_limited_stream::{
+ RateLimitedStream, RateLimiterTag, RateLimiterTags, RateLimiterTagsHandle,
+};
#[cfg(feature = "body")]
mod body;
diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
index e24df7af..27f1d1f9 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::future::Future;
use std::io::IoSlice;
use std::marker::Unpin;
use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -15,8 +16,39 @@ use super::{RateLimiter, ShareableRateLimit};
type SharedRateLimit = Arc<dyn ShareableRateLimit>;
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub enum RateLimiterTag {
+ User(String),
+}
+
+pub type RateLimiterTags = Vec<RateLimiterTag>;
+
+#[derive(Clone, Debug)]
+pub struct RateLimiterTagsHandle {
+ tags: Arc<Mutex<RateLimiterTags>>,
+ dirty: Arc<AtomicBool>,
+}
+
+impl RateLimiterTagsHandle {
+ fn new() -> Self {
+ Self {
+ tags: Arc::new(Mutex::new(Vec::new())),
+ dirty: Arc::new(AtomicBool::new(false)),
+ }
+ }
+
+ pub fn lock(&self) -> std::sync::MutexGuard<'_, RateLimiterTags> {
+ self.tags.lock().unwrap()
+ }
+
+ pub fn set_tags(&self, tags: RateLimiterTags) {
+ *self.tags.lock().unwrap() = tags;
+ self.dirty.store(true, Ordering::Release);
+ }
+}
+
pub type RateLimiterCallback =
- dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
+ dyn Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
/// A rate limited stream using [RateLimiter]
pub struct RateLimitedStream<S> {
@@ -26,6 +58,7 @@ pub struct RateLimitedStream<S> {
write_delay: Option<Pin<Box<Sleep>>>,
update_limiter_cb: Option<Box<RateLimiterCallback>>,
last_limiter_update: Instant,
+ tag_handle: Option<RateLimiterTagsHandle>,
stream: S,
}
@@ -53,6 +86,7 @@ impl<S> RateLimitedStream<S> {
write_delay: None,
update_limiter_cb: None,
last_limiter_update: Instant::now(),
+ tag_handle: None,
stream,
}
}
@@ -64,12 +98,15 @@ impl<S> RateLimitedStream<S> {
/// Note: This function is called within an async context, so it
/// should be fast and must not block.
pub fn with_limiter_update_cb<
- F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static,
+ F: Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
+ + Send
+ + 'static,
>(
stream: S,
update_limiter_cb: F,
) -> Self {
- let (read_limiter, write_limiter) = update_limiter_cb();
+ let tag_handle = Some(RateLimiterTagsHandle::new());
+ let (read_limiter, write_limiter) = update_limiter_cb(&[]);
Self {
read_limiter,
read_delay: None,
@@ -77,15 +114,29 @@ impl<S> RateLimitedStream<S> {
write_delay: None,
update_limiter_cb: Some(Box::new(update_limiter_cb)),
last_limiter_update: Instant::now(),
+ tag_handle,
stream,
}
}
fn update_limiters(&mut self) {
if let Some(ref update_limiter_cb) = self.update_limiter_cb {
- if self.last_limiter_update.elapsed().as_secs() >= 5 {
+ let mut force_update = false;
+
+ if let Some(ref handle) = self.tag_handle {
+ if handle.dirty.swap(false, Ordering::Acquire) {
+ force_update = true;
+ }
+ }
+
+ if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
self.last_limiter_update = Instant::now();
- let (read_limiter, write_limiter) = update_limiter_cb();
+ let (read_limiter, write_limiter) = if let Some(ref handle) = self.tag_handle {
+ let tags = handle.lock();
+ update_limiter_cb(&tags)
+ } else {
+ update_limiter_cb(&[])
+ };
self.read_limiter = read_limiter;
self.write_limiter = write_limiter;
}
@@ -99,6 +150,16 @@ impl<S> RateLimitedStream<S> {
pub fn inner_mut(&mut self) -> &mut S {
&mut self.stream
}
+
+ pub fn tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+ self.tag_handle
+ .as_ref()
+ .map(|handle| Arc::clone(&handle.tags))
+ }
+
+ pub fn rate_limiter_tags_handle(&self) -> Option<&RateLimiterTagsHandle> {
+ self.tag_handle.as_ref()
+ }
}
fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option<Pin<Box<Sleep>>> {
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-11-21 13:51 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-11-21 13:50 [pbs-devel] [PATCH proxmox{, -backup} v5 0/6] add user specific rate-limits Hannes Laimer
2025-11-21 13:50 ` [pbs-devel] [PATCH proxmox v5 1/3] pbs-api-types: allow traffic-control rules to match users Hannes Laimer
2025-11-21 13:50 ` Hannes Laimer [this message]
2025-11-21 13:50 ` [pbs-devel] [PATCH proxmox v5 3/3] rest-server: propagate rate-limit tags from authenticated users Hannes Laimer
2025-11-21 13:50 ` [pbs-devel] [PATCH proxmox-backup v5 1/3] api: taffic-control: update/delete users on rule correctly Hannes Laimer
2025-11-21 13:50 ` [pbs-devel] [PATCH proxmox-backup v5 2/3] traffic-control: add user-specific rule matching and precedence Hannes Laimer
2025-11-21 13:50 ` [pbs-devel] [PATCH proxmox-backup v5 3/3] ui: traffic-control: add users field in edit form and list Hannes Laimer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20251121135043.97142-3-h.laimer@proxmox.com \
--to=h.laimer@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox