all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>,
	Hannes Laimer <h.laimer@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox v2 2/3] http: track user tag updates on rate-limited streams
Date: Mon, 10 Nov 2025 09:53:10 +0100	[thread overview]
Message-ID: <fb39d013-7610-4ac9-ae24-f30bf78a0b7b@proxmox.com> (raw)
In-Reply-To: <20251107132329.42965-3-h.laimer@proxmox.com>

On 11/7/25 2:23 PM, Hannes Laimer wrote:
> Introduce rate-limit tags with a user variant and let rate-limited
> streams hold a shared handle so callbacks can refresh limits
> whenever the tag set changes.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>   proxmox-http/src/lib.rs                 |  2 +-
>   proxmox-http/src/rate_limited_stream.rs | 35 ++++++++++++++++++++++++-
>   2 files changed, 35 insertions(+), 2 deletions(-)
> 
> diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
> index 8b6953b0..990f8f36 100644
> --- a/proxmox-http/src/lib.rs
> +++ b/proxmox-http/src/lib.rs
> @@ -34,7 +34,7 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
>   #[cfg(feature = "rate-limited-stream")]
>   mod rate_limited_stream;
>   #[cfg(feature = "rate-limited-stream")]
> -pub use rate_limited_stream::RateLimitedStream;
> +pub use rate_limited_stream::{RateLimitedStream, RateLimiterTag, RateLimiterTags};
>   
>   #[cfg(feature = "body")]
>   mod body;
> diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
> index e24df7af..4bc5c414 100644
> --- a/proxmox-http/src/rate_limited_stream.rs
> +++ b/proxmox-http/src/rate_limited_stream.rs
> @@ -15,6 +15,13 @@ use super::{RateLimiter, ShareableRateLimit};
>   
>   type SharedRateLimit = Arc<dyn ShareableRateLimit>;
>   
> +#[derive(Clone, Debug, PartialEq, Eq)]
> +pub enum RateLimiterTag {
> +    User(String),
> +}

note: just to add a justification for the enum here, which could also be 
part of the commit message. I see potential here to add group based 
filter if there will be a user group implementation as requested in [0].

[0] https://bugzilla.proxmox.com/show_bug.cgi?id=5867

> +
> +pub type RateLimiterTags = Vec<RateLimiterTag>;
> +
>   pub type RateLimiterCallback =
>       dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;

I still see potential to improve the code here as already mentioned in 
v1. If the list of RateLimiterTag's is passed as paramter via the 
callback, that will simplify the logic for the patches on the 
rest-server implementation. See the suggested diff below an on the next 
patch.

>   
> @@ -26,6 +33,8 @@ pub struct RateLimitedStream<S> {
>       write_delay: Option<Pin<Box<Sleep>>>,
>       update_limiter_cb: Option<Box<RateLimiterCallback>>,
>       last_limiter_update: Instant,
> +    tag_handle: Option<Arc<Mutex<RateLimiterTags>>>,
> +    last_tags: Option<RateLimiterTags>,
>       stream: S,
>   }
>   
> @@ -53,6 +62,8 @@ impl<S> RateLimitedStream<S> {
>               write_delay: None,
>               update_limiter_cb: None,
>               last_limiter_update: Instant::now(),
> +            tag_handle: None,
> +            last_tags: None,
>               stream,
>           }
>       }
> @@ -77,13 +88,26 @@ impl<S> RateLimitedStream<S> {
>               write_delay: None,
>               update_limiter_cb: Some(Box::new(update_limiter_cb)),
>               last_limiter_update: Instant::now(),
> +            tag_handle: None,
> +            last_tags: None,
>               stream,
>           }
>       }
>   
>       fn update_limiters(&mut self) {
>           if let Some(ref update_limiter_cb) = self.update_limiter_cb {
> -            if self.last_limiter_update.elapsed().as_secs() >= 5 {
> +            let mut force_update = false;
> +            let current_tags = self
> +                .tag_handle
> +                .as_ref()
> +                .map(|handle| handle.lock().unwrap().clone());
> +
> +            if self.last_tags != current_tags {
> +                self.last_tags = current_tags;
> +                force_update = true;
> +            }
> +
> +            if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
>                   self.last_limiter_update = Instant::now();
>                   let (read_limiter, write_limiter) = update_limiter_cb();
>                   self.read_limiter = read_limiter;
> @@ -99,6 +123,15 @@ impl<S> RateLimitedStream<S> {
>       pub fn inner_mut(&mut self) -> &mut S {
>           &mut self.stream
>       }
> +
> +    pub fn tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.tag_handle.as_ref().map(Arc::clone)
> +    }
> +
> +    pub fn set_tag_handle(&mut self, handle: Arc<Mutex<RateLimiterTags>>) {
> +        self.tag_handle = Some(handle);
> +        self.last_tags = None;
> +    }
>   }
>   
>   fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option<Pin<Box<Sleep>>> {

diff --git a/proxmox-http/src/rate_limited_stream.rs 
b/proxmox-http/src/rate_limited_stream.rs
index 4bc5c414..bbb6cd6d 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -23,7 +23,7 @@ pub enum RateLimiterTag {
  pub type RateLimiterTags = Vec<RateLimiterTag>;

  pub type RateLimiterCallback =
-    dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
+    dyn Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, 
Option<SharedRateLimit>) + Send;

  /// A rate limited stream using [RateLimiter]
  pub struct RateLimitedStream<S> {
@@ -75,12 +75,13 @@ impl<S> RateLimitedStream<S> {
      /// Note: This function is called within an async context, so it
      /// should be fast and must not block.
      pub fn with_limiter_update_cb<
-        F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + 
Send + 'static,
+        F: Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, 
Option<SharedRateLimit>) + Send + 'static,
      >(
          stream: S,
          update_limiter_cb: F,
      ) -> Self {
-        let (read_limiter, write_limiter) = update_limiter_cb();
+        let tag_handle = Some(Arc::new(Mutex::new(Vec::new())));
+        let (read_limiter, write_limiter) = update_limiter_cb(&[]);
          Self {
              read_limiter,
              read_delay: None,
@@ -88,7 +89,7 @@ impl<S> RateLimitedStream<S> {
              write_delay: None,
              update_limiter_cb: Some(Box::new(update_limiter_cb)),
              last_limiter_update: Instant::now(),
-            tag_handle: None,
+            tag_handle,
              last_tags: None,
              stream,
          }
@@ -109,7 +110,8 @@ impl<S> RateLimitedStream<S> {

              if force_update || 
self.last_limiter_update.elapsed().as_secs() >= 5 {
                  self.last_limiter_update = Instant::now();
-                let (read_limiter, write_limiter) = update_limiter_cb();
+                let tags = self.last_tags.as_ref().map(|tags| 
tags.as_slice()).unwrap_or(&[]);
+                let (read_limiter, write_limiter) = 
update_limiter_cb(tags);
                  self.read_limiter = read_limiter;
                  self.write_limiter = write_limiter;
              }
@@ -127,11 +129,6 @@ impl<S> RateLimitedStream<S> {
      pub fn tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
          self.tag_handle.as_ref().map(Arc::clone)
      }
-
-    pub fn set_tag_handle(&mut self, handle: Arc<Mutex<RateLimiterTags>>) {
-        self.tag_handle = Some(handle);
-        self.last_tags = None;
-    }
  }

  fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> 
Option<Pin<Box<Sleep>>> {


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  reply	other threads:[~2025-11-10  8:52 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-07 13:23 [pbs-devel] [PATCH proxmox{, -backup} v2 0/6] add user specific rate-limits Hannes Laimer
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox v2 1/3] pbs-api-types: allow traffic-control rules to match users Hannes Laimer
2025-11-10  8:45   ` Christian Ebner
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox v2 2/3] http: track user tag updates on rate-limited streams Hannes Laimer
2025-11-10  8:53   ` Christian Ebner [this message]
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox v2 3/3] rest-server: propagate rate-limit tags from authenticated users Hannes Laimer
2025-11-10  8:55   ` Christian Ebner
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] api: taffic-control: update/delete users on rule correctly Hannes Laimer
2025-11-10  8:56   ` Christian Ebner
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox-backup v2 2/3] traffic-control: handle users specified in a " Hannes Laimer
2025-11-10  8:59   ` Christian Ebner
2025-11-07 13:23 ` [pbs-devel] [PATCH proxmox-backup v2 3/3] ui: traffic-control: add users field in edit form and list Hannes Laimer
2025-11-10  9:00   ` Christian Ebner
2025-11-10  9:03 ` [pbs-devel] [PATCH proxmox{, -backup} v2 0/6] add user specific rate-limits Christian Ebner
2025-11-10  9:13   ` Hannes Laimer
2025-11-10 13:44 ` [pbs-devel] superseded: " Hannes Laimer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=fb39d013-7610-4ac9-ae24-f30bf78a0b7b@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=h.laimer@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal