* [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite)
@ 2021-11-03 12:42 Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer
0 siblings, 2 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/mod.rs | 6 +
.../src/client/rate_limited_stream.rs | 144 ++++++++++++++++++
proxmox-http/src/client/rate_limiter.rs | 76 +++++++++
3 files changed, 226 insertions(+)
create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
create mode 100644 proxmox-http/src/client/rate_limiter.rs
diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs
index b6ee4b0..30e66d5 100644
--- a/proxmox-http/src/client/mod.rs
+++ b/proxmox-http/src/client/mod.rs
@@ -2,6 +2,12 @@
//!
//! Contains a lightweight wrapper around `hyper` with support for TLS connections.
+mod rate_limiter;
+pub use rate_limiter::RateLimiter;
+
+mod rate_limited_stream;
+pub use rate_limited_stream::RateLimitedStream;
+
mod connector;
pub use connector::HttpsConnector;
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
new file mode 100644
index 0000000..434f923
--- /dev/null
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -0,0 +1,144 @@
+use std::pin::Pin;
+use std::marker::Unpin;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::Future;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
+use tokio::time::Sleep;
+
+use std::task::{Context, Poll};
+
+use super::RateLimiter;
+
+/// A rate limited stream using [RateLimiter]
+pub struct RateLimitedStream<S> {
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ read_delay: Option<Pin<Box<Sleep>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_delay: Option<Pin<Box<Sleep>>>,
+ stream: S,
+}
+
+impl <S> RateLimitedStream<S> {
+
+ const MIN_DELAY: Duration = Duration::from_millis(20);
+
+ /// Creates a new instance with reads and writes limited to the same `rate`.
+ pub fn new(stream: S, rate: u64) -> Self {
+ let now = Instant::now();
+ let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now)));
+ let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now)));
+ Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
+ }
+
+ /// Creates a new instance with specified [RateLimiters] for reads and writes.
+ pub fn with_limiter(
+ stream: S,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ ) -> Self {
+ Self {
+ read_limiter,
+ read_delay: None,
+ write_limiter,
+ write_delay: None,
+ stream,
+ }
+ }
+}
+
+impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
+
+ fn poll_write(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.write_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
+
+ if let Some(ref write_limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = &result {
+ let now = Instant::now();
+ let delay = write_limiter.lock().unwrap()
+ .register_traffic(now, *count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.write_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_flush(ctx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_shutdown(ctx)
+ }
+}
+
+impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.read_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.read_delay = None;
+
+ let filled_len = buf.filled().len();
+ let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
+
+ if let Some(ref read_limiter) = this.read_limiter {
+ if let Poll::Ready(Ok(())) = &result {
+ let count = buf.filled().len() - filled_len;
+ let now = Instant::now();
+ let delay = read_limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.read_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+}
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
new file mode 100644
index 0000000..f917f57
--- /dev/null
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -0,0 +1,76 @@
+use std::time::{Duration, Instant};
+use std::convert::TryInto;
+
+/// Token bucket based rate limiter
+pub struct RateLimiter {
+ rate: u64, // tokens/second
+ start_time: Instant,
+ traffic: u64, // overall traffic
+ bucket_size: u64,
+ last_update: Instant,
+ consumed_tokens: u64,
+}
+
+impl RateLimiter {
+
+ const NO_DELAY: Duration = Duration::from_millis(0);
+
+ /// Creates a new instance, using [Instant::now] as start time.
+ pub fn new(rate: u64) -> Self {
+ let start_time = Instant::now();
+ Self::with_start_time(rate, start_time)
+ }
+
+ /// Creates a new instance with specified `rate` and `start_time`.
+ pub fn with_start_time(rate: u64, start_time: Instant) -> Self {
+ let bucket_size = rate * 3;
+ Self {
+ rate,
+ start_time,
+ traffic: 0,
+ bucket_size,
+ last_update: start_time,
+ // start with empty bucket (all tokens consumed)
+ consumed_tokens: bucket_size,
+ }
+ }
+
+ /// Returns the average rate (since `start_time`)
+ pub fn average_rate(&self, current_time: Instant) -> f64 {
+ let time_diff = (current_time - self.start_time).as_secs_f64();
+ if time_diff <= 0.0 {
+ 0.0
+ } else {
+ (self.traffic as f64) / time_diff
+ }
+ }
+
+ fn refill_bucket(&mut self, current_time: Instant) {
+ let time_diff = (current_time - self.last_update).as_nanos();
+
+ if time_diff <= 0 {
+ //log::error!("update_time: got negative time diff");
+ return;
+ }
+
+ self.last_update = current_time;
+
+ let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
+ .try_into().unwrap_or(u64::MAX);
+
+ self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
+ }
+
+ /// Register traffic, returning a proposed delay to reach the expected rate.
+ pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
+ self.refill_bucket(current_time);
+
+ self.traffic += data_len;
+ self.consumed_tokens += data_len;
+
+ if self.consumed_tokens <= self.bucket_size {
+ return Self::NO_DELAY;
+ }
+ Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 3+ messages in thread
* [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
@ 2021-11-03 12:42 ` Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer
1 sibling, 0 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
.../src/client/rate_limited_stream.rs | 92 +++++++++++++------
1 file changed, 62 insertions(+), 30 deletions(-)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 434f923..8b4123f 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::pin::Pin;
use std::marker::Unpin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
+use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
@@ -22,8 +23,6 @@ pub struct RateLimitedStream<S> {
impl <S> RateLimitedStream<S> {
- const MIN_DELAY: Duration = Duration::from_millis(20);
-
/// Creates a new instance with reads and writes limited to the same `rate`.
pub fn new(stream: S, rate: u64) -> Self {
let now = Instant::now();
@@ -48,6 +47,33 @@ impl <S> RateLimitedStream<S> {
}
}
+fn register_traffic(
+ limiter: &Mutex<RateLimiter>,
+ count: usize,
+) -> Option<Pin<Box<Sleep>>>{
+
+ const MIN_DELAY: Duration = Duration::from_millis(10);
+
+ let now = Instant::now();
+ let delay = limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ Some(Box::pin(sleep))
+ } else {
+ None
+ }
+}
+
+fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
+ match delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ }
+}
+
impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
fn poll_write(
@@ -57,12 +83,7 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.write_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -70,15 +91,37 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
- if let Some(ref write_limiter) = this.write_limiter {
- if let Poll::Ready(Ok(count)) = &result {
- let now = Instant::now();
- let delay = write_limiter.lock().unwrap()
- .register_traffic(now, *count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.write_delay = Some(Box::pin(sleep));
- }
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
+ }
+ }
+
+ result
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.stream.is_write_vectored()
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
+
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
}
}
@@ -111,12 +154,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.read_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.read_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -128,13 +166,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
if let Some(ref read_limiter) = this.read_limiter {
if let Poll::Ready(Ok(())) = &result {
let count = buf.filled().len() - filled_len;
- let now = Instant::now();
- let delay = read_limiter.lock().unwrap()
- .register_traffic(now, count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.read_delay = Some(Box::pin(sleep));
- }
+ this.read_delay = register_traffic(read_limiter, count);
}
}
--
2.30.2
^ permalink raw reply [flat|nested] 3+ messages in thread
* [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
@ 2021-11-03 12:42 ` Dietmar Maurer
1 sibling, 0 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
To: pbs-devel
So that we can limit used bandwidth.
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/connector.rs | 51 ++++++++++++++++---
.../src/client/rate_limited_stream.rs | 8 +++
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
use anyhow::{bail, format_err, Error};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::*;
use http::Uri;
use hyper::client::HttpConnector;
use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
use crate::tls::MaybeTlsStream;
use crate::uri::build_authority;
+use super::{RateLimiter, RateLimitedStream};
+
#[derive(Clone)]
pub struct HttpsConnector {
connector: HttpConnector,
ssl_connector: Arc<SslConnector>,
proxy: Option<ProxyConfig>,
tcp_keepalive: u32,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
}
impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
ssl_connector: Arc::new(ssl_connector),
proxy: None,
tcp_keepalive,
+ read_limiter: None,
+ write_limiter: None,
}
}
@@ -45,13 +51,21 @@ impl HttpsConnector {
self.proxy = Some(proxy);
}
- async fn secure_stream(
- tcp_stream: TcpStream,
+ pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.read_limiter = limiter;
+ }
+
+ pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.write_limiter = limiter;
+ }
+
+ async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+ tcp_stream: S,
ssl_connector: &SslConnector,
host: &str,
- ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+ ) -> Result<MaybeTlsStream<S>, Error> {
let config = ssl_connector.configure()?;
- let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+ let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
Pin::new(&mut conn).connect().await?;
Ok(MaybeTlsStream::Secured(conn))
}
@@ -107,7 +121,7 @@ impl HttpsConnector {
}
impl hyper::service::Service<Uri> for HttpsConnector {
- type Response = MaybeTlsStream<TcpStream>;
+ type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
};
let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
let keepalive = self.tcp_keepalive;
+ let read_limiter = self.read_limiter.clone();
+ let write_limiter = self.write_limiter.clone();
+
if let Some(ref proxy) = self.proxy {
let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
if use_connect {
async move {
- let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+ let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
format_err!("error connecting to {} - {}", proxy_authority, err)
})?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let mut tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
if let Some(authorization) = authorization {
connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
Ok(MaybeTlsStream::Proxied(tcp_stream))
}
.boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
if is_https {
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
} else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 8b4123f..d21f55c 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
use std::task::{Context, Poll};
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
}
}
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+ fn connected(&self) -> Connected {
+ self.stream.connected()
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2021-11-03 12:43 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.