all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method
Date: Fri,  8 Apr 2022 11:56:04 +0200	[thread overview]
Message-ID: <20220408095606.2767234-6-d.csapak@proxmox.com> (raw)
In-Reply-To: <20220408095606.2767234-1-d.csapak@proxmox.com>

that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.

Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-rest-server/Cargo.toml       |  1 +
 proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 56aa91e8..a957f8df 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
 serde_json = "1.0"
 tokio = { version = "1.6", features = ["signal", "process"] }
 tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
 tower-service = "0.3.0"
 url = "2.1"
 
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index e31df5b8..fd7b6c69 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
 use hyper::header;
 use hyper::{Body, Response, StatusCode};
 
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
 use proxmox_schema::ParameterError;
 
 /// Extension to set error message for server side logging
@@ -18,6 +18,13 @@ pub trait OutputFormatter: Send + Sync {
     /// Transform json data into a http response
     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
 
+    /// Transform serializable data into a streaming http response
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error>;
+
     /// Transform errors into a http response
     fn format_error(&self, err: Error) -> Response<Body>;
 
@@ -50,6 +57,16 @@ fn json_data_response(data: Value) -> Response<Body> {
     response
 }
 
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+    let response = Response::builder()
+        .header(
+            header::CONTENT_TYPE,
+            header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+        )
+        .body(body)?;
+    Ok(response)
+}
+
 fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     let attributes = match rpcenv.result_attrib().as_object() {
         Some(attr) => attr,
@@ -61,6 +78,22 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     }
 }
 
+fn start_data_streaming(
+    value: Value,
+    data: Box<dyn SerializableReturn + Send>,
+) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+    let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+    tokio::task::spawn_blocking(move || {
+        let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+        let mut output = std::io::BufWriter::new(output);
+        let mut serializer = serde_json::Serializer::new(&mut output);
+        let _ = data.sender_serialize(&mut serializer, value);
+    });
+
+    reader
+}
+
 struct JsonFormatter();
 
 /// Format data as ``application/json``
@@ -84,6 +117,21 @@ impl OutputFormatter for JsonFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error> {
+        let mut value = json!({});
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
         let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
             let mut resp = Response::new(Body::from(apierr.message.clone()));
@@ -140,6 +188,23 @@ impl OutputFormatter for ExtJsFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error> {
+        let mut value = json!({
+            "success": true,
+        });
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
         let message: String;
         let mut errors = HashMap::new();
-- 
2.30.2





  parent reply	other threads:[~2022-04-08  9:56 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-04-08  9:55 [pbs-devel] [PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
2022-04-12 12:29   ` [pbs-devel] applied-series: " Wolfgang Bumiller
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option Dominik Csapak
2022-04-08  9:56 ` Dominik Csapak [this message]
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants Dominik Csapak
2022-04-08  9:56 ` [pbs-devel] [PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20220408095606.2767234-6-d.csapak@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal