public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 9/9] proxmox/tools/websocket: add WebSocket implementation
Date: Tue, 14 Jul 2020 13:09:57 +0200	[thread overview]
Message-ID: <20200714110957.31884-10-d.csapak@proxmox.com> (raw)
In-Reply-To: <20200714110957.31884-1-d.csapak@proxmox.com>

uses the existing WebSocketReader and Writer to establish a
two-way communication between an upstream and downstream connection.

The upstream connection sends and receives WebSocket frames, while
the downstream one only receives and sends raw data.

For now we do not support extensions, and only accept the protocol version 13

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox/Cargo.toml             |   3 +-
 proxmox/src/tools/websocket.rs | 167 ++++++++++++++++++++++++++++++++-
 2 files changed, 168 insertions(+), 2 deletions(-)

diff --git a/proxmox/Cargo.toml b/proxmox/Cargo.toml
index 5b7a4ff..d9fe53c 100644
--- a/proxmox/Cargo.toml
+++ b/proxmox/Cargo.toml
@@ -37,6 +37,7 @@ futures = { version = "0.3", optional = true }
 http = "0.2"
 hyper = { version = "0.13", optional = true }
 percent-encoding = "2.1"
+openssl =  { version = "0.10", optional = true }
 rustyline = "6"
 serde_derive = "1.0"
 textwrap = "0.11"
@@ -57,7 +58,7 @@ api-macro = ["proxmox-api-macro"]
 test-harness = []
 cli = [ "router", "hyper", "tokio" ]
 router = [ "hyper", "tokio" ]
-websocket = [ "tokio", "futures", "tokio/sync" ]
+websocket = [ "tokio", "futures", "tokio/sync", "openssl" ]
 
 # tools:
 #valgrind = ["proxmox-tools/valgrind"]
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 1ff0927..f8481f2 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -10,9 +10,21 @@ use std::cmp::min;
 use std::io::{self, ErrorKind};
 use std::future::Future;
 
-use tokio::io::{AsyncWrite, AsyncRead, AsyncReadExt};
+use futures::select;
 use anyhow::{bail, format_err, Error};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tokio::sync::mpsc;
+use hyper::{Body, Response, StatusCode};
+use hyper::header::{
+    HeaderMap,
+    HeaderValue,
+    UPGRADE,
+    CONNECTION,
+    SEC_WEBSOCKET_KEY,
+    SEC_WEBSOCKET_PROTOCOL,
+    SEC_WEBSOCKET_VERSION,
+    SEC_WEBSOCKET_ACCEPT,
+};
 
 use futures::future::FutureExt;
 use futures::ready;
@@ -566,3 +578,156 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
         }
     }
 }
+
+/// Global Identifier for WebSockets, see RFC6455
+pub const MAGIC_WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+
+/// Provides methods for connecting a WebSocket endpoint with another
+pub struct WebSocket {
+    text: bool,
+}
+
+impl WebSocket {
+    /// Returns a new WebSocket instance and the generates the correct
+    /// WebSocket response from request headers
+    pub fn new(headers: HeaderMap<HeaderValue>) -> Result<(Self, Response<Body>), Error> {
+        let protocols = headers
+            .get(UPGRADE)
+            .ok_or_else(|| format_err!("missing Upgrade header"))?
+            .to_str()?;
+
+        let version = headers
+            .get(SEC_WEBSOCKET_VERSION)
+            .ok_or_else(|| format_err!("missing websocket version"))?
+            .to_str()?;
+
+        let key = headers
+            .get(SEC_WEBSOCKET_KEY)
+            .ok_or_else(|| format_err!("missing websocket key"))?
+            .to_str()?;
+
+        let ws_proto = headers
+            .get(SEC_WEBSOCKET_PROTOCOL)
+            .ok_or_else(|| format_err!("missing websocket key"))?
+            .to_str()?;
+
+        let text = ws_proto == "text";
+
+        if protocols != "websocket" {
+            bail!("invalid protocol name");
+        }
+
+        if version != "13" {
+            bail!("invalid websocket version");
+        }
+
+        // we ignore extensions
+
+        let mut sha1 = openssl::sha::Sha1::new();
+        let data = format!("{}{}", key, MAGIC_WEBSOCKET_GUID);
+        sha1.update(data.as_bytes());
+        let response_key = base64::encode(sha1.finish());
+
+        let response = Response::builder()
+            .status(StatusCode::SWITCHING_PROTOCOLS)
+            .header(UPGRADE, HeaderValue::from_static("websocket"))
+            .header(CONNECTION, HeaderValue::from_static("Upgrade"))
+            .header(SEC_WEBSOCKET_ACCEPT, response_key)
+            .header(SEC_WEBSOCKET_PROTOCOL, ws_proto)
+            .body(Body::empty())?;
+
+        Ok((Self { text }, response))
+    }
+
+    async fn copy_to_websocket<R, W>(
+        mut reader: &mut R,
+        writer: &mut WebSocketWriter<W>,
+        receiver: &mut mpsc::UnboundedReceiver<(OpCode, Box<[u8]>)>) -> Result<bool, Error>
+    where
+        R: AsyncRead + Unpin + Send,
+        W: AsyncWrite + Unpin + Send,
+    {
+        let mut buf = ByteBuffer::new();
+        let mut eof = false;
+        loop {
+            if !buf.is_full() {
+                let bytes = select!{
+                    res = buf.read_from_async(&mut reader).fuse() => res?,
+                    res = receiver.recv().fuse() => {
+                        let (opcode, msg) = res.ok_or(format_err!("control channel closed"))?;
+                        match opcode {
+                            OpCode::Ping => {
+                                writer.send_control_frame(None, OpCode::Pong, &msg).await?;
+                                continue;
+                            }
+                            OpCode::Close => {
+                                writer.send_control_frame(None, OpCode::Close, &msg).await?;
+                                return Ok(true);
+                            }
+                            _ => {
+                                // ignore other frames
+                                continue;
+                            }
+                        }
+                    }
+                };
+
+                if bytes == 0 {
+                    eof = true;
+                }
+            }
+            if buf.len() > 0 {
+                let bytes = writer.write(&buf).await?;
+                if bytes == 0 {
+                    eof = true;
+                }
+                buf.consume(bytes);
+            }
+
+            if eof && buf.is_empty() {
+                return Ok(false);
+            }
+        }
+    }
+
+    /// Takes two endpoints and connects them via a websocket, where the
+    /// 'upstream' endpoint sends and receives WebSocket frames, while
+    /// 'downstream' only expects and sends raw data.
+    /// This method takes care of copying the data between endpoints, and
+    /// sending correct responses for control frames (e.g. a Pont to a Ping).
+    pub async fn serve_connection<S, L>(&self, upstream: S, downstream: L) -> Result<(), Error>
+    where
+        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+        L: AsyncRead + AsyncWrite + Unpin + Send,
+    {
+
+        let (usreader, uswriter) = tokio::io::split(upstream);
+        let (mut dsreader, mut dswriter) = tokio::io::split(downstream);
+
+        let (tx, mut rx) = mpsc::unbounded_channel();
+        let mut wsreader = WebSocketReader::new(usreader, tx);
+        let mut wswriter = WebSocketWriter::new(None, self.text, uswriter);
+
+
+        let ws_future = tokio::io::copy(&mut wsreader, &mut dswriter);
+        let term_future = Self::copy_to_websocket(&mut dsreader, &mut wswriter, &mut rx);
+
+        let res = select!{
+            res = ws_future.fuse() => match res {
+                Ok(_) => Ok(()),
+                Err(err) => Err(Error::from(err)),
+            },
+            res = term_future.fuse() => match res {
+                Ok(sent_close) if !sent_close => {
+                    // status code 1000 => 0x03E8
+                    wswriter.send_control_frame(None, OpCode::Close, &[0x03, 0xE8]).await?;
+                    Ok(())
+                }
+                Ok(_) => Ok(()),
+                Err(err) => Err(err),
+            }
+        };
+
+        res
+    }
+}
-- 
2.20.1





  parent reply	other threads:[~2020-07-14 11:10 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 1/9] proxmox/tools/byte_buffer: improve ByteBuffer interface Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 2/9] proxmox/tools/byte_buffer: impl Default Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 3/9] proxmox/tools/websocket: use ready macro for WebSocketWriter Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 4/9] proxmox/tools/websocket: correctly return eof Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 5/9] proxmox/tools/websocket: use io::Error and Result explicitely Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 6/9] proxmox/tools/websocket: improve mask_bytes and create_frame interface Dominik Csapak
2020-07-15  9:08   ` [pbs-devel] [PATCH proxmox v2 " Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 7/9] proxmox/tools/websocket: implement send_control_frame for writer Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 8/9] proxmox/tools/websocket: replace CallBack with a channel Dominik Csapak
2020-07-14 11:09 ` Dominik Csapak [this message]
2020-07-15  8:15 ` [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Fabian Grünbichler
2020-07-15  8:31   ` Dominik Csapak
2020-07-15 13:00 ` [pbs-devel] applied: " Wolfgang Bumiller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200714110957.31884-10-d.csapak@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal