From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 56C311FF16B for ; Fri, 21 Nov 2025 14:51:12 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id A3CCA2279C; Fri, 21 Nov 2025 14:51:19 +0100 (CET) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Fri, 21 Nov 2025 14:50:39 +0100 Message-ID: <20251121135043.97142-3-h.laimer@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20251121135043.97142-1-h.laimer@proxmox.com> References: <20251121135043.97142-1-h.laimer@proxmox.com> MIME-Version: 1.0 X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1763733012897 X-SPAM-LEVEL: Spam detection results: 1 AWL -2.827 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SOMETLD_ARE_BAD_TLD 5 .bar, .beauty, .buzz, .cam, .casa, .cfd, .club, .date, .guru, .link, .live, .monster, .online, .press, .pw, .quest, .rest, .sbs, .shop, .stream, .top, .trade, .wiki, .work, .xyz TLD abuse PDS_OTHER_BAD_TLD 0.75 Untrustworthy TLDs RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox.com, self.stream, lib.rs] Subject: [pbs-devel] [PATCH proxmox v5 2/3] http: track user tag updates on rate-limited streams X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Introduce rate-limit tags with a user variant and let rate-limited streams hold a shared handle so callbacks can refresh limits whenever the tag set changes. If we decide to implement something like [1] in the future this could potentially include group rate-limits for example. [1] https://bugzilla.proxmox.com/show_bug.cgi?id=5867 Signed-off-by: Hannes Laimer --- proxmox-http/src/lib.rs | 4 +- proxmox-http/src/rate_limited_stream.rs | 71 +++++++++++++++++++++++-- 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs index 8b6953b0..2c7bb750 100644 --- a/proxmox-http/src/lib.rs +++ b/proxmox-http/src/lib.rs @@ -34,7 +34,9 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi #[cfg(feature = "rate-limited-stream")] mod rate_limited_stream; #[cfg(feature = "rate-limited-stream")] -pub use rate_limited_stream::RateLimitedStream; +pub use rate_limited_stream::{ + RateLimitedStream, RateLimiterTag, RateLimiterTags, RateLimiterTagsHandle, +}; #[cfg(feature = "body")] mod body; diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs index e24df7af..27f1d1f9 100644 --- a/proxmox-http/src/rate_limited_stream.rs +++ b/proxmox-http/src/rate_limited_stream.rs @@ -2,6 +2,7 @@ use std::future::Future; use std::io::IoSlice; use std::marker::Unpin; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -15,8 +16,39 @@ use super::{RateLimiter, ShareableRateLimit}; type SharedRateLimit = Arc; +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum RateLimiterTag { + User(String), +} + +pub type RateLimiterTags = Vec; + +#[derive(Clone, Debug)] +pub struct RateLimiterTagsHandle { + tags: Arc>, + dirty: Arc, +} + +impl RateLimiterTagsHandle { + fn new() -> Self { + Self { + tags: Arc::new(Mutex::new(Vec::new())), + dirty: Arc::new(AtomicBool::new(false)), + } + } + + pub fn lock(&self) -> std::sync::MutexGuard<'_, RateLimiterTags> { + self.tags.lock().unwrap() + } + + pub fn set_tags(&self, tags: RateLimiterTags) { + *self.tags.lock().unwrap() = tags; + self.dirty.store(true, Ordering::Release); + } +} + pub type RateLimiterCallback = - dyn Fn() -> (Option, Option) + Send; + dyn Fn(&[RateLimiterTag]) -> (Option, Option) + Send; /// A rate limited stream using [RateLimiter] pub struct RateLimitedStream { @@ -26,6 +58,7 @@ pub struct RateLimitedStream { write_delay: Option>>, update_limiter_cb: Option>, last_limiter_update: Instant, + tag_handle: Option, stream: S, } @@ -53,6 +86,7 @@ impl RateLimitedStream { write_delay: None, update_limiter_cb: None, last_limiter_update: Instant::now(), + tag_handle: None, stream, } } @@ -64,12 +98,15 @@ impl RateLimitedStream { /// Note: This function is called within an async context, so it /// should be fast and must not block. pub fn with_limiter_update_cb< - F: Fn() -> (Option, Option) + Send + 'static, + F: Fn(&[RateLimiterTag]) -> (Option, Option) + + Send + + 'static, >( stream: S, update_limiter_cb: F, ) -> Self { - let (read_limiter, write_limiter) = update_limiter_cb(); + let tag_handle = Some(RateLimiterTagsHandle::new()); + let (read_limiter, write_limiter) = update_limiter_cb(&[]); Self { read_limiter, read_delay: None, @@ -77,15 +114,29 @@ impl RateLimitedStream { write_delay: None, update_limiter_cb: Some(Box::new(update_limiter_cb)), last_limiter_update: Instant::now(), + tag_handle, stream, } } fn update_limiters(&mut self) { if let Some(ref update_limiter_cb) = self.update_limiter_cb { - if self.last_limiter_update.elapsed().as_secs() >= 5 { + let mut force_update = false; + + if let Some(ref handle) = self.tag_handle { + if handle.dirty.swap(false, Ordering::Acquire) { + force_update = true; + } + } + + if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 { self.last_limiter_update = Instant::now(); - let (read_limiter, write_limiter) = update_limiter_cb(); + let (read_limiter, write_limiter) = if let Some(ref handle) = self.tag_handle { + let tags = handle.lock(); + update_limiter_cb(&tags) + } else { + update_limiter_cb(&[]) + }; self.read_limiter = read_limiter; self.write_limiter = write_limiter; } @@ -99,6 +150,16 @@ impl RateLimitedStream { pub fn inner_mut(&mut self) -> &mut S { &mut self.stream } + + pub fn tag_handle(&self) -> Option>> { + self.tag_handle + .as_ref() + .map(|handle| Arc::clone(&handle.tags)) + } + + pub fn rate_limiter_tags_handle(&self) -> Option<&RateLimiterTagsHandle> { + self.tag_handle.as_ref() + } } fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option>> { -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel