public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite)
@ 2021-11-03 12:42 Dietmar Maurer
  2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
  2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer
  0 siblings, 2 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-http/src/client/mod.rs                |   6 +
 .../src/client/rate_limited_stream.rs         | 144 ++++++++++++++++++
 proxmox-http/src/client/rate_limiter.rs       |  76 +++++++++
 3 files changed, 226 insertions(+)
 create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
 create mode 100644 proxmox-http/src/client/rate_limiter.rs

diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs
index b6ee4b0..30e66d5 100644
--- a/proxmox-http/src/client/mod.rs
+++ b/proxmox-http/src/client/mod.rs
@@ -2,6 +2,12 @@
 //!
 //! Contains a lightweight wrapper around `hyper` with support for TLS connections.
 
+mod rate_limiter;
+pub use rate_limiter::RateLimiter;
+
+mod rate_limited_stream;
+pub use rate_limited_stream::RateLimitedStream;
+
 mod connector;
 pub use connector::HttpsConnector;
 
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
new file mode 100644
index 0000000..434f923
--- /dev/null
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -0,0 +1,144 @@
+use std::pin::Pin;
+use std::marker::Unpin;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::Future;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
+use tokio::time::Sleep;
+
+use std::task::{Context, Poll};
+
+use super::RateLimiter;
+
+/// A rate limited stream using [RateLimiter]
+pub struct RateLimitedStream<S> {
+    read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    read_delay: Option<Pin<Box<Sleep>>>,
+    write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    write_delay: Option<Pin<Box<Sleep>>>,
+    stream: S,
+}
+
+impl <S> RateLimitedStream<S> {
+
+    const MIN_DELAY: Duration = Duration::from_millis(20);
+
+    /// Creates a new instance with reads and writes limited to the same `rate`.
+    pub fn new(stream: S, rate: u64) -> Self {
+        let now = Instant::now();
+        let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now)));
+        let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now)));
+        Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
+    }
+
+    /// Creates a new instance with specified [RateLimiters] for reads and writes.
+    pub fn with_limiter(
+        stream: S,
+        read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+        write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    ) -> Self {
+        Self {
+            read_limiter,
+            read_delay: None,
+            write_limiter,
+            write_delay: None,
+            stream,
+        }
+    }
+}
+
+impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
+
+    fn poll_write(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        buf: &[u8]
+    ) -> Poll<Result<usize, std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = match this.write_delay {
+            Some(ref mut future) => {
+                future.as_mut().poll(ctx).is_ready()
+            }
+            None => true,
+        };
+
+        if !is_ready { return Poll::Pending; }
+
+        this.write_delay = None;
+
+        let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
+
+        if let Some(ref write_limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = &result {
+                let now = Instant::now();
+                let delay = write_limiter.lock().unwrap()
+                    .register_traffic(now, *count as u64);
+                if delay >= Self::MIN_DELAY {
+                    let sleep = tokio::time::sleep(delay);
+                    this.write_delay = Some(Box::pin(sleep));
+                }
+            }
+        }
+
+        result
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.stream).poll_flush(ctx)
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.stream).poll_shutdown(ctx)
+    }
+}
+
+impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
+
+    fn poll_read(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = match this.read_delay {
+            Some(ref mut future) => {
+                future.as_mut().poll(ctx).is_ready()
+            }
+            None => true,
+        };
+
+        if !is_ready { return Poll::Pending; }
+
+        this.read_delay = None;
+
+        let filled_len = buf.filled().len();
+        let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
+
+        if let Some(ref read_limiter) = this.read_limiter {
+            if let Poll::Ready(Ok(())) = &result {
+                let count = buf.filled().len() - filled_len;
+                let now = Instant::now();
+                let delay = read_limiter.lock().unwrap()
+                    .register_traffic(now, count as u64);
+                if delay >= Self::MIN_DELAY {
+                    let sleep = tokio::time::sleep(delay);
+                    this.read_delay = Some(Box::pin(sleep));
+                }
+            }
+        }
+
+        result
+    }
+
+}
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
new file mode 100644
index 0000000..f917f57
--- /dev/null
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -0,0 +1,76 @@
+use std::time::{Duration, Instant};
+use std::convert::TryInto;
+
+/// Token bucket based rate limiter
+pub struct RateLimiter {
+    rate: u64, // tokens/second
+    start_time: Instant,
+    traffic: u64, // overall traffic
+    bucket_size: u64,
+    last_update: Instant,
+    consumed_tokens: u64,
+}
+
+impl RateLimiter {
+
+    const NO_DELAY: Duration = Duration::from_millis(0);
+
+    /// Creates a new instance, using [Instant::now] as start time.
+    pub fn new(rate: u64) -> Self {
+        let start_time = Instant::now();
+        Self::with_start_time(rate, start_time)
+    }
+
+    /// Creates a new instance with specified `rate` and `start_time`.
+    pub fn with_start_time(rate: u64, start_time: Instant) -> Self {
+        let bucket_size = rate * 3;
+        Self {
+            rate,
+            start_time,
+            traffic: 0,
+            bucket_size,
+            last_update: start_time,
+            // start with empty bucket (all tokens consumed)
+            consumed_tokens: bucket_size,
+        }
+    }
+
+    /// Returns the average rate (since `start_time`)
+    pub fn average_rate(&self, current_time: Instant) -> f64 {
+        let time_diff = (current_time - self.start_time).as_secs_f64();
+        if time_diff <= 0.0 {
+            0.0
+        } else {
+            (self.traffic as f64) / time_diff
+        }
+    }
+
+    fn refill_bucket(&mut self, current_time: Instant) {
+        let time_diff = (current_time - self.last_update).as_nanos();
+
+        if time_diff <= 0 {
+            //log::error!("update_time: got negative time diff");
+            return;
+        }
+
+        self.last_update = current_time;
+
+        let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
+            .try_into().unwrap_or(u64::MAX);
+
+        self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
+    }
+
+    /// Register traffic, returning a proposed delay to reach the expected rate.
+    pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
+        self.refill_bucket(current_time);
+
+        self.traffic += data_len;
+        self.consumed_tokens += data_len;
+
+        if self.consumed_tokens <= self.bucket_size {
+            return Self::NO_DELAY;
+        }
+        Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
+    }
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 3+ messages in thread

* [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored
  2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
@ 2021-11-03 12:42 ` Dietmar Maurer
  2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer
  1 sibling, 0 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 .../src/client/rate_limited_stream.rs         | 92 +++++++++++++------
 1 file changed, 62 insertions(+), 30 deletions(-)

diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 434f923..8b4123f 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::pin::Pin;
 use std::marker::Unpin;
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, Instant};
+use std::io::IoSlice;
 
 use futures::Future;
 use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
@@ -22,8 +23,6 @@ pub struct RateLimitedStream<S> {
 
 impl <S> RateLimitedStream<S> {
 
-    const MIN_DELAY: Duration = Duration::from_millis(20);
-
     /// Creates a new instance with reads and writes limited to the same `rate`.
     pub fn new(stream: S, rate: u64) -> Self {
         let now = Instant::now();
@@ -48,6 +47,33 @@ impl <S> RateLimitedStream<S> {
     }
 }
 
+fn register_traffic(
+    limiter: &Mutex<RateLimiter>,
+    count: usize,
+) -> Option<Pin<Box<Sleep>>>{
+
+    const MIN_DELAY: Duration = Duration::from_millis(10);
+
+    let now = Instant::now();
+    let delay = limiter.lock().unwrap()
+        .register_traffic(now, count as u64);
+    if delay >= MIN_DELAY {
+        let sleep = tokio::time::sleep(delay);
+        Some(Box::pin(sleep))
+    } else {
+        None
+    }
+}
+
+fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
+    match delay {
+        Some(ref mut future) => {
+            future.as_mut().poll(ctx).is_ready()
+        }
+        None => true,
+    }
+}
+
 impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
 
     fn poll_write(
@@ -57,12 +83,7 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
     ) -> Poll<Result<usize, std::io::Error>> {
         let this = self.get_mut();
 
-        let is_ready = match this.write_delay {
-            Some(ref mut future) => {
-                future.as_mut().poll(ctx).is_ready()
-            }
-            None => true,
-        };
+        let is_ready = delay_is_ready(&mut this.write_delay, ctx);
 
         if !is_ready { return Poll::Pending; }
 
@@ -70,15 +91,37 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
 
         let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
 
-        if let Some(ref write_limiter) = this.write_limiter {
-            if let Poll::Ready(Ok(count)) = &result {
-                let now = Instant::now();
-                let delay = write_limiter.lock().unwrap()
-                    .register_traffic(now, *count as u64);
-                if delay >= Self::MIN_DELAY {
-                    let sleep = tokio::time::sleep(delay);
-                    this.write_delay = Some(Box::pin(sleep));
-                }
+        if let Some(ref limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = result {
+                this.write_delay = register_traffic(limiter, count);
+            }
+        }
+
+        result
+    }
+
+    fn is_write_vectored(&self) -> bool {
+        self.stream.is_write_vectored()
+    }
+
+    fn poll_write_vectored(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        bufs: &[IoSlice<'_>]
+    ) -> Poll<Result<usize, std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = delay_is_ready(&mut this.write_delay, ctx);
+
+        if !is_ready { return Poll::Pending; }
+
+        this.write_delay = None;
+
+        let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
+
+        if let Some(ref limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = result {
+                this.write_delay = register_traffic(limiter, count);
             }
         }
 
@@ -111,12 +154,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
     ) -> Poll<Result<(), std::io::Error>> {
         let this = self.get_mut();
 
-        let is_ready = match this.read_delay {
-            Some(ref mut future) => {
-                future.as_mut().poll(ctx).is_ready()
-            }
-            None => true,
-        };
+        let is_ready = delay_is_ready(&mut this.read_delay, ctx);
 
         if !is_ready { return Poll::Pending; }
 
@@ -128,13 +166,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
         if let Some(ref read_limiter) = this.read_limiter {
             if let Poll::Ready(Ok(())) = &result {
                 let count = buf.filled().len() - filled_len;
-                let now = Instant::now();
-                let delay = read_limiter.lock().unwrap()
-                    .register_traffic(now, count as u64);
-                if delay >= Self::MIN_DELAY {
-                    let sleep = tokio::time::sleep(delay);
-                    this.read_delay = Some(Box::pin(sleep));
-                }
+                this.read_delay = register_traffic(read_limiter, count);
             }
         }
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 3+ messages in thread

* [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream
  2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
  2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
@ 2021-11-03 12:42 ` Dietmar Maurer
  1 sibling, 0 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
  To: pbs-devel

So that we can limit used bandwidth.

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-http/src/client/connector.rs          | 51 ++++++++++++++++---
 .../src/client/rate_limited_stream.rs         |  8 +++
 2 files changed, 51 insertions(+), 8 deletions(-)

diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
 use anyhow::{bail, format_err, Error};
 use std::os::unix::io::AsRawFd;
 use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
 
 use futures::*;
 use http::Uri;
 use hyper::client::HttpConnector;
 use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tokio::net::TcpStream;
 use tokio_openssl::SslStream;
 
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
 use crate::tls::MaybeTlsStream;
 use crate::uri::build_authority;
 
+use super::{RateLimiter, RateLimitedStream};
+
 #[derive(Clone)]
 pub struct HttpsConnector {
     connector: HttpConnector,
     ssl_connector: Arc<SslConnector>,
     proxy: Option<ProxyConfig>,
     tcp_keepalive: u32,
+    read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    write_limiter: Option<Arc<Mutex<RateLimiter>>>,
 }
 
 impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
             ssl_connector: Arc::new(ssl_connector),
             proxy: None,
             tcp_keepalive,
+            read_limiter: None,
+            write_limiter: None,
         }
     }
 
@@ -45,13 +51,21 @@ impl HttpsConnector {
         self.proxy = Some(proxy);
     }
 
-    async fn secure_stream(
-        tcp_stream: TcpStream,
+    pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+        self.read_limiter = limiter;
+    }
+
+    pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+        self.write_limiter = limiter;
+    }
+
+    async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+        tcp_stream: S,
         ssl_connector: &SslConnector,
         host: &str,
-    ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+    ) -> Result<MaybeTlsStream<S>, Error> {
         let config = ssl_connector.configure()?;
-        let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+        let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
         Pin::new(&mut conn).connect().await?;
         Ok(MaybeTlsStream::Secured(conn))
     }
@@ -107,7 +121,7 @@ impl HttpsConnector {
 }
 
 impl hyper::service::Service<Uri> for HttpsConnector {
-    type Response = MaybeTlsStream<TcpStream>;
+    type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
     type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
         };
         let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
         let keepalive = self.tcp_keepalive;
+        let read_limiter = self.read_limiter.clone();
+        let write_limiter = self.write_limiter.clone();
+
 
         if let Some(ref proxy) = self.proxy {
             let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
             if use_connect {
                 async move {
-                    let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
                         format_err!("error connecting to {} - {}", proxy_authority, err)
                     })?;
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                    let mut tcp_stream = RateLimitedStream::with_limiter(
+                        tcp_stream,
+                        read_limiter,
+                        write_limiter,
+                    );
+
                     let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
                     if let Some(authorization) = authorization {
                         connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                    let tcp_stream = RateLimitedStream::with_limiter(
+                        tcp_stream,
+                        read_limiter,
+                        write_limiter,
+                    );
+
                     Ok(MaybeTlsStream::Proxied(tcp_stream))
                 }
                 .boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
                 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                let tcp_stream = RateLimitedStream::with_limiter(
+                    tcp_stream,
+                    read_limiter,
+                    write_limiter,
+                );
+
                 if is_https {
                     Self::secure_stream(tcp_stream, &ssl_connector, &host).await
                 } else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 8b4123f..d21f55c 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
 use futures::Future;
 use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
 use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
 
 use std::task::{Context, Poll};
 
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
     }
 
 }
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+    fn connected(&self) -> Connected {
+        self.stream.connected()
+    }
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2021-11-03 12:43 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal