From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id CD4557DA61 for ; Tue, 9 Nov 2021 13:54:13 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id BDC38D986 for ; Tue, 9 Nov 2021 13:54:13 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 23D19D979 for ; Tue, 9 Nov 2021 13:54:11 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id E997346059; Tue, 9 Nov 2021 13:54:10 +0100 (CET) Message-ID: <54793fe7-f8fe-1b08-2ecb-c1a758b255fa@proxmox.com> Date: Tue, 9 Nov 2021 13:54:07 +0100 MIME-Version: 1.0 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:95.0) Gecko/20100101 Thunderbird/95.0 Content-Language: en-US To: Proxmox VE development discussion , =?UTF-8?Q?Fabian_Gr=c3=bcnbichler?= References: <20211105130359.40803-1-f.gruenbichler@proxmox.com> <20211105130359.40803-4-f.gruenbichler@proxmox.com> From: Dominik Csapak In-Reply-To: <20211105130359.40803-4-f.gruenbichler@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 1.765 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment NICE_REPLY_A -3.06 Looks like a legit reply (A) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [main.rs] Subject: Re: [pve-devel] [PATCH proxmox-websocket-tunnel 2/4] add tunnel implementation X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 09 Nov 2021 12:54:13 -0000 looks mostly fine, some comments inline On 11/5/21 14:03, Fabian Grünbichler wrote: > the websocket tunnel helper accepts control commands (encoded as > single-line JSON) on stdin, and prints responses on stdout. > > the following commands are available: > - "connect" a 'control' tunnel via a websocket > - "forward" a local unix socket to a remote socket via a websocket > -- if requested, this will ask for a ticket via the control tunnel after > accepting a new connection on the unix socket > - "close" the control tunnel and any forwarded socket > > any other json input (without the 'control' flag set) is forwarded as-is > to the remote end of the control tunnel. > > internally, the tunnel helper will spawn tokio tasks for > - handling the control tunnel connection (new commands are passed in via > an mpsc channel together with a oneshot channel for the response) > - handling accepting new connections on each forwarded unix socket > - handling forwarding data over accepted forwarded connections > > Signed-off-by: Fabian Grünbichler > --- > > Notes: > requires proxmox-http with changes and bumped version > > Cargo.toml | 13 ++ > src/main.rs | 410 ++++++++++++++++++++++++++++++++++++++++++++++++++++ > 2 files changed, 423 insertions(+) > create mode 100644 src/main.rs > > diff --git a/Cargo.toml b/Cargo.toml > index 939184c..9d2a8c6 100644 > --- a/Cargo.toml > +++ b/Cargo.toml > @@ -9,3 +9,16 @@ description = "Proxmox websocket tunneling helper" > exclude = ["debian"] > > [dependencies] > +anyhow = "1.0" > +base64 = "0.12" > +futures = "0.3" > +futures-util = "0.3" > +hyper = { version = "0.14" } > +openssl = "0.10" > +percent-encoding = "2" > +proxmox-http = { version = "0.5.2", path = "../proxmox/proxmox-http", features = ["websocket", "client"] } > +serde = { version = "1.0", features = ["derive"] } > +serde_json = "1.0" > +tokio = { version = "1", features = ["io-std", "io-util", "macros", "rt-multi-thread", "sync"] } > +tokio-stream = { version = "0.1", features = ["io-util"] } > +tokio-util = "0.6" > diff --git a/src/main.rs b/src/main.rs > new file mode 100644 > index 0000000..150c1cf > --- /dev/null > +++ b/src/main.rs > @@ -0,0 +1,410 @@ > +use anyhow::{bail, format_err, Error}; > + > +use std::collections::VecDeque; > +use std::sync::{Arc, Mutex}; > + > +use futures::future::FutureExt; > +use futures::select; > + > +use hyper::client::{Client, HttpConnector}; > +use hyper::header::{SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE}; > +use hyper::upgrade::Upgraded; > +use hyper::{Body, Request, StatusCode}; > + > +use openssl::ssl::{SslConnector, SslMethod}; > +use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; > + > +use serde::{Deserialize, Serialize}; > +use serde_json::{Map, Value}; > +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; > +use tokio::net::{UnixListener, UnixStream}; > +use tokio::sync::{mpsc, oneshot}; > +use tokio_stream::wrappers::LinesStream; > +use tokio_stream::StreamExt; > + > +use proxmox_http::client::HttpsConnector; > +use proxmox_http::websocket::{OpCode, WebSocket, WebSocketReader, WebSocketWriter}; > + > +#[derive(Serialize, Deserialize, Debug)] > +#[serde(rename_all = "kebab-case")] > +enum CmdType { > + Connect, > + Forward, > + CloseCmd, this is never used > + NonControl, > +} > + > +type CmdData = Map; > + > +#[derive(Serialize, Deserialize, Debug)] > +#[serde(rename_all = "kebab-case")] > +struct ConnectCmdData { > + // target URL for WS connection > + url: String, > + // fingerprint of TLS certificate > + fingerprint: Option, > + // addition headers such as authorization > + headers: Option>, > +} > + > +#[derive(Serialize, Deserialize, Debug, Clone)] > +#[serde(rename_all = "kebab-case")] > +struct ForwardCmdData { > + // target URL for WS connection > + url: String, > + // addition headers such as authorization > + headers: Option>, > + // fingerprint of TLS certificate > + fingerprint: Option, > + // local UNIX socket path for forwarding > + unix: String, > + // request ticket using these parameters > + ticket: Option>, > +} > + > +struct CtrlTunnel { > + sender: Option)>>, > + forwarded: Arc>>>, for now, this is really not used (see my comments further below) > +} > + > +impl CtrlTunnel { > + async fn read_cmd_loop(mut self) -> Result<(), Error> { > + let mut stdin_stream = LinesStream::new(BufReader::new(tokio::io::stdin()).lines()); > + while let Some(res) = stdin_stream.next().await { > + match res { > + Ok(line) => { > + let (cmd_type, data) = Self::parse_cmd(&line)?; > + match cmd_type { > + CmdType::Connect => self.handle_connect_cmd(data).await, > + CmdType::Forward => { > + let res = self.handle_forward_cmd(data).await; > + match &res { > + Ok(()) => println!("{}", serde_json::json!({"success": true})), > + Err(msg) => println!( > + "{}", > + serde_json::json!({"success": false, "msg": msg.to_string()}) > + ), > + }; > + res > + } > + CmdType::NonControl => self > + .handle_tunnel_cmd(data) > + .await > + .map(|res| println!("{}", res)), > + _ => unimplemented!(), > + } > + } > + Err(err) => bail!("Failed to read from STDIN - {}", err), > + }?; > + } > + > + Ok(()) > + } > + > + fn parse_cmd(line: &str) -> Result<(CmdType, CmdData), Error> { > + let mut json: Map = serde_json::from_str(line)?; > + match json.remove("control") { > + Some(Value::Bool(true)) => { > + match json.remove("cmd").map(serde_json::from_value::) { > + None => bail!("input has 'control' flag, but no control 'cmd' set.."), > + Some(Err(e)) => bail!("failed to parse control cmd - {}", e), > + Some(Ok(cmd_type)) => Ok((cmd_type, json)), > + } > + } > + _ => Ok((CmdType::NonControl, json)), > + } > + } > + > + async fn websocket_connect( > + url: String, > + extra_headers: Vec<(String, String)>, > + fingerprint: Option, > + ) -> Result { > + let ws_key = proxmox::sys::linux::random_data(16)?; > + let ws_key = base64::encode(&ws_key); > + let mut req = Request::builder() > + .uri(url) > + .header(UPGRADE, "websocket") > + .header(SEC_WEBSOCKET_VERSION, "13") > + .header(SEC_WEBSOCKET_KEY, ws_key) > + .body(Body::empty()) > + .unwrap(); > + > + let headers = req.headers_mut(); > + for (name, value) in extra_headers { > + let name = hyper::header::HeaderName::from_bytes(name.as_bytes())?; > + let value = hyper::header::HeaderValue::from_str(&value)?; > + headers.insert(name, value); > + } > + > + let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); not sure if this unwrap cannot fail though? > + if fingerprint.is_some() { > + // FIXME actually verify fingerprint via callback! > + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); > + } else { > + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::PEER); > + } > + > + let mut httpc = HttpConnector::new(); > + httpc.enforce_http(false); // we want https... > + httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0))); > + let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), 120); > + > + let client = Client::builder().build::<_, Body>(https); > + let res = client.request(req).await?; > + if res.status() != StatusCode::SWITCHING_PROTOCOLS { > + bail!("server didn't upgrade: {}", res.status()); > + } > + > + hyper::upgrade::on(res) > + .await > + .map_err(|err| format_err!("failed to upgrade - {}", err)) > + } > + > + async fn handle_connect_cmd(&mut self, mut data: CmdData) -> Result<(), Error> { > + let mut data: ConnectCmdData = data > + .remove("data") > + .ok_or_else(|| format_err!("'connect' command missing 'data'")) > + .map(serde_json::from_value)??; > + > + if self.sender.is_some() { > + bail!("already connected!"); > + } > + > + let upgraded = Self::websocket_connect( > + data.url.clone(), > + data.headers.take().unwrap_or_else(Vec::new), > + data.fingerprint.take(), > + ) > + .await?; > + > + let (tx, rx) = mpsc::unbounded_channel(); > + self.sender = Some(tx); > + tokio::spawn(async move { > + if let Err(err) = Self::handle_ctrl_tunnel(upgraded, rx).await { > + eprintln!("Tunnel to {} failed - {}", data.url, err); > + } > + }); > + > + Ok(()) > + } > + > + async fn handle_forward_cmd(&mut self, mut data: CmdData) -> Result<(), Error> { > + let data: ForwardCmdData = data > + .remove("data") > + .ok_or_else(|| format_err!("'forward' command missing 'data'")) > + .map(serde_json::from_value)??; > + > + if self.sender.is_none() && data.ticket.is_some() { > + bail!("dynamically requesting ticket requires cmd tunnel connection!"); > + } > + > + let unix_listener = UnixListener::bind(data.unix.clone()).unwrap(); it would be better to bubble the error up instead of unwrapping here (AFAICS, the rest of the unwraps are ok, since they cannot really fail?) > + let (tx, rx) = oneshot::channel(); > + let data = Arc::new(data); > + > + self.forwarded.lock().unwrap().push(tx); we push the 'tx' here into the forwarded vec, but never use it again (no other 'lock()' call in the file) > + let cmd_sender = self.sender.clone(); > + > + tokio::spawn(async move { > + let mut rx = rx.fuse(); > + let mut tasks: Vec> = Vec::new(); > + loop { > + let accept = unix_listener.accept().fuse(); > + tokio::pin!(accept); > + let data2 = data.clone(); > + select! { > + _ = rx => { > + eprintln!("received shutdown signal, closing unix listener stream and forwarding handlers"); > + for task in tasks { > + task.abort(); > + } > + break; > + }, which makes this branch dead code so i'd drop the forwarded part and simplify this to match unix_listener.accept().await { ... } > + res = accept => match res { > + Ok((unix_stream, _)) => { > + eprintln!("accepted new connection on '{}'", data2.unix); > + let data3: Result, Error> = match (&cmd_sender, &data2.ticket) { > + (Some(cmd_sender), Some(_)) => Self::get_ticket(cmd_sender, data2.clone()).await,\ the get_ticket could probably be inside the 'handle_forward_tunnel' this way, another client could connect while the first ticket is checked. not necessary for now though, since we do not connect in parallel atm > + _ => Ok(data2.clone()), > + }; > + > + match data3 { > + Ok(data3) => { > + let task = tokio::spawn(async move { > + if let Err(err) = Self::handle_forward_tunnel(data3.clone(), unix_stream).await { > + eprintln!("Tunnel for {} failed - {}", data3.unix, err); > + } > + }); > + tasks.push(task); > + }, > + Err(err) => { > + eprintln!("Failed to accept unix connection - {}", err); > + }, > + }; > + }, > + Err(err) => eprintln!("Failed to accept unix connection on {} - {}", data2.unix, err), > + }, > + }; > + } > + }); > + > + Ok(()) > + } > + > + async fn handle_tunnel_cmd(&mut self, data: CmdData) -> Result { > + match &mut self.sender { > + None => bail!("not connected!"), > + Some(sender) => { > + let data: Value = data.into(); > + let (tx, rx) = oneshot::channel::(); > + if let Some(cmd) = data.get("cmd") { > + eprintln!("-> sending command {} to remote", cmd); > + } else { > + eprintln!("-> sending data line to remote"); > + } > + sender.send((data, tx))?; > + let res = rx.await?; > + eprintln!("<- got reply"); > + Ok(res) > + } > + } > + } > + > + async fn handle_ctrl_tunnel( > + websocket: Upgraded, > + mut cmd_receiver: mpsc::UnboundedReceiver<(Value, oneshot::Sender)>, > + ) -> Result<(), Error> { > + let (tunnel_reader, tunnel_writer) = tokio::io::split(websocket); > + let (ws_close_tx, mut ws_close_rx) = mpsc::unbounded_channel(); > + let ws_reader = WebSocketReader::new(tunnel_reader, ws_close_tx); > + let mut ws_writer = WebSocketWriter::new(Some([0, 0, 0, 0]), tunnel_writer); > + > + let mut framed_reader = > + tokio_util::codec::FramedRead::new(ws_reader, tokio_util::codec::LinesCodec::new()); > + > + let mut resp_tx_queue: VecDeque> = VecDeque::new(); > + let mut shutting_down = false; > + > + loop { > + let mut close_future = ws_close_rx.recv().boxed().fuse(); > + let mut frame_future = framed_reader.next().boxed().fuse(); > + let mut cmd_future = cmd_receiver.recv().boxed().fuse(); > + > + select! { > + res = close_future => { > + let res = res.ok_or_else(|| format_err!("WS control channel closed"))?; > + eprintln!("WS: received control message: '{:?}'", res); > + shutting_down = true; > + }, > + res = frame_future => { > + match res { > + None if shutting_down => { > + eprintln!("WS closed"); > + break; > + }, > + None => bail!("WS closed unexpectedly"), > + Some(Ok(res)) => { > + resp_tx_queue > + .pop_front() > + .ok_or_else(|| format_err!("no response handler"))? > + .send(res) > + .map_err(|msg| format_err!("failed to send tunnel response '{}' back to requester - receiver already closed?", msg))?; > + }, > + Some(Err(err)) => { > + bail!("reading from control tunnel failed - WS receive failed: {}", err); > + }, > + } > + }, > + res = cmd_future => { > + if shutting_down { continue }; > + match res { > + None => { > + eprintln!("CMD channel closed, shutting down"); > + ws_writer.send_control_frame(Some([1,2,3,4]), OpCode::Close, &[]).await?; > + shutting_down = true; > + }, > + Some((msg, resp_tx)) => { > + resp_tx_queue.push_back(resp_tx); > + > + let line = format!("{}\n", msg); > + ws_writer.write_all(line.as_bytes()).await?; > + ws_writer.flush().await?; > + }, > + } > + }, > + }; > + } > + > + Ok(()) > + } > + > + async fn handle_forward_tunnel( > + data: Arc, > + unix: UnixStream, > + ) -> Result<(), Error> { > + let upgraded = Self::websocket_connect( > + data.url.clone(), > + data.headers.clone().unwrap_or_else(Vec::new), > + data.fingerprint.clone(), > + ) > + .await?; > + > + let ws = WebSocket { > + mask: Some([0, 0, 0, 0]), > + }; > + eprintln!("established new WS for forwarding '{}'", data.unix); > + ws.serve_connection(upgraded, unix).await?; > + > + eprintln!("done handling forwarded connection from '{}'", data.unix); > + > + Ok(()) > + } > + > + async fn get_ticket( > + cmd_sender: &mpsc::UnboundedSender<(Value, oneshot::Sender)>, > + cmd_data: Arc, > + ) -> Result, Error> { > + eprintln!("requesting WS ticket via tunnel"); > + let ticket_cmd = match cmd_data.ticket.clone() { > + Some(mut ticket_cmd) => { > + ticket_cmd.insert("cmd".to_string(), serde_json::json!("ticket")); > + ticket_cmd > + } > + None => bail!("can't get ticket without ticket parameters"), > + }; > + let (tx, rx) = oneshot::channel::(); > + cmd_sender.send((serde_json::json!(ticket_cmd), tx))?; > + let ticket = rx.await?; > + let mut ticket: Map = serde_json::from_str(&ticket)?; > + let ticket = ticket > + .remove("ticket") > + .ok_or_else(|| format_err!("failed to retrieve ticket via tunnel"))?; > + > + let ticket = ticket > + .as_str() > + .ok_or_else(|| format_err!("failed to format received ticket"))?; > + let ticket = utf8_percent_encode(&ticket, NON_ALPHANUMERIC).to_string(); > + > + let mut data = cmd_data.clone(); > + let mut url = data.url.clone(); > + url.push_str("ticket="); > + url.push_str(&ticket); > + let mut d = Arc::make_mut(&mut data); > + d.url = url; > + Ok(data) > + } > +} > + > +#[tokio::main] > +async fn main() -> Result<(), Error> { > + do_main().await > +} > + > +async fn do_main() -> Result<(), Error> { > + let tunnel = CtrlTunnel { > + sender: None, > + forwarded: Arc::new(Mutex::new(Vec::new())), > + }; > + tunnel.read_cmd_loop().await > +} >