From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 1532572A86 for ; Tue, 13 Apr 2021 14:16:51 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1101B2A6C7 for ; Tue, 13 Apr 2021 14:16:51 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id D81202A676 for ; Tue, 13 Apr 2021 14:16:46 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id A369345A72 for ; Tue, 13 Apr 2021 14:16:46 +0200 (CEST) From: =?UTF-8?q?Fabian=20Gr=C3=BCnbichler?= To: pve-devel@lists.proxmox.com Date: Tue, 13 Apr 2021 14:16:22 +0200 Message-Id: <20210413121640.3602975-5-f.gruenbichler@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210413121640.3602975-1-f.gruenbichler@proxmox.com> References: <20210413121640.3602975-1-f.gruenbichler@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.027 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [main.rs] Subject: [pve-devel] [PATCH proxmox-websocket-tunnel 2/2] add tunnel implementation X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 13 Apr 2021 12:16:51 -0000 the websocket tunnel helper accepts control commands (encoded as single-line JSON) on stdin, and prints responses on stdout. the following commands are available: - "connect" a 'control' tunnel via a websocket - "forward" a local unix socket to a remote socket via a websocket -- if requested, this will ask for a ticket via the control tunnel after accepting a new connection on the unix socket - "close" the control tunnel and any forwarded socket any other json input (without the 'control' flag set) is forwarded as-is to the remote end of the control tunnel. internally, the tunnel helper will spawn tokio tasks for - handling the control tunnel connection (new commands are passed in via an mpsc channel together with a oneshot channel for the response) - handling accepting new connections on each forwarded unix socket - handling forwarding data over accepted forwarded connections Signed-off-by: Fabian Grünbichler --- Notes: proxmox_backup dependency is just there for HTTP client code that should be extracted somewhere else proxmox dependency requires websocket patches exposing a bit more stuff for client usage full repo available on my staff git.. Cargo.toml | 14 ++ src/main.rs | 396 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 410 insertions(+) create mode 100644 src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 939184c..18ba297 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,17 @@ description = "Proxmox websocket tunneling helper" exclude = ["debian"] [dependencies] +anyhow = "1.0" +futures = "0.3" +futures-util = "0.3" +hyper = { version = "0.14" } +openssl = "0.10" +percent-encoding = "2" +proxmox = { version = "0.11", path = "../proxmox/proxmox", features = ["websocket"] } +# just for tools::http and tools::async_io::EitherStream, need to move them somewhere else +proxmox-backup = { version = "1.0.8", path = "../proxmox-backup" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["io-std", "io-util", "macros", "rt-multi-thread", "sync"] } +tokio-stream = { version = "0.1", features = ["io-util"] } +tokio-util = "0.6" diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..6d21352 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,396 @@ +use anyhow::{bail, format_err, Error}; + +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + +use futures::future::FutureExt; +use futures::select; + +use hyper::client::{Client, HttpConnector}; +use hyper::header::{SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_PROTOCOL, SEC_WEBSOCKET_VERSION, UPGRADE}; +use hyper::upgrade::Upgraded; +use hyper::{Body, Request, StatusCode}; + +use openssl::ssl::{SslConnector, SslMethod}; +use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; + +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::LinesStream; +use tokio_stream::StreamExt; + +use proxmox::tools::websocket::{OpCode, WebSocket, WebSocketReader, WebSocketWriter}; +use proxmox_backup::tools::http::HttpsConnector; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "kebab-case")] +enum CmdType { + Connect, + Forward, + CloseCmd, + NonControl, +} + +type CmdData = Map; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "kebab-case")] +struct ConnectCmdData { + // target URL for WS connection + url: String, + // fingerprint of TLS certificate + fingerprint: Option, + // addition headers such as authorization + headers: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "kebab-case")] +struct ForwardCmdData { + // target URL for WS connection + url: String, + // addition headers such as authorization + headers: Option>, + // fingerprint of TLS certificate + fingerprint: Option, + // local UNIX socket path for forwarding + unix: String, + // request ticket using these parameters + ticket: Option>, +} + +struct CtrlTunnel { + sender: Option)>>, + forwarded: Arc>>>, +} + +impl CtrlTunnel { + async fn read_cmd_loop(mut self) -> Result<(), Error> { + let mut stdin_stream = LinesStream::new(BufReader::new(tokio::io::stdin()).lines()); + while let Some(res) = stdin_stream.next().await { + match res { + Ok(line) => { + let (cmd_type, data) = Self::parse_cmd(&line)?; + match cmd_type { + CmdType::Connect => self.handle_connect_cmd(data).await, + CmdType::Forward => { + let res = self.handle_forward_cmd(data).await; + match &res { + Ok(()) => println!("{}", serde_json::json!({"success": true})), + Err(msg) => println!( + "{}", + serde_json::json!({"success": false, "msg": msg.to_string()}) + ), + }; + res + } + CmdType::NonControl => self + .handle_tunnel_cmd(data) + .await + .map(|res| println!("{}", res)), + _ => unimplemented!(), + } + } + Err(err) => bail!("Failed to read from STDIN - {}", err), + }?; + } + + Ok(()) + } + + fn parse_cmd(line: &str) -> Result<(CmdType, CmdData), Error> { + let mut json: Map = serde_json::from_str(line)?; + match json.remove("control") { + Some(Value::Bool(true)) => { + match json.remove("cmd").map(serde_json::from_value::) { + None => bail!("input has 'control' flag, but no control 'cmd' set.."), + Some(Err(e)) => bail!("failed to parse control cmd - {}", e), + Some(Ok(cmd_type)) => Ok((cmd_type, json)), + } + } + _ => Ok((CmdType::NonControl, json)), + } + } + + async fn websocket_connect( + url: String, + text: bool, + extra_headers: Vec<(String, String)>, + fingerprint: Option, + ) -> Result { + let mut req = Request::builder() + .uri(url) + .header(UPGRADE, "websocket") + .header(SEC_WEBSOCKET_VERSION, "13") + .header(SEC_WEBSOCKET_KEY, "foobar") // TODO + .header(SEC_WEBSOCKET_PROTOCOL, if text { "text" } else { "binary" }) + .body(Body::empty()) + .unwrap(); + + let headers = req.headers_mut(); + for (name, value) in extra_headers { + let name = hyper::header::HeaderName::from_bytes(name.as_bytes())?; + let value = hyper::header::HeaderValue::from_str(&value)?; + headers.insert(name, value); + } + + let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); + if fingerprint.is_some() { + // FIXME actually verify fingerprint via callback! + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); + } else { + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::PEER); + } + + let mut httpc = HttpConnector::new(); + httpc.enforce_http(false); // we want https... + httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0))); + let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build()); + + let client = Client::builder().build::<_, Body>(https); + let res = client.request(req).await?; + if res.status() != StatusCode::SWITCHING_PROTOCOLS { + bail!("server didn't upgrade: {}", res.status()); + } + + hyper::upgrade::on(res) + .await + .map_err(|err| format_err!("failed to upgrade - {}", err)) + } + + async fn handle_connect_cmd(&mut self, mut data: CmdData) -> Result<(), Error> { + let mut data: ConnectCmdData = data + .remove("data") + .ok_or_else(|| format_err!("'connect' command missing 'data'")) + .map(serde_json::from_value)??; + + if self.sender.is_some() { + bail!("already connected!"); + } + + let upgraded = Self::websocket_connect( + data.url.clone(), + false, + data.headers.take().unwrap_or_else(Vec::new), + data.fingerprint.take(), + ) + .await?; + + let (tx, rx) = mpsc::unbounded_channel(); + self.sender = Some(tx); + tokio::spawn(async move { + if let Err(err) = Self::handle_ctrl_tunnel(upgraded, rx).await { + eprintln!("Tunnel to {} failed - {}", data.url, err); + } + }); + + Ok(()) + } + + async fn handle_forward_cmd(&mut self, mut data: CmdData) -> Result<(), Error> { + let data: ForwardCmdData = data + .remove("data") + .ok_or_else(|| format_err!("'forward' command missing 'data'")) + .map(serde_json::from_value)??; + + if self.sender.is_none() && data.ticket.is_some() { + bail!("dynamically requesting ticket requires cmd tunnel connection!"); + } + + let unix_listener = UnixListener::bind(data.unix.clone()).unwrap(); + let (tx, rx) = oneshot::channel(); + let data = Arc::new(data); + + self.forwarded.lock().unwrap().push(tx); + let cmd_sender = self.sender.clone(); + + tokio::spawn(async move { + let mut rx = rx.fuse(); + loop { + let accept = unix_listener.accept().fuse(); + tokio::pin!(accept); + let data2 = data.clone(); + select! { + _ = rx => { + eprintln!("received shutdown signal, closing unix listener stream"); + }, + res = accept => match res { + Ok((unix_stream, _)) => { + eprintln!("accepted new connection on '{}'", data2.unix); + let data3: Result, Error> = match (&cmd_sender, &data2.ticket) { + (Some(cmd_sender), Some(_)) => Self::get_ticket(cmd_sender, data2.clone()).await, + _ => Ok(data2.clone()), + }; + + match data3 { + Ok(data3) => { + tokio::spawn(async move { + if let Err(err) = Self::handle_forward_tunnel(data3.clone(), unix_stream).await { + eprintln!("Tunnel for {} failed - {}", data3.unix, err); + } + }); + }, + Err(err) => { + eprintln!("Failed to accept unix connection - {}", err); + }, + }; + }, + Err(err) => eprintln!("Failed to accept unix connection on {} - {}", data2.unix, err), + }, + }; + } + }); + + Ok(()) + } + + async fn handle_tunnel_cmd(&mut self, data: CmdData) -> Result { + match &mut self.sender { + None => bail!("not connected!"), + Some(sender) => { + let data: Value = data.into(); + let (tx, rx) = oneshot::channel::(); + eprintln!("tunnel request: '{}'", data.to_string()); + sender.send((data, tx))?; + let res = rx.await?; + eprintln!("tunnel response: '{}'", res); + Ok(res) + } + } + } + + async fn handle_ctrl_tunnel( + websocket: Upgraded, + mut cmd_receiver: mpsc::UnboundedReceiver<(Value, oneshot::Sender)>, + ) -> Result<(), Error> { + let (tunnel_reader, tunnel_writer) = tokio::io::split(websocket); + let (ws_close_tx, mut ws_close_rx) = mpsc::unbounded_channel(); + let ws_reader = WebSocketReader::new(tunnel_reader, ws_close_tx); + let mut ws_writer = WebSocketWriter::new(Some([0, 0, 0, 0]), false, tunnel_writer); + + let mut framed_reader = + tokio_util::codec::FramedRead::new(ws_reader, tokio_util::codec::LinesCodec::new()); + + let mut resp_tx_queue: VecDeque> = VecDeque::new(); + let mut shutting_down = false; + + loop { + select! { + res = ws_close_rx.recv().fuse() => { + let res = res.ok_or_else(|| format_err!("WS control channel closed"))?; + eprintln!("WS: received control message: '{:?}'", res); + }, + res = framed_reader.next().fuse() => { + match res { + None if shutting_down => { + eprintln!("WS closed"); + break; + }, + None => bail!("WS closed unexpectedly"), + Some(Ok(res)) => { + resp_tx_queue + .pop_front() + .ok_or_else(|| format_err!("no response handler"))? + .send(res) + .map_err(|msg| format_err!("failed to send tunnel response '{}' back to requester - receiver already closed?", msg))?; + }, + Some(Err(err)) => { + eprintln!("WS: received failed - {}", err); + // TODO handle? + }, + } + }, + res = cmd_receiver.recv().fuse() => { + if shutting_down { continue }; + match res { + None => { + eprintln!("CMD channel closed, shutting down"); + ws_writer.send_control_frame(Some([1,2,3,4]), OpCode::Close, &[]).await?; + shutting_down = true; + }, + Some((msg, resp_tx)) => { + resp_tx_queue.push_back(resp_tx); + + let line = format!("{}\n", msg); + ws_writer.write_all(line.as_bytes()).await?; + ws_writer.flush().await?; + }, + } + }, + }; + } + + Ok(()) + } + + async fn handle_forward_tunnel( + data: Arc, + unix: UnixStream, + ) -> Result<(), Error> { + let upgraded = Self::websocket_connect( + data.url.clone(), + false, + data.headers.clone().unwrap_or_else(Vec::new), + data.fingerprint.clone(), + ) + .await?; + + let ws = WebSocket { text: false, mask: Some([0, 0, 0, 0]) }; + eprintln!("established new WS for forwarding '{}'", data.unix); + ws.serve_connection(upgraded, unix).await?; + + eprintln!( + "done handling forwarded connection from '{}'", + data.unix + ); + + Ok(()) + } + + async fn get_ticket( + cmd_sender: &mpsc::UnboundedSender<(Value, oneshot::Sender)>, + cmd_data: Arc, + ) -> Result, Error> { + eprintln!("requesting WS ticket via tunnel"); + let ticket_cmd = match cmd_data.ticket.clone() { + Some(mut ticket_cmd) => { + ticket_cmd.insert("cmd".to_string(), serde_json::json!("ticket")); + ticket_cmd + }, + None => bail!("can't get ticket without ticket parameters"), + }; + let (tx, rx) = oneshot::channel::(); + cmd_sender.send((serde_json::json!(ticket_cmd), tx))?; + let ticket = rx.await?; + let mut ticket: Map = serde_json::from_str(&ticket)?; + let ticket = ticket.remove("ticket") + .ok_or_else(|| format_err!("failed to retrieve ticket via tunnel"))?; + + let ticket = ticket.as_str().ok_or_else(|| format_err!("failed to format received ticket"))?; + let ticket = utf8_percent_encode(&ticket, NON_ALPHANUMERIC).to_string(); + + let mut data = cmd_data.clone(); + let mut url = data.url.clone(); + url.push_str("ticket="); + url.push_str(&ticket); + let mut d = Arc::make_mut(&mut data); + d.url = url; + Ok(data) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + do_main().await +} + +async fn do_main() -> Result<(), Error> { + let tunnel = CtrlTunnel { + sender: None, + forwarded: Arc::new(Mutex::new(Vec::new())), + }; + tunnel.read_cmd_loop().await +} -- 2.20.1