* [pbs-devel] [PATCH v3 proxmox, proxmox-backup 0/3] Add support for HTTP to HTTPS redirection
@ 2023-10-31 18:47 Max Carrara
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS Max Carrara
` (2 more replies)
0 siblings, 3 replies; 9+ messages in thread
From: Max Carrara @ 2023-10-31 18:47 UTC (permalink / raw)
To: pbs-devel
This v3 updates the patch series so it applies again on master.
See v2 [0] for notable changes.
[0]: https://lists.proxmox.com/pipermail/pbs-devel/2023-July/006312.html
proxmox:
Max Carrara (2):
rest-server: Refactor `AcceptBuilder`, provide support for optional
TLS
rest-server: Add `Redirector`
proxmox-rest-server/src/connection.rs | 373 ++++++++++++++++++++------
proxmox-rest-server/src/lib.rs | 2 +-
proxmox-rest-server/src/rest.rs | 71 +++++
3 files changed, 359 insertions(+), 87 deletions(-)
proxmox-backup:
Max Carrara (1):
proxy: redirect HTTP requests to HTTPS
src/bin/proxmox-backup-proxy.rs | 46 ++++++++++++++++++++++++++++-----
1 file changed, 39 insertions(+), 7 deletions(-)
--
2.39.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH v3 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS
2023-10-31 18:47 [pbs-devel] [PATCH v3 proxmox, proxmox-backup 0/3] Add support for HTTP to HTTPS redirection Max Carrara
@ 2023-10-31 18:47 ` Max Carrara
2023-11-16 7:35 ` Wolfgang Bumiller
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 2/3] rest-server: Add `Redirector` Max Carrara
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS Max Carrara
2 siblings, 1 reply; 9+ messages in thread
From: Max Carrara @ 2023-10-31 18:47 UTC (permalink / raw)
To: pbs-devel
The new public function `accept_tls_optional()` is added, which
accepts both plain TCP streams and TCP streams running TLS. Plain TCP
streams are sent along via a separate channel in order to clearly
distinguish between "secure" and "insecure" connections.
Furthermore, instead of `AcceptBuilder` itself holding a reference to
an `SslAcceptor`, its public functions now take the acceptor as an
argument. The public functions' names are changed to distinguish
between their functionality in a more explicit manner:
* `accept()` --> `accept_tls()`
* NEW --> `accept_tls_optional()`
Signed-off-by: Max Carrara <m.carrara@proxmox.com>
---
Changes v1 --> v2:
* No more `BiAcceptBuilder`, `AcceptBuilder` is refactored instead
* `AcceptBuilder` doesn't hold a reference to `SslAcceptor` anymore
* Avoid unnecessary `#[cfg]`s
* Avoid unnecessarily duplicated code (already mostly done by getting
rid of `BiAcceptBuilder`)
* Some clippy stuff
Changes v2 --> v3:
* Incorporate previously applied clippy fixes
proxmox-rest-server/src/connection.rs | 373 ++++++++++++++++++++------
1 file changed, 287 insertions(+), 86 deletions(-)
diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
index 1bec28d..ab8c7db 100644
--- a/proxmox-rest-server/src/connection.rs
+++ b/proxmox-rest-server/src/connection.rs
@@ -8,15 +8,16 @@ use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use anyhow::Context as _;
-use anyhow::Error;
+use anyhow::{format_err, Context as _, Error};
use futures::FutureExt;
+use hyper::server::accept;
use openssl::ec::{EcGroup, EcKey};
use openssl::nid::Nid;
use openssl::pkey::{PKey, Private};
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use openssl::x509::X509;
use tokio::net::{TcpListener, TcpStream};
+use tokio::sync::mpsc;
use tokio_openssl::SslStream;
use tokio_stream::wrappers::ReceiverStream;
@@ -133,10 +134,14 @@ impl TlsAcceptorBuilder {
}
}
-#[cfg(feature = "rate-limited-stream")]
-type ClientStreamResult = Pin<Box<SslStream<RateLimitedStream<TcpStream>>>>;
#[cfg(not(feature = "rate-limited-stream"))]
-type ClientStreamResult = Pin<Box<SslStream<TcpStream>>>;
+type InsecureClientStream = TcpStream;
+#[cfg(feature = "rate-limited-stream")]
+type InsecureClientStream = RateLimitedStream<TcpStream>;
+
+type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
+
+type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
#[cfg(feature = "rate-limited-stream")]
type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
@@ -145,7 +150,6 @@ type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit
+ 'static;
pub struct AcceptBuilder {
- acceptor: Arc<Mutex<SslAcceptor>>,
debug: bool,
tcp_keepalive_time: u32,
max_pending_accepts: usize,
@@ -154,16 +158,9 @@ pub struct AcceptBuilder {
lookup_rate_limiter: Option<Arc<LookupRateLimiter>>,
}
-impl AcceptBuilder {
- pub fn new() -> Result<Self, Error> {
- Ok(Self::with_acceptor(Arc::new(Mutex::new(
- TlsAcceptorBuilder::new().build()?,
- ))))
- }
-
- pub fn with_acceptor(acceptor: Arc<Mutex<SslAcceptor>>) -> Self {
+impl Default for AcceptBuilder {
+ fn default() -> Self {
Self {
- acceptor,
debug: false,
tcp_keepalive_time: 120,
max_pending_accepts: 1024,
@@ -172,6 +169,12 @@ impl AcceptBuilder {
lookup_rate_limiter: None,
}
}
+}
+
+impl AcceptBuilder {
+ pub fn new() -> Self {
+ Default::default()
+ }
pub fn debug(mut self, debug: bool) -> Self {
self.debug = debug;
@@ -193,114 +196,312 @@ impl AcceptBuilder {
self.lookup_rate_limiter = Some(lookup_rate_limiter);
self
}
+}
- pub fn accept(
+impl AcceptBuilder {
+ pub fn accept_tls(
self,
listener: TcpListener,
- ) -> impl hyper::server::accept::Accept<Conn = ClientStreamResult, Error = Error> {
- let (sender, receiver) = tokio::sync::mpsc::channel(self.max_pending_accepts);
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
+ let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
+
+ tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
+
+ accept::from_stream(ReceiverStream::new(secure_receiver))
+ }
+
+ pub fn accept_tls_optional(
+ self,
+ listener: TcpListener,
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ ) -> (
+ impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
+ impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
+ ) {
+ let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
+ let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
+
+ tokio::spawn(self.accept_connections(
+ listener,
+ acceptor,
+ (secure_sender, insecure_sender).into(),
+ ));
+
+ (
+ accept::from_stream(ReceiverStream::new(secure_receiver)),
+ accept::from_stream(ReceiverStream::new(insecure_receiver)),
+ )
+ }
+}
+
+type ClientSender = mpsc::Sender<Result<ClientStreamResult, Error>>;
+type InsecureClientSender = mpsc::Sender<Result<InsecureClientStreamResult, Error>>;
- tokio::spawn(self.accept_connections(listener, sender));
+enum Sender {
+ Secure(ClientSender),
+ SecureAndInsecure(ClientSender, InsecureClientSender),
+}
- //receiver
- hyper::server::accept::from_stream(ReceiverStream::new(receiver))
+impl From<ClientSender> for Sender {
+ fn from(sender: ClientSender) -> Self {
+ Sender::Secure(sender)
}
+}
+
+impl From<(ClientSender, InsecureClientSender)> for Sender {
+ fn from(senders: (ClientSender, InsecureClientSender)) -> Self {
+ Sender::SecureAndInsecure(senders.0, senders.1)
+ }
+}
+impl AcceptBuilder {
async fn accept_connections(
self,
listener: TcpListener,
- sender: tokio::sync::mpsc::Sender<Result<ClientStreamResult, Error>>,
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ sender: Sender,
) {
let accept_counter = Arc::new(());
let mut shutdown_future = crate::shutdown_future().fuse();
loop {
- let (sock, peer) = futures::select! {
- res = listener.accept().fuse() => match res {
- Ok(conn) => conn,
+ let socket = futures::select! {
+ res = self.try_setup_socket(&listener).fuse() => match res {
+ Ok(socket) => socket,
Err(err) => {
- eprintln!("error accepting tcp connection: {err}");
+ log::error!("couldn't set up TCP socket: {err}");
continue;
}
},
- _ = shutdown_future => break,
+ _ = shutdown_future => break,
};
- #[cfg(not(feature = "rate-limited-stream"))]
- {
- let _ = &peer;
- }
- sock.set_nodelay(true).unwrap();
- let _ = proxmox_sys::linux::socket::set_tcp_keepalive(
- sock.as_raw_fd(),
- self.tcp_keepalive_time,
- );
+ let acceptor = Arc::clone(&acceptor);
+ let accept_counter = Arc::clone(&accept_counter);
- #[cfg(feature = "rate-limited-stream")]
- let sock = match self.lookup_rate_limiter.clone() {
- Some(lookup) => {
- RateLimitedStream::with_limiter_update_cb(sock, move || lookup(peer))
+ if Arc::strong_count(&accept_counter) > self.max_pending_accepts {
+ log::error!("connection rejected - too many open connections");
+ continue;
+ }
+
+ match sender {
+ Sender::Secure(ref secure_sender) => {
+ let accept_future = Self::do_accept_tls(
+ socket,
+ acceptor,
+ accept_counter,
+ self.debug,
+ secure_sender.clone(),
+ );
+
+ tokio::spawn(accept_future);
+ }
+ Sender::SecureAndInsecure(ref secure_sender, ref insecure_sender) => {
+ let accept_future = Self::do_accept_tls_optional(
+ socket,
+ acceptor,
+ accept_counter,
+ self.debug,
+ secure_sender.clone(),
+ insecure_sender.clone(),
+ );
+
+ tokio::spawn(accept_future);
}
- None => RateLimitedStream::with_limiter(sock, None, None),
};
+ }
+ }
- let ssl = {
- // limit acceptor_guard scope
- // Acceptor can be reloaded using the command socket "reload-certificate" command
- let acceptor_guard = self.acceptor.lock().unwrap();
+ async fn try_setup_socket(
+ &self,
+ listener: &TcpListener,
+ ) -> Result<InsecureClientStream, Error> {
+ let (socket, peer) = match listener.accept().await {
+ Ok(connection) => connection,
+ Err(error) => {
+ return Err(format_err!(error)).context("error while accepting tcp stream")
+ }
+ };
- match openssl::ssl::Ssl::new(acceptor_guard.context()) {
- Ok(ssl) => ssl,
- Err(err) => {
- eprintln!("failed to create Ssl object from Acceptor context - {err}");
- continue;
- }
- }
- };
+ socket
+ .set_nodelay(true)
+ .context("error while setting TCP_NODELAY on socket")?;
+
+ proxmox_sys::linux::socket::set_tcp_keepalive(socket.as_raw_fd(), self.tcp_keepalive_time)
+ .context("error while setting SO_KEEPALIVE on socket")?;
- let stream = match tokio_openssl::SslStream::new(ssl, sock) {
- Ok(stream) => stream,
+ #[cfg(feature = "rate-limited-stream")]
+ let socket = match self.lookup_rate_limiter.clone() {
+ Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
+ None => RateLimitedStream::with_limiter(socket, None, None),
+ };
+
+ #[cfg(not(feature = "rate-limited-stream"))]
+ let _peer = peer;
+
+ Ok(socket)
+ }
+
+ async fn do_accept_tls(
+ socket: InsecureClientStream,
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ accept_counter: Arc<()>,
+ debug: bool,
+ secure_sender: ClientSender,
+ ) {
+ let ssl = {
+ // limit acceptor_guard scope
+ // Acceptor can be reloaded using the command socket "reload-certificate" command
+ let acceptor_guard = acceptor.lock().unwrap();
+
+ match openssl::ssl::Ssl::new(acceptor_guard.context()) {
+ Ok(ssl) => ssl,
Err(err) => {
- eprintln!("failed to create SslStream using ssl and connection socket - {err}");
- continue;
+ log::error!("failed to create Ssl object from Acceptor context - {err}");
+ return;
}
- };
+ }
+ };
- let mut stream = Box::pin(stream);
- let sender = sender.clone();
+ let secure_stream = match tokio_openssl::SslStream::new(ssl, socket) {
+ Ok(stream) => stream,
+ Err(err) => {
+ log::error!("failed to create SslStream using ssl and connection socket - {err}");
+ return;
+ }
+ };
- if Arc::strong_count(&accept_counter) > self.max_pending_accepts {
- eprintln!("connection rejected - too many open connections");
- continue;
+ let mut secure_stream = Box::pin(secure_stream);
+
+ let accept_future =
+ tokio::time::timeout(Duration::new(10, 0), secure_stream.as_mut().accept());
+
+ let result = accept_future.await;
+
+ match result {
+ Ok(Ok(())) => {
+ if secure_sender.send(Ok(secure_stream)).await.is_err() && debug {
+ log::error!("detected closed connection channel");
+ }
+ }
+ Ok(Err(err)) => {
+ if debug {
+ log::error!("https handshake failed - {err}");
+ }
}
+ Err(_) => {
+ if debug {
+ log::error!("https handshake timeout");
+ }
+ }
+ }
- let accept_counter = Arc::clone(&accept_counter);
- tokio::spawn(async move {
- let accept_future =
- tokio::time::timeout(Duration::new(10, 0), stream.as_mut().accept());
+ drop(accept_counter); // decrease reference count
+ }
- let result = accept_future.await;
+ async fn do_accept_tls_optional(
+ socket: InsecureClientStream,
+ acceptor: Arc<Mutex<SslAcceptor>>,
+ accept_counter: Arc<()>,
+ debug: bool,
+ secure_sender: ClientSender,
+ insecure_sender: InsecureClientSender,
+ ) {
+ let client_initiates_handshake = {
+ #[cfg(feature = "rate-limited-stream")]
+ let socket = socket.inner();
- match result {
- Ok(Ok(())) => {
- if sender.send(Ok(stream)).await.is_err() && self.debug {
- log::error!("detect closed connection channel");
- }
- }
- Ok(Err(err)) => {
- if self.debug {
- log::error!("https handshake failed - {err}");
- }
- }
- Err(_) => {
- if self.debug {
- log::error!("https handshake timeout");
- }
- }
+ #[cfg(not(feature = "rate-limited-stream"))]
+ let socket = &socket;
+
+ match Self::wait_for_client_tls_handshake(socket).await {
+ Ok(initiates_handshake) => initiates_handshake,
+ Err(err) => {
+ log::error!("error checking for TLS handshake: {err}");
+ return;
}
+ }
+ };
+
+ if !client_initiates_handshake {
+ let insecure_stream = Box::pin(socket);
- drop(accept_counter); // decrease reference count
- });
+ if insecure_sender.send(Ok(insecure_stream)).await.is_err() && debug {
+ log::error!("detected closed connection channel")
+ }
+
+ return;
}
+
+ Self::do_accept_tls(socket, acceptor, accept_counter, debug, secure_sender).await
}
+
+ async fn wait_for_client_tls_handshake(incoming_stream: &TcpStream) -> Result<bool, Error> {
+ const MS_TIMEOUT: u64 = 1000;
+ const BYTES_BUF_SIZE: usize = 128;
+
+ let mut buf = [0; BYTES_BUF_SIZE];
+ let mut last_peek_size = 0;
+
+ let future = async {
+ loop {
+ let peek_size = incoming_stream
+ .peek(&mut buf)
+ .await
+ .context("couldn't peek into incoming tcp stream")?;
+
+ if contains_tls_handshake_fragment(&buf) {
+ return Ok(true);
+ }
+
+ // No more new data came in
+ if peek_size == last_peek_size {
+ return Ok(false);
+ }
+
+ last_peek_size = peek_size;
+
+ // yields to event loop; this future blocks otherwise ad infinitum
+ tokio::time::sleep(Duration::from_millis(0)).await;
+ }
+ };
+
+ tokio::time::timeout(Duration::from_millis(MS_TIMEOUT), future)
+ .await
+ .unwrap_or(Ok(false))
+ }
+}
+
+/// Checks whether an [SSL 3.0 / TLS plaintext fragment][0] being part of a
+/// SSL / TLS handshake is contained in the given buffer.
+///
+/// Such a fragment might look as follows:
+/// ```ignore
+/// [0x16, 0x3, 0x1, 0x02, 0x00, ...]
+/// // | | | |_____|
+/// // | | | \__ content length interpreted as u16
+/// // | | | must not exceed 0x4000 (2^14) bytes
+/// // | | |
+/// // | | \__ any minor version
+/// // | |
+/// // | \__ major version 3
+/// // |
+/// // \__ content type is handshake(22)
+/// ```
+///
+/// If a slice like this is detected at the beginning of the given buffer,
+/// a TLS handshake is most definitely being made.
+///
+/// [0]: https://datatracker.ietf.org/doc/html/rfc6101#section-5.2
+#[inline]
+fn contains_tls_handshake_fragment(buf: &[u8]) -> bool {
+ const SLICE_LENGTH: usize = 5;
+ const CONTENT_SIZE: u16 = 1 << 14; // max length of a TLS plaintext fragment
+
+ if buf.len() < SLICE_LENGTH {
+ return false;
+ }
+
+ buf[0] == 0x16 && buf[1] == 0x3 && (((buf[3] as u16) << 8) + buf[4] as u16) <= CONTENT_SIZE
}
--
2.39.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH v3 proxmox 2/3] rest-server: Add `Redirector`
2023-10-31 18:47 [pbs-devel] [PATCH v3 proxmox, proxmox-backup 0/3] Add support for HTTP to HTTPS redirection Max Carrara
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS Max Carrara
@ 2023-10-31 18:47 ` Max Carrara
2023-11-03 10:17 ` Wolfgang Bumiller
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS Max Carrara
2 siblings, 1 reply; 9+ messages in thread
From: Max Carrara @ 2023-10-31 18:47 UTC (permalink / raw)
To: pbs-devel
The `Redirector` is a simple `Service` that redirects HTTP requests
to HTTPS and can be served by a `hyper::Server`.
Signed-off-by: Max Carrara <m.carrara@proxmox.com>
---
Changes v1 --> v2:
* `RedirectService` is now a ZST
* Drop constraint on `PeerAddress` trait from `Service` `impl` of
`Redirector`
Changes v2 --> v3:
* Implement `Default` for `Redirector`
proxmox-rest-server/src/lib.rs | 2 +-
proxmox-rest-server/src/rest.rs | 77 +++++++++++++++++++++++++++++++++
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
index bc5be01..1c64ffb 100644
--- a/proxmox-rest-server/src/lib.rs
+++ b/proxmox-rest-server/src/lib.rs
@@ -48,7 +48,7 @@ mod api_config;
pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
mod rest;
-pub use rest::RestServer;
+pub use rest::{Redirector, RestServer};
pub mod connection;
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index 2ccd4d5..74e9474 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -97,6 +97,83 @@ impl<T: PeerAddress> Service<&T> for RestServer {
}
}
+pub struct Redirector;
+
+impl Default for Redirector {
+ fn default() -> Self {
+ Redirector::new()
+ }
+}
+
+impl Redirector {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl<T> Service<&T> for Redirector {
+ type Response = RedirectService;
+ type Error = Error;
+ type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, _ctx: &T) -> Self::Future {
+ std::future::ready(Ok(RedirectService {}))
+ }
+}
+
+pub struct RedirectService;
+
+impl Service<Request<Body>> for RedirectService {
+ type Response = Response<Body>;
+ type Error = anyhow::Error;
+ type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
+
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ lazy_static! {
+ static ref RE: Regex = Regex::new(r"((http(s)?://)|^)").unwrap();
+ }
+
+ let future = async move {
+ let header_host_value = req
+ .headers()
+ .get("host")
+ .and_then(|value| value.to_str().ok());
+
+ let response = if let Some(value) = header_host_value {
+ let location_value = RE.replace(value, "https://");
+
+ let status_code = if matches!(*req.method(), http::Method::GET | http::Method::HEAD)
+ {
+ StatusCode::MOVED_PERMANENTLY
+ } else {
+ StatusCode::PERMANENT_REDIRECT
+ };
+
+ Response::builder()
+ .status(status_code)
+ .header("Location", String::from(location_value))
+ .body(Body::empty())?
+ } else {
+ Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::empty())?
+ };
+
+ Ok(response)
+ };
+
+ future.boxed()
+ }
+}
+
pub trait PeerAddress {
fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
}
--
2.39.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH v3 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS
2023-10-31 18:47 [pbs-devel] [PATCH v3 proxmox, proxmox-backup 0/3] Add support for HTTP to HTTPS redirection Max Carrara
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS Max Carrara
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 2/3] rest-server: Add `Redirector` Max Carrara
@ 2023-10-31 18:47 ` Max Carrara
2023-11-03 10:24 ` Wolfgang Bumiller
2 siblings, 1 reply; 9+ messages in thread
From: Max Carrara @ 2023-10-31 18:47 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Max Carrara <m.carrara@proxmox.com>
---
Changes v1 --> v2:
* Incorporate changes of the previous two patches correspondingly
Changes v2 --> v3:
* None
src/bin/proxmox-backup-proxy.rs | 46 ++++++++++++++++++++++++++++-----
1 file changed, 39 insertions(+), 7 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index f38a02bd..f69f5bfc 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -23,8 +23,8 @@ use proxmox_sys::{task_log, task_warn};
use pbs_datastore::DataStore;
use proxmox_rest_server::{
- cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, RestEnvironment,
- RestServer, WorkerTask,
+ cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector,
+ RestEnvironment, RestServer, WorkerTask,
};
use proxmox_backup::rrd_cache::{
@@ -253,6 +253,7 @@ async fn run() -> Result<(), Error> {
)?;
let rest_server = RestServer::new(config);
+ let redirector = Redirector::new();
proxmox_rest_server::init_worker_tasks(
pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(),
file_opts.clone(),
@@ -288,23 +289,54 @@ async fn run() -> Result<(), Error> {
Ok(Value::Null)
})?;
- let connections = proxmox_rest_server::connection::AcceptBuilder::with_acceptor(acceptor)
+ let connections = proxmox_rest_server::connection::AcceptBuilder::new()
.debug(debug)
.rate_limiter_lookup(Arc::new(lookup_rate_limiter))
.tcp_keepalive_time(PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
let server = daemon::create_daemon(
([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
move |listener| {
- let connections = connections.accept(listener);
+ let (secure_connections, insecure_connections) =
+ connections.accept_tls_optional(listener, acceptor);
Ok(async {
daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
- hyper::Server::builder(connections)
+ let secure_server = hyper::Server::builder(secure_connections)
.serve(rest_server)
.with_graceful_shutdown(proxmox_rest_server::shutdown_future())
- .map_err(Error::from)
- .await
+ .map_err(Error::from);
+
+ let insecure_server = hyper::Server::builder(insecure_connections)
+ .serve(redirector)
+ .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
+ .map_err(Error::from);
+
+ let handles = vec![tokio::spawn(secure_server), tokio::spawn(insecure_server)];
+
+ let mut results: Vec<Result<(), Error>> = vec![];
+
+ for res_handle in futures::future::join_all(handles).await.into_iter() {
+ let flattened_res = match res_handle {
+ Ok(inner) => inner,
+ Err(err) => Err(format_err!(err)),
+ };
+
+ results.push(flattened_res);
+ }
+
+ if results.iter().any(Result::is_err) {
+ let cat_errors = results
+ .into_iter()
+ .filter_map(|res| res.err().map(|err| err.to_string()))
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ return Err(format_err!(cat_errors));
+ }
+
+ Ok(())
})
},
Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
--
2.39.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH v3 proxmox 2/3] rest-server: Add `Redirector`
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 2/3] rest-server: Add `Redirector` Max Carrara
@ 2023-11-03 10:17 ` Wolfgang Bumiller
0 siblings, 0 replies; 9+ messages in thread
From: Wolfgang Bumiller @ 2023-11-03 10:17 UTC (permalink / raw)
To: Max Carrara; +Cc: pbs-devel
On Tue, Oct 31, 2023 at 07:47:04PM +0100, Max Carrara wrote:
> The `Redirector` is a simple `Service` that redirects HTTP requests
> to HTTPS and can be served by a `hyper::Server`.
>
> Signed-off-by: Max Carrara <m.carrara@proxmox.com>
> ---
> Changes v1 --> v2:
> * `RedirectService` is now a ZST
> * Drop constraint on `PeerAddress` trait from `Service` `impl` of
> `Redirector`
>
> Changes v2 --> v3:
> * Implement `Default` for `Redirector`
>
> proxmox-rest-server/src/lib.rs | 2 +-
> proxmox-rest-server/src/rest.rs | 77 +++++++++++++++++++++++++++++++++
> 2 files changed, 78 insertions(+), 1 deletion(-)
>
> diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
> index bc5be01..1c64ffb 100644
> --- a/proxmox-rest-server/src/lib.rs
> +++ b/proxmox-rest-server/src/lib.rs
> @@ -48,7 +48,7 @@ mod api_config;
> pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
>
> mod rest;
> -pub use rest::RestServer;
> +pub use rest::{Redirector, RestServer};
>
> pub mod connection;
>
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index 2ccd4d5..74e9474 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -97,6 +97,83 @@ impl<T: PeerAddress> Service<&T> for RestServer {
> }
> }
>
> +pub struct Redirector;
> +
> +impl Default for Redirector {
> + fn default() -> Self {
> + Redirector::new()
> + }
> +}
> +
> +impl Redirector {
> + pub fn new() -> Self {
> + Self {}
> + }
> +}
> +
> +impl<T> Service<&T> for Redirector {
> + type Response = RedirectService;
> + type Error = Error;
> + type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
> +
> + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
> + Poll::Ready(Ok(()))
> + }
> +
> + fn call(&mut self, _ctx: &T) -> Self::Future {
> + std::future::ready(Ok(RedirectService {}))
> + }
> +}
> +
> +pub struct RedirectService;
> +
> +impl Service<Request<Body>> for RedirectService {
> + type Response = Response<Body>;
> + type Error = anyhow::Error;
> + type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
> +
> + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
> + Poll::Ready(Ok(()))
> + }
> +
> + fn call(&mut self, req: Request<Body>) -> Self::Future {
> + lazy_static! {
> + static ref RE: Regex = Regex::new(r"((http(s)?://)|^)").unwrap();
Shouldn't the `http://` part be anchored, too?
And does this really warrant a regex? Seems too simple for complex
machinery.
But wait a second, why would the Host header even contain a protocol?
Otherwise 1 & 2 LGTM.
> + }
> +
> + let future = async move {
> + let header_host_value = req
> + .headers()
> + .get("host")
> + .and_then(|value| value.to_str().ok());
> +
> + let response = if let Some(value) = header_host_value {
> + let location_value = RE.replace(value, "https://");
> +
> + let status_code = if matches!(*req.method(), http::Method::GET | http::Method::HEAD)
> + {
> + StatusCode::MOVED_PERMANENTLY
> + } else {
> + StatusCode::PERMANENT_REDIRECT
> + };
> +
> + Response::builder()
> + .status(status_code)
> + .header("Location", String::from(location_value))
> + .body(Body::empty())?
> + } else {
> + Response::builder()
> + .status(StatusCode::BAD_REQUEST)
> + .body(Body::empty())?
> + };
> +
> + Ok(response)
> + };
> +
> + future.boxed()
> + }
> +}
> +
> pub trait PeerAddress {
> fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
> }
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH v3 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS Max Carrara
@ 2023-11-03 10:24 ` Wolfgang Bumiller
2023-11-15 15:22 ` Max Carrara
0 siblings, 1 reply; 9+ messages in thread
From: Wolfgang Bumiller @ 2023-11-03 10:24 UTC (permalink / raw)
To: Max Carrara; +Cc: pbs-devel
On Tue, Oct 31, 2023 at 07:47:05PM +0100, Max Carrara wrote:
> Signed-off-by: Max Carrara <m.carrara@proxmox.com>
> ---
> Changes v1 --> v2:
> * Incorporate changes of the previous two patches correspondingly
>
> Changes v2 --> v3:
> * None
>
> src/bin/proxmox-backup-proxy.rs | 46 ++++++++++++++++++++++++++++-----
> 1 file changed, 39 insertions(+), 7 deletions(-)
>
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index f38a02bd..f69f5bfc 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -23,8 +23,8 @@ use proxmox_sys::{task_log, task_warn};
> use pbs_datastore::DataStore;
>
> use proxmox_rest_server::{
> - cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, RestEnvironment,
> - RestServer, WorkerTask,
> + cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector,
> + RestEnvironment, RestServer, WorkerTask,
> };
>
> use proxmox_backup::rrd_cache::{
> @@ -253,6 +253,7 @@ async fn run() -> Result<(), Error> {
> )?;
>
> let rest_server = RestServer::new(config);
> + let redirector = Redirector::new();
> proxmox_rest_server::init_worker_tasks(
> pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(),
> file_opts.clone(),
> @@ -288,23 +289,54 @@ async fn run() -> Result<(), Error> {
> Ok(Value::Null)
> })?;
>
> - let connections = proxmox_rest_server::connection::AcceptBuilder::with_acceptor(acceptor)
> + let connections = proxmox_rest_server::connection::AcceptBuilder::new()
> .debug(debug)
> .rate_limiter_lookup(Arc::new(lookup_rate_limiter))
> .tcp_keepalive_time(PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
> +
> let server = daemon::create_daemon(
> ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
> move |listener| {
> - let connections = connections.accept(listener);
> + let (secure_connections, insecure_connections) =
> + connections.accept_tls_optional(listener, acceptor);
>
> Ok(async {
> daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
>
> - hyper::Server::builder(connections)
> + let secure_server = hyper::Server::builder(secure_connections)
> .serve(rest_server)
> .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
> - .map_err(Error::from)
> - .await
> + .map_err(Error::from);
> +
> + let insecure_server = hyper::Server::builder(insecure_connections)
> + .serve(redirector)
> + .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
> + .map_err(Error::from);
> +
> + let handles = vec![tokio::spawn(secure_server), tokio::spawn(insecure_server)];
Maybe we should just detach the redirection-handler and potentially give
it a retry logic and finally fail it with a log message.
Otherwise, this shouldn't need to be a Vec, a regular array should work,
skips the extra allocation.
> +
> + let mut results: Vec<Result<(), Error>> = vec![];
> +
> + for res_handle in futures::future::join_all(handles).await.into_iter() {
> + let flattened_res = match res_handle {
> + Ok(inner) => inner,
> + Err(err) => Err(format_err!(err)),
> + };
> +
> + results.push(flattened_res);
> + }
> +
> + if results.iter().any(Result::is_err) {
> + let cat_errors = results
> + .into_iter()
> + .filter_map(|res| res.err().map(|err| err.to_string()))
> + .collect::<Vec<_>>()
> + .join("\n");
> +
> + return Err(format_err!(cat_errors));
> + }
> +
> + Ok(())
> })
> },
> Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
> --
> 2.39.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH v3 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS
2023-11-03 10:24 ` Wolfgang Bumiller
@ 2023-11-15 15:22 ` Max Carrara
2023-11-16 7:35 ` Wolfgang Bumiller
0 siblings, 1 reply; 9+ messages in thread
From: Max Carrara @ 2023-11-15 15:22 UTC (permalink / raw)
To: Wolfgang Bumiller; +Cc: pbs-devel
On 11/3/23 11:24, Wolfgang Bumiller wrote:
> On Tue, Oct 31, 2023 at 07:47:05PM +0100, Max Carrara wrote:
>> Signed-off-by: Max Carrara <m.carrara@proxmox.com>
>> ---
>> Changes v1 --> v2:
>> * Incorporate changes of the previous two patches correspondingly
>>
>> Changes v2 --> v3:
>> * None
>>
>> src/bin/proxmox-backup-proxy.rs | 46 ++++++++++++++++++++++++++++-----
>> 1 file changed, 39 insertions(+), 7 deletions(-)
>>
>> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
>> index f38a02bd..f69f5bfc 100644
>> --- a/src/bin/proxmox-backup-proxy.rs
>> +++ b/src/bin/proxmox-backup-proxy.rs
>> @@ -23,8 +23,8 @@ use proxmox_sys::{task_log, task_warn};
>> use pbs_datastore::DataStore;
>>
>> use proxmox_rest_server::{
>> - cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, RestEnvironment,
>> - RestServer, WorkerTask,
>> + cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector,
>> + RestEnvironment, RestServer, WorkerTask,
>> };
>>
>> use proxmox_backup::rrd_cache::{
>> @@ -253,6 +253,7 @@ async fn run() -> Result<(), Error> {
>> )?;
>>
>> let rest_server = RestServer::new(config);
>> + let redirector = Redirector::new();
>> proxmox_rest_server::init_worker_tasks(
>> pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(),
>> file_opts.clone(),
>> @@ -288,23 +289,54 @@ async fn run() -> Result<(), Error> {
>> Ok(Value::Null)
>> })?;
>>
>> - let connections = proxmox_rest_server::connection::AcceptBuilder::with_acceptor(acceptor)
>> + let connections = proxmox_rest_server::connection::AcceptBuilder::new()
>> .debug(debug)
>> .rate_limiter_lookup(Arc::new(lookup_rate_limiter))
>> .tcp_keepalive_time(PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
>> +
>> let server = daemon::create_daemon(
>> ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
>> move |listener| {
>> - let connections = connections.accept(listener);
>> + let (secure_connections, insecure_connections) =
>> + connections.accept_tls_optional(listener, acceptor);
>>
>> Ok(async {
>> daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
>>
>> - hyper::Server::builder(connections)
>> + let secure_server = hyper::Server::builder(secure_connections)
>> .serve(rest_server)
>> .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
>> - .map_err(Error::from)
>> - .await
>> + .map_err(Error::from);
>> +
>> + let insecure_server = hyper::Server::builder(insecure_connections)
>> + .serve(redirector)
>> + .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
>> + .map_err(Error::from);
>> +
>> + let handles = vec![tokio::spawn(secure_server), tokio::spawn(insecure_server)];
>
> Maybe we should just detach the redirection-handler and potentially give
> it a retry logic and finally fail it with a log message.
>
Did you have anything in particular in mind for the retry logic? I agree with
detaching the redirection-handler, but I don't quite understand what needs
to be retried; if something goes wrong, the entire daemon probably would need
to be recreated, AFAIU.
> Otherwise, this shouldn't need to be a Vec, a regular array should work,
> skips the extra allocation.
>
>> +
>> + let mut results: Vec<Result<(), Error>> = vec![];
>> +
>> + for res_handle in futures::future::join_all(handles).await.into_iter() {
>> + let flattened_res = match res_handle {
>> + Ok(inner) => inner,
>> + Err(err) => Err(format_err!(err)),
>> + };
>> +
>> + results.push(flattened_res);
>> + }
>> +
>> + if results.iter().any(Result::is_err) {
>> + let cat_errors = results
>> + .into_iter()
>> + .filter_map(|res| res.err().map(|err| err.to_string()))
>> + .collect::<Vec<_>>()
>> + .join("\n");
>> +
>> + return Err(format_err!(cat_errors));
>> + }
>> +
>> + Ok(())
>> })
>> },
>> Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
>> --
>> 2.39.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH v3 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS
2023-11-15 15:22 ` Max Carrara
@ 2023-11-16 7:35 ` Wolfgang Bumiller
0 siblings, 0 replies; 9+ messages in thread
From: Wolfgang Bumiller @ 2023-11-16 7:35 UTC (permalink / raw)
To: Max Carrara; +Cc: pbs-devel
On Wed, Nov 15, 2023 at 04:22:45PM +0100, Max Carrara wrote:
> On 11/3/23 11:24, Wolfgang Bumiller wrote:
> > On Tue, Oct 31, 2023 at 07:47:05PM +0100, Max Carrara wrote:
> >> Signed-off-by: Max Carrara <m.carrara@proxmox.com>
> >> ---
> >> Changes v1 --> v2:
> >> * Incorporate changes of the previous two patches correspondingly
> >>
> >> Changes v2 --> v3:
> >> * None
> >>
> >> src/bin/proxmox-backup-proxy.rs | 46 ++++++++++++++++++++++++++++-----
> >> 1 file changed, 39 insertions(+), 7 deletions(-)
> >>
> >> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> >> index f38a02bd..f69f5bfc 100644
> >> --- a/src/bin/proxmox-backup-proxy.rs
> >> +++ b/src/bin/proxmox-backup-proxy.rs
> >> @@ -23,8 +23,8 @@ use proxmox_sys::{task_log, task_warn};
> >> use pbs_datastore::DataStore;
> >>
> >> use proxmox_rest_server::{
> >> - cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, RestEnvironment,
> >> - RestServer, WorkerTask,
> >> + cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector,
> >> + RestEnvironment, RestServer, WorkerTask,
> >> };
> >>
> >> use proxmox_backup::rrd_cache::{
> >> @@ -253,6 +253,7 @@ async fn run() -> Result<(), Error> {
> >> )?;
> >>
> >> let rest_server = RestServer::new(config);
> >> + let redirector = Redirector::new();
> >> proxmox_rest_server::init_worker_tasks(
> >> pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(),
> >> file_opts.clone(),
> >> @@ -288,23 +289,54 @@ async fn run() -> Result<(), Error> {
> >> Ok(Value::Null)
> >> })?;
> >>
> >> - let connections = proxmox_rest_server::connection::AcceptBuilder::with_acceptor(acceptor)
> >> + let connections = proxmox_rest_server::connection::AcceptBuilder::new()
> >> .debug(debug)
> >> .rate_limiter_lookup(Arc::new(lookup_rate_limiter))
> >> .tcp_keepalive_time(PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
> >> +
> >> let server = daemon::create_daemon(
> >> ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
> >> move |listener| {
> >> - let connections = connections.accept(listener);
> >> + let (secure_connections, insecure_connections) =
> >> + connections.accept_tls_optional(listener, acceptor);
> >>
> >> Ok(async {
> >> daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
> >>
> >> - hyper::Server::builder(connections)
> >> + let secure_server = hyper::Server::builder(secure_connections)
> >> .serve(rest_server)
> >> .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
> >> - .map_err(Error::from)
> >> - .await
> >> + .map_err(Error::from);
> >> +
> >> + let insecure_server = hyper::Server::builder(insecure_connections)
> >> + .serve(redirector)
> >> + .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
> >> + .map_err(Error::from);
> >> +
> >> + let handles = vec![tokio::spawn(secure_server), tokio::spawn(insecure_server)];
> >
> > Maybe we should just detach the redirection-handler and potentially give
> > it a retry logic and finally fail it with a log message.
> >
>
> Did you have anything in particular in mind for the retry logic? I agree with
> detaching the redirection-handler, but I don't quite understand what needs
> to be retried; if something goes wrong, the entire daemon probably would need
> to be recreated, AFAIU.
Since they come from the same underlying listener, you're probably
right. The parts in between are more unlikely to fail.
Considering this, it might be just fine this way as it'll probably
behave the same (I'd expect socket errors to fail both anyway), but
then this should at least use a *simple* 2-way join, eg.
`futures::future::try_join()` - to join exactly 2 futures we really
don't need to manage a Vec and iterate through 2 elements ;-)
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH v3 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS Max Carrara
@ 2023-11-16 7:35 ` Wolfgang Bumiller
0 siblings, 0 replies; 9+ messages in thread
From: Wolfgang Bumiller @ 2023-11-16 7:35 UTC (permalink / raw)
To: Max Carrara; +Cc: pbs-devel
On Tue, Oct 31, 2023 at 07:47:03PM +0100, Max Carrara wrote:
> The new public function `accept_tls_optional()` is added, which
> accepts both plain TCP streams and TCP streams running TLS. Plain TCP
> streams are sent along via a separate channel in order to clearly
> distinguish between "secure" and "insecure" connections.
>
> Furthermore, instead of `AcceptBuilder` itself holding a reference to
> an `SslAcceptor`, its public functions now take the acceptor as an
> argument. The public functions' names are changed to distinguish
> between their functionality in a more explicit manner:
>
> * `accept()` --> `accept_tls()`
> * NEW --> `accept_tls_optional()`
>
> Signed-off-by: Max Carrara <m.carrara@proxmox.com>
> ---
> Changes v1 --> v2:
> * No more `BiAcceptBuilder`, `AcceptBuilder` is refactored instead
> * `AcceptBuilder` doesn't hold a reference to `SslAcceptor` anymore
> * Avoid unnecessary `#[cfg]`s
> * Avoid unnecessarily duplicated code (already mostly done by getting
> rid of `BiAcceptBuilder`)
> * Some clippy stuff
>
> Changes v2 --> v3:
> * Incorporate previously applied clippy fixes
>
> proxmox-rest-server/src/connection.rs | 373 ++++++++++++++++++++------
> 1 file changed, 287 insertions(+), 86 deletions(-)
>
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 1bec28d..ab8c7db 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -8,15 +8,16 @@ use std::pin::Pin;
> use std::sync::{Arc, Mutex};
> use std::time::Duration;
>
> -use anyhow::Context as _;
> -use anyhow::Error;
> +use anyhow::{format_err, Context as _, Error};
> use futures::FutureExt;
> +use hyper::server::accept;
> use openssl::ec::{EcGroup, EcKey};
> use openssl::nid::Nid;
> use openssl::pkey::{PKey, Private};
> use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
> use openssl::x509::X509;
> use tokio::net::{TcpListener, TcpStream};
> +use tokio::sync::mpsc;
> use tokio_openssl::SslStream;
> use tokio_stream::wrappers::ReceiverStream;
>
> @@ -133,10 +134,14 @@ impl TlsAcceptorBuilder {
> }
> }
>
> -#[cfg(feature = "rate-limited-stream")]
> -type ClientStreamResult = Pin<Box<SslStream<RateLimitedStream<TcpStream>>>>;
> #[cfg(not(feature = "rate-limited-stream"))]
> -type ClientStreamResult = Pin<Box<SslStream<TcpStream>>>;
> +type InsecureClientStream = TcpStream;
> +#[cfg(feature = "rate-limited-stream")]
> +type InsecureClientStream = RateLimitedStream<TcpStream>;
> +
> +type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
> +
> +type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
>
> #[cfg(feature = "rate-limited-stream")]
> type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> @@ -145,7 +150,6 @@ type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit
> + 'static;
>
> pub struct AcceptBuilder {
> - acceptor: Arc<Mutex<SslAcceptor>>,
> debug: bool,
> tcp_keepalive_time: u32,
> max_pending_accepts: usize,
> @@ -154,16 +158,9 @@ pub struct AcceptBuilder {
> lookup_rate_limiter: Option<Arc<LookupRateLimiter>>,
> }
>
> -impl AcceptBuilder {
> - pub fn new() -> Result<Self, Error> {
> - Ok(Self::with_acceptor(Arc::new(Mutex::new(
> - TlsAcceptorBuilder::new().build()?,
> - ))))
> - }
> -
> - pub fn with_acceptor(acceptor: Arc<Mutex<SslAcceptor>>) -> Self {
> +impl Default for AcceptBuilder {
> + fn default() -> Self {
> Self {
> - acceptor,
> debug: false,
> tcp_keepalive_time: 120,
> max_pending_accepts: 1024,
> @@ -172,6 +169,12 @@ impl AcceptBuilder {
> lookup_rate_limiter: None,
> }
> }
> +}
> +
> +impl AcceptBuilder {
> + pub fn new() -> Self {
> + Default::default()
> + }
>
> pub fn debug(mut self, debug: bool) -> Self {
> self.debug = debug;
> @@ -193,114 +196,312 @@ impl AcceptBuilder {
> self.lookup_rate_limiter = Some(lookup_rate_limiter);
> self
> }
> +}
>
> - pub fn accept(
> +impl AcceptBuilder {
> + pub fn accept_tls(
> self,
> listener: TcpListener,
> - ) -> impl hyper::server::accept::Accept<Conn = ClientStreamResult, Error = Error> {
> - let (sender, receiver) = tokio::sync::mpsc::channel(self.max_pending_accepts);
> + acceptor: Arc<Mutex<SslAcceptor>>,
> + ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
> + let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
> +
> + tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
> +
> + accept::from_stream(ReceiverStream::new(secure_receiver))
> + }
> +
> + pub fn accept_tls_optional(
> + self,
> + listener: TcpListener,
> + acceptor: Arc<Mutex<SslAcceptor>>,
> + ) -> (
> + impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
> + impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
> + ) {
> + let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
> + let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
> +
> + tokio::spawn(self.accept_connections(
> + listener,
> + acceptor,
> + (secure_sender, insecure_sender).into(),
> + ));
> +
> + (
> + accept::from_stream(ReceiverStream::new(secure_receiver)),
> + accept::from_stream(ReceiverStream::new(insecure_receiver)),
> + )
> + }
> +}
> +
> +type ClientSender = mpsc::Sender<Result<ClientStreamResult, Error>>;
> +type InsecureClientSender = mpsc::Sender<Result<InsecureClientStreamResult, Error>>;
>
> - tokio::spawn(self.accept_connections(listener, sender));
> +enum Sender {
> + Secure(ClientSender),
> + SecureAndInsecure(ClientSender, InsecureClientSender),
> +}
>
> - //receiver
> - hyper::server::accept::from_stream(ReceiverStream::new(receiver))
> +impl From<ClientSender> for Sender {
> + fn from(sender: ClientSender) -> Self {
> + Sender::Secure(sender)
> }
> +}
> +
> +impl From<(ClientSender, InsecureClientSender)> for Sender {
> + fn from(senders: (ClientSender, InsecureClientSender)) -> Self {
> + Sender::SecureAndInsecure(senders.0, senders.1)
> + }
> +}
>
> +impl AcceptBuilder {
> async fn accept_connections(
> self,
> listener: TcpListener,
> - sender: tokio::sync::mpsc::Sender<Result<ClientStreamResult, Error>>,
> + acceptor: Arc<Mutex<SslAcceptor>>,
> + sender: Sender,
> ) {
> let accept_counter = Arc::new(());
> let mut shutdown_future = crate::shutdown_future().fuse();
>
> loop {
> - let (sock, peer) = futures::select! {
> - res = listener.accept().fuse() => match res {
> - Ok(conn) => conn,
> + let socket = futures::select! {
> + res = self.try_setup_socket(&listener).fuse() => match res {
> + Ok(socket) => socket,
> Err(err) => {
> - eprintln!("error accepting tcp connection: {err}");
> + log::error!("couldn't set up TCP socket: {err}");
> continue;
> }
> },
> - _ = shutdown_future => break,
> + _ = shutdown_future => break,
> };
> - #[cfg(not(feature = "rate-limited-stream"))]
> - {
> - let _ = &peer;
> - }
>
> - sock.set_nodelay(true).unwrap();
> - let _ = proxmox_sys::linux::socket::set_tcp_keepalive(
> - sock.as_raw_fd(),
> - self.tcp_keepalive_time,
> - );
> + let acceptor = Arc::clone(&acceptor);
> + let accept_counter = Arc::clone(&accept_counter);
>
> - #[cfg(feature = "rate-limited-stream")]
> - let sock = match self.lookup_rate_limiter.clone() {
> - Some(lookup) => {
> - RateLimitedStream::with_limiter_update_cb(sock, move || lookup(peer))
> + if Arc::strong_count(&accept_counter) > self.max_pending_accepts {
> + log::error!("connection rejected - too many open connections");
> + continue;
> + }
> +
> + match sender {
> + Sender::Secure(ref secure_sender) => {
> + let accept_future = Self::do_accept_tls(
> + socket,
> + acceptor,
> + accept_counter,
> + self.debug,
> + secure_sender.clone(),
> + );
> +
> + tokio::spawn(accept_future);
> + }
> + Sender::SecureAndInsecure(ref secure_sender, ref insecure_sender) => {
> + let accept_future = Self::do_accept_tls_optional(
> + socket,
> + acceptor,
> + accept_counter,
> + self.debug,
> + secure_sender.clone(),
> + insecure_sender.clone(),
> + );
> +
> + tokio::spawn(accept_future);
> }
> - None => RateLimitedStream::with_limiter(sock, None, None),
> };
> + }
> + }
>
> - let ssl = {
> - // limit acceptor_guard scope
> - // Acceptor can be reloaded using the command socket "reload-certificate" command
> - let acceptor_guard = self.acceptor.lock().unwrap();
> + async fn try_setup_socket(
> + &self,
> + listener: &TcpListener,
> + ) -> Result<InsecureClientStream, Error> {
> + let (socket, peer) = match listener.accept().await {
> + Ok(connection) => connection,
> + Err(error) => {
> + return Err(format_err!(error)).context("error while accepting tcp stream")
> + }
> + };
>
> - match openssl::ssl::Ssl::new(acceptor_guard.context()) {
> - Ok(ssl) => ssl,
> - Err(err) => {
> - eprintln!("failed to create Ssl object from Acceptor context - {err}");
> - continue;
> - }
> - }
> - };
> + socket
> + .set_nodelay(true)
> + .context("error while setting TCP_NODELAY on socket")?;
> +
> + proxmox_sys::linux::socket::set_tcp_keepalive(socket.as_raw_fd(), self.tcp_keepalive_time)
> + .context("error while setting SO_KEEPALIVE on socket")?;
>
> - let stream = match tokio_openssl::SslStream::new(ssl, sock) {
> - Ok(stream) => stream,
> + #[cfg(feature = "rate-limited-stream")]
> + let socket = match self.lookup_rate_limiter.clone() {
> + Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
> + None => RateLimitedStream::with_limiter(socket, None, None),
> + };
> +
> + #[cfg(not(feature = "rate-limited-stream"))]
> + let _peer = peer;
> +
> + Ok(socket)
> + }
> +
> + async fn do_accept_tls(
> + socket: InsecureClientStream,
> + acceptor: Arc<Mutex<SslAcceptor>>,
> + accept_counter: Arc<()>,
> + debug: bool,
> + secure_sender: ClientSender,
> + ) {
> + let ssl = {
> + // limit acceptor_guard scope
> + // Acceptor can be reloaded using the command socket "reload-certificate" command
> + let acceptor_guard = acceptor.lock().unwrap();
> +
> + match openssl::ssl::Ssl::new(acceptor_guard.context()) {
> + Ok(ssl) => ssl,
> Err(err) => {
> - eprintln!("failed to create SslStream using ssl and connection socket - {err}");
> - continue;
> + log::error!("failed to create Ssl object from Acceptor context - {err}");
> + return;
> }
> - };
> + }
> + };
>
> - let mut stream = Box::pin(stream);
> - let sender = sender.clone();
> + let secure_stream = match tokio_openssl::SslStream::new(ssl, socket) {
> + Ok(stream) => stream,
> + Err(err) => {
> + log::error!("failed to create SslStream using ssl and connection socket - {err}");
> + return;
> + }
> + };
>
> - if Arc::strong_count(&accept_counter) > self.max_pending_accepts {
> - eprintln!("connection rejected - too many open connections");
> - continue;
> + let mut secure_stream = Box::pin(secure_stream);
> +
> + let accept_future =
> + tokio::time::timeout(Duration::new(10, 0), secure_stream.as_mut().accept());
> +
> + let result = accept_future.await;
> +
> + match result {
> + Ok(Ok(())) => {
> + if secure_sender.send(Ok(secure_stream)).await.is_err() && debug {
> + log::error!("detected closed connection channel");
> + }
> + }
> + Ok(Err(err)) => {
> + if debug {
> + log::error!("https handshake failed - {err}");
> + }
> }
> + Err(_) => {
> + if debug {
> + log::error!("https handshake timeout");
> + }
> + }
> + }
>
> - let accept_counter = Arc::clone(&accept_counter);
> - tokio::spawn(async move {
> - let accept_future =
> - tokio::time::timeout(Duration::new(10, 0), stream.as_mut().accept());
> + drop(accept_counter); // decrease reference count
> + }
>
> - let result = accept_future.await;
> + async fn do_accept_tls_optional(
> + socket: InsecureClientStream,
> + acceptor: Arc<Mutex<SslAcceptor>>,
> + accept_counter: Arc<()>,
> + debug: bool,
> + secure_sender: ClientSender,
> + insecure_sender: InsecureClientSender,
> + ) {
> + let client_initiates_handshake = {
> + #[cfg(feature = "rate-limited-stream")]
> + let socket = socket.inner();
>
> - match result {
> - Ok(Ok(())) => {
> - if sender.send(Ok(stream)).await.is_err() && self.debug {
> - log::error!("detect closed connection channel");
> - }
> - }
> - Ok(Err(err)) => {
> - if self.debug {
> - log::error!("https handshake failed - {err}");
> - }
> - }
> - Err(_) => {
> - if self.debug {
> - log::error!("https handshake timeout");
> - }
> - }
> + #[cfg(not(feature = "rate-limited-stream"))]
> + let socket = &socket;
> +
> + match Self::wait_for_client_tls_handshake(socket).await {
> + Ok(initiates_handshake) => initiates_handshake,
> + Err(err) => {
> + log::error!("error checking for TLS handshake: {err}");
> + return;
> }
> + }
> + };
> +
> + if !client_initiates_handshake {
> + let insecure_stream = Box::pin(socket);
>
> - drop(accept_counter); // decrease reference count
> - });
> + if insecure_sender.send(Ok(insecure_stream)).await.is_err() && debug {
> + log::error!("detected closed connection channel")
> + }
> +
> + return;
> }
> +
> + Self::do_accept_tls(socket, acceptor, accept_counter, debug, secure_sender).await
> }
> +
> + async fn wait_for_client_tls_handshake(incoming_stream: &TcpStream) -> Result<bool, Error> {
> + const MS_TIMEOUT: u64 = 1000;
> + const BYTES_BUF_SIZE: usize = 128;
> +
> + let mut buf = [0; BYTES_BUF_SIZE];
> + let mut last_peek_size = 0;
> +
> + let future = async {
> + loop {
> + let peek_size = incoming_stream
> + .peek(&mut buf)
> + .await
> + .context("couldn't peek into incoming tcp stream")?;
> +
> + if contains_tls_handshake_fragment(&buf) {
> + return Ok(true);
> + }
> +
> + // No more new data came in
> + if peek_size == last_peek_size {
> + return Ok(false);
> + }
> +
> + last_peek_size = peek_size;
> +
> + // yields to event loop; this future blocks otherwise ad infinitum
> + tokio::time::sleep(Duration::from_millis(0)).await;
Just noticed this - how about tokio::task::yield_now()?
Also this makes me wish for epoll to have a flag for edge triggering to
also trigger on additional data not just when going up from zero.
^ permalink raw reply [flat|nested] 9+ messages in thread
end of thread, other threads:[~2023-11-16 7:36 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-31 18:47 [pbs-devel] [PATCH v3 proxmox, proxmox-backup 0/3] Add support for HTTP to HTTPS redirection Max Carrara
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 1/3] rest-server: Refactor `AcceptBuilder`, provide support for optional TLS Max Carrara
2023-11-16 7:35 ` Wolfgang Bumiller
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox 2/3] rest-server: Add `Redirector` Max Carrara
2023-11-03 10:17 ` Wolfgang Bumiller
2023-10-31 18:47 ` [pbs-devel] [PATCH v3 proxmox-backup 3/3] proxy: redirect HTTP requests to HTTPS Max Carrara
2023-11-03 10:24 ` Wolfgang Bumiller
2023-11-15 15:22 ` Max Carrara
2023-11-16 7:35 ` Wolfgang Bumiller
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox