From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 89C7F99142 for ; Thu, 16 Nov 2023 08:35:35 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 70A08F71C for ; Thu, 16 Nov 2023 08:35:35 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 16 Nov 2023 08:35:34 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 062FA435DC for ; Thu, 16 Nov 2023 08:35:34 +0100 (CET) Date: Thu, 16 Nov 2023 08:35:31 +0100 From: Wolfgang Bumiller To: Max Carrara Cc: pbs-devel@lists.proxmox.com Message-ID: References: <20231031184705.1142244-1-m.carrara@proxmox.com> <20231031184705.1142244-2-m.carrara@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20231031184705.1142244-2-m.carrara@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL -0.049 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_2 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_4 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: Re: [pbs-devel] [PATCH v3 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 16 Nov 2023 07:35:35 -0000 On Tue, Oct 31, 2023 at 07:47:03PM +0100, Max Carrara wrote: > The new public function `accept_tls_optional()` is added, which > accepts both plain TCP streams and TCP streams running TLS. Plain TCP > streams are sent along via a separate channel in order to clearly > distinguish between "secure" and "insecure" connections. > > Furthermore, instead of `AcceptBuilder` itself holding a reference to > an `SslAcceptor`, its public functions now take the acceptor as an > argument. The public functions' names are changed to distinguish > between their functionality in a more explicit manner: > > * `accept()` --> `accept_tls()` > * NEW --> `accept_tls_optional()` > > Signed-off-by: Max Carrara > --- > Changes v1 --> v2: > * No more `BiAcceptBuilder`, `AcceptBuilder` is refactored instead > * `AcceptBuilder` doesn't hold a reference to `SslAcceptor` anymore > * Avoid unnecessary `#[cfg]`s > * Avoid unnecessarily duplicated code (already mostly done by getting > rid of `BiAcceptBuilder`) > * Some clippy stuff > > Changes v2 --> v3: > * Incorporate previously applied clippy fixes > > proxmox-rest-server/src/connection.rs | 373 ++++++++++++++++++++------ > 1 file changed, 287 insertions(+), 86 deletions(-) > > diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs > index 1bec28d..ab8c7db 100644 > --- a/proxmox-rest-server/src/connection.rs > +++ b/proxmox-rest-server/src/connection.rs > @@ -8,15 +8,16 @@ use std::pin::Pin; > use std::sync::{Arc, Mutex}; > use std::time::Duration; > > -use anyhow::Context as _; > -use anyhow::Error; > +use anyhow::{format_err, Context as _, Error}; > use futures::FutureExt; > +use hyper::server::accept; > use openssl::ec::{EcGroup, EcKey}; > use openssl::nid::Nid; > use openssl::pkey::{PKey, Private}; > use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; > use openssl::x509::X509; > use tokio::net::{TcpListener, TcpStream}; > +use tokio::sync::mpsc; > use tokio_openssl::SslStream; > use tokio_stream::wrappers::ReceiverStream; > > @@ -133,10 +134,14 @@ impl TlsAcceptorBuilder { > } > } > > -#[cfg(feature = "rate-limited-stream")] > -type ClientStreamResult = Pin>>>; > #[cfg(not(feature = "rate-limited-stream"))] > -type ClientStreamResult = Pin>>; > +type InsecureClientStream = TcpStream; > +#[cfg(feature = "rate-limited-stream")] > +type InsecureClientStream = RateLimitedStream; > + > +type InsecureClientStreamResult = Pin>; > + > +type ClientStreamResult = Pin>>; > > #[cfg(feature = "rate-limited-stream")] > type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option, Option) > @@ -145,7 +150,6 @@ type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option + 'static; > > pub struct AcceptBuilder { > - acceptor: Arc>, > debug: bool, > tcp_keepalive_time: u32, > max_pending_accepts: usize, > @@ -154,16 +158,9 @@ pub struct AcceptBuilder { > lookup_rate_limiter: Option>, > } > > -impl AcceptBuilder { > - pub fn new() -> Result { > - Ok(Self::with_acceptor(Arc::new(Mutex::new( > - TlsAcceptorBuilder::new().build()?, > - )))) > - } > - > - pub fn with_acceptor(acceptor: Arc>) -> Self { > +impl Default for AcceptBuilder { > + fn default() -> Self { > Self { > - acceptor, > debug: false, > tcp_keepalive_time: 120, > max_pending_accepts: 1024, > @@ -172,6 +169,12 @@ impl AcceptBuilder { > lookup_rate_limiter: None, > } > } > +} > + > +impl AcceptBuilder { > + pub fn new() -> Self { > + Default::default() > + } > > pub fn debug(mut self, debug: bool) -> Self { > self.debug = debug; > @@ -193,114 +196,312 @@ impl AcceptBuilder { > self.lookup_rate_limiter = Some(lookup_rate_limiter); > self > } > +} > > - pub fn accept( > +impl AcceptBuilder { > + pub fn accept_tls( > self, > listener: TcpListener, > - ) -> impl hyper::server::accept::Accept { > - let (sender, receiver) = tokio::sync::mpsc::channel(self.max_pending_accepts); > + acceptor: Arc>, > + ) -> impl accept::Accept { > + let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts); > + > + tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into())); > + > + accept::from_stream(ReceiverStream::new(secure_receiver)) > + } > + > + pub fn accept_tls_optional( > + self, > + listener: TcpListener, > + acceptor: Arc>, > + ) -> ( > + impl accept::Accept, > + impl accept::Accept, > + ) { > + let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts); > + let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts); > + > + tokio::spawn(self.accept_connections( > + listener, > + acceptor, > + (secure_sender, insecure_sender).into(), > + )); > + > + ( > + accept::from_stream(ReceiverStream::new(secure_receiver)), > + accept::from_stream(ReceiverStream::new(insecure_receiver)), > + ) > + } > +} > + > +type ClientSender = mpsc::Sender>; > +type InsecureClientSender = mpsc::Sender>; > > - tokio::spawn(self.accept_connections(listener, sender)); > +enum Sender { > + Secure(ClientSender), > + SecureAndInsecure(ClientSender, InsecureClientSender), > +} > > - //receiver > - hyper::server::accept::from_stream(ReceiverStream::new(receiver)) > +impl From for Sender { > + fn from(sender: ClientSender) -> Self { > + Sender::Secure(sender) > } > +} > + > +impl From<(ClientSender, InsecureClientSender)> for Sender { > + fn from(senders: (ClientSender, InsecureClientSender)) -> Self { > + Sender::SecureAndInsecure(senders.0, senders.1) > + } > +} > > +impl AcceptBuilder { > async fn accept_connections( > self, > listener: TcpListener, > - sender: tokio::sync::mpsc::Sender>, > + acceptor: Arc>, > + sender: Sender, > ) { > let accept_counter = Arc::new(()); > let mut shutdown_future = crate::shutdown_future().fuse(); > > loop { > - let (sock, peer) = futures::select! { > - res = listener.accept().fuse() => match res { > - Ok(conn) => conn, > + let socket = futures::select! { > + res = self.try_setup_socket(&listener).fuse() => match res { > + Ok(socket) => socket, > Err(err) => { > - eprintln!("error accepting tcp connection: {err}"); > + log::error!("couldn't set up TCP socket: {err}"); > continue; > } > }, > - _ = shutdown_future => break, > + _ = shutdown_future => break, > }; > - #[cfg(not(feature = "rate-limited-stream"))] > - { > - let _ = &peer; > - } > > - sock.set_nodelay(true).unwrap(); > - let _ = proxmox_sys::linux::socket::set_tcp_keepalive( > - sock.as_raw_fd(), > - self.tcp_keepalive_time, > - ); > + let acceptor = Arc::clone(&acceptor); > + let accept_counter = Arc::clone(&accept_counter); > > - #[cfg(feature = "rate-limited-stream")] > - let sock = match self.lookup_rate_limiter.clone() { > - Some(lookup) => { > - RateLimitedStream::with_limiter_update_cb(sock, move || lookup(peer)) > + if Arc::strong_count(&accept_counter) > self.max_pending_accepts { > + log::error!("connection rejected - too many open connections"); > + continue; > + } > + > + match sender { > + Sender::Secure(ref secure_sender) => { > + let accept_future = Self::do_accept_tls( > + socket, > + acceptor, > + accept_counter, > + self.debug, > + secure_sender.clone(), > + ); > + > + tokio::spawn(accept_future); > + } > + Sender::SecureAndInsecure(ref secure_sender, ref insecure_sender) => { > + let accept_future = Self::do_accept_tls_optional( > + socket, > + acceptor, > + accept_counter, > + self.debug, > + secure_sender.clone(), > + insecure_sender.clone(), > + ); > + > + tokio::spawn(accept_future); > } > - None => RateLimitedStream::with_limiter(sock, None, None), > }; > + } > + } > > - let ssl = { > - // limit acceptor_guard scope > - // Acceptor can be reloaded using the command socket "reload-certificate" command > - let acceptor_guard = self.acceptor.lock().unwrap(); > + async fn try_setup_socket( > + &self, > + listener: &TcpListener, > + ) -> Result { > + let (socket, peer) = match listener.accept().await { > + Ok(connection) => connection, > + Err(error) => { > + return Err(format_err!(error)).context("error while accepting tcp stream") > + } > + }; > > - match openssl::ssl::Ssl::new(acceptor_guard.context()) { > - Ok(ssl) => ssl, > - Err(err) => { > - eprintln!("failed to create Ssl object from Acceptor context - {err}"); > - continue; > - } > - } > - }; > + socket > + .set_nodelay(true) > + .context("error while setting TCP_NODELAY on socket")?; > + > + proxmox_sys::linux::socket::set_tcp_keepalive(socket.as_raw_fd(), self.tcp_keepalive_time) > + .context("error while setting SO_KEEPALIVE on socket")?; > > - let stream = match tokio_openssl::SslStream::new(ssl, sock) { > - Ok(stream) => stream, > + #[cfg(feature = "rate-limited-stream")] > + let socket = match self.lookup_rate_limiter.clone() { > + Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)), > + None => RateLimitedStream::with_limiter(socket, None, None), > + }; > + > + #[cfg(not(feature = "rate-limited-stream"))] > + let _peer = peer; > + > + Ok(socket) > + } > + > + async fn do_accept_tls( > + socket: InsecureClientStream, > + acceptor: Arc>, > + accept_counter: Arc<()>, > + debug: bool, > + secure_sender: ClientSender, > + ) { > + let ssl = { > + // limit acceptor_guard scope > + // Acceptor can be reloaded using the command socket "reload-certificate" command > + let acceptor_guard = acceptor.lock().unwrap(); > + > + match openssl::ssl::Ssl::new(acceptor_guard.context()) { > + Ok(ssl) => ssl, > Err(err) => { > - eprintln!("failed to create SslStream using ssl and connection socket - {err}"); > - continue; > + log::error!("failed to create Ssl object from Acceptor context - {err}"); > + return; > } > - }; > + } > + }; > > - let mut stream = Box::pin(stream); > - let sender = sender.clone(); > + let secure_stream = match tokio_openssl::SslStream::new(ssl, socket) { > + Ok(stream) => stream, > + Err(err) => { > + log::error!("failed to create SslStream using ssl and connection socket - {err}"); > + return; > + } > + }; > > - if Arc::strong_count(&accept_counter) > self.max_pending_accepts { > - eprintln!("connection rejected - too many open connections"); > - continue; > + let mut secure_stream = Box::pin(secure_stream); > + > + let accept_future = > + tokio::time::timeout(Duration::new(10, 0), secure_stream.as_mut().accept()); > + > + let result = accept_future.await; > + > + match result { > + Ok(Ok(())) => { > + if secure_sender.send(Ok(secure_stream)).await.is_err() && debug { > + log::error!("detected closed connection channel"); > + } > + } > + Ok(Err(err)) => { > + if debug { > + log::error!("https handshake failed - {err}"); > + } > } > + Err(_) => { > + if debug { > + log::error!("https handshake timeout"); > + } > + } > + } > > - let accept_counter = Arc::clone(&accept_counter); > - tokio::spawn(async move { > - let accept_future = > - tokio::time::timeout(Duration::new(10, 0), stream.as_mut().accept()); > + drop(accept_counter); // decrease reference count > + } > > - let result = accept_future.await; > + async fn do_accept_tls_optional( > + socket: InsecureClientStream, > + acceptor: Arc>, > + accept_counter: Arc<()>, > + debug: bool, > + secure_sender: ClientSender, > + insecure_sender: InsecureClientSender, > + ) { > + let client_initiates_handshake = { > + #[cfg(feature = "rate-limited-stream")] > + let socket = socket.inner(); > > - match result { > - Ok(Ok(())) => { > - if sender.send(Ok(stream)).await.is_err() && self.debug { > - log::error!("detect closed connection channel"); > - } > - } > - Ok(Err(err)) => { > - if self.debug { > - log::error!("https handshake failed - {err}"); > - } > - } > - Err(_) => { > - if self.debug { > - log::error!("https handshake timeout"); > - } > - } > + #[cfg(not(feature = "rate-limited-stream"))] > + let socket = &socket; > + > + match Self::wait_for_client_tls_handshake(socket).await { > + Ok(initiates_handshake) => initiates_handshake, > + Err(err) => { > + log::error!("error checking for TLS handshake: {err}"); > + return; > } > + } > + }; > + > + if !client_initiates_handshake { > + let insecure_stream = Box::pin(socket); > > - drop(accept_counter); // decrease reference count > - }); > + if insecure_sender.send(Ok(insecure_stream)).await.is_err() && debug { > + log::error!("detected closed connection channel") > + } > + > + return; > } > + > + Self::do_accept_tls(socket, acceptor, accept_counter, debug, secure_sender).await > } > + > + async fn wait_for_client_tls_handshake(incoming_stream: &TcpStream) -> Result { > + const MS_TIMEOUT: u64 = 1000; > + const BYTES_BUF_SIZE: usize = 128; > + > + let mut buf = [0; BYTES_BUF_SIZE]; > + let mut last_peek_size = 0; > + > + let future = async { > + loop { > + let peek_size = incoming_stream > + .peek(&mut buf) > + .await > + .context("couldn't peek into incoming tcp stream")?; > + > + if contains_tls_handshake_fragment(&buf) { > + return Ok(true); > + } > + > + // No more new data came in > + if peek_size == last_peek_size { > + return Ok(false); > + } > + > + last_peek_size = peek_size; > + > + // yields to event loop; this future blocks otherwise ad infinitum > + tokio::time::sleep(Duration::from_millis(0)).await; Just noticed this - how about tokio::task::yield_now()? Also this makes me wish for epoll to have a flag for edge triggering to also trigger on additional data not just when going up from zero.