public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method
Date: Thu, 17 Feb 2022 10:40:39 +0100	[thread overview]
Message-ID: <20220217094041.1632033-6-d.csapak@proxmox.com> (raw)
In-Reply-To: <20220217094041.1632033-1-d.csapak@proxmox.com>

that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.

Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-rest-server/Cargo.toml       |  1 +
 proxmox-rest-server/src/formatter.rs | 52 +++++++++++++++++++++++++++-
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 8fbbe8c0..a5855c88 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
 serde_json = "1.0"
 tokio = { version = "1.6", features = ["signal", "process"] }
 tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
 tower-service = "0.3.0"
 url = "2.1"
 
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index e3958826..7b8d16bc 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
 use hyper::{Body, Response, StatusCode};
 use hyper::header;
 
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
 use proxmox_schema::ParameterError;
 
 /// Extension to set error message for server side logging
@@ -18,6 +18,9 @@ pub trait OutputFormatter: Send + Sync {
     /// Transform json data into a http response
     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
 
+    /// Transform serializable data into a streaming http response
+    fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error>;
+
     /// Transform errors into a http response
     fn format_error(&self, err: Error) -> Response<Body>;
 
@@ -46,6 +49,16 @@ fn json_data_response(data: Value) -> Response<Body> {
     response
 }
 
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+    let response = Response::builder()
+        .header(
+            header::CONTENT_TYPE,
+            header::HeaderValue::from_static(JSON_CONTENT_TYPE)
+        )
+        .body(body)?;
+    Ok(response)
+}
+
 fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
 {
     let attributes = match rpcenv.result_attrib().as_object() {
@@ -58,6 +71,19 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
     }
 }
 
+fn start_data_streaming(value: Value, data: Box<dyn SerializableReturn + Send>) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+    let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+    std::thread::spawn(move || {
+        let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+        let mut output = std::io::BufWriter::new(output);
+        let mut serializer = serde_json::Serializer::new(&mut output);
+        let _ = data.sender_serialize(&mut serializer, value);
+    });
+
+    reader
+}
+
 
 struct JsonFormatter();
 
@@ -86,6 +112,17 @@ impl  OutputFormatter for JsonFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error> {
+        let mut value = json!({});
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
 
         let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
@@ -142,6 +179,19 @@ impl  OutputFormatter for ExtJsFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error> {
+        let mut value = json!({
+            "success": true,
+        });
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
 
         let message: String;
-- 
2.30.2





  parent reply	other threads:[~2022-02-17  9:41 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-02-17  9:40 [pbs-devel] [RFC PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
2022-02-17  9:40 ` [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
2022-02-17  9:40 ` [pbs-devel] [RFC PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait Dominik Csapak
2022-02-18  8:21   ` Dietmar Maurer
2022-02-17  9:40 ` [pbs-devel] [RFC PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization Dominik Csapak
2022-02-17  9:40 ` [pbs-devel] [RFC PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option Dominik Csapak
2022-02-17  9:40 ` Dominik Csapak [this message]
2022-02-23  9:20   ` [pbs-devel] [RFC PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Thomas Lamprecht
2022-02-17  9:40 ` [pbs-devel] [RFC PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants Dominik Csapak
2022-02-17  9:40 ` [pbs-devel] [RFC PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20220217094041.1632033-6-d.csapak@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal