From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 6507C62E34 for ; Tue, 14 Jul 2020 13:10:33 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 3581D261BD for ; Tue, 14 Jul 2020 13:10:03 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 1BA2D26157 for ; Tue, 14 Jul 2020 13:09:59 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id DBE6644141 for ; Tue, 14 Jul 2020 13:09:58 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Tue, 14 Jul 2020 13:09:57 +0200 Message-Id: <20200714110957.31884-10-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200714110957.31884-1-d.csapak@proxmox.com> References: <20200714110957.31884-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_LAZY_DOMAIN_SECURITY 1 Sending domain does not have any anti-forgery methods NO_DNS_FOR_FROM 0.379 Envelope sender has no MX or A DNS records RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_NONE 0.001 SPF: sender does not publish an SPF Record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [websocket.rs] Subject: [pbs-devel] [PATCH proxmox 9/9] proxmox/tools/websocket: add WebSocket implementation X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 14 Jul 2020 11:10:33 -0000 uses the existing WebSocketReader and Writer to establish a two-way communication between an upstream and downstream connection. The upstream connection sends and receives WebSocket frames, while the downstream one only receives and sends raw data. For now we do not support extensions, and only accept the protocol version 13 Signed-off-by: Dominik Csapak --- proxmox/Cargo.toml | 3 +- proxmox/src/tools/websocket.rs | 167 ++++++++++++++++++++++++++++++++- 2 files changed, 168 insertions(+), 2 deletions(-) diff --git a/proxmox/Cargo.toml b/proxmox/Cargo.toml index 5b7a4ff..d9fe53c 100644 --- a/proxmox/Cargo.toml +++ b/proxmox/Cargo.toml @@ -37,6 +37,7 @@ futures = { version = "0.3", optional = true } http = "0.2" hyper = { version = "0.13", optional = true } percent-encoding = "2.1" +openssl = { version = "0.10", optional = true } rustyline = "6" serde_derive = "1.0" textwrap = "0.11" @@ -57,7 +58,7 @@ api-macro = ["proxmox-api-macro"] test-harness = [] cli = [ "router", "hyper", "tokio" ] router = [ "hyper", "tokio" ] -websocket = [ "tokio", "futures", "tokio/sync" ] +websocket = [ "tokio", "futures", "tokio/sync", "openssl" ] # tools: #valgrind = ["proxmox-tools/valgrind"] diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs index 1ff0927..f8481f2 100644 --- a/proxmox/src/tools/websocket.rs +++ b/proxmox/src/tools/websocket.rs @@ -10,9 +10,21 @@ use std::cmp::min; use std::io::{self, ErrorKind}; use std::future::Future; -use tokio::io::{AsyncWrite, AsyncRead, AsyncReadExt}; +use futures::select; use anyhow::{bail, format_err, Error}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc; +use hyper::{Body, Response, StatusCode}; +use hyper::header::{ + HeaderMap, + HeaderValue, + UPGRADE, + CONNECTION, + SEC_WEBSOCKET_KEY, + SEC_WEBSOCKET_PROTOCOL, + SEC_WEBSOCKET_VERSION, + SEC_WEBSOCKET_ACCEPT, +}; use futures::future::FutureExt; use futures::ready; @@ -566,3 +578,156 @@ impl AsyncRead for WebSocketReader } } } + +/// Global Identifier for WebSockets, see RFC6455 +pub const MAGIC_WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + +/// Provides methods for connecting a WebSocket endpoint with another +pub struct WebSocket { + text: bool, +} + +impl WebSocket { + /// Returns a new WebSocket instance and the generates the correct + /// WebSocket response from request headers + pub fn new(headers: HeaderMap) -> Result<(Self, Response), Error> { + let protocols = headers + .get(UPGRADE) + .ok_or_else(|| format_err!("missing Upgrade header"))? + .to_str()?; + + let version = headers + .get(SEC_WEBSOCKET_VERSION) + .ok_or_else(|| format_err!("missing websocket version"))? + .to_str()?; + + let key = headers + .get(SEC_WEBSOCKET_KEY) + .ok_or_else(|| format_err!("missing websocket key"))? + .to_str()?; + + let ws_proto = headers + .get(SEC_WEBSOCKET_PROTOCOL) + .ok_or_else(|| format_err!("missing websocket key"))? + .to_str()?; + + let text = ws_proto == "text"; + + if protocols != "websocket" { + bail!("invalid protocol name"); + } + + if version != "13" { + bail!("invalid websocket version"); + } + + // we ignore extensions + + let mut sha1 = openssl::sha::Sha1::new(); + let data = format!("{}{}", key, MAGIC_WEBSOCKET_GUID); + sha1.update(data.as_bytes()); + let response_key = base64::encode(sha1.finish()); + + let response = Response::builder() + .status(StatusCode::SWITCHING_PROTOCOLS) + .header(UPGRADE, HeaderValue::from_static("websocket")) + .header(CONNECTION, HeaderValue::from_static("Upgrade")) + .header(SEC_WEBSOCKET_ACCEPT, response_key) + .header(SEC_WEBSOCKET_PROTOCOL, ws_proto) + .body(Body::empty())?; + + Ok((Self { text }, response)) + } + + async fn copy_to_websocket( + mut reader: &mut R, + writer: &mut WebSocketWriter, + receiver: &mut mpsc::UnboundedReceiver<(OpCode, Box<[u8]>)>) -> Result + where + R: AsyncRead + Unpin + Send, + W: AsyncWrite + Unpin + Send, + { + let mut buf = ByteBuffer::new(); + let mut eof = false; + loop { + if !buf.is_full() { + let bytes = select!{ + res = buf.read_from_async(&mut reader).fuse() => res?, + res = receiver.recv().fuse() => { + let (opcode, msg) = res.ok_or(format_err!("control channel closed"))?; + match opcode { + OpCode::Ping => { + writer.send_control_frame(None, OpCode::Pong, &msg).await?; + continue; + } + OpCode::Close => { + writer.send_control_frame(None, OpCode::Close, &msg).await?; + return Ok(true); + } + _ => { + // ignore other frames + continue; + } + } + } + }; + + if bytes == 0 { + eof = true; + } + } + if buf.len() > 0 { + let bytes = writer.write(&buf).await?; + if bytes == 0 { + eof = true; + } + buf.consume(bytes); + } + + if eof && buf.is_empty() { + return Ok(false); + } + } + } + + /// Takes two endpoints and connects them via a websocket, where the + /// 'upstream' endpoint sends and receives WebSocket frames, while + /// 'downstream' only expects and sends raw data. + /// This method takes care of copying the data between endpoints, and + /// sending correct responses for control frames (e.g. a Pont to a Ping). + pub async fn serve_connection(&self, upstream: S, downstream: L) -> Result<(), Error> + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + L: AsyncRead + AsyncWrite + Unpin + Send, + { + + let (usreader, uswriter) = tokio::io::split(upstream); + let (mut dsreader, mut dswriter) = tokio::io::split(downstream); + + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut wsreader = WebSocketReader::new(usreader, tx); + let mut wswriter = WebSocketWriter::new(None, self.text, uswriter); + + + let ws_future = tokio::io::copy(&mut wsreader, &mut dswriter); + let term_future = Self::copy_to_websocket(&mut dsreader, &mut wswriter, &mut rx); + + let res = select!{ + res = ws_future.fuse() => match res { + Ok(_) => Ok(()), + Err(err) => Err(Error::from(err)), + }, + res = term_future.fuse() => match res { + Ok(sent_close) if !sent_close => { + // status code 1000 => 0x03E8 + wswriter.send_control_frame(None, OpCode::Close, &[0x03, 0xE8]).await?; + Ok(()) + } + Ok(_) => Ok(()), + Err(err) => Err(err), + } + }; + + res + } +} -- 2.20.1