From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 7A3577E9C9 for ; Thu, 11 Nov 2021 10:58:20 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 6F5F28480 for ; Thu, 11 Nov 2021 10:58:20 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id E48CA8474 for ; Thu, 11 Nov 2021 10:58:18 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id BD5B142F4F for ; Thu, 11 Nov 2021 10:58:18 +0100 (CET) Date: Thu, 11 Nov 2021 10:58:10 +0100 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Dominik Csapak , Proxmox VE development discussion References: <20211105130359.40803-1-f.gruenbichler@proxmox.com> <20211105130359.40803-4-f.gruenbichler@proxmox.com> <54793fe7-f8fe-1b08-2ecb-c1a758b255fa@proxmox.com> In-Reply-To: <54793fe7-f8fe-1b08-2ecb-c1a758b255fa@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.15.0 (https://github.com/astroidmail/astroid) Message-Id: <1636622373.ca0eujna56.astroid@nora.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.282 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pve-devel] [PATCH proxmox-websocket-tunnel 2/4] add tunnel implementation X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 11 Nov 2021 09:58:20 -0000 On November 9, 2021 1:54 pm, Dominik Csapak wrote: > looks mostly fine, some comments inline thanks! > On 11/5/21 14:03, Fabian Gr=C3=BCnbichler wrote: >> the websocket tunnel helper accepts control commands (encoded as >> single-line JSON) on stdin, and prints responses on stdout. >>=20 >> the following commands are available: >> - "connect" a 'control' tunnel via a websocket >> - "forward" a local unix socket to a remote socket via a websocket >> -- if requested, this will ask for a ticket via the control tunnel after >> accepting a new connection on the unix socket >> - "close" the control tunnel and any forwarded socket >>=20 >> any other json input (without the 'control' flag set) is forwarded as-is >> to the remote end of the control tunnel. >>=20 >> internally, the tunnel helper will spawn tokio tasks for >> - handling the control tunnel connection (new commands are passed in via >> an mpsc channel together with a oneshot channel for the response) >> - handling accepting new connections on each forwarded unix socket >> - handling forwarding data over accepted forwarded connections >>=20 >> Signed-off-by: Fabian Gr=C3=BCnbichler >> --- >>=20 >> Notes: >> requires proxmox-http with changes and bumped version >>=20 >> Cargo.toml | 13 ++ >> src/main.rs | 410 ++++++++++++++++++++++++++++++++++++++++++++++++++++ >> 2 files changed, 423 insertions(+) >> create mode 100644 src/main.rs >>=20 >> diff --git a/Cargo.toml b/Cargo.toml >> index 939184c..9d2a8c6 100644 >> --- a/Cargo.toml >> +++ b/Cargo.toml >> @@ -9,3 +9,16 @@ description =3D "Proxmox websocket tunneling helper" >> exclude =3D ["debian"] >> =20 >> [dependencies] >> +anyhow =3D "1.0" >> +base64 =3D "0.12" >> +futures =3D "0.3" >> +futures-util =3D "0.3" >> +hyper =3D { version =3D "0.14" } >> +openssl =3D "0.10" >> +percent-encoding =3D "2" >> +proxmox-http =3D { version =3D "0.5.2", path =3D "../proxmox/proxmox-ht= tp", features =3D ["websocket", "client"] } >> +serde =3D { version =3D "1.0", features =3D ["derive"] } >> +serde_json =3D "1.0" >> +tokio =3D { version =3D "1", features =3D ["io-std", "io-util", "macros= ", "rt-multi-thread", "sync"] } >> +tokio-stream =3D { version =3D "0.1", features =3D ["io-util"] } >> +tokio-util =3D "0.6" >> diff --git a/src/main.rs b/src/main.rs >> new file mode 100644 >> index 0000000..150c1cf >> --- /dev/null >> +++ b/src/main.rs >> @@ -0,0 +1,410 @@ >> +use anyhow::{bail, format_err, Error}; >> + >> +use std::collections::VecDeque; >> +use std::sync::{Arc, Mutex}; >> + >> +use futures::future::FutureExt; >> +use futures::select; >> + >> +use hyper::client::{Client, HttpConnector}; >> +use hyper::header::{SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE}; >> +use hyper::upgrade::Upgraded; >> +use hyper::{Body, Request, StatusCode}; >> + >> +use openssl::ssl::{SslConnector, SslMethod}; >> +use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; >> + >> +use serde::{Deserialize, Serialize}; >> +use serde_json::{Map, Value}; >> +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; >> +use tokio::net::{UnixListener, UnixStream}; >> +use tokio::sync::{mpsc, oneshot}; >> +use tokio_stream::wrappers::LinesStream; >> +use tokio_stream::StreamExt; >> + >> +use proxmox_http::client::HttpsConnector; >> +use proxmox_http::websocket::{OpCode, WebSocket, WebSocketReader, WebSo= cketWriter}; >> + >> +#[derive(Serialize, Deserialize, Debug)] >> +#[serde(rename_all =3D "kebab-case")] >> +enum CmdType { >> + Connect, >> + Forward, >> + CloseCmd, >=20 > this is never used >=20 dropped in v2, we can always re-introduce this if we want to do a more=20 elaborate closing dance. >> + NonControl, >> +} >> + >> +type CmdData =3D Map; >> + >> +#[derive(Serialize, Deserialize, Debug)] >> +#[serde(rename_all =3D "kebab-case")] >> +struct ConnectCmdData { >> + // target URL for WS connection >> + url: String, >> + // fingerprint of TLS certificate >> + fingerprint: Option, >> + // addition headers such as authorization >> + headers: Option>, >> +} >> + >> +#[derive(Serialize, Deserialize, Debug, Clone)] >> +#[serde(rename_all =3D "kebab-case")] >> +struct ForwardCmdData { >> + // target URL for WS connection >> + url: String, >> + // addition headers such as authorization >> + headers: Option>, >> + // fingerprint of TLS certificate >> + fingerprint: Option, >> + // local UNIX socket path for forwarding >> + unix: String, >> + // request ticket using these parameters >> + ticket: Option>, >> +} >> + >> +struct CtrlTunnel { >> + sender: Option)>>, >> + forwarded: Arc>>>, >=20 > for now, this is really not used (see my comments further below) >=20 same >> +} >> + >> +impl CtrlTunnel { >> + async fn read_cmd_loop(mut self) -> Result<(), Error> { >> + let mut stdin_stream =3D LinesStream::new(BufReader::new(tokio:= :io::stdin()).lines()); >> + while let Some(res) =3D stdin_stream.next().await { >> + match res { >> + Ok(line) =3D> { >> + let (cmd_type, data) =3D Self::parse_cmd(&line)?; >> + match cmd_type { >> + CmdType::Connect =3D> self.handle_connect_cmd(d= ata).await, >> + CmdType::Forward =3D> { >> + let res =3D self.handle_forward_cmd(data).a= wait; >> + match &res { >> + Ok(()) =3D> println!("{}", serde_json::= json!({"success": true})), >> + Err(msg) =3D> println!( >> + "{}", >> + serde_json::json!({"success": false= , "msg": msg.to_string()}) >> + ), >> + }; >> + res >> + } >> + CmdType::NonControl =3D> self >> + .handle_tunnel_cmd(data) >> + .await >> + .map(|res| println!("{}", res)), >> + _ =3D> unimplemented!(), >> + } >> + } >> + Err(err) =3D> bail!("Failed to read from STDIN - {}", e= rr), >> + }?; >> + } >> + >> + Ok(()) >> + } >> + >> + fn parse_cmd(line: &str) -> Result<(CmdType, CmdData), Error> { >> + let mut json: Map =3D serde_json::from_str(line)= ?; >> + match json.remove("control") { >> + Some(Value::Bool(true)) =3D> { >> + match json.remove("cmd").map(serde_json::from_value::) { >> + None =3D> bail!("input has 'control' flag, but no c= ontrol 'cmd' set.."), >> + Some(Err(e)) =3D> bail!("failed to parse control cm= d - {}", e), >> + Some(Ok(cmd_type)) =3D> Ok((cmd_type, json)), >> + } >> + } >> + _ =3D> Ok((CmdType::NonControl, json)), >> + } >> + } >> + >> + async fn websocket_connect( >> + url: String, >> + extra_headers: Vec<(String, String)>, >> + fingerprint: Option, >> + ) -> Result { >> + let ws_key =3D proxmox::sys::linux::random_data(16)?; >> + let ws_key =3D base64::encode(&ws_key); >> + let mut req =3D Request::builder() >> + .uri(url) >> + .header(UPGRADE, "websocket") >> + .header(SEC_WEBSOCKET_VERSION, "13") >> + .header(SEC_WEBSOCKET_KEY, ws_key) >> + .body(Body::empty()) >> + .unwrap(); >> + >> + let headers =3D req.headers_mut(); >> + for (name, value) in extra_headers { >> + let name =3D hyper::header::HeaderName::from_bytes(name.as_= bytes())?; >> + let value =3D hyper::header::HeaderValue::from_str(&value)?= ; >> + headers.insert(name, value); >> + } >> + >> + let mut ssl_connector_builder =3D SslConnector::builder(SslMeth= od::tls()).unwrap(); >=20 > not sure if this unwrap cannot fail though? >=20 yes, in case the libssl methods it uses fail. changed to bubbling up the=20 error (which will probably be rather cryptic, but better than nothing=20 ;)) >> + if fingerprint.is_some() { >> + // FIXME actually verify fingerprint via callback! >> + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMod= e::NONE); >> + } else { >> + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMod= e::PEER); >> + } >> + >> + let mut httpc =3D HttpConnector::new(); >> + httpc.enforce_http(false); // we want https... >> + httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0))= ); >> + let https =3D HttpsConnector::with_connector(httpc, ssl_connect= or_builder.build(), 120); >> + >> + let client =3D Client::builder().build::<_, Body>(https); >> + let res =3D client.request(req).await?; >> + if res.status() !=3D StatusCode::SWITCHING_PROTOCOLS { >> + bail!("server didn't upgrade: {}", res.status()); >> + } >> + >> + hyper::upgrade::on(res) >> + .await >> + .map_err(|err| format_err!("failed to upgrade - {}", err)) >> + } >> + >> + async fn handle_connect_cmd(&mut self, mut data: CmdData) -> Result= <(), Error> { >> + let mut data: ConnectCmdData =3D data >> + .remove("data") >> + .ok_or_else(|| format_err!("'connect' command missing 'data= '")) >> + .map(serde_json::from_value)??; >> + >> + if self.sender.is_some() { >> + bail!("already connected!"); >> + } >> + >> + let upgraded =3D Self::websocket_connect( >> + data.url.clone(), >> + data.headers.take().unwrap_or_else(Vec::new), >> + data.fingerprint.take(), >> + ) >> + .await?; >> + >> + let (tx, rx) =3D mpsc::unbounded_channel(); >> + self.sender =3D Some(tx); >> + tokio::spawn(async move { >> + if let Err(err) =3D Self::handle_ctrl_tunnel(upgraded, rx).= await { >> + eprintln!("Tunnel to {} failed - {}", data.url, err); >> + } >> + }); >> + >> + Ok(()) >> + } >> + >> + async fn handle_forward_cmd(&mut self, mut data: CmdData) -> Result= <(), Error> { >> + let data: ForwardCmdData =3D data >> + .remove("data") >> + .ok_or_else(|| format_err!("'forward' command missing 'data= '")) >> + .map(serde_json::from_value)??; >> + >> + if self.sender.is_none() && data.ticket.is_some() { >> + bail!("dynamically requesting ticket requires cmd tunnel co= nnection!"); >> + } >> + >> + let unix_listener =3D UnixListener::bind(data.unix.clone()).unw= rap(); >=20 > it would be better to bubble the error up instead of unwrapping here > (AFAICS, the rest of the unwraps are ok, since they cannot really fail?) >=20 changed as well (in addition to the other one you noted, and a third one=20 - now only unwrap_or_else remain :) >> + let (tx, rx) =3D oneshot::channel(); >> + let data =3D Arc::new(data); >> + >> + self.forwarded.lock().unwrap().push(tx); >=20 > we push the 'tx' here into the forwarded vec, but never use it again > (no other 'lock()' call in the file) >=20 yeah, dropped together with the rest of the close cmd >> + let cmd_sender =3D self.sender.clone(); >> + >> + tokio::spawn(async move { >> + let mut rx =3D rx.fuse(); >> + let mut tasks: Vec> =3D Vec::ne= w(); >> + loop { >> + let accept =3D unix_listener.accept().fuse(); >> + tokio::pin!(accept); >> + let data2 =3D data.clone(); >> + select! { >> + _ =3D rx =3D> { >> + eprintln!("received shutdown signal, closing un= ix listener stream and forwarding handlers"); >> + for task in tasks { >> + task.abort(); >> + } >> + break; >> + }, >=20 > which makes this branch dead code >=20 > so i'd drop the forwarded part and simplify this to >=20 > match unix_listener.accept().await { > ... done > } >=20 >> + res =3D accept =3D> match res { >> + Ok((unix_stream, _)) =3D> { >> + eprintln!("accepted new connection on '{}'"= , data2.unix); >> + let data3: Result, Erro= r> =3D match (&cmd_sender, &data2.ticket) { >> + (Some(cmd_sender), Some(_)) =3D> Self::= get_ticket(cmd_sender, data2.clone()).await,\ >=20 > the get_ticket could probably be inside the 'handle_forward_tunnel' this=20 > way, another client could connect while the first ticket is checked. > not necessary for now though, since we do not connect in parallel atm >=20 I did it anyway while I was there >> + _ =3D> Ok(data2.clone()), >> + }; >> + >> + match data3 { >> + Ok(data3) =3D> { >> + let task =3D tokio::spawn(async mov= e { >> + if let Err(err) =3D Self::handl= e_forward_tunnel(data3.clone(), unix_stream).await { >> + eprintln!("Tunnel for {} fa= iled - {}", data3.unix, err); >> + } >> + }); >> + tasks.push(task); >> + }, >> + Err(err) =3D> { >> + eprintln!("Failed to accept unix co= nnection - {}", err); >> + }, >> + }; >> + }, >> + Err(err) =3D> eprintln!("Failed to accept unix = connection on {} - {}", data2.unix, err), >> + }, >> + }; >> + } >> + }); >> + >> + Ok(()) >> + } >> + >> + async fn handle_tunnel_cmd(&mut self, data: CmdData) -> Result { >> + match &mut self.sender { >> + None =3D> bail!("not connected!"), >> + Some(sender) =3D> { >> + let data: Value =3D data.into(); >> + let (tx, rx) =3D oneshot::channel::(); >> + if let Some(cmd) =3D data.get("cmd") { >> + eprintln!("-> sending command {} to remote", cmd); >> + } else { >> + eprintln!("-> sending data line to remote"); >> + } >> + sender.send((data, tx))?; >> + let res =3D rx.await?; >> + eprintln!("<- got reply"); >> + Ok(res) >> + } >> + } >> + } >> + >> + async fn handle_ctrl_tunnel( >> + websocket: Upgraded, >> + mut cmd_receiver: mpsc::UnboundedReceiver<(Value, oneshot::Send= er)>, >> + ) -> Result<(), Error> { >> + let (tunnel_reader, tunnel_writer) =3D tokio::io::split(websock= et); >> + let (ws_close_tx, mut ws_close_rx) =3D mpsc::unbounded_channel(= ); >> + let ws_reader =3D WebSocketReader::new(tunnel_reader, ws_close_= tx); >> + let mut ws_writer =3D WebSocketWriter::new(Some([0, 0, 0, 0]), = tunnel_writer); >> + >> + let mut framed_reader =3D >> + tokio_util::codec::FramedRead::new(ws_reader, tokio_util::c= odec::LinesCodec::new()); >> + >> + let mut resp_tx_queue: VecDeque> =3D Ve= cDeque::new(); >> + let mut shutting_down =3D false; >> + >> + loop { >> + let mut close_future =3D ws_close_rx.recv().boxed().fuse(); >> + let mut frame_future =3D framed_reader.next().boxed().fuse(= ); >> + let mut cmd_future =3D cmd_receiver.recv().boxed().fuse(); >> + >> + select! { >> + res =3D close_future =3D> { >> + let res =3D res.ok_or_else(|| format_err!("WS contr= ol channel closed"))?; >> + eprintln!("WS: received control message: '{:?}'", r= es); >> + shutting_down =3D true; >> + }, >> + res =3D frame_future =3D> { >> + match res { >> + None if shutting_down =3D> { >> + eprintln!("WS closed"); >> + break; >> + }, >> + None =3D> bail!("WS closed unexpectedly"), >> + Some(Ok(res)) =3D> { >> + resp_tx_queue >> + .pop_front() >> + .ok_or_else(|| format_err!("no response= handler"))? >> + .send(res) >> + .map_err(|msg| format_err!("failed to s= end tunnel response '{}' back to requester - receiver already closed?", msg= ))?; >> + }, >> + Some(Err(err)) =3D> { >> + bail!("reading from control tunnel failed -= WS receive failed: {}", err); >> + }, >> + } >> + }, >> + res =3D cmd_future =3D> { >> + if shutting_down { continue }; >> + match res { >> + None =3D> { >> + eprintln!("CMD channel closed, shutting dow= n"); >> + ws_writer.send_control_frame(Some([1,2,3,4]= ), OpCode::Close, &[]).await?; >> + shutting_down =3D true; >> + }, >> + Some((msg, resp_tx)) =3D> { >> + resp_tx_queue.push_back(resp_tx); >> + >> + let line =3D format!("{}\n", msg); >> + ws_writer.write_all(line.as_bytes()).await?= ; >> + ws_writer.flush().await?; >> + }, >> + } >> + }, >> + }; >> + } >> + >> + Ok(()) >> + } >> + >> + async fn handle_forward_tunnel( >> + data: Arc, >> + unix: UnixStream, >> + ) -> Result<(), Error> { >> + let upgraded =3D Self::websocket_connect( >> + data.url.clone(), >> + data.headers.clone().unwrap_or_else(Vec::new), >> + data.fingerprint.clone(), >> + ) >> + .await?; >> + >> + let ws =3D WebSocket { >> + mask: Some([0, 0, 0, 0]), >> + }; >> + eprintln!("established new WS for forwarding '{}'", data.unix); >> + ws.serve_connection(upgraded, unix).await?; >> + >> + eprintln!("done handling forwarded connection from '{}'", data.= unix); >> + >> + Ok(()) >> + } >> + >> + async fn get_ticket( >> + cmd_sender: &mpsc::UnboundedSender<(Value, oneshot::Sender)>, >> + cmd_data: Arc, >> + ) -> Result, Error> { >> + eprintln!("requesting WS ticket via tunnel"); >> + let ticket_cmd =3D match cmd_data.ticket.clone() { >> + Some(mut ticket_cmd) =3D> { >> + ticket_cmd.insert("cmd".to_string(), serde_json::json!(= "ticket")); >> + ticket_cmd >> + } >> + None =3D> bail!("can't get ticket without ticket parameters= "), >> + }; >> + let (tx, rx) =3D oneshot::channel::(); >> + cmd_sender.send((serde_json::json!(ticket_cmd), tx))?; >> + let ticket =3D rx.await?; >> + let mut ticket: Map =3D serde_json::from_str(&ti= cket)?; >> + let ticket =3D ticket >> + .remove("ticket") >> + .ok_or_else(|| format_err!("failed to retrieve ticket via t= unnel"))?; >> + >> + let ticket =3D ticket >> + .as_str() >> + .ok_or_else(|| format_err!("failed to format received ticke= t"))?; >> + let ticket =3D utf8_percent_encode(&ticket, NON_ALPHANUMERIC).t= o_string(); >> + >> + let mut data =3D cmd_data.clone(); >> + let mut url =3D data.url.clone(); >> + url.push_str("ticket=3D"); >> + url.push_str(&ticket); >> + let mut d =3D Arc::make_mut(&mut data); >> + d.url =3D url; >> + Ok(data) >> + } >> +} >> + >> +#[tokio::main] >> +async fn main() -> Result<(), Error> { >> + do_main().await >> +} >> + >> +async fn do_main() -> Result<(), Error> { >> + let tunnel =3D CtrlTunnel { >> + sender: None, >> + forwarded: Arc::new(Mutex::new(Vec::new())), >> + }; >> + tunnel.read_cmd_loop().await >> +} >>=20 >=20 >=20 >=20