From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 22BB1BE70 for ; Fri, 8 Apr 2022 11:56:42 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1820ED2A7 for ; Fri, 8 Apr 2022 11:56:12 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 7DF3BD22C for ; Fri, 8 Apr 2022 11:56:08 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 47F9D463F1 for ; Fri, 8 Apr 2022 11:56:08 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Fri, 8 Apr 2022 11:56:02 +0200 Message-Id: <20220408095606.2767234-4-d.csapak@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20220408095606.2767234-1-d.csapak@proxmox.com> References: <20220408095606.2767234-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.140 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 08 Apr 2022 09:56:42 -0000 they should behave like their normal variants, but return a `Box` instead of a value. This is useful since we do not have to generate the `Value` in-memory, but can stream the serialization to the client. We cannot simply use a `Box`, because that trait is not object-safe and thus cannot be used as a trait-object. Signed-off-by: Dominik Csapak --- proxmox-router/src/cli/command.rs | 45 ++++++++++++++++++ proxmox-router/src/router.rs | 78 +++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+) diff --git a/proxmox-router/src/cli/command.rs b/proxmox-router/src/cli/command.rs index 1b05078..76ecddc 100644 --- a/proxmox-router/src/cli/command.rs +++ b/proxmox-router/src/cli/command.rs @@ -72,6 +72,18 @@ async fn handle_simple_command_future( return Err(err); } }, + ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) { + Ok(value) => { + let value = value.to_value()?; + if value != Value::Null { + println!("Result: {}", serde_json::to_string_pretty(&value).unwrap()); + } + } + Err(err) => { + eprintln!("Error: {}", err); + return Err(err); + } + }, ApiHandler::Async(handler) => { let future = (handler)(params, cli_cmd.info, &mut rpcenv); @@ -87,6 +99,22 @@ async fn handle_simple_command_future( } } } + ApiHandler::StreamingAsync(handler) => { + let future = (handler)(params, cli_cmd.info, &mut rpcenv); + + match future.await { + Ok(value) => { + let value = value.to_value()?; + if value != Value::Null { + println!("Result: {}", serde_json::to_string_pretty(&value).unwrap()); + } + } + Err(err) => { + eprintln!("Error: {}", err); + return Err(err); + } + } + } ApiHandler::AsyncHttp(_) => { let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error"; print_simple_usage_error(prefix, cli_cmd, err_msg); @@ -118,6 +146,18 @@ fn handle_simple_command( return Err(err); } }, + ApiHandler::StreamingSync(handler) => match (handler)(params, cli_cmd.info, &mut rpcenv) { + Ok(value) => { + let value = value.to_value()?; + if value != Value::Null { + println!("Result: {}", serde_json::to_string_pretty(&value).unwrap()); + } + } + Err(err) => { + eprintln!("Error: {}", err); + return Err(err); + } + }, ApiHandler::Async(handler) => { let future = (handler)(params, cli_cmd.info, &mut rpcenv); if let Some(run) = run { @@ -138,6 +178,11 @@ fn handle_simple_command( return Err(format_err!("{}", err_msg)); } } + ApiHandler::StreamingAsync(_handler) => { + let err_msg = "CliHandler does not support ApiHandler::StreamingAsync - internal error"; + print_simple_usage_error(prefix, cli_cmd, err_msg); + return Err(format_err!("{}", err_msg)); + } ApiHandler::AsyncHttp(_) => { let err_msg = "CliHandler does not support ApiHandler::AsyncHttp - internal error"; print_simple_usage_error(prefix, cli_cmd, err_msg); diff --git a/proxmox-router/src/router.rs b/proxmox-router/src/router.rs index a469891..3fa0033 100644 --- a/proxmox-router/src/router.rs +++ b/proxmox-router/src/router.rs @@ -14,6 +14,7 @@ use proxmox_schema::{ObjectSchema, ParameterSchema, ReturnType, Schema}; use super::Permission; use crate::RpcEnvironment; +use crate::SerializableReturn; /// A synchronous API handler gets a json Value as input and returns a json Value as output. /// @@ -42,6 +43,37 @@ pub type ApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironm + Sync + 'static); +/// A synchronous API handler gets a json Value as input and returns a serializable return value as output. +/// +/// ``` +/// # use anyhow::Error; +/// # use serde_json::{json, Value}; +/// use proxmox_router::{ApiHandler, ApiMethod, RpcEnvironment, SerializableReturn}; +/// use proxmox_schema::ObjectSchema; +/// +/// fn hello( +/// param: Value, +/// info: &ApiMethod, +/// rpcenv: &mut dyn RpcEnvironment, +/// ) -> Result, Error> { +/// let res: Box = Box::new(format!("Hello World!")); +/// Ok(res) +/// } +/// +/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new( +/// &ApiHandler::StreamingSync(&hello), +/// &ObjectSchema::new("Hello World Example", &[]) +/// ); +/// ``` +pub type StreamingApiHandlerFn = &'static (dyn Fn( + Value, + &ApiMethod, + &mut dyn RpcEnvironment, +) -> Result, Error> + + Send + + Sync + + 'static); + /// Asynchronous API handlers /// /// Returns a future Value. @@ -74,6 +106,44 @@ pub type ApiAsyncHandlerFn = &'static (dyn for<'a> Fn(Value, &'static ApiMethod, pub type ApiFuture<'a> = Pin> + Send + 'a>>; +/// Streaming asynchronous API handlers +/// +/// Returns a future Value. +/// ``` +/// # use serde_json::{json, Value}; +/// # +/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment, StreamingApiFuture, SerializableReturn}; +/// use proxmox_schema::ObjectSchema; +/// +/// +/// fn hello_future<'a>( +/// param: Value, +/// info: &ApiMethod, +/// rpcenv: &'a mut dyn RpcEnvironment, +/// ) -> StreamingApiFuture<'a> { +/// Box::pin(async move { +/// let res: Box = Box::new(format!("Hello World!")); +/// Ok(res) +/// }) +/// } +/// +/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new( +/// &ApiHandler::StreamingAsync(&hello_future), +/// &ObjectSchema::new("Hello World Example (async)", &[]) +/// ); +/// ``` +pub type StreamingApiAsyncHandlerFn = &'static (dyn for<'a> Fn( + Value, + &'static ApiMethod, + &'a mut dyn RpcEnvironment, +) -> StreamingApiFuture<'a> + + Send + + Sync); + +pub type StreamingApiFuture<'a> = Pin< + Box, anyhow::Error>> + Send + 'a>, +>; + /// Asynchronous HTTP API handlers /// /// They get low level access to request and response data. Use this @@ -124,7 +194,9 @@ pub type ApiResponseFuture = /// Enum for different types of API handler functions. pub enum ApiHandler { Sync(ApiHandlerFn), + StreamingSync(StreamingApiHandlerFn), Async(ApiAsyncHandlerFn), + StreamingAsync(StreamingApiAsyncHandlerFn), AsyncHttp(ApiAsyncHttpHandlerFn), } @@ -139,9 +211,15 @@ impl PartialEq for ApiHandler { (ApiHandler::Sync(l), ApiHandler::Sync(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } + (ApiHandler::StreamingSync(l), ApiHandler::StreamingSync(r)) => { + core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) + } (ApiHandler::Async(l), ApiHandler::Async(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } + (ApiHandler::StreamingAsync(l), ApiHandler::StreamingAsync(r)) => { + core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) + } (ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } -- 2.30.2