all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dietmar Maurer <dietmar@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite)
Date: Wed,  3 Nov 2021 13:42:45 +0100	[thread overview]
Message-ID: <20211103124247.1727711-1-dietmar@proxmox.com> (raw)

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-http/src/client/mod.rs                |   6 +
 .../src/client/rate_limited_stream.rs         | 144 ++++++++++++++++++
 proxmox-http/src/client/rate_limiter.rs       |  76 +++++++++
 3 files changed, 226 insertions(+)
 create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
 create mode 100644 proxmox-http/src/client/rate_limiter.rs

diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs
index b6ee4b0..30e66d5 100644
--- a/proxmox-http/src/client/mod.rs
+++ b/proxmox-http/src/client/mod.rs
@@ -2,6 +2,12 @@
 //!
 //! Contains a lightweight wrapper around `hyper` with support for TLS connections.
 
+mod rate_limiter;
+pub use rate_limiter::RateLimiter;
+
+mod rate_limited_stream;
+pub use rate_limited_stream::RateLimitedStream;
+
 mod connector;
 pub use connector::HttpsConnector;
 
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
new file mode 100644
index 0000000..434f923
--- /dev/null
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -0,0 +1,144 @@
+use std::pin::Pin;
+use std::marker::Unpin;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::Future;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
+use tokio::time::Sleep;
+
+use std::task::{Context, Poll};
+
+use super::RateLimiter;
+
+/// A rate limited stream using [RateLimiter]
+pub struct RateLimitedStream<S> {
+    read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    read_delay: Option<Pin<Box<Sleep>>>,
+    write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    write_delay: Option<Pin<Box<Sleep>>>,
+    stream: S,
+}
+
+impl <S> RateLimitedStream<S> {
+
+    const MIN_DELAY: Duration = Duration::from_millis(20);
+
+    /// Creates a new instance with reads and writes limited to the same `rate`.
+    pub fn new(stream: S, rate: u64) -> Self {
+        let now = Instant::now();
+        let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now)));
+        let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now)));
+        Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
+    }
+
+    /// Creates a new instance with specified [RateLimiters] for reads and writes.
+    pub fn with_limiter(
+        stream: S,
+        read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+        write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    ) -> Self {
+        Self {
+            read_limiter,
+            read_delay: None,
+            write_limiter,
+            write_delay: None,
+            stream,
+        }
+    }
+}
+
+impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
+
+    fn poll_write(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        buf: &[u8]
+    ) -> Poll<Result<usize, std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = match this.write_delay {
+            Some(ref mut future) => {
+                future.as_mut().poll(ctx).is_ready()
+            }
+            None => true,
+        };
+
+        if !is_ready { return Poll::Pending; }
+
+        this.write_delay = None;
+
+        let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
+
+        if let Some(ref write_limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = &result {
+                let now = Instant::now();
+                let delay = write_limiter.lock().unwrap()
+                    .register_traffic(now, *count as u64);
+                if delay >= Self::MIN_DELAY {
+                    let sleep = tokio::time::sleep(delay);
+                    this.write_delay = Some(Box::pin(sleep));
+                }
+            }
+        }
+
+        result
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.stream).poll_flush(ctx)
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.stream).poll_shutdown(ctx)
+    }
+}
+
+impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
+
+    fn poll_read(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = match this.read_delay {
+            Some(ref mut future) => {
+                future.as_mut().poll(ctx).is_ready()
+            }
+            None => true,
+        };
+
+        if !is_ready { return Poll::Pending; }
+
+        this.read_delay = None;
+
+        let filled_len = buf.filled().len();
+        let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
+
+        if let Some(ref read_limiter) = this.read_limiter {
+            if let Poll::Ready(Ok(())) = &result {
+                let count = buf.filled().len() - filled_len;
+                let now = Instant::now();
+                let delay = read_limiter.lock().unwrap()
+                    .register_traffic(now, count as u64);
+                if delay >= Self::MIN_DELAY {
+                    let sleep = tokio::time::sleep(delay);
+                    this.read_delay = Some(Box::pin(sleep));
+                }
+            }
+        }
+
+        result
+    }
+
+}
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
new file mode 100644
index 0000000..f917f57
--- /dev/null
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -0,0 +1,76 @@
+use std::time::{Duration, Instant};
+use std::convert::TryInto;
+
+/// Token bucket based rate limiter
+pub struct RateLimiter {
+    rate: u64, // tokens/second
+    start_time: Instant,
+    traffic: u64, // overall traffic
+    bucket_size: u64,
+    last_update: Instant,
+    consumed_tokens: u64,
+}
+
+impl RateLimiter {
+
+    const NO_DELAY: Duration = Duration::from_millis(0);
+
+    /// Creates a new instance, using [Instant::now] as start time.
+    pub fn new(rate: u64) -> Self {
+        let start_time = Instant::now();
+        Self::with_start_time(rate, start_time)
+    }
+
+    /// Creates a new instance with specified `rate` and `start_time`.
+    pub fn with_start_time(rate: u64, start_time: Instant) -> Self {
+        let bucket_size = rate * 3;
+        Self {
+            rate,
+            start_time,
+            traffic: 0,
+            bucket_size,
+            last_update: start_time,
+            // start with empty bucket (all tokens consumed)
+            consumed_tokens: bucket_size,
+        }
+    }
+
+    /// Returns the average rate (since `start_time`)
+    pub fn average_rate(&self, current_time: Instant) -> f64 {
+        let time_diff = (current_time - self.start_time).as_secs_f64();
+        if time_diff <= 0.0 {
+            0.0
+        } else {
+            (self.traffic as f64) / time_diff
+        }
+    }
+
+    fn refill_bucket(&mut self, current_time: Instant) {
+        let time_diff = (current_time - self.last_update).as_nanos();
+
+        if time_diff <= 0 {
+            //log::error!("update_time: got negative time diff");
+            return;
+        }
+
+        self.last_update = current_time;
+
+        let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
+            .try_into().unwrap_or(u64::MAX);
+
+        self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
+    }
+
+    /// Register traffic, returning a proposed delay to reach the expected rate.
+    pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
+        self.refill_bucket(current_time);
+
+        self.traffic += data_len;
+        self.consumed_tokens += data_len;
+
+        if self.consumed_tokens <= self.bucket_size {
+            return Self::NO_DELAY;
+        }
+        Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
+    }
+}
-- 
2.30.2





             reply	other threads:[~2021-11-03 12:43 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-11-03 12:42 Dietmar Maurer [this message]
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20211103124247.1727711-1-dietmar@proxmox.com \
    --to=dietmar@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal