* [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper
2022-04-08 9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
@ 2022-04-08 9:56 ` Dominik Csapak
2022-04-12 12:29 ` [pbs-devel] applied-series: " Wolfgang Bumiller
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait Dominik Csapak
` (5 subsequent siblings)
6 siblings, 1 reply; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08 9:56 UTC (permalink / raw)
To: pbs-devel
this wraps around a tokio Sender for Vec<u8>, but implements a blocking
write. We can use thas as an adapter for something that only takes a
writer, and can read from it asynchonously
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-async/src/blocking/mod.rs | 3 ++
proxmox-async/src/blocking/sender_writer.rs | 47 +++++++++++++++++++++
2 files changed, 50 insertions(+)
create mode 100644 proxmox-async/src/blocking/sender_writer.rs
diff --git a/proxmox-async/src/blocking/mod.rs b/proxmox-async/src/blocking/mod.rs
index 28247b3..06f821a 100644
--- a/proxmox-async/src/blocking/mod.rs
+++ b/proxmox-async/src/blocking/mod.rs
@@ -9,3 +9,6 @@ pub use tokio_writer_adapter::TokioWriterAdapter;
mod wrapped_reader_stream;
pub use wrapped_reader_stream::WrappedReaderStream;
+
+mod sender_writer;
+pub use sender_writer::SenderWriter;
diff --git a/proxmox-async/src/blocking/sender_writer.rs b/proxmox-async/src/blocking/sender_writer.rs
new file mode 100644
index 0000000..62682e5
--- /dev/null
+++ b/proxmox-async/src/blocking/sender_writer.rs
@@ -0,0 +1,47 @@
+use std::io;
+
+use anyhow::Error;
+use tokio::sync::mpsc::Sender;
+
+/// Wrapper struct around [`tokio::sync::mpsc::Sender`] for `Result<Vec<u8>, Error>` that implements [`std::io::Write`]
+pub struct SenderWriter {
+ sender: Sender<Result<Vec<u8>, Error>>,
+}
+
+impl SenderWriter {
+ pub fn from_sender(sender: tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>) -> Self {
+ Self { sender }
+ }
+
+ fn write_impl(&mut self, buf: &[u8]) -> io::Result<usize> {
+ if let Err(err) = self.sender.blocking_send(Ok(buf.to_vec())) {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ format!("could not send: {}", err),
+ ));
+ }
+
+ Ok(buf.len())
+ }
+
+ fn flush_impl(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl io::Write for SenderWriter {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.write_impl(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.flush_impl()
+ }
+}
+
+impl Drop for SenderWriter {
+ fn drop(&mut self) {
+ // ignore errors
+ let _ = self.flush_impl();
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait
2022-04-08 9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
@ 2022-04-08 9:56 ` Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization Dominik Csapak
` (4 subsequent siblings)
6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08 9:56 UTC (permalink / raw)
To: pbs-devel
this will be useful as a generic return type for api calls which
must implement Serialize.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-router/Cargo.toml | 2 +
proxmox-router/src/lib.rs | 2 +
proxmox-router/src/serializable_return.rs | 62 +++++++++++++++++++++++
3 files changed, 66 insertions(+)
create mode 100644 proxmox-router/src/serializable_return.rs
diff --git a/proxmox-router/Cargo.toml b/proxmox-router/Cargo.toml
index d9f47ea..f042f31 100644
--- a/proxmox-router/Cargo.toml
+++ b/proxmox-router/Cargo.toml
@@ -15,6 +15,7 @@ hyper = { version = "0.14", features = [ "full" ] }
nix = "0.19.1"
percent-encoding = "2.1"
serde_json = "1.0"
+serde = "1.0"
unicode-width ="0.1.8"
# cli:
@@ -24,6 +25,7 @@ libc = { version = "0.2", optional = true }
proxmox-lang = { path = "../proxmox-lang", version = "1.1" }
proxmox-schema = { path = "../proxmox-schema", version = "1.1" }
+proxmox-async = { path = "../proxmox-async", version = "0.4" }
[features]
default = [ "cli" ]
diff --git a/proxmox-router/src/lib.rs b/proxmox-router/src/lib.rs
index dadb917..84f39f8 100644
--- a/proxmox-router/src/lib.rs
+++ b/proxmox-router/src/lib.rs
@@ -12,6 +12,7 @@ pub mod error;
mod permission;
mod router;
mod rpc_environment;
+mod serializable_return;
#[doc(inline)]
pub use error::HttpError;
@@ -19,6 +20,7 @@ pub use error::HttpError;
pub use permission::*;
pub use router::*;
pub use rpc_environment::{RpcEnvironment, RpcEnvironmentType};
+pub use serializable_return::SerializableReturn;
// make list_subdirs_api_method! work without an explicit proxmox-schema dependency:
#[doc(hidden)]
diff --git a/proxmox-router/src/serializable_return.rs b/proxmox-router/src/serializable_return.rs
new file mode 100644
index 0000000..51f6c3c
--- /dev/null
+++ b/proxmox-router/src/serializable_return.rs
@@ -0,0 +1,62 @@
+use serde::Serializer;
+use serde_json::Value;
+
+/// This defines a *fixed* serializer (iow. also where/how to write out the data).
+///
+/// (Note that `serde::Serializer` is implemented for `__&mut__ serde_json::Serializer`.
+type SenderSerializer<'a> = &'a mut serde_json::Serializer<
+ &'a mut std::io::BufWriter<proxmox_async::blocking::SenderWriter>,
+>;
+
+/// This is an object-safe trait which requires the ability to serialize into particular
+/// Serializer instances.
+pub trait SerializableReturn {
+ /// Serializes self into a [`proxmox_async::blocking::SenderWriter`] wrapped
+ /// into a [`std::io::BufWriter`]
+ ///
+ /// If `value` is an Object/Map, serializes that first and puts the value of
+ /// `self` into the `data` property.
+ fn sender_serialize(
+ &self,
+ serializer: SenderSerializer,
+ value: Value,
+ ) -> Result<
+ <SenderSerializer as serde::Serializer>::Ok,
+ <SenderSerializer as serde::Serializer>::Error,
+ >;
+
+ /// Returns a value again from self
+ fn to_value(&self) -> Result<Value, serde_json::error::Error>;
+}
+
+impl<T> SerializableReturn for T
+where
+ T: serde::Serialize,
+{
+ fn sender_serialize(
+ &self,
+ serializer: SenderSerializer,
+ value: Value,
+ ) -> Result<
+ <SenderSerializer as serde::Serializer>::Ok,
+ <SenderSerializer as serde::Serializer>::Error,
+ > {
+ use serde::ser::SerializeMap;
+ if let Some(original) = value.as_object() {
+ let mut map = serializer.serialize_map(None)?;
+ for (k, v) in original {
+ map.serialize_entry(k, v)?;
+ }
+
+ map.serialize_key("data")?;
+ map.serialize_value(&self)?;
+ map.end()
+ } else {
+ self.serialize(serializer)
+ }
+ }
+
+ fn to_value(&self) -> Result<Value, serde_json::error::Error> {
+ serde_json::to_value(self)
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization
2022-04-08 9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait Dominik Csapak
@ 2022-04-08 9:56 ` Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option Dominik Csapak
` (3 subsequent siblings)
6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08 9:56 UTC (permalink / raw)
To: pbs-devel
they should behave like their normal variants, but return a
`Box<dyn SerializableReturn + Send>` instead of a value. This is useful
since we do not have to generate the `Value` in-memory, but can
stream the serialization to the client.
We cannot simply use a `Box<dyn serde::Serialize>`, because that trait
is not object-safe and thus cannot be used as a trait-object.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-router/src/cli/command.rs | 45 ++++++++++++++++++
proxmox-router/src/router.rs | 78 +++++++++++++++++++++++++++++++
2 files changed, 123 insertions(+)
diff --git a/proxmox-router/src/cli/command.rs b/proxmox-router/src/cli/command.rs
index 1b05078..76ecddc 100644
--- a/proxmox-router/src/cli/command.rs
+++ b/proxmox-router/src/cli/command.rs
@@ -72,6 +72,18 @@ async fn handle_simple_command_future(
return Err(err);
}
},
+ ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) {
+ Ok(value) => {
+ let value = value.to_value()?;
+ if value != Value::Null {
+ println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+ }
+ }
+ Err(err) => {
+ eprintln!("Error: {}", err);
+ return Err(err);
+ }
+ },
ApiHandler::Async(handler) => {
let future = (handler)(params, cli_cmd.info, &mut rpcenv);
@@ -87,6 +99,22 @@ async fn handle_simple_command_future(
}
}
}
+ ApiHandler::StreamingAsync(handler) => {
+ let future = (handler)(params, cli_cmd.info, &mut rpcenv);
+
+ match future.await {
+ Ok(value) => {
+ let value = value.to_value()?;
+ if value != Value::Null {
+ println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+ }
+ }
+ Err(err) => {
+ eprintln!("Error: {}", err);
+ return Err(err);
+ }
+ }
+ }
ApiHandler::AsyncHttp(_) => {
let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
print_simple_usage_error(prefix, cli_cmd, err_msg);
@@ -118,6 +146,18 @@ fn handle_simple_command(
return Err(err);
}
},
+ ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) {
+ Ok(value) => {
+ let value = value.to_value()?;
+ if value != Value::Null {
+ println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+ }
+ }
+ Err(err) => {
+ eprintln!("Error: {}", err);
+ return Err(err);
+ }
+ },
ApiHandler::Async(handler) => {
let future = (handler)(params, cli_cmd.info, &mut rpcenv);
if let Some(run) = run {
@@ -138,6 +178,11 @@ fn handle_simple_command(
return Err(format_err!("{}", err_msg));
}
}
+ ApiHandler::StreamingAsync(_handler) => {
+ let err_msg = "CliHandler does not support ApiHandler::StreamingAsync - internal error";
+ print_simple_usage_error(prefix, cli_cmd, err_msg);
+ return Err(format_err!("{}", err_msg));
+ }
ApiHandler::AsyncHttp(_) => {
let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
print_simple_usage_error(prefix, cli_cmd, err_msg);
diff --git a/proxmox-router/src/router.rs b/proxmox-router/src/router.rs
index a469891..3fa0033 100644
--- a/proxmox-router/src/router.rs
+++ b/proxmox-router/src/router.rs
@@ -14,6 +14,7 @@ use proxmox_schema::{ObjectSchema, ParameterSchema, ReturnType, Schema};
use super::Permission;
use crate::RpcEnvironment;
+use crate::SerializableReturn;
/// A synchronous API handler gets a json Value as input and returns a json Value as output.
///
@@ -42,6 +43,37 @@ pub type ApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironm
+ Sync
+ 'static);
+/// A synchronous API handler gets a json Value as input and returns a serializable return value as output.
+///
+/// ```
+/// # use anyhow::Error;
+/// # use serde_json::{json, Value};
+/// use proxmox_router::{ApiHandler, ApiMethod, RpcEnvironment, SerializableReturn};
+/// use proxmox_schema::ObjectSchema;
+///
+/// fn hello(
+/// param: Value,
+/// info: &ApiMethod,
+/// rpcenv: &mut dyn RpcEnvironment,
+/// ) -> Result<Box<dyn SerializableReturn + Send>, Error> {
+/// let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
+/// Ok(res)
+/// }
+///
+/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new(
+/// &ApiHandler::StreamingSync(&hello),
+/// &ObjectSchema::new("Hello World Example", &[])
+/// );
+/// ```
+pub type StreamingApiHandlerFn = &'static (dyn Fn(
+ Value,
+ &ApiMethod,
+ &mut dyn RpcEnvironment,
+) -> Result<Box<dyn SerializableReturn + Send>, Error>
+ + Send
+ + Sync
+ + 'static);
+
/// Asynchronous API handlers
///
/// Returns a future Value.
@@ -74,6 +106,44 @@ pub type ApiAsyncHandlerFn = &'static (dyn for<'a> Fn(Value, &'static ApiMethod,
pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error>> + Send + 'a>>;
+/// Streaming asynchronous API handlers
+///
+/// Returns a future Value.
+/// ```
+/// # use serde_json::{json, Value};
+/// #
+/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment, StreamingApiFuture, SerializableReturn};
+/// use proxmox_schema::ObjectSchema;
+///
+///
+/// fn hello_future<'a>(
+/// param: Value,
+/// info: &ApiMethod,
+/// rpcenv: &'a mut dyn RpcEnvironment,
+/// ) -> StreamingApiFuture<'a> {
+/// Box::pin(async move {
+/// let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
+/// Ok(res)
+/// })
+/// }
+///
+/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new(
+/// &ApiHandler::StreamingAsync(&hello_future),
+/// &ObjectSchema::new("Hello World Example (async)", &[])
+/// );
+/// ```
+pub type StreamingApiAsyncHandlerFn = &'static (dyn for<'a> Fn(
+ Value,
+ &'static ApiMethod,
+ &'a mut dyn RpcEnvironment,
+) -> StreamingApiFuture<'a>
+ + Send
+ + Sync);
+
+pub type StreamingApiFuture<'a> = Pin<
+ Box<dyn Future<Output = Result<Box<dyn SerializableReturn + Send>, anyhow::Error>> + Send + 'a>,
+>;
+
/// Asynchronous HTTP API handlers
///
/// They get low level access to request and response data. Use this
@@ -124,7 +194,9 @@ pub type ApiResponseFuture =
/// Enum for different types of API handler functions.
pub enum ApiHandler {
Sync(ApiHandlerFn),
+ StreamingSync(StreamingApiHandlerFn),
Async(ApiAsyncHandlerFn),
+ StreamingAsync(StreamingApiAsyncHandlerFn),
AsyncHttp(ApiAsyncHttpHandlerFn),
}
@@ -139,9 +211,15 @@ impl PartialEq for ApiHandler {
(ApiHandler::Sync(l), ApiHandler::Sync(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
}
+ (ApiHandler::StreamingSync(l), ApiHandler::StreamingSync(r)) => {
+ core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
+ }
(ApiHandler::Async(l), ApiHandler::Async(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
}
+ (ApiHandler::StreamingAsync(l), ApiHandler::StreamingAsync(r)) => {
+ core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
+ }
(ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
}
--
2.30.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option
2022-04-08 9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
` (2 preceding siblings ...)
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization Dominik Csapak
@ 2022-04-08 9:56 ` Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
` (2 subsequent siblings)
6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08 9:56 UTC (permalink / raw)
To: pbs-devel
to generate the `Streaming` variants of the ApiHandler
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-api-macro/src/api/method.rs | 127 +++++++++++++++++++---------
proxmox-api-macro/tests/api1.rs | 16 ++++
2 files changed, 104 insertions(+), 39 deletions(-)
diff --git a/proxmox-api-macro/src/api/method.rs b/proxmox-api-macro/src/api/method.rs
index f82cc53..1db6124 100644
--- a/proxmox-api-macro/src/api/method.rs
+++ b/proxmox-api-macro/src/api/method.rs
@@ -169,6 +169,12 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
.transpose()?
.unwrap_or(false);
+ let streaming: bool = attribs
+ .remove("streaming")
+ .map(TryFrom::try_from)
+ .transpose()?
+ .unwrap_or(false);
+
if !attribs.is_empty() {
error!(
attribs.span(),
@@ -195,6 +201,7 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
&mut func,
&mut wrapper_ts,
&mut default_consts,
+ streaming,
)?;
// input schema is done, let's give the method body a chance to extract default parameters:
@@ -217,10 +224,11 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
returns_schema_setter = quote! { .returns(#inner) };
}
- let api_handler = if is_async {
- quote! { ::proxmox_router::ApiHandler::Async(&#api_func_name) }
- } else {
- quote! { ::proxmox_router::ApiHandler::Sync(&#api_func_name) }
+ let api_handler = match (streaming, is_async) {
+ (true, true) => quote! { ::proxmox_router::ApiHandler::StreamingAsync(&#api_func_name) },
+ (true, false) => quote! { ::proxmox_router::ApiHandler::StreamingSync(&#api_func_name) },
+ (false, true) => quote! { ::proxmox_router::ApiHandler::Async(&#api_func_name) },
+ (false, false) => quote! { ::proxmox_router::ApiHandler::Sync(&#api_func_name) },
};
Ok(quote_spanned! { func.sig.span() =>
@@ -279,6 +287,7 @@ fn handle_function_signature(
func: &mut syn::ItemFn,
wrapper_ts: &mut TokenStream,
default_consts: &mut TokenStream,
+ streaming: bool,
) -> Result<Ident, Error> {
let sig = &func.sig;
let is_async = sig.asyncness.is_some();
@@ -414,6 +423,7 @@ fn handle_function_signature(
wrapper_ts,
default_consts,
is_async,
+ streaming,
)
}
@@ -471,6 +481,7 @@ fn create_wrapper_function(
wrapper_ts: &mut TokenStream,
default_consts: &mut TokenStream,
is_async: bool,
+ streaming: bool,
) -> Result<Ident, Error> {
let api_func_name = Ident::new(
&format!("api_function_{}", &func.sig.ident),
@@ -512,45 +523,83 @@ fn create_wrapper_function(
_ => Some(quote!(?)),
};
- let body = quote! {
- if let ::serde_json::Value::Object(ref mut input_map) = &mut input_params {
- #body
- Ok(::serde_json::to_value(#func_name(#args) #await_keyword #question_mark)?)
- } else {
- ::anyhow::bail!("api function wrapper called with a non-object json value");
- }
- };
-
- if is_async {
- wrapper_ts.extend(quote! {
- fn #api_func_name<'a>(
- mut input_params: ::serde_json::Value,
- api_method_param: &'static ::proxmox_router::ApiMethod,
- rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
- ) -> ::proxmox_router::ApiFuture<'a> {
- //async fn func<'a>(
- // mut input_params: ::serde_json::Value,
- // api_method_param: &'static ::proxmox_router::ApiMethod,
- // rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
- //) -> ::std::result::Result<::serde_json::Value, ::anyhow::Error> {
- // #body
- //}
- //::std::boxed::Box::pin(async move {
- // func(input_params, api_method_param, rpc_env_param).await
- //})
- ::std::boxed::Box::pin(async move { #body })
+ let body = if streaming {
+ quote! {
+ if let ::serde_json::Value::Object(ref mut input_map) = &mut input_params {
+ #body
+ let res = #func_name(#args) #await_keyword #question_mark;
+ let res: ::std::boxed::Box<dyn ::proxmox_router::SerializableReturn + Send> = ::std::boxed::Box::new(res);
+ Ok(res)
+ } else {
+ ::anyhow::bail!("api function wrapper called with a non-object json value");
}
- });
+ }
} else {
- wrapper_ts.extend(quote! {
- fn #api_func_name(
- mut input_params: ::serde_json::Value,
- api_method_param: &::proxmox_router::ApiMethod,
- rpc_env_param: &mut dyn ::proxmox_router::RpcEnvironment,
- ) -> ::std::result::Result<::serde_json::Value, ::anyhow::Error> {
+ quote! {
+ if let ::serde_json::Value::Object(ref mut input_map) = &mut input_params {
#body
+ Ok(::serde_json::to_value(#func_name(#args) #await_keyword #question_mark)?)
+ } else {
+ ::anyhow::bail!("api function wrapper called with a non-object json value");
}
- });
+ }
+ };
+
+ match (streaming, is_async) {
+ (true, true) => {
+ wrapper_ts.extend(quote! {
+ fn #api_func_name<'a>(
+ mut input_params: ::serde_json::Value,
+ api_method_param: &'static ::proxmox_router::ApiMethod,
+ rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
+ ) -> ::proxmox_router::StreamingApiFuture<'a> {
+ ::std::boxed::Box::pin(async move { #body })
+ }
+ });
+ }
+ (true, false) => {
+ wrapper_ts.extend(quote! {
+ fn #api_func_name(
+ mut input_params: ::serde_json::Value,
+ api_method_param: &::proxmox_router::ApiMethod,
+ rpc_env_param: &mut dyn ::proxmox_router::RpcEnvironment,
+ ) -> ::std::result::Result<::std::boxed::Box<dyn ::proxmox_router::SerializableReturn + Send>, ::anyhow::Error> {
+ #body
+ }
+ });
+ }
+ (false, true) => {
+ wrapper_ts.extend(quote! {
+ fn #api_func_name<'a>(
+ mut input_params: ::serde_json::Value,
+ api_method_param: &'static ::proxmox_router::ApiMethod,
+ rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
+ ) -> ::proxmox_router::ApiFuture<'a> {
+ //async fn func<'a>(
+ // mut input_params: ::serde_json::Value,
+ // api_method_param: &'static ::proxmox_router::ApiMethod,
+ // rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
+ //) -> ::std::result::Result<::serde_json::Value, ::anyhow::Error> {
+ // #body
+ //}
+ //::std::boxed::Box::pin(async move {
+ // func(input_params, api_method_param, rpc_env_param).await
+ //})
+ ::std::boxed::Box::pin(async move { #body })
+ }
+ });
+ }
+ (false, false) => {
+ wrapper_ts.extend(quote! {
+ fn #api_func_name(
+ mut input_params: ::serde_json::Value,
+ api_method_param: &::proxmox_router::ApiMethod,
+ rpc_env_param: &mut dyn ::proxmox_router::RpcEnvironment,
+ ) -> ::std::result::Result<::serde_json::Value, ::anyhow::Error> {
+ #body
+ }
+ });
+ }
}
Ok(api_func_name)
diff --git a/proxmox-api-macro/tests/api1.rs b/proxmox-api-macro/tests/api1.rs
index fd9a338..ef60370 100644
--- a/proxmox-api-macro/tests/api1.rs
+++ b/proxmox-api-macro/tests/api1.rs
@@ -235,6 +235,22 @@ pub fn basic_function() -> Result<(), Error> {
Ok(())
}
+#[api(
+ streaming: true,
+)]
+/// streaming async call
+pub async fn streaming_async_call() -> Result<(), Error> {
+ Ok(())
+}
+
+#[api(
+ streaming: true,
+)]
+/// streaming sync call
+pub fn streaming_sync_call() -> Result<(), Error> {
+ Ok(())
+}
+
#[api(
input: {
properties: {
--
2.30.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method
2022-04-08 9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
` (3 preceding siblings ...)
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option Dominik Csapak
@ 2022-04-08 9:56 ` Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08 9:56 UTC (permalink / raw)
To: pbs-devel
that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.
Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
2 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 56aa91e8..a957f8df 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
tokio = { version = "1.6", features = ["signal", "process"] }
tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
tower-service = "0.3.0"
url = "2.1"
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index e31df5b8..fd7b6c69 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
use hyper::header;
use hyper::{Body, Response, StatusCode};
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
use proxmox_schema::ParameterError;
/// Extension to set error message for server side logging
@@ -18,6 +18,13 @@ pub trait OutputFormatter: Send + Sync {
/// Transform json data into a http response
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
+ /// Transform serializable data into a streaming http response
+ fn format_data_streaming(
+ &self,
+ data: Box<dyn SerializableReturn + Send>,
+ rpcenv: &dyn RpcEnvironment,
+ ) -> Result<Response<Body>, Error>;
+
/// Transform errors into a http response
fn format_error(&self, err: Error) -> Response<Body>;
@@ -50,6 +57,16 @@ fn json_data_response(data: Value) -> Response<Body> {
response
}
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+ let response = Response::builder()
+ .header(
+ header::CONTENT_TYPE,
+ header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+ )
+ .body(body)?;
+ Ok(response)
+}
+
fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
let attributes = match rpcenv.result_attrib().as_object() {
Some(attr) => attr,
@@ -61,6 +78,22 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
}
}
+fn start_data_streaming(
+ value: Value,
+ data: Box<dyn SerializableReturn + Send>,
+) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+ let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+ tokio::task::spawn_blocking(move || {
+ let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+ let mut output = std::io::BufWriter::new(output);
+ let mut serializer = serde_json::Serializer::new(&mut output);
+ let _ = data.sender_serialize(&mut serializer, value);
+ });
+
+ reader
+}
+
struct JsonFormatter();
/// Format data as ``application/json``
@@ -84,6 +117,21 @@ impl OutputFormatter for JsonFormatter {
json_data_response(result)
}
+ fn format_data_streaming(
+ &self,
+ data: Box<dyn SerializableReturn + Send>,
+ rpcenv: &dyn RpcEnvironment,
+ ) -> Result<Response<Body>, Error> {
+ let mut value = json!({});
+
+ add_result_attributes(&mut value, rpcenv);
+
+ let reader = start_data_streaming(value, data);
+ let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+ json_data_response_streaming(Body::wrap_stream(stream))
+ }
+
fn format_error(&self, err: Error) -> Response<Body> {
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone()));
@@ -140,6 +188,23 @@ impl OutputFormatter for ExtJsFormatter {
json_data_response(result)
}
+ fn format_data_streaming(
+ &self,
+ data: Box<dyn SerializableReturn + Send>,
+ rpcenv: &dyn RpcEnvironment,
+ ) -> Result<Response<Body>, Error> {
+ let mut value = json!({
+ "success": true,
+ });
+
+ add_result_attributes(&mut value, rpcenv);
+
+ let reader = start_data_streaming(value, data);
+ let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+ json_data_response_streaming(Body::wrap_stream(stream))
+ }
+
fn format_error(&self, err: Error) -> Response<Body> {
let message: String;
let mut errors = HashMap::new();
--
2.30.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants
2022-04-08 9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
` (4 preceding siblings ...)
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
@ 2022-04-08 9:56 ` Dominik Csapak
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08 9:56 UTC (permalink / raw)
To: pbs-devel
namely 'StreamingSync' and 'StreamingAsync'
in rest-server by using the new formatter function,
and in the debug binary by using 'to_value'
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-rest-server/src/rest.rs | 13 +++++++++++++
src/bin/proxmox_backup_debug/api.rs | 8 ++++++++
2 files changed, 21 insertions(+)
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index 2aadf1ed..1689ef6e 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -445,6 +445,19 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
(handler)(parts, req_body, params, info, Box::new(rpcenv)).await
}
+ ApiHandler::StreamingSync(handler) => {
+ let params =
+ get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+ (handler)(params, info, &mut rpcenv)
+ .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
+ }
+ ApiHandler::StreamingAsync(handler) => {
+ let params =
+ get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+ (handler)(params, info, &mut rpcenv)
+ .await
+ .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
+ }
ApiHandler::Sync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
diff --git a/src/bin/proxmox_backup_debug/api.rs b/src/bin/proxmox_backup_debug/api.rs
index 47124089..e5d9f508 100644
--- a/src/bin/proxmox_backup_debug/api.rs
+++ b/src/bin/proxmox_backup_debug/api.rs
@@ -229,6 +229,14 @@ async fn call_api_code(
nix::unistd::setuid(backup_user.uid)?;
}
match method.handler {
+ ApiHandler::StreamingSync(handler) => {
+ let res = (handler)(params, method, rpcenv)?.to_value()?;
+ Ok(res)
+ }
+ ApiHandler::StreamingAsync(handler) => {
+ let res = (handler)(params, method, rpcenv).await?.to_value()?;
+ Ok(res)
+ }
ApiHandler::AsyncHttp(_handler) => {
bail!("not implemented");
}
--
2.30.2
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls
2022-04-08 9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
` (5 preceding siblings ...)
2022-04-08 9:56 ` [pbs-devel] [PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants Dominik Csapak
@ 2022-04-08 9:56 ` Dominik Csapak
6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08 9:56 UTC (permalink / raw)
To: pbs-devel
namely /admin/datastore/{store}/snapshots
and /nodes/{node}/tasks
since those are api calls where the result can get quite large
with this change, the serialization is now streaming instead of making
a `Value` in memory.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/api2/admin/datastore.rs | 1 +
src/api2/node/tasks.rs | 1 +
2 files changed, 2 insertions(+)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index ef82b426..55fab62c 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -375,6 +375,7 @@ pub fn delete_snapshot(
}
#[api(
+ streaming: true,
input: {
properties: {
store: {
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 07353d0b..b8046f1b 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -380,6 +380,7 @@ fn stop_task(
}
#[api(
+ streaming: true,
input: {
properties: {
node: {
--
2.30.2
^ permalink raw reply [flat|nested] 9+ messages in thread