* [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite)
@ 2021-11-03 12:42 Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer
0 siblings, 2 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/mod.rs | 6 +
.../src/client/rate_limited_stream.rs | 144 ++++++++++++++++++
proxmox-http/src/client/rate_limiter.rs | 76 +++++++++
3 files changed, 226 insertions(+)
create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
create mode 100644 proxmox-http/src/client/rate_limiter.rs
diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs
index b6ee4b0..30e66d5 100644
--- a/proxmox-http/src/client/mod.rs
+++ b/proxmox-http/src/client/mod.rs
@@ -2,6 +2,12 @@
//!
//! Contains a lightweight wrapper around `hyper` with support for TLS connections.
+mod rate_limiter;
+pub use rate_limiter::RateLimiter;
+
+mod rate_limited_stream;
+pub use rate_limited_stream::RateLimitedStream;
+
mod connector;
pub use connector::HttpsConnector;
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
new file mode 100644
index 0000000..434f923
--- /dev/null
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -0,0 +1,144 @@
+use std::pin::Pin;
+use std::marker::Unpin;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::Future;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
+use tokio::time::Sleep;
+
+use std::task::{Context, Poll};
+
+use super::RateLimiter;
+
+/// A rate limited stream using [RateLimiter]
+pub struct RateLimitedStream<S> {
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ read_delay: Option<Pin<Box<Sleep>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_delay: Option<Pin<Box<Sleep>>>,
+ stream: S,
+}
+
+impl <S> RateLimitedStream<S> {
+
+ const MIN_DELAY: Duration = Duration::from_millis(20);
+
+ /// Creates a new instance with reads and writes limited to the same `rate`.
+ pub fn new(stream: S, rate: u64) -> Self {
+ let now = Instant::now();
+ let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now)));
+ let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, now)));
+ Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
+ }
+
+ /// Creates a new instance with specified [RateLimiters] for reads and writes.
+ pub fn with_limiter(
+ stream: S,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ ) -> Self {
+ Self {
+ read_limiter,
+ read_delay: None,
+ write_limiter,
+ write_delay: None,
+ stream,
+ }
+ }
+}
+
+impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
+
+ fn poll_write(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.write_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
+
+ if let Some(ref write_limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = &result {
+ let now = Instant::now();
+ let delay = write_limiter.lock().unwrap()
+ .register_traffic(now, *count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.write_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_flush(ctx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_shutdown(ctx)
+ }
+}
+
+impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.read_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.read_delay = None;
+
+ let filled_len = buf.filled().len();
+ let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
+
+ if let Some(ref read_limiter) = this.read_limiter {
+ if let Poll::Ready(Ok(())) = &result {
+ let count = buf.filled().len() - filled_len;
+ let now = Instant::now();
+ let delay = read_limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.read_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+}
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
new file mode 100644
index 0000000..f917f57
--- /dev/null
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -0,0 +1,76 @@
+use std::time::{Duration, Instant};
+use std::convert::TryInto;
+
+/// Token bucket based rate limiter
+pub struct RateLimiter {
+ rate: u64, // tokens/second
+ start_time: Instant,
+ traffic: u64, // overall traffic
+ bucket_size: u64,
+ last_update: Instant,
+ consumed_tokens: u64,
+}
+
+impl RateLimiter {
+
+ const NO_DELAY: Duration = Duration::from_millis(0);
+
+ /// Creates a new instance, using [Instant::now] as start time.
+ pub fn new(rate: u64) -> Self {
+ let start_time = Instant::now();
+ Self::with_start_time(rate, start_time)
+ }
+
+ /// Creates a new instance with specified `rate` and `start_time`.
+ pub fn with_start_time(rate: u64, start_time: Instant) -> Self {
+ let bucket_size = rate * 3;
+ Self {
+ rate,
+ start_time,
+ traffic: 0,
+ bucket_size,
+ last_update: start_time,
+ // start with empty bucket (all tokens consumed)
+ consumed_tokens: bucket_size,
+ }
+ }
+
+ /// Returns the average rate (since `start_time`)
+ pub fn average_rate(&self, current_time: Instant) -> f64 {
+ let time_diff = (current_time - self.start_time).as_secs_f64();
+ if time_diff <= 0.0 {
+ 0.0
+ } else {
+ (self.traffic as f64) / time_diff
+ }
+ }
+
+ fn refill_bucket(&mut self, current_time: Instant) {
+ let time_diff = (current_time - self.last_update).as_nanos();
+
+ if time_diff <= 0 {
+ //log::error!("update_time: got negative time diff");
+ return;
+ }
+
+ self.last_update = current_time;
+
+ let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
+ .try_into().unwrap_or(u64::MAX);
+
+ self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
+ }
+
+ /// Register traffic, returning a proposed delay to reach the expected rate.
+ pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
+ self.refill_bucket(current_time);
+
+ self.traffic += data_len;
+ self.consumed_tokens += data_len;
+
+ if self.consumed_tokens <= self.bucket_size {
+ return Self::NO_DELAY;
+ }
+ Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 3+ messages in thread
* [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
@ 2021-11-03 12:42 ` Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer
1 sibling, 0 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
.../src/client/rate_limited_stream.rs | 92 +++++++++++++------
1 file changed, 62 insertions(+), 30 deletions(-)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 434f923..8b4123f 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::pin::Pin;
use std::marker::Unpin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
+use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
@@ -22,8 +23,6 @@ pub struct RateLimitedStream<S> {
impl <S> RateLimitedStream<S> {
- const MIN_DELAY: Duration = Duration::from_millis(20);
-
/// Creates a new instance with reads and writes limited to the same `rate`.
pub fn new(stream: S, rate: u64) -> Self {
let now = Instant::now();
@@ -48,6 +47,33 @@ impl <S> RateLimitedStream<S> {
}
}
+fn register_traffic(
+ limiter: &Mutex<RateLimiter>,
+ count: usize,
+) -> Option<Pin<Box<Sleep>>>{
+
+ const MIN_DELAY: Duration = Duration::from_millis(10);
+
+ let now = Instant::now();
+ let delay = limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ Some(Box::pin(sleep))
+ } else {
+ None
+ }
+}
+
+fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
+ match delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ }
+}
+
impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
fn poll_write(
@@ -57,12 +83,7 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.write_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -70,15 +91,37 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
- if let Some(ref write_limiter) = this.write_limiter {
- if let Poll::Ready(Ok(count)) = &result {
- let now = Instant::now();
- let delay = write_limiter.lock().unwrap()
- .register_traffic(now, *count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.write_delay = Some(Box::pin(sleep));
- }
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
+ }
+ }
+
+ result
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.stream.is_write_vectored()
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
+
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
}
}
@@ -111,12 +154,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.read_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.read_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -128,13 +166,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
if let Some(ref read_limiter) = this.read_limiter {
if let Poll::Ready(Ok(())) = &result {
let count = buf.filled().len() - filled_len;
- let now = Instant::now();
- let delay = read_limiter.lock().unwrap()
- .register_traffic(now, count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.read_delay = Some(Box::pin(sleep));
- }
+ this.read_delay = register_traffic(read_limiter, count);
}
}
--
2.30.2
^ permalink raw reply [flat|nested] 3+ messages in thread
* [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
@ 2021-11-03 12:42 ` Dietmar Maurer
1 sibling, 0 replies; 3+ messages in thread
From: Dietmar Maurer @ 2021-11-03 12:42 UTC (permalink / raw)
To: pbs-devel
So that we can limit used bandwidth.
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/connector.rs | 51 ++++++++++++++++---
.../src/client/rate_limited_stream.rs | 8 +++
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
use anyhow::{bail, format_err, Error};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::*;
use http::Uri;
use hyper::client::HttpConnector;
use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
use crate::tls::MaybeTlsStream;
use crate::uri::build_authority;
+use super::{RateLimiter, RateLimitedStream};
+
#[derive(Clone)]
pub struct HttpsConnector {
connector: HttpConnector,
ssl_connector: Arc<SslConnector>,
proxy: Option<ProxyConfig>,
tcp_keepalive: u32,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
}
impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
ssl_connector: Arc::new(ssl_connector),
proxy: None,
tcp_keepalive,
+ read_limiter: None,
+ write_limiter: None,
}
}
@@ -45,13 +51,21 @@ impl HttpsConnector {
self.proxy = Some(proxy);
}
- async fn secure_stream(
- tcp_stream: TcpStream,
+ pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.read_limiter = limiter;
+ }
+
+ pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.write_limiter = limiter;
+ }
+
+ async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+ tcp_stream: S,
ssl_connector: &SslConnector,
host: &str,
- ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+ ) -> Result<MaybeTlsStream<S>, Error> {
let config = ssl_connector.configure()?;
- let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+ let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
Pin::new(&mut conn).connect().await?;
Ok(MaybeTlsStream::Secured(conn))
}
@@ -107,7 +121,7 @@ impl HttpsConnector {
}
impl hyper::service::Service<Uri> for HttpsConnector {
- type Response = MaybeTlsStream<TcpStream>;
+ type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
};
let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
let keepalive = self.tcp_keepalive;
+ let read_limiter = self.read_limiter.clone();
+ let write_limiter = self.write_limiter.clone();
+
if let Some(ref proxy) = self.proxy {
let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
if use_connect {
async move {
- let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+ let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
format_err!("error connecting to {} - {}", proxy_authority, err)
})?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let mut tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
if let Some(authorization) = authorization {
connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
Ok(MaybeTlsStream::Proxied(tcp_stream))
}
.boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
if is_https {
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
} else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 8b4123f..d21f55c 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
use std::task::{Context, Poll};
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
}
}
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+ fn connected(&self) -> Connected {
+ self.stream.connected()
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2021-11-03 12:43 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox