From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 201277B16B for ; Tue, 11 May 2021 15:54:08 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1DB1122CAD for ; Tue, 11 May 2021 15:54:08 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 04D2422C72 for ; Tue, 11 May 2021 15:54:05 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id CB47742D85 for ; Tue, 11 May 2021 15:54:04 +0200 (CEST) From: Wolfgang Bumiller To: pbs-devel@lists.proxmox.com Date: Tue, 11 May 2021 15:53:54 +0200 Message-Id: <20210511135400.32406-2-w.bumiller@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210511135400.32406-1-w.bumiller@proxmox.com> References: <20210511135400.32406-1-w.bumiller@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.018 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox-backup-proxy.rs] Subject: [pbs-devel] [PATCH backup 1/7] proxy: factor out accept_connection X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 11 May 2021 13:54:08 -0000 no functional changes, moved code and named the channel's type for more readability Signed-off-by: Wolfgang Bumiller --- diff -w for a quicker view: diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 31dc8332..27d1cbeb 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -170,19 +170,31 @@ async fn run() -> Result<(), Error> { Ok(()) } +type ClientStreamResult = + Result>>, Error>; +const MAX_PENDING_ACCEPTS: usize = 1024; + fn accept_connections( listener: tokio::net::TcpListener, acceptor: Arc, debug: bool, -) -> tokio::sync::mpsc::Receiver>>, Error>> { - - const MAX_PENDING_ACCEPTS: usize = 1024; +) -> tokio::sync::mpsc::Receiver { let (sender, receiver) = tokio::sync::mpsc::channel(MAX_PENDING_ACCEPTS); + tokio::spawn(accept_connection(listener, acceptor, debug, sender)); + + receiver +} + +async fn accept_connection( + listener: tokio::net::TcpListener, + acceptor: Arc, + debug: bool, + sender: tokio::sync::mpsc::Sender, +) { let accept_counter = Arc::new(()); - tokio::spawn(async move { loop { match listener.accept().await { Err(err) => { @@ -246,9 +258,6 @@ fn accept_connections( } } } - }); - - receiver } src/bin/proxmox-backup-proxy.rs | 127 +++++++++++++++++--------------- 1 file changed, 68 insertions(+), 59 deletions(-) diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 31dc8332..27d1cbeb 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -170,85 +170,94 @@ async fn run() -> Result<(), Error> { Ok(()) } +type ClientStreamResult = + Result>>, Error>; +const MAX_PENDING_ACCEPTS: usize = 1024; + fn accept_connections( listener: tokio::net::TcpListener, acceptor: Arc, debug: bool, -) -> tokio::sync::mpsc::Receiver>>, Error>> { - - const MAX_PENDING_ACCEPTS: usize = 1024; +) -> tokio::sync::mpsc::Receiver { let (sender, receiver) = tokio::sync::mpsc::channel(MAX_PENDING_ACCEPTS); + tokio::spawn(accept_connection(listener, acceptor, debug, sender)); + + receiver +} + +async fn accept_connection( + listener: tokio::net::TcpListener, + acceptor: Arc, + debug: bool, + sender: tokio::sync::mpsc::Sender, +) { let accept_counter = Arc::new(()); - tokio::spawn(async move { - loop { - match listener.accept().await { - Err(err) => { - eprintln!("error accepting tcp connection: {}", err); - } - Ok((sock, _addr)) => { - sock.set_nodelay(true).unwrap(); - let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); - let acceptor = Arc::clone(&acceptor); - - let ssl = match openssl::ssl::Ssl::new(acceptor.context()) { - Ok(ssl) => ssl, - Err(err) => { - eprintln!("failed to create Ssl object from Acceptor context - {}", err); - continue; - }, - }; - let stream = match tokio_openssl::SslStream::new(ssl, sock) { - Ok(stream) => stream, - Err(err) => { - eprintln!("failed to create SslStream using ssl and connection socket - {}", err); - continue; - }, - }; - - let mut stream = Box::pin(stream); - let sender = sender.clone(); - - if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS { - eprintln!("connection rejected - to many open connections"); + loop { + match listener.accept().await { + Err(err) => { + eprintln!("error accepting tcp connection: {}", err); + } + Ok((sock, _addr)) => { + sock.set_nodelay(true).unwrap(); + let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); + let acceptor = Arc::clone(&acceptor); + + let ssl = match openssl::ssl::Ssl::new(acceptor.context()) { + Ok(ssl) => ssl, + Err(err) => { + eprintln!("failed to create Ssl object from Acceptor context - {}", err); continue; - } + }, + }; + let stream = match tokio_openssl::SslStream::new(ssl, sock) { + Ok(stream) => stream, + Err(err) => { + eprintln!("failed to create SslStream using ssl and connection socket - {}", err); + continue; + }, + }; + + let mut stream = Box::pin(stream); + let sender = sender.clone(); + + if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS { + eprintln!("connection rejected - to many open connections"); + continue; + } - let accept_counter = accept_counter.clone(); - tokio::spawn(async move { - let accept_future = tokio::time::timeout( - Duration::new(10, 0), stream.as_mut().accept()); + let accept_counter = accept_counter.clone(); + tokio::spawn(async move { + let accept_future = tokio::time::timeout( + Duration::new(10, 0), stream.as_mut().accept()); - let result = accept_future.await; + let result = accept_future.await; - match result { - Ok(Ok(())) => { - if sender.send(Ok(stream)).await.is_err() && debug { - eprintln!("detect closed connection channel"); - } + match result { + Ok(Ok(())) => { + if sender.send(Ok(stream)).await.is_err() && debug { + eprintln!("detect closed connection channel"); } - Ok(Err(err)) => { - if debug { - eprintln!("https handshake failed - {}", err); - } + } + Ok(Err(err)) => { + if debug { + eprintln!("https handshake failed - {}", err); } - Err(_) => { - if debug { - eprintln!("https handshake timeout"); - } + } + Err(_) => { + if debug { + eprintln!("https handshake timeout"); } } + } - drop(accept_counter); // decrease reference count - }); - } + drop(accept_counter); // decrease reference count + }); } } - }); - - receiver + } } fn start_stat_generator() { -- 2.20.1