all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections
@ 2020-11-02 15:10 Dominik Csapak
  2020-11-03  9:16 ` Wolfgang Bumiller
  0 siblings, 1 reply; 2+ messages in thread
From: Dominik Csapak @ 2020-11-02 15:10 UTC (permalink / raw)
  To: pbs-devel

For incoming connections, we mapped the results from TcpListeners
accept with 'try_filter_map', where we awaited tokio_openssl::accept

this resulted in blocking the incoming connection stream

to circumvent this, we accept the openssl connection in a seperate
tokio task (with timeout) and send the resulting connection to a
channel

hyper gets the wrapped receiver end of this channel

the tokio task accepting in a loop gets selected with the shutdown
future, to handle the shutdown gracefully

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
i am not sure if we need the select! at all, since on a shutdown, all
open futures get canceled anyway...

also, not sure here about logging, timeouts and channel size
i chose values that seemed sensible, but if anyone has suggestions
with actual reasoning, please say so

also the indentation seems weird, but rustfmt said this is the way..

 src/bin/proxmox-backup-proxy.rs | 65 +++++++++++++++++++++++++--------
 1 file changed, 49 insertions(+), 16 deletions(-)

diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 39254504..3eb92cbb 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -113,23 +113,56 @@ async fn run() -> Result<(), Error> {
 
     let server = daemon::create_daemon(
         ([0,0,0,0,0,0,0,0], 8007).into(),
-        |listener, ready| {
-            let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
-                .map_err(Error::from)
-                .try_filter_map(move |(sock, _addr)| {
-                    let acceptor = Arc::clone(&acceptor);
-                    async move {
-                        sock.set_nodelay(true).unwrap();
-
-                        let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
-
-                        Ok(tokio_openssl::accept(&acceptor, sock)
-                            .await
-                            .ok() // handshake errors aren't be fatal, so return None to filter
-                        )
+        |mut listener, ready| {
+            let (sender, receiver) = tokio::sync::mpsc::channel(100);
+
+            let accept_future = async move {
+                loop {
+                    match listener.accept().await {
+                        Ok((sock, _)) => {
+                            let mut sender2 = sender.clone();
+                            let acceptor = Arc::clone(&acceptor);
+                            tokio::spawn(async move {
+                                sock.set_nodelay(true).unwrap();
+                                let _ = set_tcp_keepalive(
+                                    sock.as_raw_fd(),
+                                    PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
+                                );
+
+                                if let Ok(connection) = tokio::time::timeout(
+                                    Duration::new(60, 0),
+                                    tokio_openssl::accept(&acceptor, sock),
+                                )
+                                .await
+                                {
+                                    if connection.is_err() {
+                                        // ignore ssl connection errors
+                                        return;
+                                    }
+                                    if let Err(err) =
+                                        sender2.send_timeout(connection, Duration::new(60, 0)).await
+                                    {
+                                        eprintln!("send error: {}", err);
+                                    }
+                                } // ignore ssl timeout errors
+                            });
+                        }
+                        Err(err) => {
+                            eprintln!("error accepting tcp connection: {}", err);
+                        }
                     }
-                });
-            let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
+                }
+            };
+
+            // select with shutdown future for graceful shutdown
+            tokio::spawn(async move {
+                select! {
+                    _ = accept_future.fuse() => {},
+                    _ = server::shutdown_future().fuse() => {},
+                };
+            });
+
+            let connections = hyper::server::accept::from_stream(receiver);
 
             Ok(ready
                 .and_then(|_| hyper::Server::builder(connections)
-- 
2.20.1





^ permalink raw reply	[flat|nested] 2+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections
  2020-11-02 15:10 [pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections Dominik Csapak
@ 2020-11-03  9:16 ` Wolfgang Bumiller
  0 siblings, 0 replies; 2+ messages in thread
From: Wolfgang Bumiller @ 2020-11-03  9:16 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

On Mon, Nov 02, 2020 at 04:10:05PM +0100, Dominik Csapak wrote:
> For incoming connections, we mapped the results from TcpListeners
> accept with 'try_filter_map', where we awaited tokio_openssl::accept
> 
> this resulted in blocking the incoming connection stream
> 
> to circumvent this, we accept the openssl connection in a seperate
> tokio task (with timeout) and send the resulting connection to a
> channel
> 
> hyper gets the wrapped receiver end of this channel
> 
> the tokio task accepting in a loop gets selected with the shutdown
> future, to handle the shutdown gracefully
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> i am not sure if we need the select! at all, since on a shutdown, all
> open futures get canceled anyway...
> 
> also, not sure here about logging, timeouts and channel size
> i chose values that seemed sensible, but if anyone has suggestions
> with actual reasoning, please say so
> 
> also the indentation seems weird, but rustfmt said this is the way..
> 
>  src/bin/proxmox-backup-proxy.rs | 65 +++++++++++++++++++++++++--------
>  1 file changed, 49 insertions(+), 16 deletions(-)
> 
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 39254504..3eb92cbb 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -113,23 +113,56 @@ async fn run() -> Result<(), Error> {
>  
>      let server = daemon::create_daemon(
>          ([0,0,0,0,0,0,0,0], 8007).into(),
> -        |listener, ready| {
> -            let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
> -                .map_err(Error::from)
> -                .try_filter_map(move |(sock, _addr)| {
> -                    let acceptor = Arc::clone(&acceptor);
> -                    async move {
> -                        sock.set_nodelay(true).unwrap();
> -
> -                        let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
> -
> -                        Ok(tokio_openssl::accept(&acceptor, sock)
> -                            .await
> -                            .ok() // handshake errors aren't be fatal, so return None to filter
> -                        )
> +        |mut listener, ready| {
> +            let (sender, receiver) = tokio::sync::mpsc::channel(100);
> +

please factorize the below code out into functions, this is too much
indentation

> +            let accept_future = async move {
> +                loop {
> +                    match listener.accept().await {
> +                        Ok((sock, _)) => {
> +                            let mut sender2 = sender.clone();
> +                            let acceptor = Arc::clone(&acceptor);
> +                            tokio::spawn(async move {
> +                                sock.set_nodelay(true).unwrap();
> +                                let _ = set_tcp_keepalive(
> +                                    sock.as_raw_fd(),
> +                                    PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
> +                                );
> +
> +                                if let Ok(connection) = tokio::time::timeout(
> +                                    Duration::new(60, 0),
> +                                    tokio_openssl::accept(&acceptor, sock),
> +                                )
> +                                .await
> +                                {

^ as that just gets too long

> +                                    if connection.is_err() {
> +                                        // ignore ssl connection errors
> +                                        return;
> +                                    }
> +                                    if let Err(err) =
> +                                        sender2.send_timeout(connection, Duration::new(60, 0)).await
> +                                    {
> +                                        eprintln!("send error: {}", err);
> +                                    }
> +                                } // ignore ssl timeout errors
> +                            });
> +                        }
> +                        Err(err) => {
> +                            eprintln!("error accepting tcp connection: {}", err);
> +                        }
>                      }
> -                });
> -            let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
> +                }
> +            };
> +
> +            // select with shutdown future for graceful shutdown
> +            tokio::spawn(async move {
> +                select! {
> +                    _ = accept_future.fuse() => {},
> +                    _ = server::shutdown_future().fuse() => {},
> +                };
> +            });
> +
> +            let connections = hyper::server::accept::from_stream(receiver);
>  
>              Ok(ready
>                  .and_then(|_| hyper::Server::builder(connections)
> -- 
> 2.20.1




^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2020-11-03  9:16 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-11-02 15:10 [pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections Dominik Csapak
2020-11-03  9:16 ` Wolfgang Bumiller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal