From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 6337060BE8 for ; Thu, 17 Feb 2022 10:41:17 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 4A45E1D638 for ; Thu, 17 Feb 2022 10:40:47 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 874B71D58D for ; Thu, 17 Feb 2022 10:40:43 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 600C441B06 for ; Thu, 17 Feb 2022 10:40:43 +0100 (CET) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Thu, 17 Feb 2022 10:40:39 +0100 Message-Id: <20220217094041.1632033-6-d.csapak@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20220217094041.1632033-1-d.csapak@proxmox.com> References: <20220217094041.1632033-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.157 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [formatter.rs] Subject: [pbs-devel] [RFC PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 17 Feb 2022 09:41:17 -0000 that takes the data in form of a `Box` instead of a Value. Implement it in json and extjs formatter, by starting a thread and stream the serialized data via a `BufWriter` and use the Receiver side as a stream for the response body. Signed-off-by: Dominik Csapak --- proxmox-rest-server/Cargo.toml | 1 + proxmox-rest-server/src/formatter.rs | 52 +++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml index 8fbbe8c0..a5855c88 100644 --- a/proxmox-rest-server/Cargo.toml +++ b/proxmox-rest-server/Cargo.toml @@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] } serde_json = "1.0" tokio = { version = "1.6", features = ["signal", "process"] } tokio-openssl = "0.6.1" +tokio-stream = "0.1.0" tower-service = "0.3.0" url = "2.1" diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs index e3958826..7b8d16bc 100644 --- a/proxmox-rest-server/src/formatter.rs +++ b/proxmox-rest-server/src/formatter.rs @@ -7,7 +7,7 @@ use serde_json::{json, Value}; use hyper::{Body, Response, StatusCode}; use hyper::header; -use proxmox_router::{HttpError, RpcEnvironment}; +use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn}; use proxmox_schema::ParameterError; /// Extension to set error message for server side logging @@ -18,6 +18,9 @@ pub trait OutputFormatter: Send + Sync { /// Transform json data into a http response fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response; + /// Transform serializable data into a streaming http response + fn format_data_streaming(&self, data: Box, rpcenv: &dyn RpcEnvironment) -> Result, Error>; + /// Transform errors into a http response fn format_error(&self, err: Error) -> Response; @@ -46,6 +49,16 @@ fn json_data_response(data: Value) -> Response { response } +fn json_data_response_streaming(body: Body) -> Result, Error> { + let response = Response::builder() + .header( + header::CONTENT_TYPE, + header::HeaderValue::from_static(JSON_CONTENT_TYPE) + ) + .body(body)?; + Ok(response) +} + fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) { let attributes = match rpcenv.result_attrib().as_object() { @@ -58,6 +71,19 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) } } +fn start_data_streaming(value: Value, data: Box) -> tokio::sync::mpsc::Receiver, Error>> { + let (writer, reader) = tokio::sync::mpsc::channel(1); + + std::thread::spawn(move || { + let output = proxmox_async::blocking::SenderWriter::from_sender(writer); + let mut output = std::io::BufWriter::new(output); + let mut serializer = serde_json::Serializer::new(&mut output); + let _ = data.sender_serialize(&mut serializer, value); + }); + + reader +} + struct JsonFormatter(); @@ -86,6 +112,17 @@ impl OutputFormatter for JsonFormatter { json_data_response(result) } + fn format_data_streaming(&self, data: Box, rpcenv: &dyn RpcEnvironment) -> Result, Error> { + let mut value = json!({}); + + add_result_attributes(&mut value, rpcenv); + + let reader = start_data_streaming(value, data); + let stream = tokio_stream::wrappers::ReceiverStream::new(reader); + + json_data_response_streaming(Body::wrap_stream(stream)) + } + fn format_error(&self, err: Error) -> Response { let mut response = if let Some(apierr) = err.downcast_ref::() { @@ -142,6 +179,19 @@ impl OutputFormatter for ExtJsFormatter { json_data_response(result) } + fn format_data_streaming(&self, data: Box, rpcenv: &dyn RpcEnvironment) -> Result, Error> { + let mut value = json!({ + "success": true, + }); + + add_result_attributes(&mut value, rpcenv); + + let reader = start_data_streaming(value, data); + let stream = tokio_stream::wrappers::ReceiverStream::new(reader); + + json_data_response_streaming(Body::wrap_stream(stream)) + } + fn format_error(&self, err: Error) -> Response { let message: String; -- 2.30.2