From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <d.csapak@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id 22BB1BE70
 for <pbs-devel@lists.proxmox.com>; Fri,  8 Apr 2022 11:56:42 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id 1820ED2A7
 for <pbs-devel@lists.proxmox.com>; Fri,  8 Apr 2022 11:56:12 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [94.136.29.106])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS id 7DF3BD22C
 for <pbs-devel@lists.proxmox.com>; Fri,  8 Apr 2022 11:56:08 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 47F9D463F1
 for <pbs-devel@lists.proxmox.com>; Fri,  8 Apr 2022 11:56:08 +0200 (CEST)
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Fri,  8 Apr 2022 11:56:02 +0200
Message-Id: <20220408095606.2767234-4-d.csapak@proxmox.com>
X-Mailer: git-send-email 2.30.2
In-Reply-To: <20220408095606.2767234-1-d.csapak@proxmox.com>
References: <20220408095606.2767234-1-d.csapak@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.140 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 T_SCC_BODY_TEXT_LINE    -0.01 -
Subject: [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler
 variants for streaming serialization
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Fri, 08 Apr 2022 09:56:42 -0000

they should behave like their normal variants, but return a
`Box<dyn SerializableReturn + Send>` instead of a value. This is useful
since we do not have to generate the `Value` in-memory, but can
stream the serialization to the client.

We cannot simply use a `Box<dyn serde::Serialize>`, because that trait
is not object-safe and thus cannot be used as a trait-object.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-router/src/cli/command.rs | 45 ++++++++++++++++++
 proxmox-router/src/router.rs      | 78 +++++++++++++++++++++++++++++++
 2 files changed, 123 insertions(+)

diff --git a/proxmox-router/src/cli/command.rs b/proxmox-router/src/cli/command.rs
index 1b05078..76ecddc 100644
--- a/proxmox-router/src/cli/command.rs
+++ b/proxmox-router/src/cli/command.rs
@@ -72,6 +72,18 @@ async fn handle_simple_command_future(
                 return Err(err);
             }
         },
+        ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) {
+            Ok(value) => {
+                let value = value.to_value()?;
+                if value != Value::Null {
+                    println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+                }
+            }
+            Err(err) => {
+                eprintln!("Error: {}", err);
+                return Err(err);
+            }
+        },
         ApiHandler::Async(handler) => {
             let future = (handler)(params, cli_cmd.info, &mut rpcenv);
 
@@ -87,6 +99,22 @@ async fn handle_simple_command_future(
                 }
             }
         }
+        ApiHandler::StreamingAsync(handler) => {
+            let future = (handler)(params, cli_cmd.info, &mut rpcenv);
+
+            match future.await {
+                Ok(value) => {
+                    let value = value.to_value()?;
+                    if value != Value::Null {
+                        println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+                    }
+                }
+                Err(err) => {
+                    eprintln!("Error: {}", err);
+                    return Err(err);
+                }
+            }
+        }
         ApiHandler::AsyncHttp(_) => {
             let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
             print_simple_usage_error(prefix, cli_cmd, err_msg);
@@ -118,6 +146,18 @@ fn handle_simple_command(
                 return Err(err);
             }
         },
+        ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) {
+            Ok(value) => {
+                let value = value.to_value()?;
+                if value != Value::Null {
+                    println!("Result: {}", serde_json::to_string_pretty(&value).unwrap());
+                }
+            }
+            Err(err) => {
+                eprintln!("Error: {}", err);
+                return Err(err);
+            }
+        },
         ApiHandler::Async(handler) => {
             let future = (handler)(params, cli_cmd.info, &mut rpcenv);
             if let Some(run) = run {
@@ -138,6 +178,11 @@ fn handle_simple_command(
                 return Err(format_err!("{}", err_msg));
             }
         }
+        ApiHandler::StreamingAsync(_handler) => {
+            let err_msg = "CliHandler does not support ApiHandler::StreamingAsync - internal error";
+            print_simple_usage_error(prefix, cli_cmd, err_msg);
+            return Err(format_err!("{}", err_msg));
+        }
         ApiHandler::AsyncHttp(_) => {
             let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error";
             print_simple_usage_error(prefix, cli_cmd, err_msg);
diff --git a/proxmox-router/src/router.rs b/proxmox-router/src/router.rs
index a469891..3fa0033 100644
--- a/proxmox-router/src/router.rs
+++ b/proxmox-router/src/router.rs
@@ -14,6 +14,7 @@ use proxmox_schema::{ObjectSchema, ParameterSchema, ReturnType, Schema};
 
 use super::Permission;
 use crate::RpcEnvironment;
+use crate::SerializableReturn;
 
 /// A synchronous API handler gets a json Value as input and returns a json Value as output.
 ///
@@ -42,6 +43,37 @@ pub type ApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironm
               + Sync
               + 'static);
 
+/// A synchronous API handler gets a json Value as input and returns a serializable return value as output.
+///
+/// ```
+/// # use anyhow::Error;
+/// # use serde_json::{json, Value};
+/// use proxmox_router::{ApiHandler, ApiMethod, RpcEnvironment, SerializableReturn};
+/// use proxmox_schema::ObjectSchema;
+///
+/// fn hello(
+///    param: Value,
+///    info: &ApiMethod,
+///    rpcenv: &mut dyn RpcEnvironment,
+/// ) -> Result<Box<dyn SerializableReturn + Send>, Error> {
+///    let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
+///    Ok(res)
+/// }
+///
+/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new(
+///    &ApiHandler::StreamingSync(&hello),
+///    &ObjectSchema::new("Hello World Example", &[])
+/// );
+/// ```
+pub type StreamingApiHandlerFn = &'static (dyn Fn(
+    Value,
+    &ApiMethod,
+    &mut dyn RpcEnvironment,
+) -> Result<Box<dyn SerializableReturn + Send>, Error>
+              + Send
+              + Sync
+              + 'static);
+
 /// Asynchronous API handlers
 ///
 /// Returns a future Value.
@@ -74,6 +106,44 @@ pub type ApiAsyncHandlerFn = &'static (dyn for<'a> Fn(Value, &'static ApiMethod,
 
 pub type ApiFuture<'a> = Pin<Box<dyn Future<Output = Result<Value, anyhow::Error>> + Send + 'a>>;
 
+/// Streaming asynchronous API handlers
+///
+/// Returns a future Value.
+/// ```
+/// # use serde_json::{json, Value};
+/// #
+/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment, StreamingApiFuture, SerializableReturn};
+/// use proxmox_schema::ObjectSchema;
+///
+///
+/// fn hello_future<'a>(
+///    param: Value,
+///    info: &ApiMethod,
+///    rpcenv: &'a mut dyn RpcEnvironment,
+/// ) -> StreamingApiFuture<'a> {
+///    Box::pin(async move {
+///        let res: Box<dyn SerializableReturn + Send> = Box::new(format!("Hello World!"));
+///        Ok(res)
+///    })
+/// }
+///
+/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new(
+///    &ApiHandler::StreamingAsync(&hello_future),
+///    &ObjectSchema::new("Hello World Example (async)", &[])
+/// );
+/// ```
+pub type StreamingApiAsyncHandlerFn = &'static (dyn for<'a> Fn(
+    Value,
+    &'static ApiMethod,
+    &'a mut dyn RpcEnvironment,
+) -> StreamingApiFuture<'a>
+              + Send
+              + Sync);
+
+pub type StreamingApiFuture<'a> = Pin<
+    Box<dyn Future<Output = Result<Box<dyn SerializableReturn + Send>, anyhow::Error>> + Send + 'a>,
+>;
+
 /// Asynchronous HTTP API handlers
 ///
 /// They get low level access to request and response data. Use this
@@ -124,7 +194,9 @@ pub type ApiResponseFuture =
 /// Enum for different types of API handler functions.
 pub enum ApiHandler {
     Sync(ApiHandlerFn),
+    StreamingSync(StreamingApiHandlerFn),
     Async(ApiAsyncHandlerFn),
+    StreamingAsync(StreamingApiAsyncHandlerFn),
     AsyncHttp(ApiAsyncHttpHandlerFn),
 }
 
@@ -139,9 +211,15 @@ impl PartialEq for ApiHandler {
                 (ApiHandler::Sync(l), ApiHandler::Sync(r)) => {
                     core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
                 }
+                (ApiHandler::StreamingSync(l), ApiHandler::StreamingSync(r)) => {
+                    core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
+                }
                 (ApiHandler::Async(l), ApiHandler::Async(r)) => {
                     core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
                 }
+                (ApiHandler::StreamingAsync(l), ApiHandler::StreamingAsync(r)) => {
+                    core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
+                }
                 (ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => {
                     core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
                 }
-- 
2.30.2