public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls
@ 2022-04-08  9:55 Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
                   ` (6 more replies)
  0 siblings, 7 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08  9:55 UTC (permalink / raw)
  To: pbs-devel

this series aims to improve memory behaviour for api calls where the
result is large. We currently convert that result into a serde `Value`
which, depending on the actual structure of the data, can use much more
memory than the original rust structure (e.g. i saw factor 5 to 10 for
the backup snapshot list) which in addition seems to trigger bad
behaviour in the memory allocator (this again....).

by streaming the serialization, we don't need any in memory copy of the
result, and we could probably even return an Arc<> or Rc<> so we maybe
don't have to copy the data at all (we would have to implement Serialize
on that ourselfs, or enable the 'rc' feature for serde to use that)

to overcome that serde serialization is not async, use
tokio::spawn_blocking for each api call. this will spawn a thread if
none is idle, and only up to a max 'blocking thread pool' maximum.

dependencies (if it's not clear already):
proxmox-async needs a bump, router and api-macro need to depend on that
proxmox-router/api macro would then also need a bump on which
proxmox-rest-server/proxmox-backup needs to depend

changes from rfc:
* use tokio::spawn_blocking instead of a new thread
* dependencies from proxmox-async are greatly reduced since we split
  it up recently

proxmox:

Dominik Csapak (4):
  proxmox-async: add SenderWriter helper
  promxox-router: add SerializableReturn Trait
  proxmox-router: add new ApiHandler variants for streaming
    serialization
  proxmox-api-macro: add 'streaming' option

 proxmox-api-macro/src/api/method.rs         | 127 ++++++++++++++------
 proxmox-api-macro/tests/api1.rs             |  16 +++
 proxmox-async/src/blocking/mod.rs           |   3 +
 proxmox-async/src/blocking/sender_writer.rs |  47 ++++++++
 proxmox-router/Cargo.toml                   |   2 +
 proxmox-router/src/cli/command.rs           |  45 +++++++
 proxmox-router/src/lib.rs                   |   2 +
 proxmox-router/src/router.rs                |  78 ++++++++++++
 proxmox-router/src/serializable_return.rs   |  62 ++++++++++
 9 files changed, 343 insertions(+), 39 deletions(-)
 create mode 100644 proxmox-async/src/blocking/sender_writer.rs
 create mode 100644 proxmox-router/src/serializable_return.rs

proxmox-backup:

Dominik Csapak (3):
  proxmox-rest-server: OutputFormatter: add new format_data_streaming
    method
  adapt to the new ApiHandler variants
  api: admin/datastore: enable streaming for some api calls

 proxmox-rest-server/Cargo.toml       |  1 +
 proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
 proxmox-rest-server/src/rest.rs      | 13 ++++++
 src/api2/admin/datastore.rs          |  1 +
 src/api2/node/tasks.rs               |  1 +
 src/bin/proxmox_backup_debug/api.rs  |  8 ++++
 6 files changed, 90 insertions(+), 1 deletion(-)

-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper
  2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
@ 2022-04-08  9:56 ` Dominik Csapak
  2022-04-12 12:29   ` [pbs-devel] applied-series: " Wolfgang Bumiller
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait Dominik Csapak
                   ` (5 subsequent siblings)
  6 siblings, 1 reply; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08  9:56 UTC (permalink / raw)
  To: pbs-devel

this wraps around a tokio Sender for Vec<u8>, but implements a blocking
write. We can use thas as an adapter for something that only takes a
writer, and can read from it asynchonously

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-async/src/blocking/mod.rs           |  3 ++
 proxmox-async/src/blocking/sender_writer.rs | 47 +++++++++++++++++++++
 2 files changed, 50 insertions(+)
 create mode 100644 proxmox-async/src/blocking/sender_writer.rs

diff --git a/proxmox-async/src/blocking/mod.rs b/proxmox-async/src/blocking/mod.rs
index 28247b3..06f821a 100644
--- a/proxmox-async/src/blocking/mod.rs
+++ b/proxmox-async/src/blocking/mod.rs
@@ -9,3 +9,6 @@ pub use tokio_writer_adapter::TokioWriterAdapter;
 
 mod wrapped_reader_stream;
 pub use wrapped_reader_stream::WrappedReaderStream;
+
+mod sender_writer;
+pub use sender_writer::SenderWriter;
diff --git a/proxmox-async/src/blocking/sender_writer.rs b/proxmox-async/src/blocking/sender_writer.rs
new file mode 100644
index 0000000..62682e5
--- /dev/null
+++ b/proxmox-async/src/blocking/sender_writer.rs
@@ -0,0 +1,47 @@
+use std::io;
+
+use anyhow::Error;
+use tokio::sync::mpsc::Sender;
+
+/// Wrapper struct around [`tokio::sync::mpsc::Sender`] for `Result<Vec<u8>, Error>` that implements [`std::io::Write`]
+pub struct SenderWriter {
+    sender: Sender<Result<Vec<u8>, Error>>,
+}
+
+impl SenderWriter {
+    pub fn from_sender(sender: tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>) -> Self {
+        Self { sender }
+    }
+
+    fn write_impl(&mut self, buf: &[u8]) -> io::Result<usize> {
+        if let Err(err) = self.sender.blocking_send(Ok(buf.to_vec())) {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                format!("could not send: {}", err),
+            ));
+        }
+
+        Ok(buf.len())
+    }
+
+    fn flush_impl(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+}
+
+impl io::Write for SenderWriter {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.write_impl(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.flush_impl()
+    }
+}
+
+impl Drop for SenderWriter {
+    fn drop(&mut self) {
+        // ignore errors
+        let _ = self.flush_impl();
+    }
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait
  2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
@ 2022-04-08  9:56 ` Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization Dominik Csapak
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08  9:56 UTC (permalink / raw)
  To: pbs-devel

this will be useful as a generic return type for api calls which
must implement Serialize.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-router/Cargo.toml                 |  2 +
 proxmox-router/src/lib.rs                 |  2 +
 proxmox-router/src/serializable_return.rs | 62 +++++++++++++++++++++++
 3 files changed, 66 insertions(+)
 create mode 100644 proxmox-router/src/serializable_return.rs

diff --git a/proxmox-router/Cargo.toml b/proxmox-router/Cargo.toml
index d9f47ea..f042f31 100644
--- a/proxmox-router/Cargo.toml
+++ b/proxmox-router/Cargo.toml
@@ -15,6 +15,7 @@ hyper = { version = "0.14", features = [ "full" ] }
 nix = "0.19.1"
 percent-encoding = "2.1"
 serde_json = "1.0"
+serde = "1.0"
 unicode-width ="0.1.8"
 
 # cli:
@@ -24,6 +25,7 @@ libc = { version = "0.2", optional = true }
 
 proxmox-lang = { path = "../proxmox-lang", version = "1.1" }
 proxmox-schema = { path = "../proxmox-schema", version = "1.1" }
+proxmox-async = { path = "../proxmox-async", version = "0.4" }
 
 [features]
 default = [ "cli" ]
diff --git a/proxmox-router/src/lib.rs b/proxmox-router/src/lib.rs
index dadb917..84f39f8 100644
--- a/proxmox-router/src/lib.rs
+++ b/proxmox-router/src/lib.rs
@@ -12,6 +12,7 @@ pub mod error;
 mod permission;
 mod router;
 mod rpc_environment;
+mod serializable_return;
 
 #[doc(inline)]
 pub use error::HttpError;
@@ -19,6 +20,7 @@ pub use error::HttpError;
 pub use permission::*;
 pub use router::*;
 pub use rpc_environment::{RpcEnvironment, RpcEnvironmentType};
+pub use serializable_return::SerializableReturn;
 
 // make list_subdirs_api_method! work without an explicit proxmox-schema dependency:
 #[doc(hidden)]
diff --git a/proxmox-router/src/serializable_return.rs b/proxmox-router/src/serializable_return.rs
new file mode 100644
index 0000000..51f6c3c
--- /dev/null
+++ b/proxmox-router/src/serializable_return.rs
@@ -0,0 +1,62 @@
+use serde::Serializer;
+use serde_json::Value;
+
+/// This defines a *fixed* serializer (iow. also where/how to write out the data).
+///
+/// (Note that `serde::Serializer` is implemented for `__&mut__ serde_json::Serializer`.
+type SenderSerializer<'a> = &'a mut serde_json::Serializer<
+    &'a mut std::io::BufWriter<proxmox_async::blocking::SenderWriter>,
+>;
+
+/// This is an object-safe trait which requires the ability to serialize into particular
+/// Serializer instances.
+pub trait SerializableReturn {
+    /// Serializes self into a [`proxmox_async::blocking::SenderWriter`] wrapped
+    /// into a [`std::io::BufWriter`]
+    ///
+    /// If `value` is an Object/Map, serializes that first and puts the value of
+    /// `self` into the `data` property.
+    fn sender_serialize(
+        &self,
+        serializer: SenderSerializer,
+        value: Value,
+    ) -> Result<
+        <SenderSerializer as serde::Serializer>::Ok,
+        <SenderSerializer as serde::Serializer>::Error,
+    >;
+
+    /// Returns a value again from self
+    fn to_value(&self) -> Result<Value, serde_json::error::Error>;
+}
+
+impl<T> SerializableReturn for T
+where
+    T: serde::Serialize,
+{
+    fn sender_serialize(
+        &self,
+        serializer: SenderSerializer,
+        value: Value,
+    ) -> Result<
+        <SenderSerializer as serde::Serializer>::Ok,
+        <SenderSerializer as serde::Serializer>::Error,
+    > {
+        use serde::ser::SerializeMap;
+        if let Some(original) = value.as_object() {
+            let mut map = serializer.serialize_map(None)?;
+            for (k, v) in original {
+                map.serialize_entry(k, v)?;
+            }
+
+            map.serialize_key("data")?;
+            map.serialize_value(&self)?;
+            map.end()
+        } else {
+            self.serialize(serializer)
+        }
+    }
+
+    fn to_value(&self) -> Result<Value, serde_json::error::Error> {
+        serde_json::to_value(self)
+    }
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization
  2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait Dominik Csapak
@ 2022-04-08  9:56 ` Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option Dominik Csapak
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08  9:56 UTC (permalink / raw)
  To: pbs-devel

they should behave like their normal variants, but return a
`Box<dyn SerializableReturn + Send>` instead of a value. This is useful
since we do not have to generate the `Value` in-memory, but can
stream the serialization to the client.

We cannot simply use a `Box<dyn serde::Serialize>`, because that trait
is not object-safe and thus cannot be used as a trait-object.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-router/src/cli/command.rs | 45 ++++++++++++++++++
 proxmox-router/src/router.rs      | 78 +++++++++++++++++++++++++++++++
 2 files changed, 123 insertions(+)

diff --git a/proxmox-router/src/cli/command.rs b/proxmox-router/src/cli/command.rs
index 1b05078..76ecddc 100644
--- a/proxmox-router/src/cli/command.rs
+++ b/proxmox-router/src/cli/command.rs
@@ -72,6 +72,18 @@ async fn handle_simple_command_future(
                 return Err(err);
             }
         },
+        ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) {
+            Ok(value) => {
+                let value = value.to_value()?;
+                if value != Value::Null {
+                    println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+                }
+            }
+            Err(err) => {
+                eprintln!("Error: {}", err);
+                return Err(err);
+            }
+        },
         ApiHandler::Async(handler) => {
             let future = (handler)(params, cli_cmd.info, &mut rpcenv);
 
@@ -87,6 +99,22 @@ async fn handle_simple_command_future(
                 }
             }
         }
+        ApiHandler::StreamingAsync(handler) => {
+            let future = (handler)(params, cli_cmd.info, &mut rpcenv);
+
+            match future.await {
+                Ok(value) => {
+                    let value = value.to_value()?;
+                    if value != Value::Null {
+                        println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+                    }
+                }
+                Err(err) => {
+                    eprintln!("Error: {}", err);
+                    return Err(err);
+                }
+            }
+        }
         ApiHandler::AsyncHttp(_) => {
             let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
             print_simple_usage_error(prefix, cli_cmd, err_msg);
@@ -118,6 +146,18 @@ fn handle_simple_command(
                 return Err(err);
             }
         },
+        ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) {
+            Ok(value) => {
+                let value = value.to_value()?;
+                if value != Value::Null {
+                    println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+                }
+            }
+            Err(err) => {
+                eprintln!("Error: {}", err);
+                return Err(err);
+            }
+        },
         ApiHandler::Async(handler) => {
             let future = (handler)(params, cli_cmd.info, &mut rpcenv);
             if let Some(run) = run {
@@ -138,6 +178,11 @@ fn handle_simple_command(
                 return Err(format_err!("{}", err_msg));
             }
         }
+        ApiHandler::StreamingAsync(_handler) => {
+            let err_msg = "CliHandler does not support ApiHandler::StreamingAsync - internal error";
+            print_simple_usage_error(prefix, cli_cmd, err_msg);
+            return Err(format_err!("{}", err_msg));
+        }
         ApiHandler::AsyncHttp(_) => {
             let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
             print_simple_usage_error(prefix, cli_cmd, err_msg);
diff --git a/proxmox-router/src/router.rs b/proxmox-router/src/router.rs
index a469891..3fa0033 100644
--- a/proxmox-router/src/router.rs
+++ b/proxmox-router/src/router.rs
@@ -14,6 +14,7 @@ use proxmox_schema::{ObjectSchema, ParameterSchema, ReturnType, Schema};
 
 use super::Permission;
 use crate::RpcEnvironment;
+use crate::SerializableReturn;
 
 /// A synchronous API handler gets a json Value as input and returns a json Value as output.
 ///
@@ -42,6 +43,37 @@ pub type ApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironm
               + Sync
               + 'static);
 
+/// A synchronous API handler gets a json Value as input and returns a serializable return value as output.
+///
+/// ```
+/// # use anyhow::Error;
+/// # use serde_json::{json, Value};
+/// use proxmox_router::{ApiHandler, ApiMethod, RpcEnvironment, SerializableReturn};
+/// use proxmox_schema::ObjectSchema;
+///
+/// fn hello(
+///    param: Value,
+///    info: &ApiMethod,
+///    rpcenv: &mut dyn RpcEnvironment,
+/// ) -> Result<Box<dyn SerializableReturn + Send>, Error> {
+///    let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
+///    Ok(res)
+/// }
+///
+/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new(
+///    &ApiHandler::StreamingSync(&hello),
+///    &ObjectSchema::new("Hello World Example", &[])
+/// );
+/// ```
+pub type StreamingApiHandlerFn = &'static (dyn Fn(
+    Value,
+    &ApiMethod,
+    &mut dyn RpcEnvironment,
+) -> Result<Box<dyn SerializableReturn + Send>, Error>
+              + Send
+              + Sync
+              + 'static);
+
 /// Asynchronous API handlers
 ///
 /// Returns a future Value.
@@ -74,6 +106,44 @@ pub type ApiAsyncHandlerFn = &'static (dyn for<'a> Fn(Value, &'static ApiMethod,
 
 pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error>> + Send + 'a>>;
 
+/// Streaming asynchronous API handlers
+///
+/// Returns a future Value.
+/// ```
+/// # use serde_json::{json, Value};
+/// #
+/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment, StreamingApiFuture, SerializableReturn};
+/// use proxmox_schema::ObjectSchema;
+///
+///
+/// fn hello_future<'a>(
+///    param: Value,
+///    info: &ApiMethod,
+///    rpcenv: &'a mut dyn RpcEnvironment,
+/// ) -> StreamingApiFuture<'a> {
+///    Box::pin(async move {
+///        let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
+///        Ok(res)
+///    })
+/// }
+///
+/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new(
+///    &ApiHandler::StreamingAsync(&hello_future),
+///    &ObjectSchema::new("Hello World Example (async)", &[])
+/// );
+/// ```
+pub type StreamingApiAsyncHandlerFn = &'static (dyn for<'a> Fn(
+    Value,
+    &'static ApiMethod,
+    &'a mut dyn RpcEnvironment,
+) -> StreamingApiFuture<'a>
+              + Send
+              + Sync);
+
+pub type StreamingApiFuture<'a> = Pin<
+    Box<dyn Future<Output = Result<Box<dyn SerializableReturn + Send>, anyhow::Error>> + Send + 'a>,
+>;
+
 /// Asynchronous HTTP API handlers
 ///
 /// They get low level access to request and response data. Use this
@@ -124,7 +194,9 @@ pub type ApiResponseFuture =
 /// Enum for different types of API handler functions.
 pub enum ApiHandler {
     Sync(ApiHandlerFn),
+    StreamingSync(StreamingApiHandlerFn),
     Async(ApiAsyncHandlerFn),
+    StreamingAsync(StreamingApiAsyncHandlerFn),
     AsyncHttp(ApiAsyncHttpHandlerFn),
 }
 
@@ -139,9 +211,15 @@ impl PartialEq for ApiHandler {
                 (ApiHandler::Sync(l), ApiHandler::Sync(r)) => {
                     core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
                 }
+                (ApiHandler::StreamingSync(l), ApiHandler::StreamingSync(r)) => {
+                    core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
+                }
                 (ApiHandler::Async(l), ApiHandler::Async(r)) => {
                     core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
                 }
+                (ApiHandler::StreamingAsync(l), ApiHandler::StreamingAsync(r)) => {
+                    core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
+                }
                 (ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => {
                     core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
                 }
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option
  2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
                   ` (2 preceding siblings ...)
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization Dominik Csapak
@ 2022-04-08  9:56 ` Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08  9:56 UTC (permalink / raw)
  To: pbs-devel

to generate the `Streaming` variants of the ApiHandler

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-api-macro/src/api/method.rs | 127 +++++++++++++++++++---------
 proxmox-api-macro/tests/api1.rs     |  16 ++++
 2 files changed, 104 insertions(+), 39 deletions(-)

diff --git a/proxmox-api-macro/src/api/method.rs b/proxmox-api-macro/src/api/method.rs
index f82cc53..1db6124 100644
--- a/proxmox-api-macro/src/api/method.rs
+++ b/proxmox-api-macro/src/api/method.rs
@@ -169,6 +169,12 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
         .transpose()?
         .unwrap_or(false);
 
+    let streaming: bool = attribs
+        .remove("streaming")
+        .map(TryFrom::try_from)
+        .transpose()?
+        .unwrap_or(false);
+
     if !attribs.is_empty() {
         error!(
             attribs.span(),
@@ -195,6 +201,7 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
         &mut func,
         &mut wrapper_ts,
         &mut default_consts,
+        streaming,
     )?;
 
     // input schema is done, let's give the method body a chance to extract default parameters:
@@ -217,10 +224,11 @@ pub fn handle_method(mut attribs: JSONObject, mut func: syn::ItemFn) -> Result<T
         returns_schema_setter = quote! { .returns(#inner) };
     }
 
-    let api_handler = if is_async {
-        quote! { ::proxmox_router::ApiHandler::Async(&#api_func_name) }
-    } else {
-        quote! { ::proxmox_router::ApiHandler::Sync(&#api_func_name) }
+    let api_handler = match (streaming, is_async) {
+        (true, true) => quote! { ::proxmox_router::ApiHandler::StreamingAsync(&#api_func_name) },
+        (true, false) => quote! { ::proxmox_router::ApiHandler::StreamingSync(&#api_func_name) },
+        (false, true) => quote! { ::proxmox_router::ApiHandler::Async(&#api_func_name) },
+        (false, false) => quote! { ::proxmox_router::ApiHandler::Sync(&#api_func_name) },
     };
 
     Ok(quote_spanned! { func.sig.span() =>
@@ -279,6 +287,7 @@ fn handle_function_signature(
     func: &mut syn::ItemFn,
     wrapper_ts: &mut TokenStream,
     default_consts: &mut TokenStream,
+    streaming: bool,
 ) -> Result<Ident, Error> {
     let sig = &func.sig;
     let is_async = sig.asyncness.is_some();
@@ -414,6 +423,7 @@ fn handle_function_signature(
         wrapper_ts,
         default_consts,
         is_async,
+        streaming,
     )
 }
 
@@ -471,6 +481,7 @@ fn create_wrapper_function(
     wrapper_ts: &mut TokenStream,
     default_consts: &mut TokenStream,
     is_async: bool,
+    streaming: bool,
 ) -> Result<Ident, Error> {
     let api_func_name = Ident::new(
         &format!("api_function_{}", &func.sig.ident),
@@ -512,45 +523,83 @@ fn create_wrapper_function(
         _ => Some(quote!(?)),
     };
 
-    let body = quote! {
-        if let ::serde_json::Value::Object(ref mut input_map) = &mut input_params {
-            #body
-            Ok(::serde_json::to_value(#func_name(#args) #await_keyword #question_mark)?)
-        } else {
-            ::anyhow::bail!("api function wrapper called with a non-object json value");
-        }
-    };
-
-    if is_async {
-        wrapper_ts.extend(quote! {
-            fn #api_func_name<'a>(
-                mut input_params: ::serde_json::Value,
-                api_method_param: &'static ::proxmox_router::ApiMethod,
-                rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
-            ) -> ::proxmox_router::ApiFuture<'a> {
-                //async fn func<'a>(
-                //    mut input_params: ::serde_json::Value,
-                //    api_method_param: &'static ::proxmox_router::ApiMethod,
-                //    rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
-                //) -> ::std::result::Result<::serde_json::Value, ::anyhow::Error> {
-                //    #body
-                //}
-                //::std::boxed::Box::pin(async move {
-                //    func(input_params, api_method_param, rpc_env_param).await
-                //})
-                ::std::boxed::Box::pin(async move { #body })
+    let body = if streaming {
+        quote! {
+            if let ::serde_json::Value::Object(ref mut input_map) = &mut input_params {
+                #body
+                let res = #func_name(#args) #await_keyword #question_mark;
+                let res: ::std::boxed::Box<dyn ::proxmox_router::SerializableReturn + Send> = ::std::boxed::Box::new(res);
+                Ok(res)
+            } else {
+                ::anyhow::bail!("api function wrapper called with a non-object json value");
             }
-        });
+        }
     } else {
-        wrapper_ts.extend(quote! {
-            fn #api_func_name(
-                mut input_params: ::serde_json::Value,
-                api_method_param: &::proxmox_router::ApiMethod,
-                rpc_env_param: &mut dyn ::proxmox_router::RpcEnvironment,
-            ) -> ::std::result::Result<::serde_json::Value, ::anyhow::Error> {
+        quote! {
+            if let ::serde_json::Value::Object(ref mut input_map) = &mut input_params {
                 #body
+                Ok(::serde_json::to_value(#func_name(#args) #await_keyword #question_mark)?)
+            } else {
+                ::anyhow::bail!("api function wrapper called with a non-object json value");
             }
-        });
+        }
+    };
+
+    match (streaming, is_async) {
+        (true, true) => {
+            wrapper_ts.extend(quote! {
+                fn #api_func_name<'a>(
+                    mut input_params: ::serde_json::Value,
+                    api_method_param: &'static ::proxmox_router::ApiMethod,
+                    rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
+                ) -> ::proxmox_router::StreamingApiFuture<'a> {
+                    ::std::boxed::Box::pin(async move { #body })
+                }
+            });
+        }
+        (true, false) => {
+            wrapper_ts.extend(quote! {
+                fn #api_func_name(
+                    mut input_params: ::serde_json::Value,
+                    api_method_param: &::proxmox_router::ApiMethod,
+                    rpc_env_param: &mut dyn ::proxmox_router::RpcEnvironment,
+                ) -> ::std::result::Result<::std::boxed::Box<dyn ::proxmox_router::SerializableReturn + Send>, ::anyhow::Error> {
+                    #body
+                }
+            });
+        }
+        (false, true) => {
+            wrapper_ts.extend(quote! {
+                fn #api_func_name<'a>(
+                    mut input_params: ::serde_json::Value,
+                    api_method_param: &'static ::proxmox_router::ApiMethod,
+                    rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
+                ) -> ::proxmox_router::ApiFuture<'a> {
+                    //async fn func<'a>(
+                    //    mut input_params: ::serde_json::Value,
+                    //    api_method_param: &'static ::proxmox_router::ApiMethod,
+                    //    rpc_env_param: &'a mut dyn ::proxmox_router::RpcEnvironment,
+                    //) -> ::std::result::Result<::serde_json::Value, ::anyhow::Error> {
+                    //    #body
+                    //}
+                    //::std::boxed::Box::pin(async move {
+                    //    func(input_params, api_method_param, rpc_env_param).await
+                    //})
+                    ::std::boxed::Box::pin(async move { #body })
+                }
+            });
+        }
+        (false, false) => {
+            wrapper_ts.extend(quote! {
+                fn #api_func_name(
+                    mut input_params: ::serde_json::Value,
+                    api_method_param: &::proxmox_router::ApiMethod,
+                    rpc_env_param: &mut dyn ::proxmox_router::RpcEnvironment,
+                ) -> ::std::result::Result<::serde_json::Value, ::anyhow::Error> {
+                    #body
+                }
+            });
+        }
     }
 
     Ok(api_func_name)
diff --git a/proxmox-api-macro/tests/api1.rs b/proxmox-api-macro/tests/api1.rs
index fd9a338..ef60370 100644
--- a/proxmox-api-macro/tests/api1.rs
+++ b/proxmox-api-macro/tests/api1.rs
@@ -235,6 +235,22 @@ pub fn basic_function() -> Result<(), Error> {
     Ok(())
 }
 
+#[api(
+    streaming: true,
+)]
+/// streaming async call
+pub async fn streaming_async_call() -> Result<(), Error> {
+    Ok(())
+}
+
+#[api(
+    streaming: true,
+)]
+/// streaming sync call
+pub fn streaming_sync_call() -> Result<(), Error> {
+    Ok(())
+}
+
 #[api(
     input: {
         properties: {
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method
  2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
                   ` (3 preceding siblings ...)
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option Dominik Csapak
@ 2022-04-08  9:56 ` Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
  6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08  9:56 UTC (permalink / raw)
  To: pbs-devel

that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.

Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-rest-server/Cargo.toml       |  1 +
 proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 56aa91e8..a957f8df 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
 serde_json = "1.0"
 tokio = { version = "1.6", features = ["signal", "process"] }
 tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
 tower-service = "0.3.0"
 url = "2.1"
 
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index e31df5b8..fd7b6c69 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
 use hyper::header;
 use hyper::{Body, Response, StatusCode};
 
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
 use proxmox_schema::ParameterError;
 
 /// Extension to set error message for server side logging
@@ -18,6 +18,13 @@ pub trait OutputFormatter: Send + Sync {
     /// Transform json data into a http response
     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
 
+    /// Transform serializable data into a streaming http response
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error>;
+
     /// Transform errors into a http response
     fn format_error(&self, err: Error) -> Response<Body>;
 
@@ -50,6 +57,16 @@ fn json_data_response(data: Value) -> Response<Body> {
     response
 }
 
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+    let response = Response::builder()
+        .header(
+            header::CONTENT_TYPE,
+            header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+        )
+        .body(body)?;
+    Ok(response)
+}
+
 fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     let attributes = match rpcenv.result_attrib().as_object() {
         Some(attr) => attr,
@@ -61,6 +78,22 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     }
 }
 
+fn start_data_streaming(
+    value: Value,
+    data: Box<dyn SerializableReturn + Send>,
+) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+    let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+    tokio::task::spawn_blocking(move || {
+        let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+        let mut output = std::io::BufWriter::new(output);
+        let mut serializer = serde_json::Serializer::new(&mut output);
+        let _ = data.sender_serialize(&mut serializer, value);
+    });
+
+    reader
+}
+
 struct JsonFormatter();
 
 /// Format data as ``application/json``
@@ -84,6 +117,21 @@ impl OutputFormatter for JsonFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error> {
+        let mut value = json!({});
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
         let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
             let mut resp = Response::new(Body::from(apierr.message.clone()));
@@ -140,6 +188,23 @@ impl OutputFormatter for ExtJsFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error> {
+        let mut value = json!({
+            "success": true,
+        });
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
         let message: String;
         let mut errors = HashMap::new();
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants
  2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
                   ` (4 preceding siblings ...)
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
@ 2022-04-08  9:56 ` Dominik Csapak
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
  6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08  9:56 UTC (permalink / raw)
  To: pbs-devel

namely 'StreamingSync' and 'StreamingAsync'
in rest-server by using the new formatter function,
and in the debug binary by using 'to_value'

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-rest-server/src/rest.rs     | 13 +++++++++++++
 src/bin/proxmox_backup_debug/api.rs |  8 ++++++++
 2 files changed, 21 insertions(+)

diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index 2aadf1ed..1689ef6e 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -445,6 +445,19 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
             let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
             (handler)(parts, req_body, params, info, Box::new(rpcenv)).await
         }
+        ApiHandler::StreamingSync(handler) => {
+            let params =
+                get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+            (handler)(params, info, &mut rpcenv)
+                .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
+        }
+        ApiHandler::StreamingAsync(handler) => {
+            let params =
+                get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+            (handler)(params, info, &mut rpcenv)
+                .await
+                .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
+        }
         ApiHandler::Sync(handler) => {
             let params =
                 get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
diff --git a/src/bin/proxmox_backup_debug/api.rs b/src/bin/proxmox_backup_debug/api.rs
index 47124089..e5d9f508 100644
--- a/src/bin/proxmox_backup_debug/api.rs
+++ b/src/bin/proxmox_backup_debug/api.rs
@@ -229,6 +229,14 @@ async fn call_api_code(
         nix::unistd::setuid(backup_user.uid)?;
     }
     match method.handler {
+        ApiHandler::StreamingSync(handler) => {
+            let res = (handler)(params, method, rpcenv)?.to_value()?;
+            Ok(res)
+        }
+        ApiHandler::StreamingAsync(handler) => {
+            let res = (handler)(params, method, rpcenv).await?.to_value()?;
+            Ok(res)
+        }
         ApiHandler::AsyncHttp(_handler) => {
             bail!("not implemented");
         }
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls
  2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
                   ` (5 preceding siblings ...)
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants Dominik Csapak
@ 2022-04-08  9:56 ` Dominik Csapak
  6 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2022-04-08  9:56 UTC (permalink / raw)
  To: pbs-devel

namely /admin/datastore/{store}/snapshots
and /nodes/{node}/tasks

since those are api calls where the result can get quite large
with this change, the serialization is now streaming instead of making
a `Value` in memory.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/api2/admin/datastore.rs | 1 +
 src/api2/node/tasks.rs      | 1 +
 2 files changed, 2 insertions(+)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index ef82b426..55fab62c 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -375,6 +375,7 @@ pub fn delete_snapshot(
 }
 
 #[api(
+    streaming: true,
     input: {
         properties: {
             store: {
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 07353d0b..b8046f1b 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -380,6 +380,7 @@ fn stop_task(
 }
 
 #[api(
+    streaming: true,
     input: {
         properties: {
             node: {
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] applied-series: [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper
  2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
@ 2022-04-12 12:29   ` Wolfgang Bumiller
  0 siblings, 0 replies; 9+ messages in thread
From: Wolfgang Bumiller @ 2022-04-12 12:29 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

applied the proxmox package series




^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2022-04-12 12:29 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
2022-04-12 12:29   ` [pbs-devel] applied-series: " Wolfgang Bumiller
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal