* [pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections
@ 2020-11-02 15:10 Dominik Csapak
2020-11-03 9:16 ` Wolfgang Bumiller
0 siblings, 1 reply; 2+ messages in thread
From: Dominik Csapak @ 2020-11-02 15:10 UTC (permalink / raw)
To: pbs-devel
For incoming connections, we mapped the results from TcpListeners
accept with 'try_filter_map', where we awaited tokio_openssl::accept
this resulted in blocking the incoming connection stream
to circumvent this, we accept the openssl connection in a seperate
tokio task (with timeout) and send the resulting connection to a
channel
hyper gets the wrapped receiver end of this channel
the tokio task accepting in a loop gets selected with the shutdown
future, to handle the shutdown gracefully
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
i am not sure if we need the select! at all, since on a shutdown, all
open futures get canceled anyway...
also, not sure here about logging, timeouts and channel size
i chose values that seemed sensible, but if anyone has suggestions
with actual reasoning, please say so
also the indentation seems weird, but rustfmt said this is the way..
src/bin/proxmox-backup-proxy.rs | 65 +++++++++++++++++++++++++--------
1 file changed, 49 insertions(+), 16 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 39254504..3eb92cbb 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -113,23 +113,56 @@ async fn run() -> Result<(), Error> {
let server = daemon::create_daemon(
([0,0,0,0,0,0,0,0], 8007).into(),
- |listener, ready| {
- let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
- .map_err(Error::from)
- .try_filter_map(move |(sock, _addr)| {
- let acceptor = Arc::clone(&acceptor);
- async move {
- sock.set_nodelay(true).unwrap();
-
- let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
-
- Ok(tokio_openssl::accept(&acceptor, sock)
- .await
- .ok() // handshake errors aren't be fatal, so return None to filter
- )
+ |mut listener, ready| {
+ let (sender, receiver) = tokio::sync::mpsc::channel(100);
+
+ let accept_future = async move {
+ loop {
+ match listener.accept().await {
+ Ok((sock, _)) => {
+ let mut sender2 = sender.clone();
+ let acceptor = Arc::clone(&acceptor);
+ tokio::spawn(async move {
+ sock.set_nodelay(true).unwrap();
+ let _ = set_tcp_keepalive(
+ sock.as_raw_fd(),
+ PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
+ );
+
+ if let Ok(connection) = tokio::time::timeout(
+ Duration::new(60, 0),
+ tokio_openssl::accept(&acceptor, sock),
+ )
+ .await
+ {
+ if connection.is_err() {
+ // ignore ssl connection errors
+ return;
+ }
+ if let Err(err) =
+ sender2.send_timeout(connection, Duration::new(60, 0)).await
+ {
+ eprintln!("send error: {}", err);
+ }
+ } // ignore ssl timeout errors
+ });
+ }
+ Err(err) => {
+ eprintln!("error accepting tcp connection: {}", err);
+ }
}
- });
- let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
+ }
+ };
+
+ // select with shutdown future for graceful shutdown
+ tokio::spawn(async move {
+ select! {
+ _ = accept_future.fuse() => {},
+ _ = server::shutdown_future().fuse() => {},
+ };
+ });
+
+ let connections = hyper::server::accept::from_stream(receiver);
Ok(ready
.and_then(|_| hyper::Server::builder(connections)
--
2.20.1
^ permalink raw reply [flat|nested] 2+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections
2020-11-02 15:10 [pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections Dominik Csapak
@ 2020-11-03 9:16 ` Wolfgang Bumiller
0 siblings, 0 replies; 2+ messages in thread
From: Wolfgang Bumiller @ 2020-11-03 9:16 UTC (permalink / raw)
To: Dominik Csapak; +Cc: pbs-devel
On Mon, Nov 02, 2020 at 04:10:05PM +0100, Dominik Csapak wrote:
> For incoming connections, we mapped the results from TcpListeners
> accept with 'try_filter_map', where we awaited tokio_openssl::accept
>
> this resulted in blocking the incoming connection stream
>
> to circumvent this, we accept the openssl connection in a seperate
> tokio task (with timeout) and send the resulting connection to a
> channel
>
> hyper gets the wrapped receiver end of this channel
>
> the tokio task accepting in a loop gets selected with the shutdown
> future, to handle the shutdown gracefully
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> i am not sure if we need the select! at all, since on a shutdown, all
> open futures get canceled anyway...
>
> also, not sure here about logging, timeouts and channel size
> i chose values that seemed sensible, but if anyone has suggestions
> with actual reasoning, please say so
>
> also the indentation seems weird, but rustfmt said this is the way..
>
> src/bin/proxmox-backup-proxy.rs | 65 +++++++++++++++++++++++++--------
> 1 file changed, 49 insertions(+), 16 deletions(-)
>
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 39254504..3eb92cbb 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -113,23 +113,56 @@ async fn run() -> Result<(), Error> {
>
> let server = daemon::create_daemon(
> ([0,0,0,0,0,0,0,0], 8007).into(),
> - |listener, ready| {
> - let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
> - .map_err(Error::from)
> - .try_filter_map(move |(sock, _addr)| {
> - let acceptor = Arc::clone(&acceptor);
> - async move {
> - sock.set_nodelay(true).unwrap();
> -
> - let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
> -
> - Ok(tokio_openssl::accept(&acceptor, sock)
> - .await
> - .ok() // handshake errors aren't be fatal, so return None to filter
> - )
> + |mut listener, ready| {
> + let (sender, receiver) = tokio::sync::mpsc::channel(100);
> +
please factorize the below code out into functions, this is too much
indentation
> + let accept_future = async move {
> + loop {
> + match listener.accept().await {
> + Ok((sock, _)) => {
> + let mut sender2 = sender.clone();
> + let acceptor = Arc::clone(&acceptor);
> + tokio::spawn(async move {
> + sock.set_nodelay(true).unwrap();
> + let _ = set_tcp_keepalive(
> + sock.as_raw_fd(),
> + PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
> + );
> +
> + if let Ok(connection) = tokio::time::timeout(
> + Duration::new(60, 0),
> + tokio_openssl::accept(&acceptor, sock),
> + )
> + .await
> + {
^ as that just gets too long
> + if connection.is_err() {
> + // ignore ssl connection errors
> + return;
> + }
> + if let Err(err) =
> + sender2.send_timeout(connection, Duration::new(60, 0)).await
> + {
> + eprintln!("send error: {}", err);
> + }
> + } // ignore ssl timeout errors
> + });
> + }
> + Err(err) => {
> + eprintln!("error accepting tcp connection: {}", err);
> + }
> }
> - });
> - let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
> + }
> + };
> +
> + // select with shutdown future for graceful shutdown
> + tokio::spawn(async move {
> + select! {
> + _ = accept_future.fuse() => {},
> + _ = server::shutdown_future().fuse() => {},
> + };
> + });
> +
> + let connections = hyper::server::accept::from_stream(receiver);
>
> Ok(ready
> .and_then(|_| hyper::Server::builder(connections)
> --
> 2.20.1
^ permalink raw reply [flat|nested] 2+ messages in thread
end of thread, other threads:[~2020-11-03 9:16 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-11-02 15:10 [pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections Dominik Csapak
2020-11-03 9:16 ` Wolfgang Bumiller
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox