public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it
@ 2020-07-17  6:34 Dominik Csapak
  2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 2/4] proxmox/tools/websocket: improve error handling Dominik Csapak
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Dominik Csapak @ 2020-07-17  6:34 UTC (permalink / raw)
  To: pbs-devel

this patch introduces a custom WebSocketError, so that we can
detect errors in the websocket protocol and react to it in the
right way

the channel now sends a Result<(OpCode, Box<[u8]>), WebSocketError>
so that we can either react to a control frame, or react to an
errornous data stream (which means sending a close frame
with the appropriate error code)

while at it, change FrameHeader::try_from_bytes to return
Result<Option<FrameHeader>, WebSocketError> instead of a nested
Result with the *guessed* remaining size. This was neither used by
us, nor was it really meaningful, since this can change during the
decode every time new data is read (extensions, mask, payload length, etc.)
so simply returning an Option is enough information for us

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox/src/tools/websocket.rs | 191 ++++++++++++++++++++++++---------
 1 file changed, 142 insertions(+), 49 deletions(-)

diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index fc9a0c5..c1b0066 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -7,7 +7,7 @@
 use std::pin::Pin;
 use std::task::{Context, Poll};
 use std::cmp::min;
-use std::io::{self, ErrorKind};
+use std::io;
 use std::future::Future;
 
 use futures::select;
@@ -30,8 +30,75 @@ use futures::future::FutureExt;
 use futures::ready;
 
 use crate::io_format_err;
+use crate::sys::error::io_err_other;
 use crate::tools::byte_buffer::ByteBuffer;
 
+// see RFC6455 section 7.4.1
+const CLOSE_NORMAL: &[u8] =         &[0x03, 0xE8]; // 1000
+const CLOSE_PROTOCOL_ERROR: &[u8] = &[0x03, 0xEA]; // 1002
+const CLOSE_INVALID_DATA: &[u8] =   &[0x03, 0xEB]; // 1003
+//const CLOSE_UTF8_ERROR: &[u8] =     &[0x03, 0xEF]; // 1007
+const CLOSE_POLICY_ERROR: &[u8] =   &[0x03, 0xF0]; // 1008
+//const CLOSE_TOOBIG_ERROR: &[u8] =   &[0x03, 0xF1]; // 1009
+const CLOSE_UNEXPECTED: &[u8] =     &[0x03, 0xF3]; // 1011
+
+#[derive(Debug, Clone, Copy)]
+#[repr(u16)]
+pub enum WebSocketErrorKind {
+    ProtocolError = 1002,
+    InvalidData = 1003,
+    Other = 1008,
+    Unexpected = 1011,
+}
+
+impl std::fmt::Display for WebSocketErrorKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
+        write!(f, "{}", *self as u16)
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct WebSocketError{
+    kind: WebSocketErrorKind,
+    message: String,
+}
+
+impl WebSocketError {
+    pub fn new(kind: WebSocketErrorKind, message: &str) -> Self {
+        Self{
+            kind,
+            message: message.to_string()
+        }
+    }
+
+    pub fn generate_frame_payload(&self) -> Vec<u8> {
+        let msglen = self.message.len().min(125);
+        let code = match self.kind {
+            WebSocketErrorKind::ProtocolError => CLOSE_PROTOCOL_ERROR,
+            WebSocketErrorKind::InvalidData => CLOSE_INVALID_DATA,
+            WebSocketErrorKind::Unexpected => CLOSE_UNEXPECTED,
+            WebSocketErrorKind::Other => CLOSE_POLICY_ERROR,
+        };
+        let mut data = Vec::with_capacity(msglen + 2);
+        data.push(code[0]);
+        data.push(code[1]);
+        data.copy_from_slice(&self.message.as_bytes()[..msglen]);
+        data
+    }
+}
+
+impl std::fmt::Display for WebSocketError {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
+        write!(f, "{} (Code: {})", self.message, self.kind)
+    }
+}
+
+impl From<WebSocketError> for Error {
+    fn from(error: WebSocketError) -> Self {
+        Error::new(io_format_err!("{}", error))
+    }
+}
+
 #[repr(u8)]
 #[derive(Debug, PartialEq, PartialOrd, Copy, Clone)]
 /// Represents an OpCode of a websocket frame
@@ -293,25 +360,23 @@ impl FrameHeader {
     /// Tries to parse a FrameHeader from bytes.
     ///
     /// When there are not enough bytes to completely parse the header,
-    /// returns Ok(Err(size)) where size determines how many bytes
-    /// are missing to parse further (this amount can change when more
-    /// information is available)
+    /// returns Ok(None)
     ///
     /// Example:
     /// ```
     /// # use proxmox::tools::websocket::*;
     /// # use std::io;
-    /// # fn main() -> io::Result<()> {
+    /// # fn main() -> Result<(), WebSocketError> {
     /// let frame = create_frame(None, &[0,1,2,3], OpCode::Ping)?;
     /// let header = FrameHeader::try_from_bytes(&frame[..1])?;
     /// match header {
-    ///     Ok(_) => unreachable!(),
-    ///     Err(x) => assert_eq!(x, 1),
+    ///     Some(_) => unreachable!(),
+    ///     None => {},
     /// }
     /// let header = FrameHeader::try_from_bytes(&frame[..2])?;
     /// match header {
-    ///     Err(x) => unreachable!(),
-    ///     Ok(header) => assert_eq!(header, FrameHeader{
+    ///     None => unreachable!(),
+    ///     Some(header) => assert_eq!(header, FrameHeader{
     ///         fin: true,
     ///         mask: None,
     ///         frametype: OpCode::Ping,
@@ -322,19 +387,19 @@ impl FrameHeader {
     /// # Ok(())
     /// # }
     /// ```
-    pub fn try_from_bytes(data: &[u8]) -> io::Result<Result<FrameHeader, usize>> {
+    pub fn try_from_bytes(data: &[u8]) -> Result<Option<FrameHeader>, WebSocketError> {
         let len = data.len();
         if len < 2 {
-            return Ok(Err(2 - len));
+            return Ok(None);
         }
 
         let data = data;
 
         // we do not support extensions
         if data[0] & 0b01110000 > 0 {
-            return Err(io::Error::new(
-                ErrorKind::InvalidData,
-                "Extensions not supported",
+            return Err(WebSocketError::new(
+                    WebSocketErrorKind::ProtocolError,
+                    "Extensions not supported",
             ));
         }
 
@@ -347,14 +412,17 @@ impl FrameHeader {
             9 => OpCode::Ping,
             10 => OpCode::Pong,
             other => {
-                return Err(io::Error::new(ErrorKind::InvalidData, format!("Unknown OpCode {}", other)));
+                return Err(WebSocketError::new(
+                        WebSocketErrorKind::ProtocolError,
+                        &format!("Unknown OpCode {}", other),
+                ));
             }
         };
 
         if !fin && frametype.is_control() {
-            return Err(io::Error::new(
-                ErrorKind::InvalidData,
-                "Control frames cannot be fragmented",
+            return Err(WebSocketError::new(
+                    WebSocketErrorKind::ProtocolError,
+                    "Control frames cannot be fragmented",
             ));
         }
 
@@ -368,14 +436,14 @@ impl FrameHeader {
         let mut payload_len: usize = (data[1] & 0b01111111).into();
         if payload_len == 126 {
             if len < 4 {
-                return Ok(Err(4 - len));
+                return Ok(None);
             }
             payload_len = u16::from_be_bytes([data[2], data[3]]) as usize;
             mask_offset += 2;
             payload_offset += 2;
         } else if payload_len == 127 {
             if len < 10 {
-                return Ok(Err(10 - len));
+                return Ok(None);
             }
             payload_len = u64::from_be_bytes([
                 data[2], data[3], data[4], data[5], data[6], data[7], data[8], data[9],
@@ -385,9 +453,9 @@ impl FrameHeader {
         }
 
         if payload_len > 125 && frametype.is_control() {
-            return Err(io::Error::new(
-                ErrorKind::InvalidData,
-                "Control frames cannot carry more than 125 bytes of data",
+            return Err(WebSocketError::new(
+                    WebSocketErrorKind::ProtocolError,
+                    "Control frames cannot carry more than 125 bytes of data",
             ));
         }
 
@@ -403,7 +471,7 @@ impl FrameHeader {
             false => None,
         };
 
-        Ok(Ok(FrameHeader {
+        Ok(Some(FrameHeader {
             fin,
             mask,
             frametype,
@@ -413,6 +481,8 @@ impl FrameHeader {
     }
 }
 
+type WebSocketReadResult = Result<(OpCode, Box<[u8]>), WebSocketError>;
+
 /// Wraps a reader that implements AsyncRead and implements it itself.
 ///
 /// On read, reads the underlying reader and tries to decode the frames and
@@ -422,7 +492,7 @@ impl FrameHeader {
 /// Has an internal Buffer for storing incomplete headers.
 pub struct WebSocketReader<R: AsyncRead> {
     reader: Option<R>,
-    sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>,
+    sender: mpsc::UnboundedSender<WebSocketReadResult>,
     read_buffer: Option<ByteBuffer>,
     header: Option<FrameHeader>,
     state: ReaderState<R>,
@@ -431,11 +501,11 @@ pub struct WebSocketReader<R: AsyncRead> {
 impl<R: AsyncReadExt> WebSocketReader<R> {
     /// Creates a new WebSocketReader with the given CallBack for control frames
     /// and a default buffer size of 4096.
-    pub fn new(reader: R, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
+    pub fn new(reader: R, sender: mpsc::UnboundedSender<WebSocketReadResult>) -> WebSocketReader<R> {
         Self::with_capacity(reader, 4096, sender)
     }
 
-    pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
+    pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<WebSocketReadResult>) -> WebSocketReader<R> {
         WebSocketReader {
             reader: Some(reader),
             sender,
@@ -512,13 +582,19 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
                     let mut header = match this.header.take() {
                         Some(header) => header,
                         None => {
-                            let header = match FrameHeader::try_from_bytes(&read_buffer[..])? {
-                                Ok(header) => header,
-                                Err(_) => {
+                            let header = match FrameHeader::try_from_bytes(&read_buffer[..]) {
+                                Ok(Some(header)) => header,
+                                Ok(None) => {
                                     this.state = ReaderState::NoData;
                                     this.read_buffer = Some(read_buffer);
                                     continue;
-                                }
+                                },
+                                Err(err) => {
+                                    if let Err(err) = this.sender.send(Err(err.clone())) {
+                                        return Poll::Ready(Err(io_err_other(err)));
+                                    }
+                                    return Poll::Ready(Err(io_err_other(err)));
+                                },
                             };
 
                             read_buffer.consume(header.header_len as usize);
@@ -531,7 +607,7 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
 
                             let mut data = read_buffer.remove_data(header.payload_len);
                             mask_bytes(header.mask, &mut data);
-                            if let Err(err) = this.sender.send((header.frametype, data)) {
+                            if let Err(err) = this.sender.send(Ok((header.frametype, data))) {
                                 eprintln!("error sending control frame: {}", err);
                             }
 
@@ -639,10 +715,37 @@ impl WebSocket {
         Ok((Self { text }, response))
     }
 
+    async fn handle_channel_message<W>(
+        result: WebSocketReadResult,
+        writer: &mut WebSocketWriter<W>
+    ) -> Result<OpCode, Error>
+    where
+        W: AsyncWrite + Unpin + Send,
+    {
+        match result {
+            Ok((OpCode::Ping, msg)) => {
+                writer.send_control_frame(None, OpCode::Pong, &msg).await?;
+                Ok(OpCode::Pong)
+            }
+            Ok((OpCode::Close, msg)) => {
+                writer.send_control_frame(None, OpCode::Close, &msg).await?;
+                Ok(OpCode::Close)
+            }
+            Ok((opcode, _)) => {
+                // ignore other frames
+                Ok(opcode)
+            },
+            Err(err) => {
+                writer.send_control_frame(None, OpCode::Close, &err.generate_frame_payload()).await?;
+                Err(Error::from(err))
+            }
+        }
+    }
+
     async fn copy_to_websocket<R, W>(
         mut reader: &mut R,
-        writer: &mut WebSocketWriter<W>,
-        receiver: &mut mpsc::UnboundedReceiver<(OpCode, Box<[u8]>)>) -> Result<bool, Error>
+        mut writer: &mut WebSocketWriter<W>,
+        receiver: &mut mpsc::UnboundedReceiver<WebSocketReadResult>) -> Result<bool, Error>
     where
         R: AsyncRead + Unpin + Send,
         W: AsyncWrite + Unpin + Send,
@@ -654,20 +757,10 @@ impl WebSocket {
                 let bytes = select!{
                     res = buf.read_from_async(&mut reader).fuse() => res?,
                     res = receiver.recv().fuse() => {
-                        let (opcode, msg) = res.ok_or(format_err!("control channel closed"))?;
-                        match opcode {
-                            OpCode::Ping => {
-                                writer.send_control_frame(None, OpCode::Pong, &msg).await?;
-                                continue;
-                            }
-                            OpCode::Close => {
-                                writer.send_control_frame(None, OpCode::Close, &msg).await?;
-                                return Ok(true);
-                            }
-                            _ => {
-                                // ignore other frames
-                                continue;
-                            }
+                        let res = res.ok_or_else(|| format_err!("control channel closed"))?;
+                        match Self::handle_channel_message(res, &mut writer).await? {
+                            OpCode::Close => return Ok(true),
+                            _ => { continue; },
                         }
                     }
                 };
@@ -720,7 +813,7 @@ impl WebSocket {
             res = term_future.fuse() => match res {
                 Ok(sent_close) if !sent_close => {
                     // status code 1000 => 0x03E8
-                    wswriter.send_control_frame(None, OpCode::Close, &[0x03, 0xE8]).await?;
+                    wswriter.send_control_frame(None, OpCode::Close, CLOSE_NORMAL).await?;
                     Ok(())
                 }
                 Ok(_) => Ok(()),
-- 
2.20.1





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH proxmox 2/4] proxmox/tools/websocket: improve error handling
  2020-07-17  6:34 [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it Dominik Csapak
@ 2020-07-17  6:34 ` Dominik Csapak
  2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 3/4] proxmox/tools/websocket: fix some clippy warnings Dominik Csapak
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2020-07-17  6:34 UTC (permalink / raw)
  To: pbs-devel

use io_err_other instead of io_format_err and change the Error type
of create_frame from io::Error to WebSocketError

it is not good to redefine the ErrorKinds of io::Error, since
the caller probably is not aware of that

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox/src/tools/websocket.rs | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index c1b0066..b548b47 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -166,7 +166,7 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Box<[u8]>) {
 /// ```
 /// # use proxmox::tools::websocket::*;
 /// # use std::io;
-/// # fn main() -> io::Result<()> {
+/// # fn main() -> Result<(), WebSocketError> {
 /// let data = vec![0,1,2,3,4];
 /// let frame = create_frame(None, &data, OpCode::Text)?;
 /// assert_eq!(frame, vec![0b10000001, 5, 0, 1, 2, 3, 4]);
@@ -179,7 +179,7 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Box<[u8]>) {
 /// ```
 /// # use proxmox::tools::websocket::*;
 /// # use std::io;
-/// # fn main() -> io::Result<()> {
+/// # fn main() -> Result<(), WebSocketError> {
 /// let data = vec![0,1,2,3,4];
 /// let frame = create_frame(Some([0u8, 1u8, 2u8, 3u8]), &data, OpCode::Text)?;
 /// assert_eq!(frame, vec![0b10000001, 0b10000101, 0, 1, 2, 3, 0, 0, 0, 0, 4]);
@@ -192,7 +192,7 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Box<[u8]>) {
 /// ```
 /// # use proxmox::tools::websocket::*;
 /// # use std::io;
-/// # fn main() -> io::Result<()> {
+/// # fn main() -> Result<(), WebSocketError> {
 /// let data = vec![0,1,2,3,4];
 /// let frame = create_frame(None, &data, OpCode::Ping)?;
 /// assert_eq!(frame, vec![0b10001001, 0b00000101, 0, 1, 2, 3, 4]);
@@ -204,13 +204,13 @@ pub fn create_frame(
     mask: Option<[u8; 4]>,
     data: &[u8],
     frametype: OpCode,
-) -> io::Result<Vec<u8>> {
+) -> Result<Vec<u8>, WebSocketError> {
     let first_byte = 0b10000000 | (frametype as u8);
     let len = data.len();
     if (frametype as u8) & 0b00001000 > 0 && len > 125 {
-        return Err(io::Error::new(
-            ErrorKind::InvalidData,
-            "Control frames cannot have data longer than 125 bytes",
+        return Err(WebSocketError::new(
+                WebSocketErrorKind::Unexpected,
+                "Control frames cannot have data longer than 125 bytes",
         ));
     }
 
@@ -276,7 +276,7 @@ impl<W: AsyncWrite + Unpin> WebSocketWriter<W> {
     }
 
     pub async fn send_control_frame(&mut self, mask: Option<[u8; 4]>, opcode: OpCode, data: &[u8]) -> Result<(), Error> {
-        let frame = create_frame(mask, data, opcode)?;
+        let frame = create_frame(mask, data, opcode).map_err(Error::from)?;
         self.writer.write_all(&frame).await.map_err(Error::from)
     }
 }
@@ -299,7 +299,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for WebSocketWriter<W> {
             let frame = match create_frame(this.mask, buf, frametype) {
                 Ok(f) => f,
                 Err(e) => {
-                    return Poll::Ready(Err(e));
+                    return Poll::Ready(Err(io_err_other(e)));
                 }
             };
             this.frame = Some((frame, 0, buf.len()));
@@ -544,12 +544,12 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
                 ReaderState::NoData => {
                     let mut reader = match this.reader.take() {
                         Some(reader) => reader,
-                        None => return Poll::Ready(Err(io_format_err!("no reader"))),
+                        None => return Poll::Ready(Err(io_err_other("no reader"))),
                     };
 
                     let mut buffer = match this.read_buffer.take() {
                         Some(buffer) => buffer,
-                        None => return Poll::Ready(Err(io_format_err!("no buffer"))),
+                        None => return Poll::Ready(Err(io_err_other("no buffer"))),
                     };
 
                     let future = async move {
@@ -576,7 +576,7 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
                 ReaderState::HaveData => {
                     let mut read_buffer = match this.read_buffer.take() {
                         Some(read_buffer) => read_buffer,
-                        None => return Poll::Ready(Err(io_format_err!("no buffer"))),
+                        None => return Poll::Ready(Err(io_err_other("no buffer"))),
                     };
 
                     let mut header = match this.header.take() {
-- 
2.20.1





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH proxmox 3/4] proxmox/tools/websocket: fix some clippy warnings
  2020-07-17  6:34 [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it Dominik Csapak
  2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 2/4] proxmox/tools/websocket: improve error handling Dominik Csapak
@ 2020-07-17  6:34 ` Dominik Csapak
  2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 4/4] proxmox/tools/byte_buffer: improve read_from example Dominik Csapak
  2020-07-17 10:12 ` [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it Wolfgang Bumiller
  3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2020-07-17  6:34 UTC (permalink / raw)
  To: pbs-devel

* reformat docs
* use &mut [u8] instead of Box<[u8]> (conversion can be done automatically)
* use:
    let foo = if condition { A } else { B };
  instead of matching on a boolean condition
* rename WaitingForData to WaitingForFuture so that not all
  variants of the enum end with Data

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox/src/tools/websocket.rs | 35 +++++++++++++++-------------------
 1 file changed, 15 insertions(+), 20 deletions(-)

diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index b548b47..80312f3 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -1,8 +1,7 @@
 //! Websocket helpers
 //!
-//! Provides methods to read and write from websockets
-//! The reader and writer take a reader/writer with AsyncRead/AsyncWrite
-//! respectively and provides the same
+//! Provides methods to read and write from websockets The reader and writer take a reader/writer
+//! with AsyncRead/AsyncWrite respectively and provides the same
 
 use std::pin::Pin;
 use std::task::{Context, Poll};
@@ -124,7 +123,7 @@ impl OpCode {
     }
 }
 
-fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Box<[u8]>) {
+fn mask_bytes(mask: Option<[u8; 4]>, data: &mut [u8]) {
     let mask = match mask {
         Some([0,0,0,0]) | None => return,
         Some(mask) => mask,
@@ -289,10 +288,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for WebSocketWriter<W> {
     ) -> Poll<io::Result<usize>> {
         let this = Pin::get_mut(self);
 
-        let frametype = match this.text {
-            true => OpCode::Text,
-            false => OpCode::Binary,
-        };
+        let frametype = if this.text { OpCode::Text } else { OpCode::Binary };
 
         if this.frame.is_none() {
             // create frame buf
@@ -459,16 +455,15 @@ impl FrameHeader {
             ));
         }
 
-        let mask = match mask_bit {
-            true => {
-                if len < mask_offset + 4 {
-                    return Ok(Err(mask_offset + 4 - len));
-                }
-                let mut mask = [0u8; 4];
-                mask.copy_from_slice(&data[mask_offset as usize..payload_offset as usize]);
-                Some(mask)
+        let mask = if mask_bit {
+            if len < mask_offset + 4 {
+                return Ok(None);
             }
-            false => None,
+            let mut mask = [0u8; 4];
+            mask.copy_from_slice(&data[mask_offset as usize..payload_offset as usize]);
+            Some(mask)
+        } else {
+            None
         };
 
         Ok(Some(FrameHeader {
@@ -524,7 +519,7 @@ struct ReadResult<R> {
 
 enum ReaderState<R> {
     NoData,
-    WaitingForData(Pin<Box<dyn Future<Output = io::Result<ReadResult<R>>> + Send + 'static>>),
+    WaitingForFuture(Pin<Box<dyn Future<Output = io::Result<ReadResult<R>>> + Send + 'static>>),
     HaveData,
 }
 
@@ -558,9 +553,9 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
                             .map(move |len| ReadResult { len, reader, buffer })
                     };
 
-                    this.state = ReaderState::WaitingForData(future.boxed());
+                    this.state = ReaderState::WaitingForFuture(future.boxed());
                 },
-                ReaderState::WaitingForData(ref mut future) => {
+                ReaderState::WaitingForFuture(ref mut future) => {
                     match ready!(future.as_mut().poll(cx)) {
                         Ok(ReadResult { len, reader, buffer }) => {
                             this.reader = Some(reader);
-- 
2.20.1





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH proxmox 4/4] proxmox/tools/byte_buffer: improve read_from example
  2020-07-17  6:34 [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it Dominik Csapak
  2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 2/4] proxmox/tools/websocket: improve error handling Dominik Csapak
  2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 3/4] proxmox/tools/websocket: fix some clippy warnings Dominik Csapak
@ 2020-07-17  6:34 ` Dominik Csapak
  2020-07-17 10:12 ` [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it Wolfgang Bumiller
  3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2020-07-17  6:34 UTC (permalink / raw)
  To: pbs-devel

'norun' was not a valid attribute, so cargo doc ignored it completely

instead, write a proper example that can be compiled

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox/src/tools/byte_buffer.rs | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/proxmox/src/tools/byte_buffer.rs b/proxmox/src/tools/byte_buffer.rs
index f01a6dd..24aec11 100644
--- a/proxmox/src/tools/byte_buffer.rs
+++ b/proxmox/src/tools/byte_buffer.rs
@@ -154,14 +154,15 @@ impl ByteBuffer {
     /// free space in the buffer) and updates its size accordingly.
     ///
     /// Example:
-    /// ```norun
-    /// // create some reader
-    /// let reader = ...;
-    ///
-    /// let mut buf = ByteBuffer::new();
-    /// let res = buf.read_from(reader);
-    /// // do something with the buffer
-    /// ...
+    /// ```
+    /// # use std::io;
+    /// # use proxmox::tools::byte_buffer::ByteBuffer;
+    /// fn code<R: io::Read>(mut reader: R) -> io::Result<()> {
+    ///     let mut buf = ByteBuffer::new();
+    ///     let res = buf.read_from(&mut reader)?;
+    ///     // do something with the buffer
+    ///     Ok(())
+    /// }
     /// ```
     pub fn read_from<T: Read + ?Sized>(&mut self, input: &mut T) -> Result<usize> {
         let amount = input.read(self.get_free_mut_slice())?;
-- 
2.20.1





^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it
  2020-07-17  6:34 [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it Dominik Csapak
                   ` (2 preceding siblings ...)
  2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 4/4] proxmox/tools/byte_buffer: improve read_from example Dominik Csapak
@ 2020-07-17 10:12 ` Wolfgang Bumiller
  3 siblings, 0 replies; 5+ messages in thread
From: Wolfgang Bumiller @ 2020-07-17 10:12 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

On Fri, Jul 17, 2020 at 08:34:48AM +0200, Dominik Csapak wrote:
> this patch introduces a custom WebSocketError, so that we can
> detect errors in the websocket protocol and react to it in the
> right way
> 
> the channel now sends a Result<(OpCode, Box<[u8]>), WebSocketError>
> so that we can either react to a control frame, or react to an
> errornous data stream (which means sending a close frame
> with the appropriate error code)
> 
> while at it, change FrameHeader::try_from_bytes to return
> Result<Option<FrameHeader>, WebSocketError> instead of a nested
> Result with the *guessed* remaining size. This was neither used by
> us, nor was it really meaningful, since this can change during the
> decode every time new data is read (extensions, mask, payload length, etc.)
> so simply returning an Option is enough information for us
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
>  proxmox/src/tools/websocket.rs | 191 ++++++++++++++++++++++++---------
>  1 file changed, 142 insertions(+), 49 deletions(-)
> 
> diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
> index fc9a0c5..c1b0066 100644
> --- a/proxmox/src/tools/websocket.rs
> +++ b/proxmox/src/tools/websocket.rs
> @@ -7,7 +7,7 @@
>  use std::pin::Pin;
>  use std::task::{Context, Poll};
>  use std::cmp::min;
> -use std::io::{self, ErrorKind};
> +use std::io;
>  use std::future::Future;
>  
>  use futures::select;
> @@ -30,8 +30,75 @@ use futures::future::FutureExt;
>  use futures::ready;
>  
>  use crate::io_format_err;
> +use crate::sys::error::io_err_other;
>  use crate::tools::byte_buffer::ByteBuffer;
>  
> +// see RFC6455 section 7.4.1
> +const CLOSE_NORMAL: &[u8] =         &[0x03, 0xE8]; // 1000
> +const CLOSE_PROTOCOL_ERROR: &[u8] = &[0x03, 0xEA]; // 1002
> +const CLOSE_INVALID_DATA: &[u8] =   &[0x03, 0xEB]; // 1003
> +//const CLOSE_UTF8_ERROR: &[u8] =     &[0x03, 0xEF]; // 1007
> +const CLOSE_POLICY_ERROR: &[u8] =   &[0x03, 0xF0]; // 1008
> +//const CLOSE_TOOBIG_ERROR: &[u8] =   &[0x03, 0xF1]; // 1009
> +const CLOSE_UNEXPECTED: &[u8] =     &[0x03, 0xF3]; // 1011

I'd just drop the above constants so you don't need to have the values
twice in different representations, you can cast the enum below to u16
and use u16's `.to_be_bytes()` method
...

> +
> +#[derive(Debug, Clone, Copy)]
> +#[repr(u16)]
> +pub enum WebSocketErrorKind {
> +    ProtocolError = 1002,
> +    InvalidData = 1003,
> +    Other = 1008,
> +    Unexpected = 1011,
> +}

...
You could give this a `.to_be_bytes()` method directly which just does
    (self as u16).to_be_bytes()
...

> +
> +impl std::fmt::Display for WebSocketErrorKind {
> +    fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
> +        write!(f, "{}", *self as u16)
> +    }
> +}
> +
> +#[derive(Debug, Clone)]
> +pub struct WebSocketError{
> +    kind: WebSocketErrorKind,
> +    message: String,
> +}
> +
> +impl WebSocketError {
> +    pub fn new(kind: WebSocketErrorKind, message: &str) -> Self {
> +        Self{
> +            kind,
> +            message: message.to_string()
> +        }
> +    }
> +
> +    pub fn generate_frame_payload(&self) -> Vec<u8> {
> +        let msglen = self.message.len().min(125);
> +        let code = match self.kind {

...
then this entire match can just use `self.to_be_bytes()`
...


> +            WebSocketErrorKind::ProtocolError => CLOSE_PROTOCOL_ERROR,
> +            WebSocketErrorKind::InvalidData => CLOSE_INVALID_DATA,
> +            WebSocketErrorKind::Unexpected => CLOSE_UNEXPECTED,
> +            WebSocketErrorKind::Other => CLOSE_POLICY_ERROR,
> +        };
> +        let mut data = Vec::with_capacity(msglen + 2);
> +        data.push(code[0]);
> +        data.push(code[1]);
> +        data.copy_from_slice(&self.message.as_bytes()[..msglen]);

This looks wrong, did you mean `.extend()`?

> +        data
> +    }
> +}
> +
> +impl std::fmt::Display for WebSocketError {
> +    fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
> +        write!(f, "{} (Code: {})", self.message, self.kind)
> +    }
> +}
> +
> +impl From<WebSocketError> for Error {

That's an anyhow::Error afaict? You don't need to implement this.
Just

    impl std::error::Error for WebSocketError {}

with Debug, Display and std::error::Error you're all set for anyhow
auto-magic.

> +    fn from(error: WebSocketError) -> Self {
> +        Error::new(io_format_err!("{}", error))
> +    }
> +}
> +
>  #[repr(u8)]
>  #[derive(Debug, PartialEq, PartialOrd, Copy, Clone)]
>  /// Represents an OpCode of a websocket frame
> @@ -293,25 +360,23 @@ impl FrameHeader {
>      /// Tries to parse a FrameHeader from bytes.
>      ///
>      /// When there are not enough bytes to completely parse the header,
> -    /// returns Ok(Err(size)) where size determines how many bytes
> -    /// are missing to parse further (this amount can change when more
> -    /// information is available)
> +    /// returns Ok(None)
>      ///
>      /// Example:
>      /// ```
>      /// # use proxmox::tools::websocket::*;
>      /// # use std::io;
> -    /// # fn main() -> io::Result<()> {
> +    /// # fn main() -> Result<(), WebSocketError> {
>      /// let frame = create_frame(None, &[0,1,2,3], OpCode::Ping)?;
>      /// let header = FrameHeader::try_from_bytes(&frame[..1])?;
>      /// match header {
> -    ///     Ok(_) => unreachable!(),
> -    ///     Err(x) => assert_eq!(x, 1),
> +    ///     Some(_) => unreachable!(),
> +    ///     None => {},
>      /// }
>      /// let header = FrameHeader::try_from_bytes(&frame[..2])?;
>      /// match header {
> -    ///     Err(x) => unreachable!(),
> -    ///     Ok(header) => assert_eq!(header, FrameHeader{
> +    ///     None => unreachable!(),
> +    ///     Some(header) => assert_eq!(header, FrameHeader{
>      ///         fin: true,
>      ///         mask: None,
>      ///         frametype: OpCode::Ping,
> @@ -322,19 +387,19 @@ impl FrameHeader {
>      /// # Ok(())
>      /// # }
>      /// ```
> -    pub fn try_from_bytes(data: &[u8]) -> io::Result<Result<FrameHeader, usize>> {
> +    pub fn try_from_bytes(data: &[u8]) -> Result<Option<FrameHeader>, WebSocketError> {
>          let len = data.len();
>          if len < 2 {
> -            return Ok(Err(2 - len));
> +            return Ok(None);
>          }
>  
>          let data = data;
>  
>          // we do not support extensions
>          if data[0] & 0b01110000 > 0 {
> -            return Err(io::Error::new(
> -                ErrorKind::InvalidData,
> -                "Extensions not supported",
> +            return Err(WebSocketError::new(
> +                    WebSocketErrorKind::ProtocolError,
> +                    "Extensions not supported",
>              ));
>          }
>  
> @@ -347,14 +412,17 @@ impl FrameHeader {
>              9 => OpCode::Ping,
>              10 => OpCode::Pong,
>              other => {
> -                return Err(io::Error::new(ErrorKind::InvalidData, format!("Unknown OpCode {}", other)));
> +                return Err(WebSocketError::new(
> +                        WebSocketErrorKind::ProtocolError,
> +                        &format!("Unknown OpCode {}", other),
> +                ));
>              }
>          };
>  
>          if !fin && frametype.is_control() {
> -            return Err(io::Error::new(
> -                ErrorKind::InvalidData,
> -                "Control frames cannot be fragmented",
> +            return Err(WebSocketError::new(
> +                    WebSocketErrorKind::ProtocolError,
> +                    "Control frames cannot be fragmented",

protocol error number 3, maybe consider a
`WebSocketError::protocol(message: impl Display)` convenience method ;-)

>              ));
>          }
>  
> @@ -368,14 +436,14 @@ impl FrameHeader {
>          let mut payload_len: usize = (data[1] & 0b01111111).into();
>          if payload_len == 126 {
>              if len < 4 {
> -                return Ok(Err(4 - len));
> +                return Ok(None);
>              }
>              payload_len = u16::from_be_bytes([data[2], data[3]]) as usize;
>              mask_offset += 2;
>              payload_offset += 2;
>          } else if payload_len == 127 {
>              if len < 10 {
> -                return Ok(Err(10 - len));
> +                return Ok(None);
>              }
>              payload_len = u64::from_be_bytes([
>                  data[2], data[3], data[4], data[5], data[6], data[7], data[8], data[9],
> @@ -385,9 +453,9 @@ impl FrameHeader {
>          }
>  
>          if payload_len > 125 && frametype.is_control() {
> -            return Err(io::Error::new(
> -                ErrorKind::InvalidData,
> -                "Control frames cannot carry more than 125 bytes of data",
> +            return Err(WebSocketError::new(
> +                    WebSocketErrorKind::ProtocolError,
> +                    "Control frames cannot carry more than 125 bytes of data",
>              ));
>          }
>  
> @@ -403,7 +471,7 @@ impl FrameHeader {
>              false => None,
>          };
>  
> -        Ok(Ok(FrameHeader {
> +        Ok(Some(FrameHeader {
>              fin,
>              mask,
>              frametype,
> @@ -413,6 +481,8 @@ impl FrameHeader {
>      }
>  }
>  
> +type WebSocketReadResult = Result<(OpCode, Box<[u8]>), WebSocketError>;
> +
>  /// Wraps a reader that implements AsyncRead and implements it itself.
>  ///
>  /// On read, reads the underlying reader and tries to decode the frames and
> @@ -422,7 +492,7 @@ impl FrameHeader {
>  /// Has an internal Buffer for storing incomplete headers.
>  pub struct WebSocketReader<R: AsyncRead> {
>      reader: Option<R>,
> -    sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>,
> +    sender: mpsc::UnboundedSender<WebSocketReadResult>,
>      read_buffer: Option<ByteBuffer>,
>      header: Option<FrameHeader>,
>      state: ReaderState<R>,
> @@ -431,11 +501,11 @@ pub struct WebSocketReader<R: AsyncRead> {
>  impl<R: AsyncReadExt> WebSocketReader<R> {
>      /// Creates a new WebSocketReader with the given CallBack for control frames
>      /// and a default buffer size of 4096.
> -    pub fn new(reader: R, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
> +    pub fn new(reader: R, sender: mpsc::UnboundedSender<WebSocketReadResult>) -> WebSocketReader<R> {
>          Self::with_capacity(reader, 4096, sender)
>      }
>  
> -    pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
> +    pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<WebSocketReadResult>) -> WebSocketReader<R> {
>          WebSocketReader {
>              reader: Some(reader),
>              sender,
> @@ -512,13 +582,19 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
>                      let mut header = match this.header.take() {
>                          Some(header) => header,
>                          None => {
> -                            let header = match FrameHeader::try_from_bytes(&read_buffer[..])? {
> -                                Ok(header) => header,
> -                                Err(_) => {
> +                            let header = match FrameHeader::try_from_bytes(&read_buffer[..]) {
> +                                Ok(Some(header)) => header,
> +                                Ok(None) => {
>                                      this.state = ReaderState::NoData;
>                                      this.read_buffer = Some(read_buffer);
>                                      continue;
> -                                }
> +                                },
> +                                Err(err) => {
> +                                    if let Err(err) = this.sender.send(Err(err.clone())) {
> +                                        return Poll::Ready(Err(io_err_other(err)));
> +                                    }
> +                                    return Poll::Ready(Err(io_err_other(err)));
> +                                },
>                              };
>  
>                              read_buffer.consume(header.header_len as usize);
> @@ -531,7 +607,7 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
>  
>                              let mut data = read_buffer.remove_data(header.payload_len);
>                              mask_bytes(header.mask, &mut data);
> -                            if let Err(err) = this.sender.send((header.frametype, data)) {
> +                            if let Err(err) = this.sender.send(Ok((header.frametype, data))) {
>                                  eprintln!("error sending control frame: {}", err);
>                              }
>  
> @@ -639,10 +715,37 @@ impl WebSocket {
>          Ok((Self { text }, response))
>      }
>  
> +    async fn handle_channel_message<W>(
> +        result: WebSocketReadResult,
> +        writer: &mut WebSocketWriter<W>
> +    ) -> Result<OpCode, Error>
> +    where
> +        W: AsyncWrite + Unpin + Send,
> +    {
> +        match result {
> +            Ok((OpCode::Ping, msg)) => {
> +                writer.send_control_frame(None, OpCode::Pong, &msg).await?;
> +                Ok(OpCode::Pong)
> +            }
> +            Ok((OpCode::Close, msg)) => {
> +                writer.send_control_frame(None, OpCode::Close, &msg).await?;
> +                Ok(OpCode::Close)
> +            }
> +            Ok((opcode, _)) => {
> +                // ignore other frames
> +                Ok(opcode)
> +            },
> +            Err(err) => {
> +                writer.send_control_frame(None, OpCode::Close, &err.generate_frame_payload()).await?;
> +                Err(Error::from(err))
> +            }
> +        }
> +    }
> +
>      async fn copy_to_websocket<R, W>(
>          mut reader: &mut R,
> -        writer: &mut WebSocketWriter<W>,
> -        receiver: &mut mpsc::UnboundedReceiver<(OpCode, Box<[u8]>)>) -> Result<bool, Error>
> +        mut writer: &mut WebSocketWriter<W>,
> +        receiver: &mut mpsc::UnboundedReceiver<WebSocketReadResult>) -> Result<bool, Error>
>      where
>          R: AsyncRead + Unpin + Send,
>          W: AsyncWrite + Unpin + Send,
> @@ -654,20 +757,10 @@ impl WebSocket {
>                  let bytes = select!{
>                      res = buf.read_from_async(&mut reader).fuse() => res?,
>                      res = receiver.recv().fuse() => {
> -                        let (opcode, msg) = res.ok_or(format_err!("control channel closed"))?;
> -                        match opcode {
> -                            OpCode::Ping => {
> -                                writer.send_control_frame(None, OpCode::Pong, &msg).await?;
> -                                continue;
> -                            }
> -                            OpCode::Close => {
> -                                writer.send_control_frame(None, OpCode::Close, &msg).await?;
> -                                return Ok(true);
> -                            }
> -                            _ => {
> -                                // ignore other frames
> -                                continue;
> -                            }
> +                        let res = res.ok_or_else(|| format_err!("control channel closed"))?;
> +                        match Self::handle_channel_message(res, &mut writer).await? {
> +                            OpCode::Close => return Ok(true),
> +                            _ => { continue; },

you can drop the braces here ^

>                          }
>                      }
>                  };
> @@ -720,7 +813,7 @@ impl WebSocket {
>              res = term_future.fuse() => match res {
>                  Ok(sent_close) if !sent_close => {
>                      // status code 1000 => 0x03E8
> -                    wswriter.send_control_frame(None, OpCode::Close, &[0x03, 0xE8]).await?;
> +                    wswriter.send_control_frame(None, OpCode::Close, CLOSE_NORMAL).await?;
>                      Ok(())
>                  }
>                  Ok(_) => Ok(()),
> -- 
> 2.20.1




^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2020-07-17 10:12 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-17  6:34 [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it Dominik Csapak
2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 2/4] proxmox/tools/websocket: improve error handling Dominik Csapak
2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 3/4] proxmox/tools/websocket: fix some clippy warnings Dominik Csapak
2020-07-17  6:34 ` [pbs-devel] [PATCH proxmox 4/4] proxmox/tools/byte_buffer: improve read_from example Dominik Csapak
2020-07-17 10:12 ` [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it Wolfgang Bumiller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal