public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls
@ 2022-04-12 14:15 Dominik Csapak
  2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Dominik Csapak @ 2022-04-12 14:15 UTC (permalink / raw)
  To: pbs-devel

proxmox-backup remaining part of the series

changes from v1:
* rebase on master (because of rustfmt changes)

dependencies (if it's not clear):
proxmox-rest-server/proxmox-backup needs to depend on newer
proxmox-router/api-macro

Dominik Csapak (3):
  proxmox-rest-server: OutputFormatter: add new format_data_streaming
    method
  adapt to the new ApiHandler variants
  api: admin/datastore: enable streaming for some api calls

 proxmox-rest-server/Cargo.toml       |  1 +
 proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
 proxmox-rest-server/src/rest.rs      | 13 ++++++
 src/api2/admin/datastore.rs          |  1 +
 src/api2/node/tasks.rs               |  1 +
 src/bin/proxmox_backup_debug/api.rs  |  8 ++++
 6 files changed, 90 insertions(+), 1 deletion(-)

-- 
2.30.2





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method
  2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
@ 2022-04-12 14:15 ` Dominik Csapak
  2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 2/3] adapt to the new ApiHandler variants Dominik Csapak
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2022-04-12 14:15 UTC (permalink / raw)
  To: pbs-devel

that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.

Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-rest-server/Cargo.toml       |  1 +
 proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 56aa91e8..a957f8df 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
 serde_json = "1.0"
 tokio = { version = "1.6", features = ["signal", "process"] }
 tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
 tower-service = "0.3.0"
 url = "2.1"
 
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index 709a6b1e..2e9a01fa 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
 use hyper::header;
 use hyper::{Body, Response, StatusCode};
 
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
 use proxmox_schema::ParameterError;
 
 /// Extension to set error message for server side logging
@@ -18,6 +18,13 @@ pub trait OutputFormatter: Send + Sync {
     /// Transform json data into a http response
     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
 
+    /// Transform serializable data into a streaming http response
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error>;
+
     /// Transform errors into a http response
     fn format_error(&self, err: Error) -> Response<Body>;
 
@@ -50,6 +57,16 @@ fn json_data_response(data: Value) -> Response<Body> {
     response
 }
 
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+    let response = Response::builder()
+        .header(
+            header::CONTENT_TYPE,
+            header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+        )
+        .body(body)?;
+    Ok(response)
+}
+
 fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     let attributes = match rpcenv.result_attrib().as_object() {
         Some(attr) => attr,
@@ -61,6 +78,22 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     }
 }
 
+fn start_data_streaming(
+    value: Value,
+    data: Box<dyn SerializableReturn + Send>,
+) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+    let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+    tokio::task::spawn_blocking(move || {
+        let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+        let mut output = std::io::BufWriter::new(output);
+        let mut serializer = serde_json::Serializer::new(&mut output);
+        let _ = data.sender_serialize(&mut serializer, value);
+    });
+
+    reader
+}
+
 struct JsonFormatter();
 
 /// Format data as ``application/json``
@@ -84,6 +117,21 @@ impl OutputFormatter for JsonFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error> {
+        let mut value = json!({});
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
         let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
             let mut resp = Response::new(Body::from(apierr.message.clone()));
@@ -140,6 +188,23 @@ impl OutputFormatter for ExtJsFormatter {
         json_data_response(result)
     }
 
+    fn format_data_streaming(
+        &self,
+        data: Box<dyn SerializableReturn + Send>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Result<Response<Body>, Error> {
+        let mut value = json!({
+            "success": true,
+        });
+
+        add_result_attributes(&mut value, rpcenv);
+
+        let reader = start_data_streaming(value, data);
+        let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+        json_data_response_streaming(Body::wrap_stream(stream))
+    }
+
     fn format_error(&self, err: Error) -> Response<Body> {
         let mut errors = HashMap::new();
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v2 2/3] adapt to the new ApiHandler variants
  2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
  2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
@ 2022-04-12 14:15 ` Dominik Csapak
  2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
  2022-04-13  6:26 ` [pbs-devel] applied-series: [PATCH proxmox-backup v2 0/3] implement streaming serialization for " Wolfgang Bumiller
  3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2022-04-12 14:15 UTC (permalink / raw)
  To: pbs-devel

namely 'StreamingSync' and 'StreamingAsync'
in rest-server by using the new formatter function,
and in the debug binary by using 'to_value'

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-rest-server/src/rest.rs     | 13 +++++++++++++
 src/bin/proxmox_backup_debug/api.rs |  8 ++++++++
 2 files changed, 21 insertions(+)

diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index 2aadf1ed..1689ef6e 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -445,6 +445,19 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
             let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
             (handler)(parts, req_body, params, info, Box::new(rpcenv)).await
         }
+        ApiHandler::StreamingSync(handler) => {
+            let params =
+                get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+            (handler)(params, info, &mut rpcenv)
+                .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
+        }
+        ApiHandler::StreamingAsync(handler) => {
+            let params =
+                get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+            (handler)(params, info, &mut rpcenv)
+                .await
+                .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
+        }
         ApiHandler::Sync(handler) => {
             let params =
                 get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
diff --git a/src/bin/proxmox_backup_debug/api.rs b/src/bin/proxmox_backup_debug/api.rs
index 599425e8..75c8818a 100644
--- a/src/bin/proxmox_backup_debug/api.rs
+++ b/src/bin/proxmox_backup_debug/api.rs
@@ -229,6 +229,14 @@ async fn call_api_code(
         nix::unistd::setuid(backup_user.uid)?;
     }
     match method.handler {
+        ApiHandler::StreamingSync(handler) => {
+            let res = (handler)(params, method, rpcenv)?.to_value()?;
+            Ok(res)
+        }
+        ApiHandler::StreamingAsync(handler) => {
+            let res = (handler)(params, method, rpcenv).await?.to_value()?;
+            Ok(res)
+        }
         ApiHandler::AsyncHttp(_handler) => {
             bail!("not implemented");
         }
-- 
2.30.2





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v2 3/3] api: admin/datastore: enable streaming for some api calls
  2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
  2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
  2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 2/3] adapt to the new ApiHandler variants Dominik Csapak
@ 2022-04-12 14:15 ` Dominik Csapak
  2022-04-13  6:26 ` [pbs-devel] applied-series: [PATCH proxmox-backup v2 0/3] implement streaming serialization for " Wolfgang Bumiller
  3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2022-04-12 14:15 UTC (permalink / raw)
  To: pbs-devel

namely /admin/datastore/{store}/snapshots
and /nodes/{node}/tasks

since those are api calls where the result can get quite large
with this change, the serialization is now streaming instead of making
a `Value` in memory.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/api2/admin/datastore.rs | 1 +
 src/api2/node/tasks.rs      | 1 +
 2 files changed, 2 insertions(+)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index ef82b426..55fab62c 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -375,6 +375,7 @@ pub fn delete_snapshot(
 }
 
 #[api(
+    streaming: true,
     input: {
         properties: {
             store: {
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 07353d0b..b8046f1b 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -380,6 +380,7 @@ fn stop_task(
 }
 
 #[api(
+    streaming: true,
     input: {
         properties: {
             node: {
-- 
2.30.2





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] applied-series: [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls
  2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
                   ` (2 preceding siblings ...)
  2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
@ 2022-04-13  6:26 ` Wolfgang Bumiller
  3 siblings, 0 replies; 5+ messages in thread
From: Wolfgang Bumiller @ 2022-04-13  6:26 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

applied series, thanks




^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2022-04-13  6:26 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 2/3] adapt to the new ApiHandler variants Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
2022-04-13  6:26 ` [pbs-devel] applied-series: [PATCH proxmox-backup v2 0/3] implement streaming serialization for " Wolfgang Bumiller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal