From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id E9EF762D7E for ; Tue, 14 Jul 2020 13:10:01 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id E814B26192 for ; Tue, 14 Jul 2020 13:10:01 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id C7D9626143 for ; Tue, 14 Jul 2020 13:09:58 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 91C004421C for ; Tue, 14 Jul 2020 13:09:58 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Tue, 14 Jul 2020 13:09:56 +0200 Message-Id: <20200714110957.31884-9-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200714110957.31884-1-d.csapak@proxmox.com> References: <20200714110957.31884-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_LAZY_DOMAIN_SECURITY 1 Sending domain does not have any anti-forgery methods NO_DNS_FOR_FROM 0.379 Envelope sender has no MX or A DNS records RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_NONE 0.001 SPF: sender does not publish an SPF Record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [websocket.rs] Subject: [pbs-devel] [PATCH proxmox 8/9] proxmox/tools/websocket: replace CallBack with a channel X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 14 Jul 2020 11:10:02 -0000 instead of having a callback that we call on a control frame, use a channel to send the data to a receiver Signed-off-by: Dominik Csapak --- proxmox/Cargo.toml | 2 +- proxmox/src/tools/websocket.rs | 23 +++++++++++------------ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/proxmox/Cargo.toml b/proxmox/Cargo.toml index e72f358..5b7a4ff 100644 --- a/proxmox/Cargo.toml +++ b/proxmox/Cargo.toml @@ -57,7 +57,7 @@ api-macro = ["proxmox-api-macro"] test-harness = [] cli = [ "router", "hyper", "tokio" ] router = [ "hyper", "tokio" ] -websocket = [ "tokio", "futures" ] +websocket = [ "tokio", "futures", "tokio/sync" ] # tools: #valgrind = ["proxmox-tools/valgrind"] diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs index 2a2bfa4..1ff0927 100644 --- a/proxmox/src/tools/websocket.rs +++ b/proxmox/src/tools/websocket.rs @@ -12,6 +12,7 @@ use std::future::Future; use tokio::io::{AsyncWrite, AsyncRead, AsyncReadExt}; use anyhow::{bail, format_err, Error}; +use tokio::sync::mpsc; use futures::future::FutureExt; use futures::ready; @@ -400,9 +401,6 @@ impl FrameHeader { } } -/// Callback for control frames -pub type CallBack = fn(frametype: OpCode, payload: &[u8]); - /// Wraps a reader that implements AsyncRead and implements it itself. /// /// On read, reads the underlying reader and tries to decode the frames and @@ -412,7 +410,7 @@ pub type CallBack = fn(frametype: OpCode, payload: &[u8]); /// Has an internal Buffer for storing incomplete headers. pub struct WebSocketReader { reader: Option, - callback: CallBack, + sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>, read_buffer: Option, header: Option, state: ReaderState, @@ -421,14 +419,14 @@ pub struct WebSocketReader { impl WebSocketReader { /// Creates a new WebSocketReader with the given CallBack for control frames /// and a default buffer size of 4096. - pub fn new(reader: R, callback: CallBack) -> WebSocketReader { - Self::with_capacity(reader, callback, 4096) + pub fn new(reader: R, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader { + Self::with_capacity(reader, 4096, sender) } - pub fn with_capacity(reader: R, callback: CallBack, capacity: usize) -> WebSocketReader { + pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader { WebSocketReader { reader: Some(reader), - callback, + sender, read_buffer: Some(ByteBuffer::with_capacity(capacity)), header: None, state: ReaderState::NoData, @@ -518,12 +516,13 @@ impl AsyncRead for WebSocketReader if header.is_control_frame() { if read_buffer.len() >= header.payload_len { + let mut data = read_buffer.remove_data(header.payload_len); mask_bytes(header.mask, &mut data); - (this.callback)( - header.frametype, - &data, - ); + if let Err(err) = this.sender.send((header.frametype, data)) { + eprintln!("error sending control frame: {}", err); + } + this.state = if read_buffer.is_empty() { ReaderState::NoData } else { -- 2.20.1