From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method
Date: Thu, 17 Feb 2022 10:40:39 +0100 [thread overview]
Message-ID: <20220217094041.1632033-6-d.csapak@proxmox.com> (raw)
In-Reply-To: <20220217094041.1632033-1-d.csapak@proxmox.com>
that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.
Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/formatter.rs | 52 +++++++++++++++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 8fbbe8c0..a5855c88 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
tokio = { version = "1.6", features = ["signal", "process"] }
tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
tower-service = "0.3.0"
url = "2.1"
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index e3958826..7b8d16bc 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
use hyper::{Body, Response, StatusCode};
use hyper::header;
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
use proxmox_schema::ParameterError;
/// Extension to set error message for server side logging
@@ -18,6 +18,9 @@ pub trait OutputFormatter: Send + Sync {
/// Transform json data into a http response
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
+ /// Transform serializable data into a streaming http response
+ fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error>;
+
/// Transform errors into a http response
fn format_error(&self, err: Error) -> Response<Body>;
@@ -46,6 +49,16 @@ fn json_data_response(data: Value) -> Response<Body> {
response
}
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+ let response = Response::builder()
+ .header(
+ header::CONTENT_TYPE,
+ header::HeaderValue::from_static(JSON_CONTENT_TYPE)
+ )
+ .body(body)?;
+ Ok(response)
+}
+
fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
{
let attributes = match rpcenv.result_attrib().as_object() {
@@ -58,6 +71,19 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
}
}
+fn start_data_streaming(value: Value, data: Box<dyn SerializableReturn + Send>) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+ let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+ std::thread::spawn(move || {
+ let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+ let mut output = std::io::BufWriter::new(output);
+ let mut serializer = serde_json::Serializer::new(&mut output);
+ let _ = data.sender_serialize(&mut serializer, value);
+ });
+
+ reader
+}
+
struct JsonFormatter();
@@ -86,6 +112,17 @@ impl OutputFormatter for JsonFormatter {
json_data_response(result)
}
+ fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error> {
+ let mut value = json!({});
+
+ add_result_attributes(&mut value, rpcenv);
+
+ let reader = start_data_streaming(value, data);
+ let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+ json_data_response_streaming(Body::wrap_stream(stream))
+ }
+
fn format_error(&self, err: Error) -> Response<Body> {
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
@@ -142,6 +179,19 @@ impl OutputFormatter for ExtJsFormatter {
json_data_response(result)
}
+ fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error> {
+ let mut value = json!({
+ "success": true,
+ });
+
+ add_result_attributes(&mut value, rpcenv);
+
+ let reader = start_data_streaming(value, data);
+ let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+ json_data_response_streaming(Body::wrap_stream(stream))
+ }
+
fn format_error(&self, err: Error) -> Response<Body> {
let message: String;
--
2.30.2
next prev parent reply other threads:[~2022-02-17 9:41 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-02-17 9:40 [pbs-devel] [RFC PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
2022-02-17 9:40 ` [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
2022-02-17 9:40 ` [pbs-devel] [RFC PATCH proxmox 2/4] promxox-router: add SerializableReturn Trait Dominik Csapak
2022-02-18 8:21 ` Dietmar Maurer
2022-02-17 9:40 ` [pbs-devel] [RFC PATCH proxmox 3/4] proxmox-router: add new ApiHandler variants for streaming serialization Dominik Csapak
2022-02-17 9:40 ` [pbs-devel] [RFC PATCH proxmox 4/4] proxmox-api-macro: add 'streaming' option Dominik Csapak
2022-02-17 9:40 ` Dominik Csapak [this message]
2022-02-23 9:20 ` [pbs-devel] [RFC PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Thomas Lamprecht
2022-02-17 9:40 ` [pbs-devel] [RFC PATCH proxmox-backup 2/3] adapt to the new ApiHandler variants Dominik Csapak
2022-02-17 9:40 ` [pbs-devel] [RFC PATCH proxmox-backup 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20220217094041.1632033-6-d.csapak@proxmox.com \
--to=d.csapak@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal