From: Dietmar Maurer <dietmar@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream
Date: Wed, 3 Nov 2021 13:42:47 +0100 [thread overview]
Message-ID: <20211103124247.1727711-3-dietmar@proxmox.com> (raw)
In-Reply-To: <20211103124247.1727711-1-dietmar@proxmox.com>
So that we can limit used bandwidth.
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/connector.rs | 51 ++++++++++++++++---
.../src/client/rate_limited_stream.rs | 8 +++
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
use anyhow::{bail, format_err, Error};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::*;
use http::Uri;
use hyper::client::HttpConnector;
use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
use crate::tls::MaybeTlsStream;
use crate::uri::build_authority;
+use super::{RateLimiter, RateLimitedStream};
+
#[derive(Clone)]
pub struct HttpsConnector {
connector: HttpConnector,
ssl_connector: Arc<SslConnector>,
proxy: Option<ProxyConfig>,
tcp_keepalive: u32,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
}
impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
ssl_connector: Arc::new(ssl_connector),
proxy: None,
tcp_keepalive,
+ read_limiter: None,
+ write_limiter: None,
}
}
@@ -45,13 +51,21 @@ impl HttpsConnector {
self.proxy = Some(proxy);
}
- async fn secure_stream(
- tcp_stream: TcpStream,
+ pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.read_limiter = limiter;
+ }
+
+ pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.write_limiter = limiter;
+ }
+
+ async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+ tcp_stream: S,
ssl_connector: &SslConnector,
host: &str,
- ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+ ) -> Result<MaybeTlsStream<S>, Error> {
let config = ssl_connector.configure()?;
- let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+ let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
Pin::new(&mut conn).connect().await?;
Ok(MaybeTlsStream::Secured(conn))
}
@@ -107,7 +121,7 @@ impl HttpsConnector {
}
impl hyper::service::Service<Uri> for HttpsConnector {
- type Response = MaybeTlsStream<TcpStream>;
+ type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
};
let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
let keepalive = self.tcp_keepalive;
+ let read_limiter = self.read_limiter.clone();
+ let write_limiter = self.write_limiter.clone();
+
if let Some(ref proxy) = self.proxy {
let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
if use_connect {
async move {
- let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+ let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
format_err!("error connecting to {} - {}", proxy_authority, err)
})?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let mut tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
if let Some(authorization) = authorization {
connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
Ok(MaybeTlsStream::Proxied(tcp_stream))
}
.boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
if is_https {
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
} else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 8b4123f..d21f55c 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
use std::task::{Context, Poll};
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
}
}
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+ fn connected(&self) -> Connected {
+ self.stream.connected()
+ }
+}
--
2.30.2
prev parent reply other threads:[~2021-11-03 12:42 UTC|newest]
Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-03 12:42 ` Dietmar Maurer [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20211103124247.1727711-3-dietmar@proxmox.com \
--to=dietmar@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.