From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 38D501FF15E for ; Mon, 10 Nov 2025 09:52:30 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 6AE04FF20; Mon, 10 Nov 2025 09:53:15 +0100 (CET) Message-ID: Date: Mon, 10 Nov 2025 09:53:10 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird To: Proxmox Backup Server development discussion , Hannes Laimer References: <20251107132329.42965-1-h.laimer@proxmox.com> <20251107132329.42965-3-h.laimer@proxmox.com> Content-Language: en-US, de-DE From: Christian Ebner In-Reply-To: <20251107132329.42965-3-h.laimer@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1762764768790 X-SPAM-LEVEL: Spam detection results: 1 AWL -2.828 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SOMETLD_ARE_BAD_TLD 5 .bar, .beauty, .buzz, .cam, .casa, .cfd, .club, .date, .guru, .link, .live, .monster, .online, .press, .pw, .quest, .rest, .sbs, .shop, .stream, .top, .trade, .wiki, .work, .xyz TLD abuse PDS_OTHER_BAD_TLD 0.75 Untrustworthy TLDs SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox v2 2/3] http: track user tag updates on rate-limited streams X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" On 11/7/25 2:23 PM, Hannes Laimer wrote: > Introduce rate-limit tags with a user variant and let rate-limited > streams hold a shared handle so callbacks can refresh limits > whenever the tag set changes. > > Signed-off-by: Hannes Laimer > --- > proxmox-http/src/lib.rs | 2 +- > proxmox-http/src/rate_limited_stream.rs | 35 ++++++++++++++++++++++++- > 2 files changed, 35 insertions(+), 2 deletions(-) > > diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs > index 8b6953b0..990f8f36 100644 > --- a/proxmox-http/src/lib.rs > +++ b/proxmox-http/src/lib.rs > @@ -34,7 +34,7 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi > #[cfg(feature = "rate-limited-stream")] > mod rate_limited_stream; > #[cfg(feature = "rate-limited-stream")] > -pub use rate_limited_stream::RateLimitedStream; > +pub use rate_limited_stream::{RateLimitedStream, RateLimiterTag, RateLimiterTags}; > > #[cfg(feature = "body")] > mod body; > diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs > index e24df7af..4bc5c414 100644 > --- a/proxmox-http/src/rate_limited_stream.rs > +++ b/proxmox-http/src/rate_limited_stream.rs > @@ -15,6 +15,13 @@ use super::{RateLimiter, ShareableRateLimit}; > > type SharedRateLimit = Arc; > > +#[derive(Clone, Debug, PartialEq, Eq)] > +pub enum RateLimiterTag { > + User(String), > +} note: just to add a justification for the enum here, which could also be part of the commit message. I see potential here to add group based filter if there will be a user group implementation as requested in [0]. [0] https://bugzilla.proxmox.com/show_bug.cgi?id=5867 > + > +pub type RateLimiterTags = Vec; > + > pub type RateLimiterCallback = > dyn Fn() -> (Option, Option) + Send; I still see potential to improve the code here as already mentioned in v1. If the list of RateLimiterTag's is passed as paramter via the callback, that will simplify the logic for the patches on the rest-server implementation. See the suggested diff below an on the next patch. > > @@ -26,6 +33,8 @@ pub struct RateLimitedStream { > write_delay: Option>>, > update_limiter_cb: Option>, > last_limiter_update: Instant, > + tag_handle: Option>>, > + last_tags: Option, > stream: S, > } > > @@ -53,6 +62,8 @@ impl RateLimitedStream { > write_delay: None, > update_limiter_cb: None, > last_limiter_update: Instant::now(), > + tag_handle: None, > + last_tags: None, > stream, > } > } > @@ -77,13 +88,26 @@ impl RateLimitedStream { > write_delay: None, > update_limiter_cb: Some(Box::new(update_limiter_cb)), > last_limiter_update: Instant::now(), > + tag_handle: None, > + last_tags: None, > stream, > } > } > > fn update_limiters(&mut self) { > if let Some(ref update_limiter_cb) = self.update_limiter_cb { > - if self.last_limiter_update.elapsed().as_secs() >= 5 { > + let mut force_update = false; > + let current_tags = self > + .tag_handle > + .as_ref() > + .map(|handle| handle.lock().unwrap().clone()); > + > + if self.last_tags != current_tags { > + self.last_tags = current_tags; > + force_update = true; > + } > + > + if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 { > self.last_limiter_update = Instant::now(); > let (read_limiter, write_limiter) = update_limiter_cb(); > self.read_limiter = read_limiter; > @@ -99,6 +123,15 @@ impl RateLimitedStream { > pub fn inner_mut(&mut self) -> &mut S { > &mut self.stream > } > + > + pub fn tag_handle(&self) -> Option>> { > + self.tag_handle.as_ref().map(Arc::clone) > + } > + > + pub fn set_tag_handle(&mut self, handle: Arc>) { > + self.tag_handle = Some(handle); > + self.last_tags = None; > + } > } > > fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option>> { diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs index 4bc5c414..bbb6cd6d 100644 --- a/proxmox-http/src/rate_limited_stream.rs +++ b/proxmox-http/src/rate_limited_stream.rs @@ -23,7 +23,7 @@ pub enum RateLimiterTag { pub type RateLimiterTags = Vec; pub type RateLimiterCallback = - dyn Fn() -> (Option, Option) + Send; + dyn Fn(&[RateLimiterTag]) -> (Option, Option) + Send; /// A rate limited stream using [RateLimiter] pub struct RateLimitedStream { @@ -75,12 +75,13 @@ impl RateLimitedStream { /// Note: This function is called within an async context, so it /// should be fast and must not block. pub fn with_limiter_update_cb< - F: Fn() -> (Option, Option) + Send + 'static, + F: Fn(&[RateLimiterTag]) -> (Option, Option) + Send + 'static, >( stream: S, update_limiter_cb: F, ) -> Self { - let (read_limiter, write_limiter) = update_limiter_cb(); + let tag_handle = Some(Arc::new(Mutex::new(Vec::new()))); + let (read_limiter, write_limiter) = update_limiter_cb(&[]); Self { read_limiter, read_delay: None, @@ -88,7 +89,7 @@ impl RateLimitedStream { write_delay: None, update_limiter_cb: Some(Box::new(update_limiter_cb)), last_limiter_update: Instant::now(), - tag_handle: None, + tag_handle, last_tags: None, stream, } @@ -109,7 +110,8 @@ impl RateLimitedStream { if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 { self.last_limiter_update = Instant::now(); - let (read_limiter, write_limiter) = update_limiter_cb(); + let tags = self.last_tags.as_ref().map(|tags| tags.as_slice()).unwrap_or(&[]); + let (read_limiter, write_limiter) = update_limiter_cb(tags); self.read_limiter = read_limiter; self.write_limiter = write_limiter; } @@ -127,11 +129,6 @@ impl RateLimitedStream { pub fn tag_handle(&self) -> Option>> { self.tag_handle.as_ref().map(Arc::clone) } - - pub fn set_tag_handle(&mut self, handle: Arc>) { - self.tag_handle = Some(handle); - self.last_tags = None; - } } fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option>> { _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel