From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 9EE209B8BF for ; Tue, 21 Nov 2023 11:08:55 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 84FFB6136 for ; Tue, 21 Nov 2023 11:08:55 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Tue, 21 Nov 2023 11:08:54 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id DB37940C70 for ; Tue, 21 Nov 2023 11:08:53 +0100 (CET) From: Max Carrara To: pbs-devel@lists.proxmox.com Date: Tue, 21 Nov 2023 11:08:44 +0100 Message-Id: <20231121100846.216207-2-m.carrara@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20231121100846.216207-1-m.carrara@proxmox.com> References: <20231121100846.216207-1-m.carrara@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.217 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_2 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_4 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH v4 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 21 Nov 2023 10:08:55 -0000 The new public function `accept_tls_optional()` is added, which accepts both plain TCP streams and TCP streams running TLS. Plain TCP streams are sent along via a separate channel in order to clearly distinguish between "secure" and "insecure" connections. Furthermore, instead of `AcceptBuilder` itself holding a reference to an `SslAcceptor`, its public functions now take the acceptor as an argument. The public functions' names are changed to distinguish between their functionality in a more explicit manner: * `accept()` --> `accept_tls()` * NEW --> `accept_tls_optional()` Signed-off-by: Max Carrara --- Changes v1 --> v2: * No more `BiAcceptBuilder`, `AcceptBuilder` is refactored instead * `AcceptBuilder` doesn't hold a reference to `SslAcceptor` anymore * Avoid unnecessary `#[cfg]`s * Avoid unnecessarily duplicated code (already mostly done by getting rid of `BiAcceptBuilder`) * Some clippy stuff Changes v2 --> v3: * Incorporate previously applied clippy fixes Changes v3 --> v4: * use `tokio::task::yield_now()` instead of `tokio::time::sleep()` to yield to executor proxmox-rest-server/src/connection.rs | 373 ++++++++++++++++++++------ 1 file changed, 287 insertions(+), 86 deletions(-) diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs index 1bec28d..34b585c 100644 --- a/proxmox-rest-server/src/connection.rs +++ b/proxmox-rest-server/src/connection.rs @@ -8,15 +8,16 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::time::Duration; -use anyhow::Context as _; -use anyhow::Error; +use anyhow::{format_err, Context as _, Error}; use futures::FutureExt; +use hyper::server::accept; use openssl::ec::{EcGroup, EcKey}; use openssl::nid::Nid; use openssl::pkey::{PKey, Private}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use openssl::x509::X509; use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::mpsc; use tokio_openssl::SslStream; use tokio_stream::wrappers::ReceiverStream; @@ -133,10 +134,14 @@ impl TlsAcceptorBuilder { } } -#[cfg(feature = "rate-limited-stream")] -type ClientStreamResult = Pin>>>; #[cfg(not(feature = "rate-limited-stream"))] -type ClientStreamResult = Pin>>; +type InsecureClientStream = TcpStream; +#[cfg(feature = "rate-limited-stream")] +type InsecureClientStream = RateLimitedStream; + +type InsecureClientStreamResult = Pin>; + +type ClientStreamResult = Pin>>; #[cfg(feature = "rate-limited-stream")] type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option, Option) @@ -145,7 +150,6 @@ type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option>, debug: bool, tcp_keepalive_time: u32, max_pending_accepts: usize, @@ -154,16 +158,9 @@ pub struct AcceptBuilder { lookup_rate_limiter: Option>, } -impl AcceptBuilder { - pub fn new() -> Result { - Ok(Self::with_acceptor(Arc::new(Mutex::new( - TlsAcceptorBuilder::new().build()?, - )))) - } - - pub fn with_acceptor(acceptor: Arc>) -> Self { +impl Default for AcceptBuilder { + fn default() -> Self { Self { - acceptor, debug: false, tcp_keepalive_time: 120, max_pending_accepts: 1024, @@ -172,6 +169,12 @@ impl AcceptBuilder { lookup_rate_limiter: None, } } +} + +impl AcceptBuilder { + pub fn new() -> Self { + Default::default() + } pub fn debug(mut self, debug: bool) -> Self { self.debug = debug; @@ -193,114 +196,312 @@ impl AcceptBuilder { self.lookup_rate_limiter = Some(lookup_rate_limiter); self } +} - pub fn accept( +impl AcceptBuilder { + pub fn accept_tls( self, listener: TcpListener, - ) -> impl hyper::server::accept::Accept { - let (sender, receiver) = tokio::sync::mpsc::channel(self.max_pending_accepts); + acceptor: Arc>, + ) -> impl accept::Accept { + let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts); + + tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into())); + + accept::from_stream(ReceiverStream::new(secure_receiver)) + } + + pub fn accept_tls_optional( + self, + listener: TcpListener, + acceptor: Arc>, + ) -> ( + impl accept::Accept, + impl accept::Accept, + ) { + let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts); + let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts); + + tokio::spawn(self.accept_connections( + listener, + acceptor, + (secure_sender, insecure_sender).into(), + )); + + ( + accept::from_stream(ReceiverStream::new(secure_receiver)), + accept::from_stream(ReceiverStream::new(insecure_receiver)), + ) + } +} + +type ClientSender = mpsc::Sender>; +type InsecureClientSender = mpsc::Sender>; - tokio::spawn(self.accept_connections(listener, sender)); +enum Sender { + Secure(ClientSender), + SecureAndInsecure(ClientSender, InsecureClientSender), +} - //receiver - hyper::server::accept::from_stream(ReceiverStream::new(receiver)) +impl From for Sender { + fn from(sender: ClientSender) -> Self { + Sender::Secure(sender) } +} + +impl From<(ClientSender, InsecureClientSender)> for Sender { + fn from(senders: (ClientSender, InsecureClientSender)) -> Self { + Sender::SecureAndInsecure(senders.0, senders.1) + } +} +impl AcceptBuilder { async fn accept_connections( self, listener: TcpListener, - sender: tokio::sync::mpsc::Sender>, + acceptor: Arc>, + sender: Sender, ) { let accept_counter = Arc::new(()); let mut shutdown_future = crate::shutdown_future().fuse(); loop { - let (sock, peer) = futures::select! { - res = listener.accept().fuse() => match res { - Ok(conn) => conn, + let socket = futures::select! { + res = self.try_setup_socket(&listener).fuse() => match res { + Ok(socket) => socket, Err(err) => { - eprintln!("error accepting tcp connection: {err}"); + log::error!("couldn't set up TCP socket: {err}"); continue; } }, - _ = shutdown_future => break, + _ = shutdown_future => break, }; - #[cfg(not(feature = "rate-limited-stream"))] - { - let _ = &peer; - } - sock.set_nodelay(true).unwrap(); - let _ = proxmox_sys::linux::socket::set_tcp_keepalive( - sock.as_raw_fd(), - self.tcp_keepalive_time, - ); + let acceptor = Arc::clone(&acceptor); + let accept_counter = Arc::clone(&accept_counter); - #[cfg(feature = "rate-limited-stream")] - let sock = match self.lookup_rate_limiter.clone() { - Some(lookup) => { - RateLimitedStream::with_limiter_update_cb(sock, move || lookup(peer)) + if Arc::strong_count(&accept_counter) > self.max_pending_accepts { + log::error!("connection rejected - too many open connections"); + continue; + } + + match sender { + Sender::Secure(ref secure_sender) => { + let accept_future = Self::do_accept_tls( + socket, + acceptor, + accept_counter, + self.debug, + secure_sender.clone(), + ); + + tokio::spawn(accept_future); + } + Sender::SecureAndInsecure(ref secure_sender, ref insecure_sender) => { + let accept_future = Self::do_accept_tls_optional( + socket, + acceptor, + accept_counter, + self.debug, + secure_sender.clone(), + insecure_sender.clone(), + ); + + tokio::spawn(accept_future); } - None => RateLimitedStream::with_limiter(sock, None, None), }; + } + } - let ssl = { - // limit acceptor_guard scope - // Acceptor can be reloaded using the command socket "reload-certificate" command - let acceptor_guard = self.acceptor.lock().unwrap(); + async fn try_setup_socket( + &self, + listener: &TcpListener, + ) -> Result { + let (socket, peer) = match listener.accept().await { + Ok(connection) => connection, + Err(error) => { + return Err(format_err!(error)).context("error while accepting tcp stream") + } + }; - match openssl::ssl::Ssl::new(acceptor_guard.context()) { - Ok(ssl) => ssl, - Err(err) => { - eprintln!("failed to create Ssl object from Acceptor context - {err}"); - continue; - } - } - }; + socket + .set_nodelay(true) + .context("error while setting TCP_NODELAY on socket")?; + + proxmox_sys::linux::socket::set_tcp_keepalive(socket.as_raw_fd(), self.tcp_keepalive_time) + .context("error while setting SO_KEEPALIVE on socket")?; - let stream = match tokio_openssl::SslStream::new(ssl, sock) { - Ok(stream) => stream, + #[cfg(feature = "rate-limited-stream")] + let socket = match self.lookup_rate_limiter.clone() { + Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)), + None => RateLimitedStream::with_limiter(socket, None, None), + }; + + #[cfg(not(feature = "rate-limited-stream"))] + let _peer = peer; + + Ok(socket) + } + + async fn do_accept_tls( + socket: InsecureClientStream, + acceptor: Arc>, + accept_counter: Arc<()>, + debug: bool, + secure_sender: ClientSender, + ) { + let ssl = { + // limit acceptor_guard scope + // Acceptor can be reloaded using the command socket "reload-certificate" command + let acceptor_guard = acceptor.lock().unwrap(); + + match openssl::ssl::Ssl::new(acceptor_guard.context()) { + Ok(ssl) => ssl, Err(err) => { - eprintln!("failed to create SslStream using ssl and connection socket - {err}"); - continue; + log::error!("failed to create Ssl object from Acceptor context - {err}"); + return; } - }; + } + }; - let mut stream = Box::pin(stream); - let sender = sender.clone(); + let secure_stream = match tokio_openssl::SslStream::new(ssl, socket) { + Ok(stream) => stream, + Err(err) => { + log::error!("failed to create SslStream using ssl and connection socket - {err}"); + return; + } + }; - if Arc::strong_count(&accept_counter) > self.max_pending_accepts { - eprintln!("connection rejected - too many open connections"); - continue; + let mut secure_stream = Box::pin(secure_stream); + + let accept_future = + tokio::time::timeout(Duration::new(10, 0), secure_stream.as_mut().accept()); + + let result = accept_future.await; + + match result { + Ok(Ok(())) => { + if secure_sender.send(Ok(secure_stream)).await.is_err() && debug { + log::error!("detected closed connection channel"); + } + } + Ok(Err(err)) => { + if debug { + log::error!("https handshake failed - {err}"); + } } + Err(_) => { + if debug { + log::error!("https handshake timeout"); + } + } + } - let accept_counter = Arc::clone(&accept_counter); - tokio::spawn(async move { - let accept_future = - tokio::time::timeout(Duration::new(10, 0), stream.as_mut().accept()); + drop(accept_counter); // decrease reference count + } - let result = accept_future.await; + async fn do_accept_tls_optional( + socket: InsecureClientStream, + acceptor: Arc>, + accept_counter: Arc<()>, + debug: bool, + secure_sender: ClientSender, + insecure_sender: InsecureClientSender, + ) { + let client_initiates_handshake = { + #[cfg(feature = "rate-limited-stream")] + let socket = socket.inner(); - match result { - Ok(Ok(())) => { - if sender.send(Ok(stream)).await.is_err() && self.debug { - log::error!("detect closed connection channel"); - } - } - Ok(Err(err)) => { - if self.debug { - log::error!("https handshake failed - {err}"); - } - } - Err(_) => { - if self.debug { - log::error!("https handshake timeout"); - } - } + #[cfg(not(feature = "rate-limited-stream"))] + let socket = &socket; + + match Self::wait_for_client_tls_handshake(socket).await { + Ok(initiates_handshake) => initiates_handshake, + Err(err) => { + log::error!("error checking for TLS handshake: {err}"); + return; } + } + }; + + if !client_initiates_handshake { + let insecure_stream = Box::pin(socket); - drop(accept_counter); // decrease reference count - }); + if insecure_sender.send(Ok(insecure_stream)).await.is_err() && debug { + log::error!("detected closed connection channel") + } + + return; } + + Self::do_accept_tls(socket, acceptor, accept_counter, debug, secure_sender).await } + + async fn wait_for_client_tls_handshake(incoming_stream: &TcpStream) -> Result { + const MS_TIMEOUT: u64 = 1000; + const BYTES_BUF_SIZE: usize = 128; + + let mut buf = [0; BYTES_BUF_SIZE]; + let mut last_peek_size = 0; + + let future = async { + loop { + let peek_size = incoming_stream + .peek(&mut buf) + .await + .context("couldn't peek into incoming tcp stream")?; + + if contains_tls_handshake_fragment(&buf) { + return Ok(true); + } + + // No more new data came in + if peek_size == last_peek_size { + return Ok(false); + } + + last_peek_size = peek_size; + + // explicitly yield to event loop; this future otherwise blocks ad infinitum + tokio::task::yield_now().await; + } + }; + + tokio::time::timeout(Duration::from_millis(MS_TIMEOUT), future) + .await + .unwrap_or(Ok(false)) + } +} + +/// Checks whether an [SSL 3.0 / TLS plaintext fragment][0] being part of a +/// SSL / TLS handshake is contained in the given buffer. +/// +/// Such a fragment might look as follows: +/// ```ignore +/// [0x16, 0x3, 0x1, 0x02, 0x00, ...] +/// // | | | |_____| +/// // | | | \__ content length interpreted as u16 +/// // | | | must not exceed 0x4000 (2^14) bytes +/// // | | | +/// // | | \__ any minor version +/// // | | +/// // | \__ major version 3 +/// // | +/// // \__ content type is handshake(22) +/// ``` +/// +/// If a slice like this is detected at the beginning of the given buffer, +/// a TLS handshake is most definitely being made. +/// +/// [0]: https://datatracker.ietf.org/doc/html/rfc6101#section-5.2 +#[inline] +fn contains_tls_handshake_fragment(buf: &[u8]) -> bool { + const SLICE_LENGTH: usize = 5; + const CONTENT_SIZE: u16 = 1 << 14; // max length of a TLS plaintext fragment + + if buf.len() < SLICE_LENGTH { + return false; + } + + buf[0] == 0x16 && buf[1] == 0x3 && (((buf[3] as u16) << 8) + buf[4] as u16) <= CONTENT_SIZE } -- 2.39.2