* [pbs-devel] [PATCH proxmox 1/9] proxmox/tools/byte_buffer: improve ByteBuffer interface
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 2/9] proxmox/tools/byte_buffer: impl Default Dominik Csapak
` (9 subsequent siblings)
10 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
by implementing Deref and DerefMut, renaming consume to 'remove_data'
adapt the usage inside of websocket (we did not use it anywhere else for now)
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/byte_buffer.rs | 91 ++++++++++++++++++++------------
proxmox/src/tools/websocket.rs | 11 ++--
2 files changed, 64 insertions(+), 38 deletions(-)
diff --git a/proxmox/src/tools/byte_buffer.rs b/proxmox/src/tools/byte_buffer.rs
index abfa6c8..7d4fce4 100644
--- a/proxmox/src/tools/byte_buffer.rs
+++ b/proxmox/src/tools/byte_buffer.rs
@@ -9,14 +9,13 @@
//! fn code<T: Read + ?Sized>(input: &mut T) -> std::io::Result<Box<[u8]>> {
//! let mut buffer = ByteBuffer::new();
//! let amount = buffer.read_from(input)?;
-//! let data = buffer.consume(amount);
+//! let data = buffer.remove_data(amount);
//! assert_eq!(data.len(), amount);
//! Ok(data)
//! }
//! # code(&mut &b"testdata"[..]).expect("byte buffer test failed");
//! ```
-use std::cmp::min;
use std::io::{Read, Result};
use crate::tools::vec;
@@ -49,23 +48,18 @@ impl ByteBuffer {
}
}
- /// Returns the length of the data in the Buffer
- pub fn data_size(&self) -> usize {
- self.data_size
- }
-
pub fn free_size(&self) -> usize {
self.capacity - self.data_size
}
- pub fn is_empty(&self) -> bool {
- self.data_size == 0
- }
-
pub fn is_full(&self) -> bool {
self.data_size >= self.capacity
}
+ pub fn clear(&mut self) {
+ self.data_size = 0
+ }
+
/// Sets the length of the data. Useful if data was manually added
/// with a mutable slice (e.g. from [get_free_mut_slice](#method.get_free_mut_slice)).
///
@@ -92,19 +86,6 @@ impl ByteBuffer {
self.data_size += size;
}
- /// Gets an immutable reference to the data in the buffer
- /// Example:
- /// ```
- /// # use proxmox::tools::byte_buffer::ByteBuffer;
- /// let mut buf = ByteBuffer::new();
- /// buf.get_free_mut_slice()[..2].copy_from_slice(&[1u8, 2u8]);
- /// buf.add_size(2);
- /// assert_eq!(buf.get_data_slice(), &[1u8, 2u8]);
- /// ```
- pub fn get_data_slice(&self) -> &[u8] {
- &self.buf[..self.data_size]
- }
-
/// Returns a mutable reference to the free section of the
/// Buffer. There are no guarantees about the content of the
/// free part of the Buffer (may even be uninitialized).
@@ -113,8 +94,8 @@ impl ByteBuffer {
&mut self.buf[self.data_size..self.capacity]
}
- /// Consumes up to max_amount of data from the front
- /// of the buffer. If there was less than max_amount present,
+ /// Removes up to max_amount of data from the front
+ /// of the buffer and returns. If there was less than max_amount present,
/// it will return as much data as there was in the buffer.
/// The rest of the data will be moved to the front, and
/// the data size will be updated accordingly.
@@ -125,20 +106,50 @@ impl ByteBuffer {
/// let mut buf = ByteBuffer::new();
/// buf.get_free_mut_slice()[..2].copy_from_slice(&[1u8, 2u8]);
/// buf.add_size(2);
- /// assert_eq!(buf.data_size(), 2);
+ /// assert_eq!(buf.len(), 2);
///
- /// let data = buf.consume(100);
+ /// let data = buf.remove_data(100);
/// assert_eq!(&data[..], &[1u8, 2u8]);
/// assert!(buf.is_empty());
/// ```
- pub fn consume(&mut self, max_amount: usize) -> Box<[u8]> {
- let size = min(max_amount, self.data_size);
+ #[must_use]
+ pub fn remove_data(&mut self, max_amount: usize) -> Box<[u8]> {
+ let size = max_amount.min(self.data_size);
let tmp: Box<[u8]> = self.buf[..size].into();
self.buf.copy_within(size..self.capacity, 0);
self.data_size -= size;
tmp
}
+ /// Removes up to max_amount of data from the front and returns
+ /// the amount of data removed. If there was less than max_amount present,
+ /// it will empty out the buffer and return the amount removed.
+ ///
+ /// Example:
+ /// ```
+ /// # use proxmox::tools::byte_buffer::ByteBuffer;
+ /// let mut buf = ByteBuffer::new();
+ /// buf.get_free_mut_slice()[..2].copy_from_slice(&[1u8, 2u8]);
+ /// buf.add_size(2);
+ /// assert_eq!(buf.len(), 2);
+ ///
+ /// let amount = buf.consume(1);
+ /// assert_eq!(amount, 1);
+ /// let amount = buf.consume(100);
+ /// assert_eq!(amount, 1);
+ /// assert!(buf.is_empty());
+ /// ```
+ pub fn consume(&mut self, max_amount: usize) -> usize {
+ let size = max_amount.min(self.data_size);
+ if size < max_amount {
+ self.clear()
+ } else {
+ self.buf.copy_within(size..self.capacity, 0);
+ self.data_size -= size;
+ }
+ size
+ }
+
/// Takes a reader and reads into the back of the buffer (up to the
/// free space in the buffer) and updates its size accordingly.
///
@@ -168,6 +179,20 @@ impl ByteBuffer {
}
}
+impl std::ops::Deref for ByteBuffer {
+ type Target = [u8];
+
+ fn deref(&self) -> &Self::Target {
+ &self.buf[..self.data_size]
+ }
+}
+
+impl std::ops::DerefMut for ByteBuffer {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.buf[..self.data_size]
+ }
+}
+
#[cfg(test)]
mod test {
use crate::tools::byte_buffer::ByteBuffer;
@@ -181,7 +206,7 @@ mod test {
}
buffer.add_size(5);
- let slice2 = buffer.get_data_slice();
+ let slice2 = &buffer[..];
assert_eq!(slice2, &[0, 1, 2, 3, 4]);
}
@@ -190,7 +215,7 @@ mod test {
fn test2() {
let mut buffer = ByteBuffer::with_capacity(1024);
let size = buffer.read_from(&mut std::io::repeat(54)).unwrap();
- assert_eq!(buffer.data_size(), size);
- assert_eq!(buffer.get_data_slice()[0], 54);
+ assert_eq!(buffer.len(), size);
+ assert_eq!(buffer[0], 54);
}
}
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 3877e4e..04173bb 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -485,7 +485,7 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
let mut header = match this.header.take() {
Some(header) => header,
None => {
- let header = match FrameHeader::try_from_bytes(read_buffer.get_data_slice())? {
+ let header = match FrameHeader::try_from_bytes(&read_buffer[..])? {
Ok(header) => header,
Err(_) => {
this.state = ReaderState::NoData;
@@ -500,12 +500,12 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
};
if header.is_control_frame() {
- if read_buffer.data_size() >= header.payload_len {
+ if read_buffer.len() >= header.payload_len {
(this.callback)(
header.frametype,
mask_bytes(
header.mask,
- &mut read_buffer.consume(header.payload_len).into_vec(),
+ &mut read_buffer.remove_data(header.payload_len).into_vec(),
),
);
this.state = if read_buffer.is_empty() {
@@ -523,8 +523,9 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
}
}
- let len = min(buf.len() - offset, min(header.payload_len, read_buffer.data_size()));
- let mut data = read_buffer.consume(len).into_vec();
+ let len = min(buf.len() - offset, min(header.payload_len, read_buffer.len()));
+
+ let mut data = read_buffer.remove_data(len).into_vec();
buf[offset..offset+len].copy_from_slice(mask_bytes(header.mask, &mut data));
offset += len;
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox 2/9] proxmox/tools/byte_buffer: impl Default
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 1/9] proxmox/tools/byte_buffer: improve ByteBuffer interface Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 3/9] proxmox/tools/websocket: use ready macro for WebSocketWriter Dominik Csapak
` (8 subsequent siblings)
10 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/byte_buffer.rs | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/proxmox/src/tools/byte_buffer.rs b/proxmox/src/tools/byte_buffer.rs
index 7d4fce4..f01a6dd 100644
--- a/proxmox/src/tools/byte_buffer.rs
+++ b/proxmox/src/tools/byte_buffer.rs
@@ -179,6 +179,12 @@ impl ByteBuffer {
}
}
+impl Default for ByteBuffer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl std::ops::Deref for ByteBuffer {
type Target = [u8];
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox 3/9] proxmox/tools/websocket: use ready macro for WebSocketWriter
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 1/9] proxmox/tools/byte_buffer: improve ByteBuffer interface Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 2/9] proxmox/tools/byte_buffer: impl Default Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 4/9] proxmox/tools/websocket: correctly return eof Dominik Csapak
` (7 subsequent siblings)
10 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/websocket.rs | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 04173bb..c30293f 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -222,16 +222,19 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for WebSocketWriter<W> {
// we have a frame in any case, so unwrap is ok
let (buf, pos, origsize) = this.frame.as_mut().unwrap();
loop {
- match Pin::new(&mut this.writer).poll_write(cx, &buf[*pos..]) {
- Poll::Ready(Ok(size)) => {
+ match ready!(Pin::new(&mut this.writer).poll_write(cx, &buf[*pos..])) {
+ Ok(size) => {
*pos += size;
if *pos == buf.len() {
let size = *origsize;
this.frame = None;
return Poll::Ready(Ok(size));
}
- }
- other => return other,
+ },
+ Err(err) => {
+ eprintln!("error in writer: {}", err);
+ return Poll::Ready(Err(Error::new(ErrorKind::Other, err)))
+ },
}
}
}
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox 4/9] proxmox/tools/websocket: correctly return eof
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
` (2 preceding siblings ...)
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 3/9] proxmox/tools/websocket: use ready macro for WebSocketWriter Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 5/9] proxmox/tools/websocket: use io::Error and Result explicitely Dominik Csapak
` (6 subsequent siblings)
10 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
only return Ok(0) when the upstream reader did return that, not when
we have no data in the buffer, else the downstream reader believes
we are EOF (even if we are not)
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/websocket.rs | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index c30293f..d59b7ad 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -432,7 +432,7 @@ impl<R: AsyncReadExt> WebSocketReader<R> {
enum ReaderState<R> {
NoData,
- WaitingForData(Pin<Box<dyn Future<Output = Result<(R, ByteBuffer), Error>> + Send + 'static>>),
+ WaitingForData(Pin<Box<dyn Future<Output = Result<(usize, R, ByteBuffer), Error>> + Send + 'static>>),
HaveData,
}
@@ -463,18 +463,20 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
let future = async move {
buffer.read_from_async(&mut reader)
.await
- .map(move |_| (reader, buffer))
+ .map(move |len| (len, reader, buffer))
};
this.state = ReaderState::WaitingForData(future.boxed());
},
ReaderState::WaitingForData(ref mut future) => {
match ready!(future.as_mut().poll(cx)) {
- Ok((reader, buffer)) => {
+ Ok((len, reader, buffer)) => {
this.reader = Some(reader);
this.read_buffer = Some(buffer);
this.state = ReaderState::HaveData;
-
+ if len == 0 {
+ return Poll::Ready(Ok(0));
+ }
},
Err(err) => return Poll::Ready(Err(Error::new(ErrorKind::Other, err))),
}
@@ -545,7 +547,9 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
};
this.read_buffer = Some(read_buffer);
- return Poll::Ready(Ok(offset));
+ if offset > 0 {
+ return Poll::Ready(Ok(offset));
+ }
},
}
}
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox 5/9] proxmox/tools/websocket: use io::Error and Result explicitely
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
` (3 preceding siblings ...)
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 4/9] proxmox/tools/websocket: correctly return eof Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 6/9] proxmox/tools/websocket: improve mask_bytes and create_frame interface Dominik Csapak
` (5 subsequent siblings)
10 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
and add a helper struct for the ReadResult
(so that the types are properly documented)
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/websocket.rs | 46 ++++++++++++++++++++--------------
1 file changed, 27 insertions(+), 19 deletions(-)
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index d59b7ad..078225a 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -7,14 +7,16 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::cmp::min;
-use std::io::{self, Error, ErrorKind};
+use std::io::{self, ErrorKind};
use std::future::Future;
use tokio::io::{AsyncWrite, AsyncRead, AsyncReadExt};
+use anyhow::{bail, format_err, Error};
use futures::future::FutureExt;
use futures::ready;
+use crate::io_format_err;
use crate::tools::byte_buffer::ByteBuffer;
#[repr(u8)]
@@ -129,7 +131,7 @@ pub fn create_frame(
let first_byte = 0b10000000 | (frametype as u8);
let len = data.len();
if (frametype as u8) & 0b00001000 > 0 && len > 125 {
- return Err(Error::new(
+ return Err(io::Error::new(
ErrorKind::InvalidData,
"Control frames cannot have data longer than 125 bytes",
));
@@ -200,7 +202,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for WebSocketWriter<W> {
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
- ) -> Poll<Result<usize, Error>> {
+ ) -> Poll<io::Result<usize>> {
let this = Pin::get_mut(self);
let frametype = match this.text {
@@ -233,18 +235,18 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for WebSocketWriter<W> {
},
Err(err) => {
eprintln!("error in writer: {}", err);
- return Poll::Ready(Err(Error::new(ErrorKind::Other, err)))
+ return Poll::Ready(Err(err))
},
}
}
}
- fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let this = Pin::get_mut(self);
Pin::new(&mut this.writer).poll_flush(cx)
}
- fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let this = Pin::get_mut(self);
Pin::new(&mut this.writer).poll_shutdown(cx)
}
@@ -303,7 +305,7 @@ impl FrameHeader {
/// # Ok(())
/// # }
/// ```
- pub fn try_from_bytes(data: &[u8]) -> Result<Result<FrameHeader, usize>, Error> {
+ pub fn try_from_bytes(data: &[u8]) -> io::Result<Result<FrameHeader, usize>> {
let len = data.len();
if len < 2 {
return Ok(Err(2 - len));
@@ -313,7 +315,7 @@ impl FrameHeader {
// we do not support extensions
if data[0] & 0b01110000 > 0 {
- return Err(Error::new(
+ return Err(io::Error::new(
ErrorKind::InvalidData,
"Extensions not supported",
));
@@ -328,12 +330,12 @@ impl FrameHeader {
9 => OpCode::Ping,
10 => OpCode::Pong,
other => {
- return Err(Error::new(ErrorKind::InvalidData, format!("Unknown OpCode {}", other)));
+ return Err(io::Error::new(ErrorKind::InvalidData, format!("Unknown OpCode {}", other)));
}
};
if !fin && frametype.is_control() {
- return Err(Error::new(
+ return Err(io::Error::new(
ErrorKind::InvalidData,
"Control frames cannot be fragmented",
));
@@ -366,7 +368,7 @@ impl FrameHeader {
}
if payload_len > 125 && frametype.is_control() {
- return Err(Error::new(
+ return Err(io::Error::new(
ErrorKind::InvalidData,
"Control frames cannot carry more than 125 bytes of data",
));
@@ -430,9 +432,15 @@ impl<R: AsyncReadExt> WebSocketReader<R> {
}
}
+struct ReadResult<R> {
+ len: usize,
+ reader: R,
+ buffer: ByteBuffer,
+}
+
enum ReaderState<R> {
NoData,
- WaitingForData(Pin<Box<dyn Future<Output = Result<(usize, R, ByteBuffer), Error>> + Send + 'static>>),
+ WaitingForData(Pin<Box<dyn Future<Output = io::Result<ReadResult<R>>> + Send + 'static>>),
HaveData,
}
@@ -443,7 +451,7 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
- ) -> Poll<Result<usize, Error>> {
+ ) -> Poll<io::Result<usize>> {
let this = Pin::get_mut(self);
let mut offset = 0;
@@ -452,25 +460,25 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
ReaderState::NoData => {
let mut reader = match this.reader.take() {
Some(reader) => reader,
- None => return Poll::Ready(Err(Error::new(ErrorKind::Other, "no reader"))),
+ None => return Poll::Ready(Err(io_format_err!("no reader"))),
};
let mut buffer = match this.read_buffer.take() {
Some(buffer) => buffer,
- None => return Poll::Ready(Err(Error::new(ErrorKind::Other, "no buffer"))),
+ None => return Poll::Ready(Err(io_format_err!("no buffer"))),
};
let future = async move {
buffer.read_from_async(&mut reader)
.await
- .map(move |len| (len, reader, buffer))
+ .map(move |len| ReadResult { len, reader, buffer })
};
this.state = ReaderState::WaitingForData(future.boxed());
},
ReaderState::WaitingForData(ref mut future) => {
match ready!(future.as_mut().poll(cx)) {
- Ok((len, reader, buffer)) => {
+ Ok(ReadResult { len, reader, buffer }) => {
this.reader = Some(reader);
this.read_buffer = Some(buffer);
this.state = ReaderState::HaveData;
@@ -478,13 +486,13 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
return Poll::Ready(Ok(0));
}
},
- Err(err) => return Poll::Ready(Err(Error::new(ErrorKind::Other, err))),
+ Err(err) => return Poll::Ready(Err(err)),
}
},
ReaderState::HaveData => {
let mut read_buffer = match this.read_buffer.take() {
Some(read_buffer) => read_buffer,
- None => return Poll::Ready(Err(Error::new(ErrorKind::Other, "no buffer"))),
+ None => return Poll::Ready(Err(io_format_err!("no buffer"))),
};
let mut header = match this.header.take() {
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox 6/9] proxmox/tools/websocket: improve mask_bytes and create_frame interface
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
` (4 preceding siblings ...)
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 5/9] proxmox/tools/websocket: use io::Error and Result explicitely Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-15 9:08 ` [pbs-devel] [PATCH proxmox v2 " Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 7/9] proxmox/tools/websocket: implement send_control_frame for writer Dominik Csapak
` (4 subsequent siblings)
10 siblings, 1 reply; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
by using a Box<[u8]> instead of a vector (we do not need it)
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/websocket.rs | 33 ++++++++++++++++-----------------
1 file changed, 16 insertions(+), 17 deletions(-)
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 078225a..cd09add 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -44,18 +44,17 @@ impl OpCode {
}
}
-fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
+fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Box<[u8]>) {
let mask = match mask {
- Some([0,0,0,0]) | None => return data,
+ Some([0,0,0,0]) | None => return,
Some(mask) => mask,
};
if data.len() < 32 {
- let mut_data = data.as_mut_slice();
- for i in 0..mut_data.len() {
- mut_data[i] ^= mask[i%4];
+ for i in 0..data.len() {
+ data[i] ^= mask[i%4];
}
- return data;
+ return;
}
let mut newmask: u32 = u32::from_le_bytes(mask);
@@ -75,8 +74,6 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
*s ^= newmask as u8;
newmask = newmask.rotate_right(8);
}
-
- data
}
/// Can be used to create a complete WebSocket Frame.
@@ -125,7 +122,7 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
/// ```
pub fn create_frame(
mask: Option<[u8; 4]>,
- mut data: Vec<u8>,
+ data: &[u8],
frametype: OpCode,
) -> io::Result<Vec<u8>> {
let first_byte = 0b10000000 | (frametype as u8);
@@ -155,8 +152,10 @@ pub fn create_frame(
if let Some(mask) = mask {
buf.extend_from_slice(&mask);
}
+ let mut data = data.to_vec().into_boxed_slice();
+ mask_bytes(mask, &mut data);
- buf.append(&mut mask_bytes(mask, &mut data));
+ buf.append(&mut data.into_vec());
Ok(buf)
}
@@ -212,7 +211,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for WebSocketWriter<W> {
if this.frame.is_none() {
// create frame buf
- let frame = match create_frame(this.mask, buf.to_vec(), frametype) {
+ let frame = match create_frame(this.mask, buf, frametype) {
Ok(f) => f,
Err(e) => {
return Poll::Ready(Err(e));
@@ -514,12 +513,11 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
if header.is_control_frame() {
if read_buffer.len() >= header.payload_len {
+ let mut data = read_buffer.remove_data(header.payload_len);
+ mask_bytes(header.mask, &mut data);
(this.callback)(
header.frametype,
- mask_bytes(
- header.mask,
- &mut read_buffer.remove_data(header.payload_len).into_vec(),
- ),
+ &data,
);
this.state = if read_buffer.is_empty() {
ReaderState::NoData
@@ -538,8 +536,9 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
let len = min(buf.len() - offset, min(header.payload_len, read_buffer.len()));
- let mut data = read_buffer.remove_data(len).into_vec();
- buf[offset..offset+len].copy_from_slice(mask_bytes(header.mask, &mut data));
+ let mut data = read_buffer.remove_data(len);
+ mask_bytes(header.mask, &mut data);
+ buf[offset..offset+len].copy_from_slice(&data);
offset += len;
header.payload_len -= len;
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox v2 6/9] proxmox/tools/websocket: improve mask_bytes and create_frame interface
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 6/9] proxmox/tools/websocket: improve mask_bytes and create_frame interface Dominik Csapak
@ 2020-07-15 9:08 ` Dominik Csapak
0 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-15 9:08 UTC (permalink / raw)
To: pbs-devel
by using a Box<[u8]> instead of a vector (we do not need it)
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
changes from v1:
adapt tests
proxmox/src/tools/websocket.rs | 41 +++++++++++++++++-----------------
1 file changed, 20 insertions(+), 21 deletions(-)
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 078225a..a5065b2 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -44,18 +44,17 @@ impl OpCode {
}
}
-fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
+fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Box<[u8]>) {
let mask = match mask {
- Some([0,0,0,0]) | None => return data,
+ Some([0,0,0,0]) | None => return,
Some(mask) => mask,
};
if data.len() < 32 {
- let mut_data = data.as_mut_slice();
- for i in 0..mut_data.len() {
- mut_data[i] ^= mask[i%4];
+ for i in 0..data.len() {
+ data[i] ^= mask[i%4];
}
- return data;
+ return;
}
let mut newmask: u32 = u32::from_le_bytes(mask);
@@ -75,8 +74,6 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
*s ^= newmask as u8;
newmask = newmask.rotate_right(8);
}
-
- data
}
/// Can be used to create a complete WebSocket Frame.
@@ -91,7 +88,7 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
/// # use std::io;
/// # fn main() -> io::Result<()> {
/// let data = vec![0,1,2,3,4];
-/// let frame = create_frame(None, data, OpCode::Text)?;
+/// let frame = create_frame(None, &data, OpCode::Text)?;
/// assert_eq!(frame, vec![0b10000001, 5, 0, 1, 2, 3, 4]);
/// # Ok(())
/// # }
@@ -104,7 +101,7 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
/// # use std::io;
/// # fn main() -> io::Result<()> {
/// let data = vec![0,1,2,3,4];
-/// let frame = create_frame(Some([0u8, 1u8, 2u8, 3u8]), data, OpCode::Text)?;
+/// let frame = create_frame(Some([0u8, 1u8, 2u8, 3u8]), &data, OpCode::Text)?;
/// assert_eq!(frame, vec![0b10000001, 0b10000101, 0, 1, 2, 3, 0, 0, 0, 0, 4]);
/// # Ok(())
/// # }
@@ -117,7 +114,7 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
/// # use std::io;
/// # fn main() -> io::Result<()> {
/// let data = vec![0,1,2,3,4];
-/// let frame = create_frame(None, data, OpCode::Ping)?;
+/// let frame = create_frame(None, &data, OpCode::Ping)?;
/// assert_eq!(frame, vec![0b10001001, 0b00000101, 0, 1, 2, 3, 4]);
/// # Ok(())
/// # }
@@ -125,7 +122,7 @@ fn mask_bytes(mask: Option<[u8; 4]>, data: &mut Vec<u8>) -> &mut Vec<u8> {
/// ```
pub fn create_frame(
mask: Option<[u8; 4]>,
- mut data: Vec<u8>,
+ data: &[u8],
frametype: OpCode,
) -> io::Result<Vec<u8>> {
let first_byte = 0b10000000 | (frametype as u8);
@@ -155,8 +152,10 @@ pub fn create_frame(
if let Some(mask) = mask {
buf.extend_from_slice(&mask);
}
+ let mut data = data.to_vec().into_boxed_slice();
+ mask_bytes(mask, &mut data);
- buf.append(&mut mask_bytes(mask, &mut data));
+ buf.append(&mut data.into_vec());
Ok(buf)
}
@@ -212,7 +211,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for WebSocketWriter<W> {
if this.frame.is_none() {
// create frame buf
- let frame = match create_frame(this.mask, buf.to_vec(), frametype) {
+ let frame = match create_frame(this.mask, buf, frametype) {
Ok(f) => f,
Err(e) => {
return Poll::Ready(Err(e));
@@ -285,7 +284,7 @@ impl FrameHeader {
/// # use proxmox::tools::websocket::*;
/// # use std::io;
/// # fn main() -> io::Result<()> {
- /// let frame = create_frame(None, vec![0,1,2,3], OpCode::Ping)?;
+ /// let frame = create_frame(None, &[0,1,2,3], OpCode::Ping)?;
/// let header = FrameHeader::try_from_bytes(&frame[..1])?;
/// match header {
/// Ok(_) => unreachable!(),
@@ -514,12 +513,11 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
if header.is_control_frame() {
if read_buffer.len() >= header.payload_len {
+ let mut data = read_buffer.remove_data(header.payload_len);
+ mask_bytes(header.mask, &mut data);
(this.callback)(
header.frametype,
- mask_bytes(
- header.mask,
- &mut read_buffer.remove_data(header.payload_len).into_vec(),
- ),
+ &data,
);
this.state = if read_buffer.is_empty() {
ReaderState::NoData
@@ -538,8 +536,9 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
let len = min(buf.len() - offset, min(header.payload_len, read_buffer.len()));
- let mut data = read_buffer.remove_data(len).into_vec();
- buf[offset..offset+len].copy_from_slice(mask_bytes(header.mask, &mut data));
+ let mut data = read_buffer.remove_data(len);
+ mask_bytes(header.mask, &mut data);
+ buf[offset..offset+len].copy_from_slice(&data);
offset += len;
header.payload_len -= len;
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox 7/9] proxmox/tools/websocket: implement send_control_frame for writer
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
` (5 preceding siblings ...)
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 6/9] proxmox/tools/websocket: improve mask_bytes and create_frame interface Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 8/9] proxmox/tools/websocket: replace CallBack with a channel Dominik Csapak
` (3 subsequent siblings)
10 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
so that we can easily send a control frame to the endpoint
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/websocket.rs | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index cd09add..2a2bfa4 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -194,6 +194,11 @@ impl<W: AsyncWrite + Unpin> WebSocketWriter<W> {
frame: None,
}
}
+
+ pub async fn send_control_frame(&mut self, mask: Option<[u8; 4]>, opcode: OpCode, data: &[u8]) -> Result<(), Error> {
+ let frame = create_frame(mask, data, opcode)?;
+ self.writer.write_all(&frame).await.map_err(Error::from)
+ }
}
impl<W: AsyncWrite + Unpin> AsyncWrite for WebSocketWriter<W> {
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox 8/9] proxmox/tools/websocket: replace CallBack with a channel
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
` (6 preceding siblings ...)
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 7/9] proxmox/tools/websocket: implement send_control_frame for writer Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 9/9] proxmox/tools/websocket: add WebSocket implementation Dominik Csapak
` (2 subsequent siblings)
10 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
instead of having a callback that we call on a control frame,
use a channel to send the data to a receiver
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/Cargo.toml | 2 +-
proxmox/src/tools/websocket.rs | 23 +++++++++++------------
2 files changed, 12 insertions(+), 13 deletions(-)
diff --git a/proxmox/Cargo.toml b/proxmox/Cargo.toml
index e72f358..5b7a4ff 100644
--- a/proxmox/Cargo.toml
+++ b/proxmox/Cargo.toml
@@ -57,7 +57,7 @@ api-macro = ["proxmox-api-macro"]
test-harness = []
cli = [ "router", "hyper", "tokio" ]
router = [ "hyper", "tokio" ]
-websocket = [ "tokio", "futures" ]
+websocket = [ "tokio", "futures", "tokio/sync" ]
# tools:
#valgrind = ["proxmox-tools/valgrind"]
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 2a2bfa4..1ff0927 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -12,6 +12,7 @@ use std::future::Future;
use tokio::io::{AsyncWrite, AsyncRead, AsyncReadExt};
use anyhow::{bail, format_err, Error};
+use tokio::sync::mpsc;
use futures::future::FutureExt;
use futures::ready;
@@ -400,9 +401,6 @@ impl FrameHeader {
}
}
-/// Callback for control frames
-pub type CallBack = fn(frametype: OpCode, payload: &[u8]);
-
/// Wraps a reader that implements AsyncRead and implements it itself.
///
/// On read, reads the underlying reader and tries to decode the frames and
@@ -412,7 +410,7 @@ pub type CallBack = fn(frametype: OpCode, payload: &[u8]);
/// Has an internal Buffer for storing incomplete headers.
pub struct WebSocketReader<R: AsyncRead> {
reader: Option<R>,
- callback: CallBack,
+ sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>,
read_buffer: Option<ByteBuffer>,
header: Option<FrameHeader>,
state: ReaderState<R>,
@@ -421,14 +419,14 @@ pub struct WebSocketReader<R: AsyncRead> {
impl<R: AsyncReadExt> WebSocketReader<R> {
/// Creates a new WebSocketReader with the given CallBack for control frames
/// and a default buffer size of 4096.
- pub fn new(reader: R, callback: CallBack) -> WebSocketReader<R> {
- Self::with_capacity(reader, callback, 4096)
+ pub fn new(reader: R, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
+ Self::with_capacity(reader, 4096, sender)
}
- pub fn with_capacity(reader: R, callback: CallBack, capacity: usize) -> WebSocketReader<R> {
+ pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
WebSocketReader {
reader: Some(reader),
- callback,
+ sender,
read_buffer: Some(ByteBuffer::with_capacity(capacity)),
header: None,
state: ReaderState::NoData,
@@ -518,12 +516,13 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
if header.is_control_frame() {
if read_buffer.len() >= header.payload_len {
+
let mut data = read_buffer.remove_data(header.payload_len);
mask_bytes(header.mask, &mut data);
- (this.callback)(
- header.frametype,
- &data,
- );
+ if let Err(err) = this.sender.send((header.frametype, data)) {
+ eprintln!("error sending control frame: {}", err);
+ }
+
this.state = if read_buffer.is_empty() {
ReaderState::NoData
} else {
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH proxmox 9/9] proxmox/tools/websocket: add WebSocket implementation
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
` (7 preceding siblings ...)
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 8/9] proxmox/tools/websocket: replace CallBack with a channel Dominik Csapak
@ 2020-07-14 11:09 ` Dominik Csapak
2020-07-15 8:15 ` [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Fabian Grünbichler
2020-07-15 13:00 ` [pbs-devel] applied: " Wolfgang Bumiller
10 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-14 11:09 UTC (permalink / raw)
To: pbs-devel
uses the existing WebSocketReader and Writer to establish a
two-way communication between an upstream and downstream connection.
The upstream connection sends and receives WebSocket frames, while
the downstream one only receives and sends raw data.
For now we do not support extensions, and only accept the protocol version 13
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/Cargo.toml | 3 +-
proxmox/src/tools/websocket.rs | 167 ++++++++++++++++++++++++++++++++-
2 files changed, 168 insertions(+), 2 deletions(-)
diff --git a/proxmox/Cargo.toml b/proxmox/Cargo.toml
index 5b7a4ff..d9fe53c 100644
--- a/proxmox/Cargo.toml
+++ b/proxmox/Cargo.toml
@@ -37,6 +37,7 @@ futures = { version = "0.3", optional = true }
http = "0.2"
hyper = { version = "0.13", optional = true }
percent-encoding = "2.1"
+openssl = { version = "0.10", optional = true }
rustyline = "6"
serde_derive = "1.0"
textwrap = "0.11"
@@ -57,7 +58,7 @@ api-macro = ["proxmox-api-macro"]
test-harness = []
cli = [ "router", "hyper", "tokio" ]
router = [ "hyper", "tokio" ]
-websocket = [ "tokio", "futures", "tokio/sync" ]
+websocket = [ "tokio", "futures", "tokio/sync", "openssl" ]
# tools:
#valgrind = ["proxmox-tools/valgrind"]
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 1ff0927..f8481f2 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -10,9 +10,21 @@ use std::cmp::min;
use std::io::{self, ErrorKind};
use std::future::Future;
-use tokio::io::{AsyncWrite, AsyncRead, AsyncReadExt};
+use futures::select;
use anyhow::{bail, format_err, Error};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
+use hyper::{Body, Response, StatusCode};
+use hyper::header::{
+ HeaderMap,
+ HeaderValue,
+ UPGRADE,
+ CONNECTION,
+ SEC_WEBSOCKET_KEY,
+ SEC_WEBSOCKET_PROTOCOL,
+ SEC_WEBSOCKET_VERSION,
+ SEC_WEBSOCKET_ACCEPT,
+};
use futures::future::FutureExt;
use futures::ready;
@@ -566,3 +578,156 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
}
}
}
+
+/// Global Identifier for WebSockets, see RFC6455
+pub const MAGIC_WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+
+/// Provides methods for connecting a WebSocket endpoint with another
+pub struct WebSocket {
+ text: bool,
+}
+
+impl WebSocket {
+ /// Returns a new WebSocket instance and the generates the correct
+ /// WebSocket response from request headers
+ pub fn new(headers: HeaderMap<HeaderValue>) -> Result<(Self, Response<Body>), Error> {
+ let protocols = headers
+ .get(UPGRADE)
+ .ok_or_else(|| format_err!("missing Upgrade header"))?
+ .to_str()?;
+
+ let version = headers
+ .get(SEC_WEBSOCKET_VERSION)
+ .ok_or_else(|| format_err!("missing websocket version"))?
+ .to_str()?;
+
+ let key = headers
+ .get(SEC_WEBSOCKET_KEY)
+ .ok_or_else(|| format_err!("missing websocket key"))?
+ .to_str()?;
+
+ let ws_proto = headers
+ .get(SEC_WEBSOCKET_PROTOCOL)
+ .ok_or_else(|| format_err!("missing websocket key"))?
+ .to_str()?;
+
+ let text = ws_proto == "text";
+
+ if protocols != "websocket" {
+ bail!("invalid protocol name");
+ }
+
+ if version != "13" {
+ bail!("invalid websocket version");
+ }
+
+ // we ignore extensions
+
+ let mut sha1 = openssl::sha::Sha1::new();
+ let data = format!("{}{}", key, MAGIC_WEBSOCKET_GUID);
+ sha1.update(data.as_bytes());
+ let response_key = base64::encode(sha1.finish());
+
+ let response = Response::builder()
+ .status(StatusCode::SWITCHING_PROTOCOLS)
+ .header(UPGRADE, HeaderValue::from_static("websocket"))
+ .header(CONNECTION, HeaderValue::from_static("Upgrade"))
+ .header(SEC_WEBSOCKET_ACCEPT, response_key)
+ .header(SEC_WEBSOCKET_PROTOCOL, ws_proto)
+ .body(Body::empty())?;
+
+ Ok((Self { text }, response))
+ }
+
+ async fn copy_to_websocket<R, W>(
+ mut reader: &mut R,
+ writer: &mut WebSocketWriter<W>,
+ receiver: &mut mpsc::UnboundedReceiver<(OpCode, Box<[u8]>)>) -> Result<bool, Error>
+ where
+ R: AsyncRead + Unpin + Send,
+ W: AsyncWrite + Unpin + Send,
+ {
+ let mut buf = ByteBuffer::new();
+ let mut eof = false;
+ loop {
+ if !buf.is_full() {
+ let bytes = select!{
+ res = buf.read_from_async(&mut reader).fuse() => res?,
+ res = receiver.recv().fuse() => {
+ let (opcode, msg) = res.ok_or(format_err!("control channel closed"))?;
+ match opcode {
+ OpCode::Ping => {
+ writer.send_control_frame(None, OpCode::Pong, &msg).await?;
+ continue;
+ }
+ OpCode::Close => {
+ writer.send_control_frame(None, OpCode::Close, &msg).await?;
+ return Ok(true);
+ }
+ _ => {
+ // ignore other frames
+ continue;
+ }
+ }
+ }
+ };
+
+ if bytes == 0 {
+ eof = true;
+ }
+ }
+ if buf.len() > 0 {
+ let bytes = writer.write(&buf).await?;
+ if bytes == 0 {
+ eof = true;
+ }
+ buf.consume(bytes);
+ }
+
+ if eof && buf.is_empty() {
+ return Ok(false);
+ }
+ }
+ }
+
+ /// Takes two endpoints and connects them via a websocket, where the
+ /// 'upstream' endpoint sends and receives WebSocket frames, while
+ /// 'downstream' only expects and sends raw data.
+ /// This method takes care of copying the data between endpoints, and
+ /// sending correct responses for control frames (e.g. a Pont to a Ping).
+ pub async fn serve_connection<S, L>(&self, upstream: S, downstream: L) -> Result<(), Error>
+ where
+ S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ L: AsyncRead + AsyncWrite + Unpin + Send,
+ {
+
+ let (usreader, uswriter) = tokio::io::split(upstream);
+ let (mut dsreader, mut dswriter) = tokio::io::split(downstream);
+
+ let (tx, mut rx) = mpsc::unbounded_channel();
+ let mut wsreader = WebSocketReader::new(usreader, tx);
+ let mut wswriter = WebSocketWriter::new(None, self.text, uswriter);
+
+
+ let ws_future = tokio::io::copy(&mut wsreader, &mut dswriter);
+ let term_future = Self::copy_to_websocket(&mut dsreader, &mut wswriter, &mut rx);
+
+ let res = select!{
+ res = ws_future.fuse() => match res {
+ Ok(_) => Ok(()),
+ Err(err) => Err(Error::from(err)),
+ },
+ res = term_future.fuse() => match res {
+ Ok(sent_close) if !sent_close => {
+ // status code 1000 => 0x03E8
+ wswriter.send_control_frame(None, OpCode::Close, &[0x03, 0xE8]).await?;
+ Ok(())
+ }
+ Ok(_) => Ok(()),
+ Err(err) => Err(err),
+ }
+ };
+
+ res
+ }
+}
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
` (8 preceding siblings ...)
2020-07-14 11:09 ` [pbs-devel] [PATCH proxmox 9/9] proxmox/tools/websocket: add WebSocket implementation Dominik Csapak
@ 2020-07-15 8:15 ` Fabian Grünbichler
2020-07-15 8:31 ` Dominik Csapak
2020-07-15 13:00 ` [pbs-devel] applied: " Wolfgang Bumiller
10 siblings, 1 reply; 14+ messages in thread
From: Fabian Grünbichler @ 2020-07-15 8:15 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
$ cargo test
[...]
test src/api/const_regex.rs - const_regex (line 32) ... ok
test src/tools/vec.rs - tools::vec (line 19) ... ok
test src/api/router.rs - api::router::Router (line 216) ... ok
test src/tools/websocket.rs - tools::websocket::FrameHeader::try_from_bytes (line 301) ... FAILED
test src/tools/websocket.rs - tools::websocket::create_frame (line 112) ... FAILED
test src/tools/websocket.rs - tools::websocket::create_frame (line 125) ... FAILED
test src/tools/websocket.rs - tools::websocket::WebSocketWriter (line 182) ... ok
test src/tools/websocket.rs - tools::websocket::create_frame (line 99) ... FAILED
test src/tools/vec/byte_vec.rs - tools::vec::byte_vec (line 4) ... ok
test src/tools/vec/byte_vec.rs - tools::vec::byte_vec::ByteVecExt (line 15) ... ok
test src/tools/vec/byte_vec.rs - tools::vec::byte_vec::ByteVecExt::grow_uninitialized (line 51) ... ok
[...]
?
On July 14, 2020 1:09 pm, Dominik Csapak wrote:
> this series adds the necessary bits and pieces for using a
> websocket connection in the api
>
> Dominik Csapak (9):
> proxmox/tools/byte_buffer: improve ByteBuffer interface
> proxmox/tools/byte_buffer: impl Default
> proxmox/tools/websocket: use ready macro for WebSocketWriter
> proxmox/tools/websocket: correctly return eof
> proxmox/tools/websocket: use io::Error and Result explicitely
> proxmox/tools/websocket: improve mask_bytes and create_frame interface
> proxmox/tools/websocket: implement send_control_frame for writer
> proxmox/tools/websocket: replace CallBack with a channel
> proxmox/tools/websocket: add WebSocket implementation
>
> proxmox/Cargo.toml | 3 +-
> proxmox/src/tools/byte_buffer.rs | 97 ++++++----
> proxmox/src/tools/websocket.rs | 296 +++++++++++++++++++++++++------
> 3 files changed, 306 insertions(+), 90 deletions(-)
>
> --
> 2.20.1
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api
2020-07-15 8:15 ` [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Fabian Grünbichler
@ 2020-07-15 8:31 ` Dominik Csapak
0 siblings, 0 replies; 14+ messages in thread
From: Dominik Csapak @ 2020-07-15 8:31 UTC (permalink / raw)
To: pbs-devel, Fabian Grünbichler
On 7/15/20 10:15 AM, Fabian Grünbichler wrote:
> $ cargo test
> [...]
> test src/api/const_regex.rs - const_regex (line 32) ... ok
> test src/tools/vec.rs - tools::vec (line 19) ... ok
> test src/api/router.rs - api::router::Router (line 216) ... ok
> test src/tools/websocket.rs - tools::websocket::FrameHeader::try_from_bytes (line 301) ... FAILED
> test src/tools/websocket.rs - tools::websocket::create_frame (line 112) ... FAILED
> test src/tools/websocket.rs - tools::websocket::create_frame (line 125) ... FAILED
> test src/tools/websocket.rs - tools::websocket::WebSocketWriter (line 182) ... ok
> test src/tools/websocket.rs - tools::websocket::create_frame (line 99) ... FAILED
> test src/tools/vec/byte_vec.rs - tools::vec::byte_vec (line 4) ... ok
> test src/tools/vec/byte_vec.rs - tools::vec::byte_vec::ByteVecExt (line 15) ... ok
> test src/tools/vec/byte_vec.rs - tools::vec::byte_vec::ByteVecExt::grow_uninitialized (line 51) ... ok
> [...]
>
> ?
>
yes, sorry. seems i forgot to run the tests after changing the
create_frame interface....
i'll send a v2 of that patch (6/9; or should i send the whole series again?)
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] applied: [PATCH proxmox 0/9] preparation for websocket api
2020-07-14 11:09 [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Dominik Csapak
` (9 preceding siblings ...)
2020-07-15 8:15 ` [pbs-devel] [PATCH proxmox 0/9] preparation for websocket api Fabian Grünbichler
@ 2020-07-15 13:00 ` Wolfgang Bumiller
10 siblings, 0 replies; 14+ messages in thread
From: Wolfgang Bumiller @ 2020-07-15 13:00 UTC (permalink / raw)
To: Dominik Csapak; +Cc: pbs-devel
applied series, added a minor Cargo.toml fixup
(expecting some minor follow-ups)
On Tue, Jul 14, 2020 at 01:09:48PM +0200, Dominik Csapak wrote:
> this series adds the necessary bits and pieces for using a
> websocket connection in the api
>
> Dominik Csapak (9):
> proxmox/tools/byte_buffer: improve ByteBuffer interface
> proxmox/tools/byte_buffer: impl Default
> proxmox/tools/websocket: use ready macro for WebSocketWriter
> proxmox/tools/websocket: correctly return eof
> proxmox/tools/websocket: use io::Error and Result explicitely
> proxmox/tools/websocket: improve mask_bytes and create_frame interface
> proxmox/tools/websocket: implement send_control_frame for writer
> proxmox/tools/websocket: replace CallBack with a channel
> proxmox/tools/websocket: add WebSocket implementation
>
> proxmox/Cargo.toml | 3 +-
> proxmox/src/tools/byte_buffer.rs | 97 ++++++----
> proxmox/src/tools/websocket.rs | 296 +++++++++++++++++++++++++------
> 3 files changed, 306 insertions(+), 90 deletions(-)
>
> --
> 2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread