* [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls
@ 2022-04-12 14:15 Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
` (3 more replies)
0 siblings, 4 replies; 5+ messages in thread
From: Dominik Csapak @ 2022-04-12 14:15 UTC (permalink / raw)
To: pbs-devel
proxmox-backup remaining part of the series
changes from v1:
* rebase on master (because of rustfmt changes)
dependencies (if it's not clear):
proxmox-rest-server/proxmox-backup needs to depend on newer
proxmox-router/api-macro
Dominik Csapak (3):
proxmox-rest-server: OutputFormatter: add new format_data_streaming
method
adapt to the new ApiHandler variants
api: admin/datastore: enable streaming for some api calls
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
proxmox-rest-server/src/rest.rs | 13 ++++++
src/api2/admin/datastore.rs | 1 +
src/api2/node/tasks.rs | 1 +
src/bin/proxmox_backup_debug/api.rs | 8 ++++
6 files changed, 90 insertions(+), 1 deletion(-)
--
2.30.2
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method
2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
@ 2022-04-12 14:15 ` Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 2/3] adapt to the new ApiHandler variants Dominik Csapak
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2022-04-12 14:15 UTC (permalink / raw)
To: pbs-devel
that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.
Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/formatter.rs | 67 +++++++++++++++++++++++++++-
2 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 56aa91e8..a957f8df 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
tokio = { version = "1.6", features = ["signal", "process"] }
tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
tower-service = "0.3.0"
url = "2.1"
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index 709a6b1e..2e9a01fa 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
use hyper::header;
use hyper::{Body, Response, StatusCode};
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
use proxmox_schema::ParameterError;
/// Extension to set error message for server side logging
@@ -18,6 +18,13 @@ pub trait OutputFormatter: Send + Sync {
/// Transform json data into a http response
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
+ /// Transform serializable data into a streaming http response
+ fn format_data_streaming(
+ &self,
+ data: Box<dyn SerializableReturn + Send>,
+ rpcenv: &dyn RpcEnvironment,
+ ) -> Result<Response<Body>, Error>;
+
/// Transform errors into a http response
fn format_error(&self, err: Error) -> Response<Body>;
@@ -50,6 +57,16 @@ fn json_data_response(data: Value) -> Response<Body> {
response
}
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+ let response = Response::builder()
+ .header(
+ header::CONTENT_TYPE,
+ header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+ )
+ .body(body)?;
+ Ok(response)
+}
+
fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
let attributes = match rpcenv.result_attrib().as_object() {
Some(attr) => attr,
@@ -61,6 +78,22 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
}
}
+fn start_data_streaming(
+ value: Value,
+ data: Box<dyn SerializableReturn + Send>,
+) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+ let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+ tokio::task::spawn_blocking(move || {
+ let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+ let mut output = std::io::BufWriter::new(output);
+ let mut serializer = serde_json::Serializer::new(&mut output);
+ let _ = data.sender_serialize(&mut serializer, value);
+ });
+
+ reader
+}
+
struct JsonFormatter();
/// Format data as ``application/json``
@@ -84,6 +117,21 @@ impl OutputFormatter for JsonFormatter {
json_data_response(result)
}
+ fn format_data_streaming(
+ &self,
+ data: Box<dyn SerializableReturn + Send>,
+ rpcenv: &dyn RpcEnvironment,
+ ) -> Result<Response<Body>, Error> {
+ let mut value = json!({});
+
+ add_result_attributes(&mut value, rpcenv);
+
+ let reader = start_data_streaming(value, data);
+ let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+ json_data_response_streaming(Body::wrap_stream(stream))
+ }
+
fn format_error(&self, err: Error) -> Response<Body> {
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone()));
@@ -140,6 +188,23 @@ impl OutputFormatter for ExtJsFormatter {
json_data_response(result)
}
+ fn format_data_streaming(
+ &self,
+ data: Box<dyn SerializableReturn + Send>,
+ rpcenv: &dyn RpcEnvironment,
+ ) -> Result<Response<Body>, Error> {
+ let mut value = json!({
+ "success": true,
+ });
+
+ add_result_attributes(&mut value, rpcenv);
+
+ let reader = start_data_streaming(value, data);
+ let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+ json_data_response_streaming(Body::wrap_stream(stream))
+ }
+
fn format_error(&self, err: Error) -> Response<Body> {
let mut errors = HashMap::new();
--
2.30.2
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v2 2/3] adapt to the new ApiHandler variants
2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
@ 2022-04-12 14:15 ` Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
2022-04-13 6:26 ` [pbs-devel] applied-series: [PATCH proxmox-backup v2 0/3] implement streaming serialization for " Wolfgang Bumiller
3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2022-04-12 14:15 UTC (permalink / raw)
To: pbs-devel
namely 'StreamingSync' and 'StreamingAsync'
in rest-server by using the new formatter function,
and in the debug binary by using 'to_value'
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-rest-server/src/rest.rs | 13 +++++++++++++
src/bin/proxmox_backup_debug/api.rs | 8 ++++++++
2 files changed, 21 insertions(+)
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index 2aadf1ed..1689ef6e 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -445,6 +445,19 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
(handler)(parts, req_body, params, info, Box::new(rpcenv)).await
}
+ ApiHandler::StreamingSync(handler) => {
+ let params =
+ get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+ (handler)(params, info, &mut rpcenv)
+ .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
+ }
+ ApiHandler::StreamingAsync(handler) => {
+ let params =
+ get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+ (handler)(params, info, &mut rpcenv)
+ .await
+ .and_then(|data| formatter.format_data_streaming(data, &rpcenv))
+ }
ApiHandler::Sync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
diff --git a/src/bin/proxmox_backup_debug/api.rs b/src/bin/proxmox_backup_debug/api.rs
index 599425e8..75c8818a 100644
--- a/src/bin/proxmox_backup_debug/api.rs
+++ b/src/bin/proxmox_backup_debug/api.rs
@@ -229,6 +229,14 @@ async fn call_api_code(
nix::unistd::setuid(backup_user.uid)?;
}
match method.handler {
+ ApiHandler::StreamingSync(handler) => {
+ let res = (handler)(params, method, rpcenv)?.to_value()?;
+ Ok(res)
+ }
+ ApiHandler::StreamingAsync(handler) => {
+ let res = (handler)(params, method, rpcenv).await?.to_value()?;
+ Ok(res)
+ }
ApiHandler::AsyncHttp(_handler) => {
bail!("not implemented");
}
--
2.30.2
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v2 3/3] api: admin/datastore: enable streaming for some api calls
2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 2/3] adapt to the new ApiHandler variants Dominik Csapak
@ 2022-04-12 14:15 ` Dominik Csapak
2022-04-13 6:26 ` [pbs-devel] applied-series: [PATCH proxmox-backup v2 0/3] implement streaming serialization for " Wolfgang Bumiller
3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2022-04-12 14:15 UTC (permalink / raw)
To: pbs-devel
namely /admin/datastore/{store}/snapshots
and /nodes/{node}/tasks
since those are api calls where the result can get quite large
with this change, the serialization is now streaming instead of making
a `Value` in memory.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/api2/admin/datastore.rs | 1 +
src/api2/node/tasks.rs | 1 +
2 files changed, 2 insertions(+)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index ef82b426..55fab62c 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -375,6 +375,7 @@ pub fn delete_snapshot(
}
#[api(
+ streaming: true,
input: {
properties: {
store: {
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 07353d0b..b8046f1b 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -380,6 +380,7 @@ fn stop_task(
}
#[api(
+ streaming: true,
input: {
properties: {
node: {
--
2.30.2
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] applied-series: [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls
2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
` (2 preceding siblings ...)
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
@ 2022-04-13 6:26 ` Wolfgang Bumiller
3 siblings, 0 replies; 5+ messages in thread
From: Wolfgang Bumiller @ 2022-04-13 6:26 UTC (permalink / raw)
To: Dominik Csapak; +Cc: pbs-devel
applied series, thanks
^ permalink raw reply [flat|nested] 5+ messages in thread
end of thread, other threads:[~2022-04-13 6:26 UTC | newest]
Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-04-12 14:15 [pbs-devel] [PATCH proxmox-backup v2 0/3] implement streaming serialization for api calls Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 2/3] adapt to the new ApiHandler variants Dominik Csapak
2022-04-12 14:15 ` [pbs-devel] [PATCH proxmox-backup v2 3/3] api: admin/datastore: enable streaming for some api calls Dominik Csapak
2022-04-13 6:26 ` [pbs-devel] applied-series: [PATCH proxmox-backup v2 0/3] implement streaming serialization for " Wolfgang Bumiller
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal