From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: update to hyper 1.0
Date: Wed, 26 Mar 2025 16:23:18 +0100 [thread overview]
Message-ID: <20250326152327.332179-15-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-1-f.gruenbichler@proxmox.com>
and switch to proxmox-http's Body implementation.
hyper now has a special (opaque) Body implementation called Incoming
which is used for incoming request bodies on the server side, and
incoming response bodies on the client side, so our API handler's now
consume an instance of this type.
the Accept trait previously offered by hyper is gone in 1.0, and needs
to be replaced with an accept loop. our corresponding Acceptor
implementations have been dropped as well.
hyper now has its own Service and async Read/Write traits, but helpfully
provides wrappers for tower's Service and tokio's AsyncRead/AsyncWrite
variants.
graceful shutdown handling is now exposed differently on the hyper side,
and to allow usage with upgradable connections the variant form
hyper-util needs to be used, as the one straight from hyper doesn't
support it (yet).
Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
proxmox-rest-server/Cargo.toml | 7 +-
.../examples/minimal-rest-server.rs | 5 +-
proxmox-rest-server/src/api_config.rs | 44 ++---
proxmox-rest-server/src/connection.rs | 14 +-
proxmox-rest-server/src/formatter.rs | 8 +-
proxmox-rest-server/src/h2service.rs | 15 +-
proxmox-rest-server/src/lib.rs | 2 +-
proxmox-rest-server/src/rest.rs | 164 +++++++++++-------
8 files changed, 142 insertions(+), 117 deletions(-)
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index ffbd925a..ee253b4f 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -20,7 +20,9 @@ anyhow.workspace = true
futures.workspace = true
handlebars = { workspace = true, optional = true }
http.workspace = true
+http-body-util.workspace = true
hyper = { workspace = true, features = [ "full" ] }
+hyper-util = { workspace = true, features = [ "client", "client-legacy", "http1", "server", "server-auto", "server-graceful", "service", "tokio" ]}
libc.workspace = true
log.workspace = true
nix.workspace = true
@@ -39,7 +41,7 @@ url.workspace = true
proxmox-async.workspace = true
proxmox-compression.workspace = true
proxmox-daemon.workspace = true
-proxmox-http = { workspace = true, optional = true }
+proxmox-http = { workspace = true, features = ["body"] }
proxmox-lang.workspace = true
proxmox-log.workspace = true
proxmox-router.workspace = true
@@ -52,6 +54,5 @@ proxmox-worker-task.workspace = true
default = []
templates = ["dep:handlebars"]
rate-limited-stream = [
- "dep:proxmox-http",
- "proxmox-http?/rate-limited-stream",
+ "proxmox-http/rate-limited-stream",
]
diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs
index 23be586c..454430fb 100644
--- a/proxmox-rest-server/examples/minimal-rest-server.rs
+++ b/proxmox-rest-server/examples/minimal-rest-server.rs
@@ -6,8 +6,9 @@ use std::sync::{LazyLock, Mutex};
use anyhow::{bail, format_err, Error};
use http::request::Parts;
use http::HeaderMap;
-use hyper::{Body, Method, Response};
+use hyper::{Method, Response};
+use proxmox_http::Body;
use proxmox_router::{
list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
};
@@ -57,7 +58,7 @@ fn get_index(
Box::pin(async move {
// build an index page
http::Response::builder()
- .body("hello world".into())
+ .body("hello world".to_owned().into_bytes().into())
.unwrap()
})
}
diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
index b20b2da0..0b847a0c 100644
--- a/proxmox-rest-server/src/api_config.rs
+++ b/proxmox-rest-server/src/api_config.rs
@@ -9,10 +9,12 @@ use std::task::{Context, Poll};
use anyhow::{format_err, Error};
use http::{HeaderMap, Method, Uri};
use hyper::http::request::Parts;
-use hyper::{Body, Response};
+use hyper::Response;
+use hyper_util::rt::TokioIo;
use tower_service::Service;
use proxmox_daemon::command_socket::CommandSocket;
+use proxmox_http::Body;
use proxmox_log::{FileLogOptions, FileLogger};
use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
use proxmox_sys::fs::{create_path, CreateOptions};
@@ -107,7 +109,7 @@ impl ApiConfig {
) -> Response<Body> {
match self.index_handler.as_ref() {
Some(handler) => (handler.func)(rest_env, parts).await,
- None => Response::builder().status(404).body("".into()).unwrap(),
+ None => Response::builder().status(404).body(Body::empty()).unwrap(),
}
}
@@ -511,7 +513,7 @@ impl From<std::os::unix::net::SocketAddr> for PrivilegedAddr {
}
impl Service<Uri> for PrivilegedAddr {
- type Response = PrivilegedSocket;
+ type Response = TokioIo<PrivilegedSocket>;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
@@ -527,6 +529,7 @@ impl Service<Uri> for PrivilegedAddr {
tokio::net::TcpStream::connect(addr)
.await
.map(PrivilegedSocket::Tcp)
+ .map(TokioIo::new)
})
}
PrivilegedAddr::Unix(addr) => {
@@ -537,6 +540,7 @@ impl Service<Uri> for PrivilegedAddr {
})?)
.await
.map(PrivilegedSocket::Unix)
+ .map(TokioIo::new)
})
}
}
@@ -607,39 +611,11 @@ impl tokio::io::AsyncWrite for PrivilegedSocket {
}
}
-impl hyper::client::connect::Connection for PrivilegedSocket {
- fn connected(&self) -> hyper::client::connect::Connected {
+impl hyper_util::client::legacy::connect::Connection for PrivilegedSocket {
+ fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
match self {
Self::Tcp(s) => s.connected(),
- Self::Unix(_) => hyper::client::connect::Connected::new(),
+ Self::Unix(_) => hyper_util::client::legacy::connect::Connected::new(),
}
}
}
-
-/// Implements hyper's `Accept` for `UnixListener`s.
-pub struct UnixAcceptor {
- listener: tokio::net::UnixListener,
-}
-
-impl From<tokio::net::UnixListener> for UnixAcceptor {
- fn from(listener: tokio::net::UnixListener) -> Self {
- Self { listener }
- }
-}
-
-impl hyper::server::accept::Accept for UnixAcceptor {
- type Conn = tokio::net::UnixStream;
- type Error = io::Error;
-
- fn poll_accept(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<io::Result<Self::Conn>>> {
- Pin::new(&mut self.get_mut().listener)
- .poll_accept(cx)
- .map(|res| match res {
- Ok((stream, _addr)) => Some(Ok(stream)),
- Err(err) => Some(Err(err)),
- })
- }
-}
diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
index 526555ae..a65ef398 100644
--- a/proxmox-rest-server/src/connection.rs
+++ b/proxmox-rest-server/src/connection.rs
@@ -14,7 +14,6 @@ use std::time::Duration;
use anyhow::{format_err, Context, Error};
use futures::FutureExt;
-use hyper::server::accept;
use openssl::ec::{EcGroup, EcKey};
use openssl::nid::Nid;
use openssl::pkey::{PKey, Private};
@@ -226,12 +225,13 @@ impl AcceptBuilder {
self,
listener: TcpListener,
acceptor: Arc<Mutex<SslAcceptor>>,
- ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
+ // FIXME: replace return value with own trait? see now removed UnixAcceptor
+ ) -> ReceiverStream<Result<ClientStreamResult, Error>> {
let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
- accept::from_stream(ReceiverStream::new(secure_receiver))
+ ReceiverStream::new(secure_receiver)
}
pub fn accept_tls_optional(
@@ -239,8 +239,8 @@ impl AcceptBuilder {
listener: TcpListener,
acceptor: Arc<Mutex<SslAcceptor>>,
) -> (
- impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
- impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
+ ReceiverStream<Result<ClientStreamResult, Error>>,
+ ReceiverStream<Result<InsecureClientStreamResult, Error>>,
) {
let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
@@ -252,8 +252,8 @@ impl AcceptBuilder {
));
(
- accept::from_stream(ReceiverStream::new(secure_receiver)),
- accept::from_stream(ReceiverStream::new(insecure_receiver)),
+ ReceiverStream::new(secure_receiver),
+ ReceiverStream::new(insecure_receiver),
)
}
}
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index 32ca9936..9ce87205 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -5,12 +5,14 @@ use anyhow::Error;
use serde_json::{json, Value};
use hyper::header;
-use hyper::{Body, Response, StatusCode};
+use hyper::{Response, StatusCode};
+use proxmox_http::Body;
use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
use proxmox_schema::ParameterError;
/// Extension to set error message for server side logging
+#[derive(Clone)]
pub(crate) struct ErrorMessageExtension(pub String);
/// Methods to format data and errors
@@ -168,11 +170,11 @@ impl OutputFormatter for JsonFormatter {
pub(crate) fn error_to_response(err: Error) -> Response<Body> {
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
- let mut resp = Response::new(Body::from(apierr.message.clone()));
+ let mut resp = Response::new(apierr.message.clone().into());
*resp.status_mut() = apierr.code;
resp
} else {
- let mut resp = Response::new(Body::from(err.to_string()));
+ let mut resp = Response::new(err.to_string().into());
*resp.status_mut() = StatusCode::BAD_REQUEST;
resp
};
diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs
index db6e3b0a..18258e14 100644
--- a/proxmox-rest-server/src/h2service.rs
+++ b/proxmox-rest-server/src/h2service.rs
@@ -6,8 +6,10 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use futures::*;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::body::Incoming;
+use hyper::{Request, Response, StatusCode};
+use proxmox_http::Body;
use proxmox_router::http_err;
use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
@@ -19,6 +21,7 @@ use crate::{normalize_path_with_components, WorkerTask};
/// We use this kind of service to handle backup protocol
/// connections. State is stored inside the generic ``rpcenv``. Logs
/// goes into the ``WorkerTask`` log.
+#[derive(Clone)]
pub struct H2Service<E> {
router: &'static Router,
rpcenv: E,
@@ -42,7 +45,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
}
}
- fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
+ fn handle_request(&self, req: Request<Incoming>) -> ApiResponseFuture {
let (parts, body) = req.into_parts();
let method = parts.method.clone();
@@ -103,7 +106,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
}
}
-impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
+impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Incoming>> for H2Service<E> {
type Response = Response<Body>;
type Error = Error;
#[allow(clippy::type_complexity)]
@@ -113,7 +116,7 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
Poll::Ready(Ok(()))
}
- fn call(&mut self, req: Request<Body>) -> Self::Future {
+ fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let path = req.uri().path().to_owned();
let method = req.method().clone();
let worker = self.worker.clone();
@@ -126,14 +129,14 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
}
Err(err) => {
if let Some(apierr) = err.downcast_ref::<HttpError>() {
- let mut resp = Response::new(Body::from(apierr.message.clone()));
+ let mut resp = Response::new(apierr.message.clone().into());
resp.extensions_mut()
.insert(ErrorMessageExtension(apierr.message.clone()));
*resp.status_mut() = apierr.code;
Self::log_response(worker, method, &path, &resp);
Ok(resp)
} else {
- let mut resp = Response::new(Body::from(err.to_string()));
+ let mut resp = Response::new(err.to_string().into());
resp.extensions_mut()
.insert(ErrorMessageExtension(err.to_string()));
*resp.status_mut() = StatusCode::BAD_REQUEST;
diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
index 43dafa91..5ddd3667 100644
--- a/proxmox-rest-server/src/lib.rs
+++ b/proxmox-rest-server/src/lib.rs
@@ -34,7 +34,7 @@ mod environment;
pub use environment::*;
mod api_config;
-pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
+pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
mod rest;
pub use rest::{Redirector, RestServer};
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index f5a72052..f902592d 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -10,17 +10,25 @@ use std::task::{Context, Poll};
use anyhow::{bail, format_err, Error};
use futures::future::FutureExt;
use futures::stream::TryStreamExt;
-use hyper::body::HttpBody;
+use http_body_util::{BodyDataStream, BodyStream};
+use hyper::body::{Body as HyperBody, Incoming};
use hyper::header::{self, HeaderMap};
use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
+use hyper_util::rt::{TokioExecutor, TokioIo};
+use hyper_util::server::conn;
+use hyper_util::server::graceful::GracefulShutdown;
+use hyper_util::service::TowerToHyperService;
use regex::Regex;
use serde_json::Value;
use tokio::fs::File;
+use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::Instant;
+use tokio_stream::wrappers::ReceiverStream;
use tower_service::Service;
use url::form_urlencoded;
+use proxmox_http::Body;
use proxmox_router::{
check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
RpcEnvironmentType, UserInformation,
@@ -40,6 +48,7 @@ unsafe extern "C" {
fn tzset();
}
+#[derive(Clone)]
struct AuthStringExtension(String);
pub(crate) struct EmptyUserInformation {}
@@ -74,24 +83,11 @@ impl RestServer {
api_config: Arc::new(api_config),
}
}
-}
-impl<T: PeerAddress> Service<&T> for RestServer {
- type Response = ApiService;
- type Error = Error;
- type Future = std::future::Ready<Result<ApiService, Error>>;
-
- fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn call(&mut self, ctx: &T) -> Self::Future {
- std::future::ready(match ctx.peer_addr() {
- Err(err) => Err(format_err!("unable to get peer address - {}", err)),
- Ok(peer) => Ok(ApiService {
- peer,
- api_config: Arc::clone(&self.api_config),
- }),
+ pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
+ Ok(ApiService {
+ peer: peer.peer_addr()?,
+ api_config: Arc::clone(&self.api_config),
})
}
}
@@ -108,25 +104,40 @@ impl Redirector {
pub fn new() -> Self {
Self {}
}
-}
-impl<T> Service<&T> for Redirector {
- type Response = RedirectService;
- type Error = Error;
- type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
-
- fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn call(&mut self, _ctx: &T) -> Self::Future {
- std::future::ready(Ok(RedirectService {}))
+ pub fn redirect_service(&self) -> RedirectService {
+ RedirectService {}
}
}
+#[derive(Clone)]
pub struct RedirectService;
-impl Service<Request<Body>> for RedirectService {
+impl RedirectService {
+ pub async fn serve<S>(
+ self,
+ conn: S,
+ mut graceful: Option<Arc<GracefulShutdown>>,
+ ) -> Result<(), Error>
+ where
+ S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ {
+ let api_service = TowerToHyperService::new(self);
+ let io = TokioIo::new(conn);
+ let api_conn = conn::auto::Builder::new(TokioExecutor::new());
+ let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
+ if let Some(graceful) = graceful.take() {
+ let api_conn = graceful.watch(api_conn);
+ drop(graceful);
+ api_conn.await
+ } else {
+ api_conn.await
+ }
+ .map_err(|err| format_err!("error serving redirect connection: {err}"))
+ }
+}
+
+impl Service<Request<Incoming>> for RedirectService {
type Response = Response<Body>;
type Error = anyhow::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
@@ -135,7 +146,7 @@ impl Service<Request<Body>> for RedirectService {
Poll::Ready(Ok(()))
}
- fn call(&mut self, req: Request<Body>) -> Self::Future {
+ fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let future = async move {
let header_host_value = req
.headers()
@@ -194,12 +205,6 @@ impl PeerAddress for tokio::net::TcpStream {
}
}
-impl PeerAddress for hyper::server::conn::AddrStream {
- fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
- Ok(self.remote_addr())
- }
-}
-
impl PeerAddress for tokio::net::UnixStream {
fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
// TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
@@ -223,11 +228,36 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
// Rust wants this type 'pub' here (else we get 'private type `ApiService`
// in public interface'). The type is still private because the crate does
// not export it.
+#[derive(Clone)]
pub struct ApiService {
pub peer: std::net::SocketAddr,
pub api_config: Arc<ApiConfig>,
}
+impl ApiService {
+ pub async fn serve<S>(
+ self,
+ conn: S,
+ mut graceful: Option<Arc<GracefulShutdown>>,
+ ) -> Result<(), Error>
+ where
+ S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ {
+ let api_service = TowerToHyperService::new(self);
+ let io = TokioIo::new(conn);
+ let api_conn = conn::auto::Builder::new(TokioExecutor::new());
+ let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
+ if let Some(graceful) = graceful.take() {
+ let api_conn = graceful.watch(api_conn);
+ drop(graceful);
+ api_conn.await
+ } else {
+ api_conn.await
+ }
+ .map_err(|err| format_err!("error serving connection: {err}"))
+ }
+}
+
fn log_response(
logfile: Option<&Arc<Mutex<FileLogger>>>,
peer: &std::net::SocketAddr,
@@ -307,7 +337,7 @@ fn get_user_agent(headers: &HeaderMap) -> Option<String> {
.ok()
}
-impl Service<Request<Body>> for ApiService {
+impl Service<Request<Incoming>> for ApiService {
type Response = Response<Body>;
type Error = Error;
#[allow(clippy::type_complexity)]
@@ -317,7 +347,7 @@ impl Service<Request<Body>> for ApiService {
Poll::Ready(Ok(()))
}
- fn call(&mut self, req: Request<Body>) -> Self::Future {
+ fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let path = req.uri().path_and_query().unwrap().as_str().to_owned();
let method = req.method().clone();
let user_agent = get_user_agent(req.headers());
@@ -384,7 +414,7 @@ fn parse_query_parameters<S: 'static + BuildHasher + Send>(
async fn get_request_parameters<S: 'static + BuildHasher + Send>(
param_schema: ParameterSchema,
parts: &Parts,
- req_body: Body,
+ req_body: Incoming,
uri_param: HashMap<String, String, S>,
) -> Result<Value, Error> {
let mut is_json = false;
@@ -401,13 +431,17 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
}
}
- let body = TryStreamExt::map_err(req_body, |err| {
+ let stream_body = BodyStream::new(req_body);
+ let body = TryStreamExt::map_err(stream_body, |err| {
http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
})
- .try_fold(Vec::new(), |mut acc, chunk| async move {
+ .try_fold(Vec::new(), |mut acc, frame| async move {
// FIXME: max request body size?
- if acc.len() + chunk.len() < 64 * 1024 {
- acc.extend_from_slice(&chunk);
+ let frame = frame
+ .into_data()
+ .map_err(|err| format_err!("Failed to read request body frame - {err:?}"))?;
+ if acc.len() + frame.len() < 64 * 1024 {
+ acc.extend_from_slice(&frame);
Ok(acc)
} else {
Err(http_err!(BAD_REQUEST, "Request body too large"))
@@ -437,13 +471,14 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
}
}
+#[derive(Clone)]
struct NoLogExtension();
async fn proxy_protected_request(
config: &ApiConfig,
info: &ApiMethod,
mut parts: Parts,
- req_body: Body,
+ req_body: Incoming,
peer: &std::net::SocketAddr,
) -> Result<Response<Body>, Error> {
let mut uri_parts = parts.uri.clone().into_parts();
@@ -463,9 +498,14 @@ async fn proxy_protected_request(
let reload_timezone = info.reload_timezone;
let mut resp = match config.privileged_addr.clone() {
- None => hyper::client::Client::new().request(request).await?,
+ None => {
+ hyper_util::client::legacy::Client::builder(TokioExecutor::new())
+ .build_http()
+ .request(request)
+ .await?
+ }
Some(addr) => {
- hyper::client::Client::builder()
+ hyper_util::client::legacy::Client::builder(TokioExecutor::new())
.build(addr)
.request(request)
.await?
@@ -479,7 +519,7 @@ async fn proxy_protected_request(
}
}
- Ok(resp)
+ Ok(resp.map(|b| Body::wrap_stream(BodyDataStream::new(b))))
}
fn delay_unauth_time() -> std::time::Instant {
@@ -491,22 +531,23 @@ fn access_forbidden_time() -> std::time::Instant {
}
fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result<Response<Body>, Error> {
- let (mut send, body) = hyper::Body::channel();
+ let (send, body) = tokio::sync::mpsc::channel::<Result<Vec<u8>, Error>>(1);
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = stream.into_inner();
while let Some(record) = stream.next().await {
- if send.send_data(record.to_bytes().into()).await.is_err() {
+ if send.send(Ok(record.to_bytes())).await.is_err() {
break;
}
}
});
- Ok(Response::builder()
+ Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json-seq")
- .body(body)?)
+ .body(Body::wrap_stream(ReceiverStream::new(body)))
+ .map_err(Error::from)
}
fn handle_sync_stream_as_json_seq(
@@ -527,7 +568,7 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
info: &'static ApiMethod,
formatter: Option<&'static dyn OutputFormatter>,
parts: Parts,
- req_body: Body,
+ req_body: Incoming,
uri_param: HashMap<String, String, S>,
) -> Result<Response<Body>, Error> {
let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER);
@@ -630,9 +671,10 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
);
resp.map(|body| {
Body::wrap_stream(
- DeflateEncoder::builder(TryStreamExt::map_err(body, |err| {
- proxmox_lang::io_format_err!("error during compression: {}", err)
- }))
+ DeflateEncoder::builder(TryStreamExt::map_err(
+ BodyDataStream::new(body),
+ |err| proxmox_lang::io_format_err!("error during compression: {}", err),
+ ))
.zlib(true)
.flush_window(is_streaming.then_some(64 * 1024))
.build(),
@@ -796,7 +838,7 @@ fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMe
impl ApiConfig {
pub async fn handle_request(
self: Arc<ApiConfig>,
- req: Request<Body>,
+ req: Request<Incoming>,
peer: &std::net::SocketAddr,
) -> Result<Response<Body>, Error> {
let (parts, body) = req.into_parts();
@@ -808,7 +850,7 @@ impl ApiConfig {
if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
return Ok(Response::builder()
.status(StatusCode::URI_TOO_LONG)
- .body("".into())
+ .body(Body::empty())
.unwrap());
}
@@ -907,7 +949,7 @@ impl Action {
pub struct ApiRequestData<'a> {
parts: Parts,
- body: Body,
+ body: Incoming,
peer: &'a std::net::SocketAddr,
config: &'a ApiConfig,
full_path: &'a str,
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-03-26 15:24 UTC|newest]
Thread overview: 32+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` Fabian Grünbichler [this message]
2025-04-02 13:34 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32 ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250326152327.332179-15-f.gruenbichler@proxmox.com \
--to=f.gruenbichler@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal