From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 650C07BF90 for ; Wed, 3 Nov 2021 13:43:27 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 5BAB0139F5 for ; Wed, 3 Nov 2021 13:42:57 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 21F45139EB for ; Wed, 3 Nov 2021 13:42:56 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id F009945F36; Wed, 3 Nov 2021 13:42:55 +0100 (CET) From: Dietmar Maurer To: pbs-devel@lists.proxmox.com Date: Wed, 3 Nov 2021 13:42:45 +0100 Message-Id: <20211103124247.1727711-1-dietmar@proxmox.com> X-Mailer: git-send-email 2.30.2 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.527 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mod.rs] Subject: [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 03 Nov 2021 12:43:27 -0000 Signed-off-by: Dietmar Maurer --- proxmox-http/src/client/mod.rs | 6 + .../src/client/rate_limited_stream.rs | 144 ++++++++++++++++++ proxmox-http/src/client/rate_limiter.rs | 76 +++++++++ 3 files changed, 226 insertions(+) create mode 100644 proxmox-http/src/client/rate_limited_stream.rs create mode 100644 proxmox-http/src/client/rate_limiter.rs diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs index b6ee4b0..30e66d5 100644 --- a/proxmox-http/src/client/mod.rs +++ b/proxmox-http/src/client/mod.rs @@ -2,6 +2,12 @@ //! //! Contains a lightweight wrapper around `hyper` with support for TLS connections. +mod rate_limiter; +pub use rate_limiter::RateLimiter; + +mod rate_limited_stream; +pub use rate_limited_stream::RateLimitedStream; + mod connector; pub use connector::HttpsConnector; diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs new file mode 100644 index 0000000..434f923 --- /dev/null +++ b/proxmox-http/src/client/rate_limited_stream.rs @@ -0,0 +1,144 @@ +use std::pin::Pin; +use std::marker::Unpin; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use futures::Future; +use tokio::io::{ReadBuf, AsyncRead, AsyncWrite}; +use tokio::time::Sleep; + +use std::task::{Context, Poll}; + +use super::RateLimiter; + +/// A rate limited stream using [RateLimiter] +pub struct RateLimitedStream { + read_limiter: Option>>, + read_delay: Option>>, + write_limiter: Option>>, + write_delay: Option>>, + stream: S, +} + +impl RateLimitedStream { + + const MIN_DELAY: Duration = Duration::from_millis(20); + + /// Creates a new instance with reads and writes limited to the same `rate`. + pub fn new(stream: S, rate: u64) -> Self { + let now = Instant::now(); + let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now))); + let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now))); + Self::with_limiter(stream, Some(read_limiter), Some(write_limiter)) + } + + /// Creates a new instance with specified [RateLimiters] for reads and writes. + pub fn with_limiter( + stream: S, + read_limiter: Option>>, + write_limiter: Option>>, + ) -> Self { + Self { + read_limiter, + read_delay: None, + write_limiter, + write_delay: None, + stream, + } + } +} + +impl AsyncWrite for RateLimitedStream { + + fn poll_write( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + buf: &[u8] + ) -> Poll> { + let this = self.get_mut(); + + let is_ready = match this.write_delay { + Some(ref mut future) => { + future.as_mut().poll(ctx).is_ready() + } + None => true, + }; + + if !is_ready { return Poll::Pending; } + + this.write_delay = None; + + let result = Pin::new(&mut this.stream).poll_write(ctx, buf); + + if let Some(ref write_limiter) = this.write_limiter { + if let Poll::Ready(Ok(count)) = &result { + let now = Instant::now(); + let delay = write_limiter.lock().unwrap() + .register_traffic(now, *count as u64); + if delay >= Self::MIN_DELAY { + let sleep = tokio::time::sleep(delay); + this.write_delay = Some(Box::pin(sleep)); + } + } + } + + result + } + + fn poll_flush( + self: Pin<&mut Self>, + ctx: &mut Context<'_> + ) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.stream).poll_flush(ctx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + ctx: &mut Context<'_> + ) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.stream).poll_shutdown(ctx) + } +} + +impl AsyncRead for RateLimitedStream { + + fn poll_read( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.get_mut(); + + let is_ready = match this.read_delay { + Some(ref mut future) => { + future.as_mut().poll(ctx).is_ready() + } + None => true, + }; + + if !is_ready { return Poll::Pending; } + + this.read_delay = None; + + let filled_len = buf.filled().len(); + let result = Pin::new(&mut this.stream).poll_read(ctx, buf); + + if let Some(ref read_limiter) = this.read_limiter { + if let Poll::Ready(Ok(())) = &result { + let count = buf.filled().len() - filled_len; + let now = Instant::now(); + let delay = read_limiter.lock().unwrap() + .register_traffic(now, count as u64); + if delay >= Self::MIN_DELAY { + let sleep = tokio::time::sleep(delay); + this.read_delay = Some(Box::pin(sleep)); + } + } + } + + result + } + +} diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs new file mode 100644 index 0000000..f917f57 --- /dev/null +++ b/proxmox-http/src/client/rate_limiter.rs @@ -0,0 +1,76 @@ +use std::time::{Duration, Instant}; +use std::convert::TryInto; + +/// Token bucket based rate limiter +pub struct RateLimiter { + rate: u64, // tokens/second + start_time: Instant, + traffic: u64, // overall traffic + bucket_size: u64, + last_update: Instant, + consumed_tokens: u64, +} + +impl RateLimiter { + + const NO_DELAY: Duration = Duration::from_millis(0); + + /// Creates a new instance, using [Instant::now] as start time. + pub fn new(rate: u64) -> Self { + let start_time = Instant::now(); + Self::with_start_time(rate, start_time) + } + + /// Creates a new instance with specified `rate` and `start_time`. + pub fn with_start_time(rate: u64, start_time: Instant) -> Self { + let bucket_size = rate * 3; + Self { + rate, + start_time, + traffic: 0, + bucket_size, + last_update: start_time, + // start with empty bucket (all tokens consumed) + consumed_tokens: bucket_size, + } + } + + /// Returns the average rate (since `start_time`) + pub fn average_rate(&self, current_time: Instant) -> f64 { + let time_diff = (current_time - self.start_time).as_secs_f64(); + if time_diff <= 0.0 { + 0.0 + } else { + (self.traffic as f64) / time_diff + } + } + + fn refill_bucket(&mut self, current_time: Instant) { + let time_diff = (current_time - self.last_update).as_nanos(); + + if time_diff <= 0 { + //log::error!("update_time: got negative time diff"); + return; + } + + self.last_update = current_time; + + let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000) + .try_into().unwrap_or(u64::MAX); + + self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic); + } + + /// Register traffic, returning a proposed delay to reach the expected rate. + pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration { + self.refill_bucket(current_time); + + self.traffic += data_len; + self.consumed_tokens += data_len; + + if self.consumed_tokens <= self.bucket_size { + return Self::NO_DELAY; + } + Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate) + } +} -- 2.30.2