From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 0CB8863DD5 for ; Fri, 17 Jul 2020 12:12:30 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id F0E111B54D for ; Fri, 17 Jul 2020 12:12:29 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 5A9491B542 for ; Fri, 17 Jul 2020 12:12:28 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 1E04942ECE for ; Fri, 17 Jul 2020 12:12:28 +0200 (CEST) Date: Fri, 17 Jul 2020 12:12:26 +0200 From: Wolfgang Bumiller To: Dominik Csapak Cc: pbs-devel@lists.proxmox.com Message-ID: <20200717101226.s37262265zgi27h4@olga.proxmox.com> References: <20200717063451.19207-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20200717063451.19207-1-d.csapak@proxmox.com> User-Agent: NeoMutt/20180716 X-SPAM-LEVEL: Spam detection results: 0 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [websocket.rs] Subject: Re: [pbs-devel] [PATCH proxmox 1/4] proxmox/tools/websocket: introduce WebSocketError and use it X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 17 Jul 2020 10:12:30 -0000 On Fri, Jul 17, 2020 at 08:34:48AM +0200, Dominik Csapak wrote: > this patch introduces a custom WebSocketError, so that we can > detect errors in the websocket protocol and react to it in the > right way > > the channel now sends a Result<(OpCode, Box<[u8]>), WebSocketError> > so that we can either react to a control frame, or react to an > errornous data stream (which means sending a close frame > with the appropriate error code) > > while at it, change FrameHeader::try_from_bytes to return > Result, WebSocketError> instead of a nested > Result with the *guessed* remaining size. This was neither used by > us, nor was it really meaningful, since this can change during the > decode every time new data is read (extensions, mask, payload length, etc.) > so simply returning an Option is enough information for us > > Signed-off-by: Dominik Csapak > --- > proxmox/src/tools/websocket.rs | 191 ++++++++++++++++++++++++--------- > 1 file changed, 142 insertions(+), 49 deletions(-) > > diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs > index fc9a0c5..c1b0066 100644 > --- a/proxmox/src/tools/websocket.rs > +++ b/proxmox/src/tools/websocket.rs > @@ -7,7 +7,7 @@ > use std::pin::Pin; > use std::task::{Context, Poll}; > use std::cmp::min; > -use std::io::{self, ErrorKind}; > +use std::io; > use std::future::Future; > > use futures::select; > @@ -30,8 +30,75 @@ use futures::future::FutureExt; > use futures::ready; > > use crate::io_format_err; > +use crate::sys::error::io_err_other; > use crate::tools::byte_buffer::ByteBuffer; > > +// see RFC6455 section 7.4.1 > +const CLOSE_NORMAL: &[u8] = &[0x03, 0xE8]; // 1000 > +const CLOSE_PROTOCOL_ERROR: &[u8] = &[0x03, 0xEA]; // 1002 > +const CLOSE_INVALID_DATA: &[u8] = &[0x03, 0xEB]; // 1003 > +//const CLOSE_UTF8_ERROR: &[u8] = &[0x03, 0xEF]; // 1007 > +const CLOSE_POLICY_ERROR: &[u8] = &[0x03, 0xF0]; // 1008 > +//const CLOSE_TOOBIG_ERROR: &[u8] = &[0x03, 0xF1]; // 1009 > +const CLOSE_UNEXPECTED: &[u8] = &[0x03, 0xF3]; // 1011 I'd just drop the above constants so you don't need to have the values twice in different representations, you can cast the enum below to u16 and use u16's `.to_be_bytes()` method ... > + > +#[derive(Debug, Clone, Copy)] > +#[repr(u16)] > +pub enum WebSocketErrorKind { > + ProtocolError = 1002, > + InvalidData = 1003, > + Other = 1008, > + Unexpected = 1011, > +} ... You could give this a `.to_be_bytes()` method directly which just does (self as u16).to_be_bytes() ... > + > +impl std::fmt::Display for WebSocketErrorKind { > + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { > + write!(f, "{}", *self as u16) > + } > +} > + > +#[derive(Debug, Clone)] > +pub struct WebSocketError{ > + kind: WebSocketErrorKind, > + message: String, > +} > + > +impl WebSocketError { > + pub fn new(kind: WebSocketErrorKind, message: &str) -> Self { > + Self{ > + kind, > + message: message.to_string() > + } > + } > + > + pub fn generate_frame_payload(&self) -> Vec { > + let msglen = self.message.len().min(125); > + let code = match self.kind { ... then this entire match can just use `self.to_be_bytes()` ... > + WebSocketErrorKind::ProtocolError => CLOSE_PROTOCOL_ERROR, > + WebSocketErrorKind::InvalidData => CLOSE_INVALID_DATA, > + WebSocketErrorKind::Unexpected => CLOSE_UNEXPECTED, > + WebSocketErrorKind::Other => CLOSE_POLICY_ERROR, > + }; > + let mut data = Vec::with_capacity(msglen + 2); > + data.push(code[0]); > + data.push(code[1]); > + data.copy_from_slice(&self.message.as_bytes()[..msglen]); This looks wrong, did you mean `.extend()`? > + data > + } > +} > + > +impl std::fmt::Display for WebSocketError { > + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { > + write!(f, "{} (Code: {})", self.message, self.kind) > + } > +} > + > +impl From for Error { That's an anyhow::Error afaict? You don't need to implement this. Just impl std::error::Error for WebSocketError {} with Debug, Display and std::error::Error you're all set for anyhow auto-magic. > + fn from(error: WebSocketError) -> Self { > + Error::new(io_format_err!("{}", error)) > + } > +} > + > #[repr(u8)] > #[derive(Debug, PartialEq, PartialOrd, Copy, Clone)] > /// Represents an OpCode of a websocket frame > @@ -293,25 +360,23 @@ impl FrameHeader { > /// Tries to parse a FrameHeader from bytes. > /// > /// When there are not enough bytes to completely parse the header, > - /// returns Ok(Err(size)) where size determines how many bytes > - /// are missing to parse further (this amount can change when more > - /// information is available) > + /// returns Ok(None) > /// > /// Example: > /// ``` > /// # use proxmox::tools::websocket::*; > /// # use std::io; > - /// # fn main() -> io::Result<()> { > + /// # fn main() -> Result<(), WebSocketError> { > /// let frame = create_frame(None, &[0,1,2,3], OpCode::Ping)?; > /// let header = FrameHeader::try_from_bytes(&frame[..1])?; > /// match header { > - /// Ok(_) => unreachable!(), > - /// Err(x) => assert_eq!(x, 1), > + /// Some(_) => unreachable!(), > + /// None => {}, > /// } > /// let header = FrameHeader::try_from_bytes(&frame[..2])?; > /// match header { > - /// Err(x) => unreachable!(), > - /// Ok(header) => assert_eq!(header, FrameHeader{ > + /// None => unreachable!(), > + /// Some(header) => assert_eq!(header, FrameHeader{ > /// fin: true, > /// mask: None, > /// frametype: OpCode::Ping, > @@ -322,19 +387,19 @@ impl FrameHeader { > /// # Ok(()) > /// # } > /// ``` > - pub fn try_from_bytes(data: &[u8]) -> io::Result> { > + pub fn try_from_bytes(data: &[u8]) -> Result, WebSocketError> { > let len = data.len(); > if len < 2 { > - return Ok(Err(2 - len)); > + return Ok(None); > } > > let data = data; > > // we do not support extensions > if data[0] & 0b01110000 > 0 { > - return Err(io::Error::new( > - ErrorKind::InvalidData, > - "Extensions not supported", > + return Err(WebSocketError::new( > + WebSocketErrorKind::ProtocolError, > + "Extensions not supported", > )); > } > > @@ -347,14 +412,17 @@ impl FrameHeader { > 9 => OpCode::Ping, > 10 => OpCode::Pong, > other => { > - return Err(io::Error::new(ErrorKind::InvalidData, format!("Unknown OpCode {}", other))); > + return Err(WebSocketError::new( > + WebSocketErrorKind::ProtocolError, > + &format!("Unknown OpCode {}", other), > + )); > } > }; > > if !fin && frametype.is_control() { > - return Err(io::Error::new( > - ErrorKind::InvalidData, > - "Control frames cannot be fragmented", > + return Err(WebSocketError::new( > + WebSocketErrorKind::ProtocolError, > + "Control frames cannot be fragmented", protocol error number 3, maybe consider a `WebSocketError::protocol(message: impl Display)` convenience method ;-) > )); > } > > @@ -368,14 +436,14 @@ impl FrameHeader { > let mut payload_len: usize = (data[1] & 0b01111111).into(); > if payload_len == 126 { > if len < 4 { > - return Ok(Err(4 - len)); > + return Ok(None); > } > payload_len = u16::from_be_bytes([data[2], data[3]]) as usize; > mask_offset += 2; > payload_offset += 2; > } else if payload_len == 127 { > if len < 10 { > - return Ok(Err(10 - len)); > + return Ok(None); > } > payload_len = u64::from_be_bytes([ > data[2], data[3], data[4], data[5], data[6], data[7], data[8], data[9], > @@ -385,9 +453,9 @@ impl FrameHeader { > } > > if payload_len > 125 && frametype.is_control() { > - return Err(io::Error::new( > - ErrorKind::InvalidData, > - "Control frames cannot carry more than 125 bytes of data", > + return Err(WebSocketError::new( > + WebSocketErrorKind::ProtocolError, > + "Control frames cannot carry more than 125 bytes of data", > )); > } > > @@ -403,7 +471,7 @@ impl FrameHeader { > false => None, > }; > > - Ok(Ok(FrameHeader { > + Ok(Some(FrameHeader { > fin, > mask, > frametype, > @@ -413,6 +481,8 @@ impl FrameHeader { > } > } > > +type WebSocketReadResult = Result<(OpCode, Box<[u8]>), WebSocketError>; > + > /// Wraps a reader that implements AsyncRead and implements it itself. > /// > /// On read, reads the underlying reader and tries to decode the frames and > @@ -422,7 +492,7 @@ impl FrameHeader { > /// Has an internal Buffer for storing incomplete headers. > pub struct WebSocketReader { > reader: Option, > - sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>, > + sender: mpsc::UnboundedSender, > read_buffer: Option, > header: Option, > state: ReaderState, > @@ -431,11 +501,11 @@ pub struct WebSocketReader { > impl WebSocketReader { > /// Creates a new WebSocketReader with the given CallBack for control frames > /// and a default buffer size of 4096. > - pub fn new(reader: R, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader { > + pub fn new(reader: R, sender: mpsc::UnboundedSender) -> WebSocketReader { > Self::with_capacity(reader, 4096, sender) > } > > - pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader { > + pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender) -> WebSocketReader { > WebSocketReader { > reader: Some(reader), > sender, > @@ -512,13 +582,19 @@ impl AsyncRead for WebSocketReader > let mut header = match this.header.take() { > Some(header) => header, > None => { > - let header = match FrameHeader::try_from_bytes(&read_buffer[..])? { > - Ok(header) => header, > - Err(_) => { > + let header = match FrameHeader::try_from_bytes(&read_buffer[..]) { > + Ok(Some(header)) => header, > + Ok(None) => { > this.state = ReaderState::NoData; > this.read_buffer = Some(read_buffer); > continue; > - } > + }, > + Err(err) => { > + if let Err(err) = this.sender.send(Err(err.clone())) { > + return Poll::Ready(Err(io_err_other(err))); > + } > + return Poll::Ready(Err(io_err_other(err))); > + }, > }; > > read_buffer.consume(header.header_len as usize); > @@ -531,7 +607,7 @@ impl AsyncRead for WebSocketReader > > let mut data = read_buffer.remove_data(header.payload_len); > mask_bytes(header.mask, &mut data); > - if let Err(err) = this.sender.send((header.frametype, data)) { > + if let Err(err) = this.sender.send(Ok((header.frametype, data))) { > eprintln!("error sending control frame: {}", err); > } > > @@ -639,10 +715,37 @@ impl WebSocket { > Ok((Self { text }, response)) > } > > + async fn handle_channel_message( > + result: WebSocketReadResult, > + writer: &mut WebSocketWriter > + ) -> Result > + where > + W: AsyncWrite + Unpin + Send, > + { > + match result { > + Ok((OpCode::Ping, msg)) => { > + writer.send_control_frame(None, OpCode::Pong, &msg).await?; > + Ok(OpCode::Pong) > + } > + Ok((OpCode::Close, msg)) => { > + writer.send_control_frame(None, OpCode::Close, &msg).await?; > + Ok(OpCode::Close) > + } > + Ok((opcode, _)) => { > + // ignore other frames > + Ok(opcode) > + }, > + Err(err) => { > + writer.send_control_frame(None, OpCode::Close, &err.generate_frame_payload()).await?; > + Err(Error::from(err)) > + } > + } > + } > + > async fn copy_to_websocket( > mut reader: &mut R, > - writer: &mut WebSocketWriter, > - receiver: &mut mpsc::UnboundedReceiver<(OpCode, Box<[u8]>)>) -> Result > + mut writer: &mut WebSocketWriter, > + receiver: &mut mpsc::UnboundedReceiver) -> Result > where > R: AsyncRead + Unpin + Send, > W: AsyncWrite + Unpin + Send, > @@ -654,20 +757,10 @@ impl WebSocket { > let bytes = select!{ > res = buf.read_from_async(&mut reader).fuse() => res?, > res = receiver.recv().fuse() => { > - let (opcode, msg) = res.ok_or(format_err!("control channel closed"))?; > - match opcode { > - OpCode::Ping => { > - writer.send_control_frame(None, OpCode::Pong, &msg).await?; > - continue; > - } > - OpCode::Close => { > - writer.send_control_frame(None, OpCode::Close, &msg).await?; > - return Ok(true); > - } > - _ => { > - // ignore other frames > - continue; > - } > + let res = res.ok_or_else(|| format_err!("control channel closed"))?; > + match Self::handle_channel_message(res, &mut writer).await? { > + OpCode::Close => return Ok(true), > + _ => { continue; }, you can drop the braces here ^ > } > } > }; > @@ -720,7 +813,7 @@ impl WebSocket { > res = term_future.fuse() => match res { > Ok(sent_close) if !sent_close => { > // status code 1000 => 0x03E8 > - wswriter.send_control_frame(None, OpCode::Close, &[0x03, 0xE8]).await?; > + wswriter.send_control_frame(None, OpCode::Close, CLOSE_NORMAL).await?; > Ok(()) > } > Ok(_) => Ok(()), > -- > 2.20.1