From: Stefan Reiter <s.reiter@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 14/22] client: add VsockClient to connect to virtio-vsock VMs
Date: Tue, 16 Feb 2021 18:07:02 +0100 [thread overview]
Message-ID: <20210216170710.31767-15-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210216170710.31767-1-s.reiter@proxmox.com>
Currently useful only for single file restore, but kept generic enough
to use any compatible API endpoint over a virtio-vsock[0,1] interface.
VsockClient is adapted and slimmed down from HttpClient.
A tower-compatible VsockConnector is implemented, using a wrapped
UnixStream as transfer. The UnixStream has to be wrapped in a custom
struct to implement 'Connection', Async{Read,Write} are simply forwarded
directly to the underlying stream.
[0] https://www.man7.org/linux/man-pages/man7/vsock.7.html
[1] https://wiki.qemu.org/Features/VirtioVsock
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/client.rs | 3 +
src/client/vsock_client.rs | 259 +++++++++++++++++++++++++++++++++++++
2 files changed, 262 insertions(+)
create mode 100644 src/client/vsock_client.rs
diff --git a/src/client.rs b/src/client.rs
index d50c26c2..1eae7dd1 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -19,6 +19,9 @@ pub mod pipe_to_stream;
mod http_client;
pub use http_client::*;
+mod vsock_client;
+pub use vsock_client::*;
+
mod task_log;
pub use task_log::*;
diff --git a/src/client/vsock_client.rs b/src/client/vsock_client.rs
new file mode 100644
index 00000000..ce3f7bc7
--- /dev/null
+++ b/src/client/vsock_client.rs
@@ -0,0 +1,259 @@
+use anyhow::{bail, format_err, Error};
+use futures::*;
+
+use core::task::Context;
+use std::pin::Pin;
+use std::task::Poll;
+
+use http::Uri;
+use http::{Request, Response};
+use hyper::client::connect::{Connected, Connection};
+use hyper::client::Client;
+use hyper::Body;
+use pin_project::pin_project;
+use serde_json::Value;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite, AsyncWriteExt};
+use tokio::net::UnixStream;
+
+use crate::tools;
+use proxmox::api::error::HttpError;
+
+/// Port below 1024 is privileged, this is intentional so only root (on host) can connect
+pub const DEFAULT_VSOCK_PORT: u16 = 807;
+
+#[derive(Clone)]
+struct VsockConnector;
+
+#[pin_project]
+/// Wrapper around UnixStream so we can implement hyper::client::connect::Connection
+struct UnixConnection {
+ #[pin]
+ stream: UnixStream,
+}
+
+impl tower_service::Service<Uri> for VsockConnector {
+ type Response = UnixConnection;
+ type Error = Error;
+ type Future = Pin<Box<dyn Future<Output = Result<UnixConnection, Error>> + Send>>;
+
+ fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, dst: Uri) -> Self::Future {
+ use nix::sys::socket::*;
+ use std::os::unix::io::FromRawFd;
+
+ // connect can block, so run in blocking task (though in reality it seems to immediately
+ // return with either ENODEV or ETIMEDOUT in case of error)
+ tokio::task::spawn_blocking(move || {
+ if dst.scheme_str().unwrap_or_default() != "vsock" {
+ bail!("invalid URI (scheme) for vsock connector: {}", dst);
+ }
+
+ let cid = match dst.host() {
+ Some(host) => host.parse().map_err(|err| {
+ format_err!(
+ "invalid URI (host not a number) for vsock connector: {} ({})",
+ dst,
+ err
+ )
+ })?,
+ None => bail!("invalid URI (no host) for vsock connector: {}", dst),
+ };
+
+ let port = match dst.port_u16() {
+ Some(port) => port,
+ None => bail!("invalid URI (bad port) for vsock connector: {}", dst),
+ };
+
+ let sock_fd = socket(
+ AddressFamily::Vsock,
+ SockType::Stream,
+ SockFlag::empty(),
+ None,
+ )?;
+
+ let sock_addr = VsockAddr::new(cid, port as u32);
+ connect(sock_fd, &SockAddr::Vsock(sock_addr))?;
+
+ // connect sync, but set nonblock after (tokio requires it)
+ let std_stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(sock_fd) };
+ std_stream.set_nonblocking(true)?;
+
+ let stream = tokio::net::UnixStream::from_std(std_stream)?;
+ let connection = UnixConnection { stream };
+
+ Ok(connection)
+ })
+ // unravel the thread JoinHandle to a useable future
+ .map(|res| match res {
+ Ok(res) => res,
+ Err(err) => Err(format_err!("thread join error on vsock connect: {}", err)),
+ })
+ .boxed()
+ }
+}
+
+impl Connection for UnixConnection {
+ fn connected(&self) -> Connected {
+ Connected::new()
+ }
+}
+
+impl AsyncRead for UnixConnection {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.project();
+ this.stream.poll_read(cx, buf)
+ }
+}
+
+impl AsyncWrite for UnixConnection {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<tokio::io::Result<usize>> {
+ let this = self.project();
+ this.stream.poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<tokio::io::Result<()>> {
+ let this = self.project();
+ this.stream.poll_flush(cx)
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<tokio::io::Result<()>> {
+ let this = self.project();
+ this.stream.poll_shutdown(cx)
+ }
+}
+
+/// Slimmed down version of HttpClient for virtio-vsock connections (file restore daemon)
+pub struct VsockClient {
+ client: Client<VsockConnector>,
+ cid: i32,
+ port: u16,
+}
+
+impl VsockClient {
+ pub fn new(cid: i32, port: u16) -> Self {
+ let conn = VsockConnector {};
+ let client = Client::builder().build::<_, Body>(conn);
+ Self { client, cid, port }
+ }
+
+ pub async fn get(&self, path: &str, data: Option<Value>) -> Result<Value, Error> {
+ let req = Self::request_builder(self.cid, self.port, "GET", path, data)?;
+ self.api_request(req).await
+ }
+
+ pub async fn post(&mut self, path: &str, data: Option<Value>) -> Result<Value, Error> {
+ let req = Self::request_builder(self.cid, self.port, "POST", path, data)?;
+ self.api_request(req).await
+ }
+
+ pub async fn download(
+ &mut self,
+ path: &str,
+ data: Option<Value>,
+ output: &mut (dyn AsyncWrite + Send + Unpin),
+ ) -> Result<(), Error> {
+ let req = Self::request_builder(self.cid, self.port, "GET", path, data)?;
+
+ let client = self.client.clone();
+
+ let resp = client.request(req)
+ .await
+ .map_err(|_| format_err!("vsock download request timed out"))?;
+ let status = resp.status();
+ if !status.is_success() {
+ Self::api_response(resp)
+ .await
+ .map(|_| ())?
+ } else {
+ resp.into_body()
+ .map_err(Error::from)
+ .try_fold(output, move |acc, chunk| async move {
+ acc.write_all(&chunk).await?;
+ Ok::<_, Error>(acc)
+ })
+ .await?;
+ }
+ Ok(())
+ }
+
+ async fn api_response(response: Response<Body>) -> Result<Value, Error> {
+ let status = response.status();
+ let data = hyper::body::to_bytes(response.into_body()).await?;
+
+ let text = String::from_utf8(data.to_vec()).unwrap();
+ if status.is_success() {
+ if text.is_empty() {
+ Ok(Value::Null)
+ } else {
+ let value: Value = serde_json::from_str(&text)?;
+ Ok(value)
+ }
+ } else {
+ Err(Error::from(HttpError::new(status, text)))
+ }
+ }
+
+ async fn api_request(&self, req: Request<Body>) -> Result<Value, Error> {
+ self.client
+ .request(req)
+ .map_err(Error::from)
+ .and_then(Self::api_response)
+ .await
+ }
+
+ pub fn request_builder(
+ cid: i32,
+ port: u16,
+ method: &str,
+ path: &str,
+ data: Option<Value>,
+ ) -> Result<Request<Body>, Error> {
+ let path = path.trim_matches('/');
+ let url: Uri = format!("vsock://{}:{}/{}", cid, port, path).parse()?;
+
+ if let Some(data) = data {
+ if method == "POST" {
+ let request = Request::builder()
+ .method(method)
+ .uri(url)
+ .header(hyper::header::CONTENT_TYPE, "application/json")
+ .body(Body::from(data.to_string()))?;
+ return Ok(request);
+ } else {
+ let query = tools::json_object_to_query(data)?;
+ let url: Uri = format!("vsock://{}:{}/{}?{}", cid, port, path, query).parse()?;
+ let request = Request::builder()
+ .method(method)
+ .uri(url)
+ .header(
+ hyper::header::CONTENT_TYPE,
+ "application/x-www-form-urlencoded",
+ )
+ .body(Body::empty())?;
+ return Ok(request);
+ }
+ }
+
+ let request = Request::builder()
+ .method(method)
+ .uri(url)
+ .header(
+ hyper::header::CONTENT_TYPE,
+ "application/x-www-form-urlencoded",
+ )
+ .body(Body::empty())?;
+
+ Ok(request)
+ }
+}
--
2.20.1
next prev parent reply other threads:[~2021-02-16 17:08 UTC|newest]
Thread overview: 50+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-02-16 17:06 [pbs-devel] [PATCH 00/22] Single file restore for VM images Stefan Reiter
2021-02-16 17:06 ` [pbs-devel] [PATCH pxar 01/22] decoder/aio: add contents() and content_size() calls Stefan Reiter
2021-02-17 7:56 ` Wolfgang Bumiller
2021-02-16 17:06 ` [pbs-devel] [PATCH pxar 02/22] decoder: add peek() Stefan Reiter
2021-02-17 8:20 ` Wolfgang Bumiller
2021-02-17 8:38 ` Stefan Reiter
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-restore-vm-data 03/22] initial commit Stefan Reiter
2021-03-15 18:35 ` [pbs-devel] applied: " Thomas Lamprecht
2021-03-16 15:33 ` Stefan Reiter
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-backup 04/22] api2/admin/datastore: refactor list_dir_content in catalog_reader Stefan Reiter
2021-02-17 7:50 ` [pbs-devel] applied: " Thomas Lamprecht
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-backup 05/22] api2/admin/datastore: accept "/" as path for root Stefan Reiter
2021-02-17 7:50 ` [pbs-devel] applied: " Thomas Lamprecht
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-backup 06/22] api2/admin/datastore: refactor create_zip into pxar/extract Stefan Reiter
2021-02-17 7:50 ` [pbs-devel] applied: " Thomas Lamprecht
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-backup 07/22] pxar/extract: add extract_sub_dir Stefan Reiter
2021-02-17 7:51 ` [pbs-devel] applied: " Thomas Lamprecht
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-backup 08/22] pxar/extract: add sequential variants to create_zip, extract_sub_dir Stefan Reiter
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-backup 09/22] client: extract common functions to proxmox_client_tools module Stefan Reiter
2021-02-17 6:49 ` Dietmar Maurer
2021-02-17 7:58 ` Stefan Reiter
2021-02-17 8:50 ` Dietmar Maurer
2021-02-17 9:47 ` Stefan Reiter
2021-02-17 10:12 ` Dietmar Maurer
2021-02-17 9:13 ` [pbs-devel] applied: " Dietmar Maurer
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-backup 10/22] proxmox_client_tools: extract 'key' from client module Stefan Reiter
2021-02-17 9:11 ` Dietmar Maurer
2021-02-16 17:06 ` [pbs-devel] [PATCH proxmox-backup 11/22] file-restore: add binary and basic commands Stefan Reiter
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 12/22] file-restore: allow specifying output-format Stefan Reiter
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 13/22] rest: implement tower service for UnixStream Stefan Reiter
2021-02-17 6:52 ` [pbs-devel] applied: " Dietmar Maurer
2021-02-16 17:07 ` Stefan Reiter [this message]
2021-02-17 7:24 ` [pbs-devel] applied: [PATCH proxmox-backup 14/22] client: add VsockClient to connect to virtio-vsock VMs Dietmar Maurer
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 15/22] file-restore-daemon: add binary with virtio-vsock API server Stefan Reiter
2021-02-17 10:17 ` Dietmar Maurer
2021-02-17 10:25 ` Dietmar Maurer
2021-02-17 10:30 ` Stefan Reiter
2021-02-17 11:13 ` Dietmar Maurer
2021-02-17 11:26 ` Dietmar Maurer
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 16/22] file-restore-daemon: add watchdog module Stefan Reiter
2021-02-17 10:52 ` Wolfgang Bumiller
2021-02-17 11:14 ` Stefan Reiter
2021-02-17 11:29 ` Wolfgang Bumiller
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 17/22] file-restore-daemon: add disk module Stefan Reiter
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 18/22] file-restore: add basic VM/block device support Stefan Reiter
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 19/22] file-restore: improve logging of VM with logrotate Stefan Reiter
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 20/22] debian/client: add postinst hook to rebuild file-restore initramfs Stefan Reiter
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 21/22] file-restore(-daemon): implement list API Stefan Reiter
2021-02-16 17:07 ` [pbs-devel] [PATCH proxmox-backup 22/22] file-restore: add 'extract' command for VM file restore Stefan Reiter
2021-02-16 17:11 ` [pbs-devel] [PATCH 00/22] Single file restore for VM images Stefan Reiter
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210216170710.31767-15-s.reiter@proxmox.com \
--to=s.reiter@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.