* [pbs-devel] [PATCH v2 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS
2023-07-18 13:28 [pbs-devel] [PATCH v2 proxmox, proxmox-backup 0/3] Add support for HTTP to HTTPS redirection Max Carrara
@ 2023-07-18 13:28 ` Max Carrara
2023-07-18 13:28 ` [pbs-devel] [PATCH v2 proxmox 2/3] rest-server: Add `Redirector` Max Carrara
2023-07-18 13:28 ` [pbs-devel] [PATCH v2 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS Max Carrara
2 siblings, 0 replies; 4+ messages in thread
From: Max Carrara @ 2023-07-18 13:28 UTC (permalink / raw)
To: pbs-devel
The new public function `accept_tls_optional()` is added, which
accepts both plain TCP streams and TCP streams running TLS. Plain TCP
streams are sent along via a separate channel in order to clearly
distinguish between "secure" and "insecure" connections.
Furthermore, instead of `AcceptBuilder` itself holding a reference to
an `SslAcceptor`, its public functions now take the acceptor as an
argument. The public functions' names are changed to distinguish
between their functionality in a more explicit manner:
* `accept()` --> `accept_tls()`
* NEW --> `accept_tls_optional()`
Signed-off-by: Max Carrara <m.carrara@proxmox.com>
---
Changes v1 --> v2:
* No more `BiAcceptBuilder`, `AcceptBuilder` is refactored instead
* `AcceptBuilder` doesn't hold a reference to `SslAcceptor` anymore
* Avoid unnecessary `#[cfg]`s
* Avoid unnecessarily duplicated code (already mostly done by getting
rid of `BiAcceptBuilder`)
* Some clippy stuff
proxmox-rest-server/src/connection.rs | 369 ++++++++++++++++++++------
1 file changed, 285 insertions(+), 84 deletions(-)
diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
index 7681f00..f498e46 100644
--- a/proxmox-rest-server/src/connection.rs
+++ b/proxmox-rest-server/src/connection.rs
@@ -8,15 +8,16 @@ use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use anyhow::Context as _;
-use anyhow::Error;
+use anyhow::{format_err, Context as _, Error};
use futures::FutureExt;
+use hyper::server::accept;
use openssl::ec::{EcGroup, EcKey};
use openssl::nid::Nid;
use openssl::pkey::{PKey, Private};
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use openssl::x509::X509;
use tokio::net::{TcpListener, TcpStream};
+use tokio::sync::mpsc;
use tokio_openssl::SslStream;
use tokio_stream::wrappers::ReceiverStream;
@@ -133,10 +134,14 @@ impl TlsAcceptorBuilder {
}
}
-#[cfg(feature = "rate-limited-stream")]
-type ClientStreamResult = Pin<Box<SslStream<RateLimitedStream<TcpStream>>>>;
#[cfg(not(feature = "rate-limited-stream"))]
-type ClientStreamResult = Pin<Box<SslStream<TcpStream>>>;
+type InsecureClientStream = TcpStream;
+#[cfg(feature = "rate-limited-stream")]
+type InsecureClientStream = RateLimitedStream<TcpStream>;
+
+type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
+
+type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
#[cfg(feature = "rate-limited-stream")]
type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
@@ -145,7 +150,6 @@ type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit
+ 'static;
pub struct AcceptBuilder {
- acceptor: Arc<Mutex<SslAcceptor>>,
debug: bool,
tcp_keepalive_time: u32,
max_pending_accepts: usize,
@@ -154,16 +158,9 @@ pub struct AcceptBuilder {
lookup_rate_limiter: Option<Arc<LookupRateLimiter>>,
}
-impl AcceptBuilder {
- pub fn new() -> Result<Self, Error> {
- Ok(Self::with_acceptor(Arc::new(Mutex::new(
- TlsAcceptorBuilder::new().build()?,
- ))))
- }
-
- pub fn with_acceptor(acceptor: Arc<Mutex<SslAcceptor>>) -> Self {
+impl Default for AcceptBuilder {
+ fn default() -> Self {
Self {
- acceptor,
debug: false,
tcp_keepalive_time: 120,
max_pending_accepts: 1024,
@@ -172,6 +169,12 @@ impl AcceptBuilder {
lookup_rate_limiter: None,
}
}
+}
+
+impl AcceptBuilder {
+ pub fn new() -> Self {
+ Default::default()
+ }
pub fn debug(mut self, debug: bool) -> Self {
self.debug = debug;
@@ -193,112 +196,310 @@ impl AcceptBuilder {
self.lookup_rate_limiter = Some(lookup_rate_limiter);
self
}
+}
- pub fn accept(
+impl AcceptBuilder {
+ pub fn accept_tls(
self,
listener: TcpListener,
- ) -> impl hyper::server::accept::Accept<Conn = ClientStreamResult, Error = Error> {
- let (sender, receiver) = tokio::sync::mpsc::channel(self.max_pending_accepts);
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
+ let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
- tokio::spawn(self.accept_connections(listener, sender));
+ tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
- //receiver
- hyper::server::accept::from_stream(ReceiverStream::new(receiver))
+ accept::from_stream(ReceiverStream::new(secure_receiver))
}
+ pub fn accept_tls_optional(
+ self,
+ listener: TcpListener,
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ ) -> (
+ impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
+ impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
+ ) {
+ let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
+ let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
+
+ tokio::spawn(self.accept_connections(
+ listener,
+ acceptor,
+ (secure_sender, insecure_sender).into(),
+ ));
+
+ (
+ accept::from_stream(ReceiverStream::new(secure_receiver)),
+ accept::from_stream(ReceiverStream::new(insecure_receiver)),
+ )
+ }
+}
+
+type ClientSender = mpsc::Sender<Result<ClientStreamResult, Error>>;
+type InsecureClientSender = mpsc::Sender<Result<InsecureClientStreamResult, Error>>;
+
+enum Sender {
+ Secure(ClientSender),
+ SecureAndInsecure(ClientSender, InsecureClientSender),
+}
+
+impl From<ClientSender> for Sender {
+ fn from(sender: ClientSender) -> Self {
+ Sender::Secure(sender)
+ }
+}
+
+impl From<(ClientSender, InsecureClientSender)> for Sender {
+ fn from(senders: (ClientSender, InsecureClientSender)) -> Self {
+ Sender::SecureAndInsecure(senders.0, senders.1)
+ }
+}
+
+impl AcceptBuilder {
async fn accept_connections(
self,
listener: TcpListener,
- sender: tokio::sync::mpsc::Sender<Result<ClientStreamResult, Error>>,
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ sender: Sender,
) {
let accept_counter = Arc::new(());
let mut shutdown_future = crate::shutdown_future().fuse();
loop {
- let (sock, peer) = futures::select! {
- res = listener.accept().fuse() => match res {
- Ok(conn) => conn,
+ let socket = futures::select! {
+ res = self.try_setup_socket(&listener).fuse() => match res {
+ Ok(socket) => socket,
Err(err) => {
- eprintln!("error accepting tcp connection: {err}");
+ log::error!("couldn't set up TCP socket: {err}");
continue;
}
},
- _ = shutdown_future => break,
+ _ = shutdown_future => break,
};
- #[cfg(not(feature = "rate-limited-stream"))]
- drop(peer);
- sock.set_nodelay(true).unwrap();
- let _ = proxmox_sys::linux::socket::set_tcp_keepalive(
- sock.as_raw_fd(),
- self.tcp_keepalive_time,
- );
+ let acceptor = Arc::clone(&acceptor);
+ let accept_counter = Arc::clone(&accept_counter);
- #[cfg(feature = "rate-limited-stream")]
- let sock = match self.lookup_rate_limiter.clone() {
- Some(lookup) => {
- RateLimitedStream::with_limiter_update_cb(sock, move || lookup(peer))
+ if Arc::strong_count(&accept_counter) > self.max_pending_accepts {
+ log::error!("connection rejected - too many open connections");
+ continue;
+ }
+
+ match sender {
+ Sender::Secure(ref secure_sender) => {
+ let accept_future = Self::do_accept_tls(
+ socket,
+ acceptor,
+ accept_counter,
+ self.debug,
+ secure_sender.clone(),
+ );
+
+ tokio::spawn(accept_future);
+ }
+ Sender::SecureAndInsecure(ref secure_sender, ref insecure_sender) => {
+ let accept_future = Self::do_accept_tls_optional(
+ socket,
+ acceptor,
+ accept_counter,
+ self.debug,
+ secure_sender.clone(),
+ insecure_sender.clone(),
+ );
+
+ tokio::spawn(accept_future);
}
- None => RateLimitedStream::with_limiter(sock, None, None),
};
+ }
+ }
+
+ async fn try_setup_socket(
+ &self,
+ listener: &TcpListener,
+ ) -> Result<InsecureClientStream, Error> {
+ #[allow(unused_variables)]
+ let (socket, peer) = match listener.accept().await {
+ Ok(connection) => connection,
+ Err(error) => {
+ return Err(format_err!(error)).context("error while accepting tcp stream")
+ }
+ };
- let ssl = {
- // limit acceptor_guard scope
- // Acceptor can be reloaded using the command socket "reload-certificate" command
- let acceptor_guard = self.acceptor.lock().unwrap();
+ socket
+ .set_nodelay(true)
+ .context("error while setting TCP_NODELAY on socket")?;
- match openssl::ssl::Ssl::new(acceptor_guard.context()) {
- Ok(ssl) => ssl,
- Err(err) => {
- eprintln!("failed to create Ssl object from Acceptor context - {err}");
- continue;
- }
- }
- };
+ proxmox_sys::linux::socket::set_tcp_keepalive(socket.as_raw_fd(), self.tcp_keepalive_time)
+ .context("error while setting SO_KEEPALIVE on socket")?;
- let stream = match tokio_openssl::SslStream::new(ssl, sock) {
- Ok(stream) => stream,
+ #[cfg(feature = "rate-limited-stream")]
+ let socket = match self.lookup_rate_limiter.clone() {
+ Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
+ None => RateLimitedStream::with_limiter(socket, None, None),
+ };
+
+ Ok(socket)
+ }
+
+ async fn do_accept_tls(
+ socket: InsecureClientStream,
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ accept_counter: Arc<()>,
+ debug: bool,
+ secure_sender: ClientSender,
+ ) {
+ let ssl = {
+ // limit acceptor_guard scope
+ // Acceptor can be reloaded using the command socket "reload-certificate" command
+ let acceptor_guard = acceptor.lock().unwrap();
+
+ match openssl::ssl::Ssl::new(acceptor_guard.context()) {
+ Ok(ssl) => ssl,
Err(err) => {
- eprintln!("failed to create SslStream using ssl and connection socket - {err}");
- continue;
+ log::error!("failed to create Ssl object from Acceptor context - {err}");
+ return;
}
- };
+ }
+ };
- let mut stream = Box::pin(stream);
- let sender = sender.clone();
+ let secure_stream = match tokio_openssl::SslStream::new(ssl, socket) {
+ Ok(stream) => stream,
+ Err(err) => {
+ log::error!("failed to create SslStream using ssl and connection socket - {err}");
+ return;
+ }
+ };
- if Arc::strong_count(&accept_counter) > self.max_pending_accepts {
- eprintln!("connection rejected - too many open connections");
- continue;
+ let mut secure_stream = Box::pin(secure_stream);
+
+ let accept_future =
+ tokio::time::timeout(Duration::new(10, 0), secure_stream.as_mut().accept());
+
+ let result = accept_future.await;
+
+ match result {
+ Ok(Ok(())) => {
+ if secure_sender.send(Ok(secure_stream)).await.is_err() && debug {
+ log::error!("detected closed connection channel");
+ }
+ }
+ Ok(Err(err)) => {
+ if debug {
+ log::error!("https handshake failed - {err}");
+ }
}
+ Err(_) => {
+ if debug {
+ log::error!("https handshake timeout");
+ }
+ }
+ }
- let accept_counter = Arc::clone(&accept_counter);
- tokio::spawn(async move {
- let accept_future =
- tokio::time::timeout(Duration::new(10, 0), stream.as_mut().accept());
+ drop(accept_counter); // decrease reference count
+ }
- let result = accept_future.await;
+ async fn do_accept_tls_optional(
+ socket: InsecureClientStream,
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ accept_counter: Arc<()>,
+ debug: bool,
+ secure_sender: ClientSender,
+ insecure_sender: InsecureClientSender,
+ ) {
+ let client_initiates_handshake = {
+ #[cfg(feature = "rate-limited-stream")]
+ let socket = socket.inner();
- match result {
- Ok(Ok(())) => {
- if sender.send(Ok(stream)).await.is_err() && self.debug {
- log::error!("detect closed connection channel");
- }
- }
- Ok(Err(err)) => {
- if self.debug {
- log::error!("https handshake failed - {err}");
- }
- }
- Err(_) => {
- if self.debug {
- log::error!("https handshake timeout");
- }
- }
+ #[cfg(not(feature = "rate-limited-stream"))]
+ let socket = &socket;
+
+ match Self::wait_for_client_tls_handshake(socket).await {
+ Ok(initiates_handshake) => initiates_handshake,
+ Err(err) => {
+ log::error!("error checking for TLS handshake: {err}");
+ return;
}
+ }
+ };
+
+ if !client_initiates_handshake {
+ let insecure_stream = Box::pin(socket);
- drop(accept_counter); // decrease reference count
- });
+ if insecure_sender.send(Ok(insecure_stream)).await.is_err() && debug {
+ log::error!("detected closed connection channel")
+ }
+
+ return;
}
+
+ Self::do_accept_tls(socket, acceptor, accept_counter, debug, secure_sender).await
}
+
+ async fn wait_for_client_tls_handshake(incoming_stream: &TcpStream) -> Result<bool, Error> {
+ const MS_TIMEOUT: u64 = 1000;
+ const BYTES_BUF_SIZE: usize = 128;
+
+ let mut buf = [0; BYTES_BUF_SIZE];
+ let mut last_peek_size = 0;
+
+ let future = async {
+ loop {
+ let peek_size = incoming_stream
+ .peek(&mut buf)
+ .await
+ .context("couldn't peek into incoming tcp stream")?;
+
+ if contains_tls_handshake_fragment(&buf) {
+ return Ok(true);
+ }
+
+ // No more new data came in
+ if peek_size == last_peek_size {
+ return Ok(false);
+ }
+
+ last_peek_size = peek_size;
+
+ // yields to event loop; this future blocks otherwise ad infinitum
+ tokio::time::sleep(Duration::from_millis(0)).await;
+ }
+ };
+
+ tokio::time::timeout(Duration::from_millis(MS_TIMEOUT), future)
+ .await
+ .unwrap_or(Ok(false))
+ }
+}
+
+/// Checks whether an [SSL 3.0 / TLS plaintext fragment][0] being part of a
+/// SSL / TLS handshake is contained in the given buffer.
+///
+/// Such a fragment might look as follows:
+/// ```ignore
+/// [0x16, 0x3, 0x1, 0x02, 0x00, ...]
+/// // | | | |_____|
+/// // | | | \__ content length interpreted as u16
+/// // | | | must not exceed 0x4000 (2^14) bytes
+/// // | | |
+/// // | | \__ any minor version
+/// // | |
+/// // | \__ major version 3
+/// // |
+/// // \__ content type is handshake(22)
+/// ```
+///
+/// If a slice like this is detected at the beginning of the given buffer,
+/// a TLS handshake is most definitely being made.
+///
+/// [0]: https://datatracker.ietf.org/doc/html/rfc6101#section-5.2
+#[inline]
+fn contains_tls_handshake_fragment(buf: &[u8]) -> bool {
+ const SLICE_LENGTH: usize = 5;
+ const CONTENT_SIZE: u16 = 1 << 14; // max length of a TLS plaintext fragment
+
+ if buf.len() < SLICE_LENGTH {
+ return false;
+ }
+
+ buf[0] == 0x16 && buf[1] == 0x3 && (((buf[3] as u16) << 8) + buf[4] as u16) <= CONTENT_SIZE
}
--
2.39.2
^ permalink raw reply [flat|nested] 4+ messages in thread
* [pbs-devel] [PATCH v2 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS
2023-07-18 13:28 [pbs-devel] [PATCH v2 proxmox, proxmox-backup 0/3] Add support for HTTP to HTTPS redirection Max Carrara
2023-07-18 13:28 ` [pbs-devel] [PATCH v2 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS Max Carrara
2023-07-18 13:28 ` [pbs-devel] [PATCH v2 proxmox 2/3] rest-server: Add `Redirector` Max Carrara
@ 2023-07-18 13:28 ` Max Carrara
2 siblings, 0 replies; 4+ messages in thread
From: Max Carrara @ 2023-07-18 13:28 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Max Carrara <m.carrara@proxmox.com>
---
Changes v1 --> v2:
* Incorporate changes of the previous two patches correspondingly
src/bin/proxmox-backup-proxy.rs | 46 ++++++++++++++++++++++++++++-----
1 file changed, 39 insertions(+), 7 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index f38a02bd..f69f5bfc 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -23,8 +23,8 @@ use proxmox_sys::{task_log, task_warn};
use pbs_datastore::DataStore;
use proxmox_rest_server::{
- cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, RestEnvironment,
- RestServer, WorkerTask,
+ cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector,
+ RestEnvironment, RestServer, WorkerTask,
};
use proxmox_backup::rrd_cache::{
@@ -253,6 +253,7 @@ async fn run() -> Result<(), Error> {
)?;
let rest_server = RestServer::new(config);
+ let redirector = Redirector::new();
proxmox_rest_server::init_worker_tasks(
pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(),
file_opts.clone(),
@@ -288,23 +289,54 @@ async fn run() -> Result<(), Error> {
Ok(Value::Null)
})?;
- let connections = proxmox_rest_server::connection::AcceptBuilder::with_acceptor(acceptor)
+ let connections = proxmox_rest_server::connection::AcceptBuilder::new()
.debug(debug)
.rate_limiter_lookup(Arc::new(lookup_rate_limiter))
.tcp_keepalive_time(PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
let server = daemon::create_daemon(
([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
move |listener| {
- let connections = connections.accept(listener);
+ let (secure_connections, insecure_connections) =
+ connections.accept_tls_optional(listener, acceptor);
Ok(async {
daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
- hyper::Server::builder(connections)
+ let secure_server = hyper::Server::builder(secure_connections)
.serve(rest_server)
.with_graceful_shutdown(proxmox_rest_server::shutdown_future())
- .map_err(Error::from)
- .await
+ .map_err(Error::from);
+
+ let insecure_server = hyper::Server::builder(insecure_connections)
+ .serve(redirector)
+ .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
+ .map_err(Error::from);
+
+ let handles = vec![tokio::spawn(secure_server), tokio::spawn(insecure_server)];
+
+ let mut results: Vec<Result<(), Error>> = vec![];
+
+ for res_handle in futures::future::join_all(handles).await.into_iter() {
+ let flattened_res = match res_handle {
+ Ok(inner) => inner,
+ Err(err) => Err(format_err!(err)),
+ };
+
+ results.push(flattened_res);
+ }
+
+ if results.iter().any(Result::is_err) {
+ let cat_errors = results
+ .into_iter()
+ .filter_map(|res| res.err().map(|err| err.to_string()))
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ return Err(format_err!(cat_errors));
+ }
+
+ Ok(())
})
},
Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
--
2.39.2
^ permalink raw reply [flat|nested] 4+ messages in thread