From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <d.csapak@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id 5D682BEA8
 for <pbs-devel@lists.proxmox.com>; Fri,  8 Apr 2022 11:56:12 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id 5A81DD2A8
 for <pbs-devel@lists.proxmox.com>; Fri,  8 Apr 2022 11:56:12 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [94.136.29.106])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS id 9D517D236
 for <pbs-devel@lists.proxmox.com>; Fri,  8 Apr 2022 11:56:08 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 74C3545A0A
 for <pbs-devel@lists.proxmox.com>; Fri,  8 Apr 2022 11:56:08 +0200 (CEST)
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Fri,  8 Apr 2022 11:56:04 +0200
Message-Id: <20220408095606.2767234-6-d.csapak@proxmox.com>
X-Mailer: git-send-email 2.30.2
In-Reply-To: <20220408095606.2767234-1-d.csapak@proxmox.com>
References: <20220408095606.2767234-1-d.csapak@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.140 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 T_SCC_BODY_TEXT_LINE    -0.01 -
Subject: [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server:
 OutputFormatter: add new format_data_streaming method
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Fri, 08 Apr 2022 09:56:12 -0000

that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.

Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-rest-server/Cargo.toml       |  1 +
 proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 56aa91e8..a957f8df 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
 serde_json = "1.0"
 tokio = { version = "1.6", features = ["signal", "process"] }
 tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
 tower-service = "0.3.0"
 url = "2.1"
 
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index e31df5b8..fd7b6c69 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
 use hyper::header;
 use hyper::{Body, Response, StatusCode};
 
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
 use proxmox_schema::ParameterError;
 
 /// Extension to set error message for server side logging
@@ -18,6 +18,13 @@ pub trait OutputFormatter: Send + Sync {
     /// Transform json data into a http response
     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
 
+    /// Transform serializable data into a streaming http response
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error>;
+
     /// Transform errors into a http response
     fn format_error(&self, err: Error) -> Response<Body>;
 
@@ -50,6 +57,16 @@ fn json_data_response(data: Value) -> Response<Body> {
     response
 }
 
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+    let response = Response::builder()
+        .header(
+            header::CONTENT_TYPE,
+            header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+        )
+        .body(body)?;
+    Ok(response)
+}
+
 fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     let attributes = match rpcenv.result_attrib().as_object() {
         Some(attr) => attr,
@@ -61,6 +78,22 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     }
 }
 
+fn start_data_streaming(
+    value: Value,
+    data: Box<dyn SerializableReturn + Send>,
+) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+    let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+    tokio::task::spawn_blocking(move || {
+        let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+        let mut output = std::io::BufWriter::new(output);
+        let mut serializer = serde_json::Serializer::new(&mut output);
+        let _ = data.sender_serialize(&mut serializer, value);
+    });
+
+    reader
+}
+
 struct JsonFormatter();
 
 /// Format data as ``application/json``
@@ -84,6 +117,21 @@ impl OutputFormatter for JsonFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error> {
+        let mut value = json!({});
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
         let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
             let mut resp = Response::new(Body::from(apierr.message.clone()));
@@ -140,6 +188,23 @@ impl OutputFormatter for ExtJsFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error> {
+        let mut value = json!({
+            "success": true,
+        });
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
         let message: String;
         let mut errors = HashMap::new();
-- 
2.30.2