public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Dietmar Maurer <dietmar@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream
Date: Wed,  3 Nov 2021 13:42:47 +0100	[thread overview]
Message-ID: <20211103124247.1727711-3-dietmar@proxmox.com> (raw)
In-Reply-To: <20211103124247.1727711-1-dietmar@proxmox.com>

So that we can limit used bandwidth.

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-http/src/client/connector.rs          | 51 ++++++++++++++++---
 .../src/client/rate_limited_stream.rs         |  8 +++
 2 files changed, 51 insertions(+), 8 deletions(-)

diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
 use anyhow::{bail, format_err, Error};
 use std::os::unix::io::AsRawFd;
 use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
 
 use futures::*;
 use http::Uri;
 use hyper::client::HttpConnector;
 use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tokio::net::TcpStream;
 use tokio_openssl::SslStream;
 
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
 use crate::tls::MaybeTlsStream;
 use crate::uri::build_authority;
 
+use super::{RateLimiter, RateLimitedStream};
+
 #[derive(Clone)]
 pub struct HttpsConnector {
     connector: HttpConnector,
     ssl_connector: Arc<SslConnector>,
     proxy: Option<ProxyConfig>,
     tcp_keepalive: u32,
+    read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    write_limiter: Option<Arc<Mutex<RateLimiter>>>,
 }
 
 impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
             ssl_connector: Arc::new(ssl_connector),
             proxy: None,
             tcp_keepalive,
+            read_limiter: None,
+            write_limiter: None,
         }
     }
 
@@ -45,13 +51,21 @@ impl HttpsConnector {
         self.proxy = Some(proxy);
     }
 
-    async fn secure_stream(
-        tcp_stream: TcpStream,
+    pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+        self.read_limiter = limiter;
+    }
+
+    pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+        self.write_limiter = limiter;
+    }
+
+    async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+        tcp_stream: S,
         ssl_connector: &SslConnector,
         host: &str,
-    ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+    ) -> Result<MaybeTlsStream<S>, Error> {
         let config = ssl_connector.configure()?;
-        let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+        let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
         Pin::new(&mut conn).connect().await?;
         Ok(MaybeTlsStream::Secured(conn))
     }
@@ -107,7 +121,7 @@ impl HttpsConnector {
 }
 
 impl hyper::service::Service<Uri> for HttpsConnector {
-    type Response = MaybeTlsStream<TcpStream>;
+    type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
     type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
         };
         let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
         let keepalive = self.tcp_keepalive;
+        let read_limiter = self.read_limiter.clone();
+        let write_limiter = self.write_limiter.clone();
+
 
         if let Some(ref proxy) = self.proxy {
             let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
             if use_connect {
                 async move {
-                    let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
                         format_err!("error connecting to {} - {}", proxy_authority, err)
                     })?;
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                    let mut tcp_stream = RateLimitedStream::with_limiter(
+                        tcp_stream,
+                        read_limiter,
+                        write_limiter,
+                    );
+
                     let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
                     if let Some(authorization) = authorization {
                         connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                    let tcp_stream = RateLimitedStream::with_limiter(
+                        tcp_stream,
+                        read_limiter,
+                        write_limiter,
+                    );
+
                     Ok(MaybeTlsStream::Proxied(tcp_stream))
                 }
                 .boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
                 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                let tcp_stream = RateLimitedStream::with_limiter(
+                    tcp_stream,
+                    read_limiter,
+                    write_limiter,
+                );
+
                 if is_https {
                     Self::secure_stream(tcp_stream, &ssl_connector, &host).await
                 } else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 8b4123f..d21f55c 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
 use futures::Future;
 use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
 use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
 
 use std::task::{Context, Poll};
 
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
     }
 
 }
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+    fn connected(&self) -> Connected {
+        self.stream.connected()
+    }
+}
-- 
2.30.2





      parent reply	other threads:[~2021-11-03 12:42 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-03 12:42 ` Dietmar Maurer [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20211103124247.1727711-3-dietmar@proxmox.com \
    --to=dietmar@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal