From: "Max Carrara" <m.carrara@proxmox.com>
To: "Proxmox Backup Server development discussion"
<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: update to hyper 1.0
Date: Wed, 02 Apr 2025 15:34:08 +0200 [thread overview]
Message-ID: <D8W6WZYUNVSL.24VBRUOPALH4M@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-15-f.gruenbichler@proxmox.com>
On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> and switch to proxmox-http's Body implementation.
>
> hyper now has a special (opaque) Body implementation called Incoming
> which is used for incoming request bodies on the server side, and
> incoming response bodies on the client side, so our API handler's now
> consume an instance of this type.
>
> the Accept trait previously offered by hyper is gone in 1.0, and needs
> to be replaced with an accept loop. our corresponding Acceptor
> implementations have been dropped as well.
>
> hyper now has its own Service and async Read/Write traits, but helpfully
> provides wrappers for tower's Service and tokio's AsyncRead/AsyncWrite
> variants.
>
> graceful shutdown handling is now exposed differently on the hyper side,
> and to allow usage with upgradable connections the variant form
> hyper-util needs to be used, as the one straight from hyper doesn't
> support it (yet).
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
> proxmox-rest-server/Cargo.toml | 7 +-
> .../examples/minimal-rest-server.rs | 5 +-
> proxmox-rest-server/src/api_config.rs | 44 ++---
> proxmox-rest-server/src/connection.rs | 14 +-
> proxmox-rest-server/src/formatter.rs | 8 +-
> proxmox-rest-server/src/h2service.rs | 15 +-
> proxmox-rest-server/src/lib.rs | 2 +-
> proxmox-rest-server/src/rest.rs | 164 +++++++++++-------
> 8 files changed, 142 insertions(+), 117 deletions(-)
>
> diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
> index ffbd925a..ee253b4f 100644
> --- a/proxmox-rest-server/Cargo.toml
> +++ b/proxmox-rest-server/Cargo.toml
> @@ -20,7 +20,9 @@ anyhow.workspace = true
> futures.workspace = true
> handlebars = { workspace = true, optional = true }
> http.workspace = true
> +http-body-util.workspace = true
> hyper = { workspace = true, features = [ "full" ] }
> +hyper-util = { workspace = true, features = [ "client", "client-legacy", "http1", "server", "server-auto", "server-graceful", "service", "tokio" ]}
> libc.workspace = true
> log.workspace = true
> nix.workspace = true
> @@ -39,7 +41,7 @@ url.workspace = true
> proxmox-async.workspace = true
> proxmox-compression.workspace = true
> proxmox-daemon.workspace = true
> -proxmox-http = { workspace = true, optional = true }
> +proxmox-http = { workspace = true, features = ["body"] }
> proxmox-lang.workspace = true
> proxmox-log.workspace = true
> proxmox-router.workspace = true
> @@ -52,6 +54,5 @@ proxmox-worker-task.workspace = true
> default = []
> templates = ["dep:handlebars"]
> rate-limited-stream = [
> - "dep:proxmox-http",
> - "proxmox-http?/rate-limited-stream",
> + "proxmox-http/rate-limited-stream",
> ]
> diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs
> index 23be586c..454430fb 100644
> --- a/proxmox-rest-server/examples/minimal-rest-server.rs
> +++ b/proxmox-rest-server/examples/minimal-rest-server.rs
> @@ -6,8 +6,9 @@ use std::sync::{LazyLock, Mutex};
> use anyhow::{bail, format_err, Error};
> use http::request::Parts;
> use http::HeaderMap;
> -use hyper::{Body, Method, Response};
> +use hyper::{Method, Response};
>
> +use proxmox_http::Body;
> use proxmox_router::{
> list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
> };
> @@ -57,7 +58,7 @@ fn get_index(
> Box::pin(async move {
> // build an index page
> http::Response::builder()
> - .body("hello world".into())
> + .body("hello world".to_owned().into_bytes().into())
> .unwrap()
> })
> }
> diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
> index b20b2da0..0b847a0c 100644
> --- a/proxmox-rest-server/src/api_config.rs
> +++ b/proxmox-rest-server/src/api_config.rs
> @@ -9,10 +9,12 @@ use std::task::{Context, Poll};
> use anyhow::{format_err, Error};
> use http::{HeaderMap, Method, Uri};
> use hyper::http::request::Parts;
> -use hyper::{Body, Response};
> +use hyper::Response;
> +use hyper_util::rt::TokioIo;
> use tower_service::Service;
>
> use proxmox_daemon::command_socket::CommandSocket;
> +use proxmox_http::Body;
> use proxmox_log::{FileLogOptions, FileLogger};
> use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
> use proxmox_sys::fs::{create_path, CreateOptions};
> @@ -107,7 +109,7 @@ impl ApiConfig {
> ) -> Response<Body> {
> match self.index_handler.as_ref() {
> Some(handler) => (handler.func)(rest_env, parts).await,
> - None => Response::builder().status(404).body("".into()).unwrap(),
> + None => Response::builder().status(404).body(Body::empty()).unwrap(),
> }
> }
>
> @@ -511,7 +513,7 @@ impl From<std::os::unix::net::SocketAddr> for PrivilegedAddr {
> }
>
> impl Service<Uri> for PrivilegedAddr {
> - type Response = PrivilegedSocket;
> + type Response = TokioIo<PrivilegedSocket>;
> type Error = io::Error;
> type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
>
> @@ -527,6 +529,7 @@ impl Service<Uri> for PrivilegedAddr {
> tokio::net::TcpStream::connect(addr)
> .await
> .map(PrivilegedSocket::Tcp)
> + .map(TokioIo::new)
> })
> }
> PrivilegedAddr::Unix(addr) => {
> @@ -537,6 +540,7 @@ impl Service<Uri> for PrivilegedAddr {
> })?)
> .await
> .map(PrivilegedSocket::Unix)
> + .map(TokioIo::new)
> })
> }
> }
> @@ -607,39 +611,11 @@ impl tokio::io::AsyncWrite for PrivilegedSocket {
> }
> }
>
> -impl hyper::client::connect::Connection for PrivilegedSocket {
> - fn connected(&self) -> hyper::client::connect::Connected {
> +impl hyper_util::client::legacy::connect::Connection for PrivilegedSocket {
> + fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
> match self {
> Self::Tcp(s) => s.connected(),
> - Self::Unix(_) => hyper::client::connect::Connected::new(),
> + Self::Unix(_) => hyper_util::client::legacy::connect::Connected::new(),
> }
> }
> }
> -
> -/// Implements hyper's `Accept` for `UnixListener`s.
> -pub struct UnixAcceptor {
> - listener: tokio::net::UnixListener,
> -}
> -
> -impl From<tokio::net::UnixListener> for UnixAcceptor {
> - fn from(listener: tokio::net::UnixListener) -> Self {
> - Self { listener }
> - }
> -}
> -
> -impl hyper::server::accept::Accept for UnixAcceptor {
> - type Conn = tokio::net::UnixStream;
> - type Error = io::Error;
> -
> - fn poll_accept(
> - self: Pin<&mut Self>,
> - cx: &mut Context<'_>,
> - ) -> Poll<Option<io::Result<Self::Conn>>> {
> - Pin::new(&mut self.get_mut().listener)
> - .poll_accept(cx)
> - .map(|res| match res {
> - Ok((stream, _addr)) => Some(Ok(stream)),
> - Err(err) => Some(Err(err)),
> - })
> - }
> -}
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 526555ae..a65ef398 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -14,7 +14,6 @@ use std::time::Duration;
>
> use anyhow::{format_err, Context, Error};
> use futures::FutureExt;
> -use hyper::server::accept;
> use openssl::ec::{EcGroup, EcKey};
> use openssl::nid::Nid;
> use openssl::pkey::{PKey, Private};
> @@ -226,12 +225,13 @@ impl AcceptBuilder {
> self,
> listener: TcpListener,
> acceptor: Arc<Mutex<SslAcceptor>>,
> - ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
> + // FIXME: replace return value with own trait? see now removed UnixAcceptor
Yeah, we probably should provide our own trait for this and have it
implement `futures_core::Stream` (or `std::async_iter::AsyncIterator`
once that's stabilized). For now the below is fine IMO, as we can cook
up a trait later as well. Switching over to hyper/1.0 isn't too pretty,
but it is what it is :s
> + ) -> ReceiverStream<Result<ClientStreamResult, Error>> {
> let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
>
> tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
>
> - accept::from_stream(ReceiverStream::new(secure_receiver))
> + ReceiverStream::new(secure_receiver)
> }
>
> pub fn accept_tls_optional(
> @@ -239,8 +239,8 @@ impl AcceptBuilder {
> listener: TcpListener,
> acceptor: Arc<Mutex<SslAcceptor>>,
> ) -> (
> - impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
> - impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
> + ReceiverStream<Result<ClientStreamResult, Error>>,
> + ReceiverStream<Result<InsecureClientStreamResult, Error>>,
> ) {
> let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
> let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
> @@ -252,8 +252,8 @@ impl AcceptBuilder {
> ));
>
> (
> - accept::from_stream(ReceiverStream::new(secure_receiver)),
> - accept::from_stream(ReceiverStream::new(insecure_receiver)),
> + ReceiverStream::new(secure_receiver),
> + ReceiverStream::new(insecure_receiver),
> )
> }
> }
> diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
> index 32ca9936..9ce87205 100644
> --- a/proxmox-rest-server/src/formatter.rs
> +++ b/proxmox-rest-server/src/formatter.rs
> @@ -5,12 +5,14 @@ use anyhow::Error;
> use serde_json::{json, Value};
>
> use hyper::header;
> -use hyper::{Body, Response, StatusCode};
> +use hyper::{Response, StatusCode};
>
> +use proxmox_http::Body;
> use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
> use proxmox_schema::ParameterError;
>
> /// Extension to set error message for server side logging
> +#[derive(Clone)]
> pub(crate) struct ErrorMessageExtension(pub String);
>
> /// Methods to format data and errors
> @@ -168,11 +170,11 @@ impl OutputFormatter for JsonFormatter {
>
> pub(crate) fn error_to_response(err: Error) -> Response<Body> {
> let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
> - let mut resp = Response::new(Body::from(apierr.message.clone()));
> + let mut resp = Response::new(apierr.message.clone().into());
> *resp.status_mut() = apierr.code;
> resp
> } else {
> - let mut resp = Response::new(Body::from(err.to_string()));
> + let mut resp = Response::new(err.to_string().into());
> *resp.status_mut() = StatusCode::BAD_REQUEST;
> resp
> };
> diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs
> index db6e3b0a..18258e14 100644
> --- a/proxmox-rest-server/src/h2service.rs
> +++ b/proxmox-rest-server/src/h2service.rs
> @@ -6,8 +6,10 @@ use std::sync::Arc;
> use std::task::{Context, Poll};
>
> use futures::*;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::body::Incoming;
> +use hyper::{Request, Response, StatusCode};
>
> +use proxmox_http::Body;
> use proxmox_router::http_err;
> use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
>
> @@ -19,6 +21,7 @@ use crate::{normalize_path_with_components, WorkerTask};
> /// We use this kind of service to handle backup protocol
> /// connections. State is stored inside the generic ``rpcenv``. Logs
> /// goes into the ``WorkerTask`` log.
> +#[derive(Clone)]
> pub struct H2Service<E> {
> router: &'static Router,
> rpcenv: E,
> @@ -42,7 +45,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
> }
> }
>
> - fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
> + fn handle_request(&self, req: Request<Incoming>) -> ApiResponseFuture {
> let (parts, body) = req.into_parts();
>
> let method = parts.method.clone();
> @@ -103,7 +106,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
> }
> }
>
> -impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
> +impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Incoming>> for H2Service<E> {
> type Response = Response<Body>;
> type Error = Error;
> #[allow(clippy::type_complexity)]
> @@ -113,7 +116,7 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
> Poll::Ready(Ok(()))
> }
>
> - fn call(&mut self, req: Request<Body>) -> Self::Future {
> + fn call(&mut self, req: Request<Incoming>) -> Self::Future {
> let path = req.uri().path().to_owned();
> let method = req.method().clone();
> let worker = self.worker.clone();
> @@ -126,14 +129,14 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
> }
> Err(err) => {
> if let Some(apierr) = err.downcast_ref::<HttpError>() {
> - let mut resp = Response::new(Body::from(apierr.message.clone()));
> + let mut resp = Response::new(apierr.message.clone().into());
> resp.extensions_mut()
> .insert(ErrorMessageExtension(apierr.message.clone()));
> *resp.status_mut() = apierr.code;
> Self::log_response(worker, method, &path, &resp);
> Ok(resp)
> } else {
> - let mut resp = Response::new(Body::from(err.to_string()));
> + let mut resp = Response::new(err.to_string().into());
> resp.extensions_mut()
> .insert(ErrorMessageExtension(err.to_string()));
> *resp.status_mut() = StatusCode::BAD_REQUEST;
> diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
> index 43dafa91..5ddd3667 100644
> --- a/proxmox-rest-server/src/lib.rs
> +++ b/proxmox-rest-server/src/lib.rs
> @@ -34,7 +34,7 @@ mod environment;
> pub use environment::*;
>
> mod api_config;
> -pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
> +pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
>
> mod rest;
> pub use rest::{Redirector, RestServer};
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index f5a72052..f902592d 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -10,17 +10,25 @@ use std::task::{Context, Poll};
> use anyhow::{bail, format_err, Error};
> use futures::future::FutureExt;
> use futures::stream::TryStreamExt;
> -use hyper::body::HttpBody;
> +use http_body_util::{BodyDataStream, BodyStream};
> +use hyper::body::{Body as HyperBody, Incoming};
> use hyper::header::{self, HeaderMap};
> use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{Request, Response, StatusCode};
> +use hyper_util::rt::{TokioExecutor, TokioIo};
> +use hyper_util::server::conn;
> +use hyper_util::server::graceful::GracefulShutdown;
> +use hyper_util::service::TowerToHyperService;
> use regex::Regex;
> use serde_json::Value;
> use tokio::fs::File;
> +use tokio::io::{AsyncRead, AsyncWrite};
> use tokio::time::Instant;
> +use tokio_stream::wrappers::ReceiverStream;
> use tower_service::Service;
> use url::form_urlencoded;
>
> +use proxmox_http::Body;
> use proxmox_router::{
> check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
> RpcEnvironmentType, UserInformation,
> @@ -40,6 +48,7 @@ unsafe extern "C" {
> fn tzset();
> }
>
> +#[derive(Clone)]
> struct AuthStringExtension(String);
>
> pub(crate) struct EmptyUserInformation {}
> @@ -74,24 +83,11 @@ impl RestServer {
> api_config: Arc::new(api_config),
> }
> }
> -}
>
> -impl<T: PeerAddress> Service<&T> for RestServer {
> - type Response = ApiService;
> - type Error = Error;
> - type Future = std::future::Ready<Result<ApiService, Error>>;
> -
> - fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
> - Poll::Ready(Ok(()))
> - }
> -
> - fn call(&mut self, ctx: &T) -> Self::Future {
> - std::future::ready(match ctx.peer_addr() {
> - Err(err) => Err(format_err!("unable to get peer address - {}", err)),
> - Ok(peer) => Ok(ApiService {
> - peer,
> - api_config: Arc::clone(&self.api_config),
> - }),
> + pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> + Ok(ApiService {
> + peer: peer.peer_addr()?,
> + api_config: Arc::clone(&self.api_config),
> })
> }
> }
> @@ -108,25 +104,40 @@ impl Redirector {
> pub fn new() -> Self {
> Self {}
> }
> -}
>
> -impl<T> Service<&T> for Redirector {
> - type Response = RedirectService;
> - type Error = Error;
> - type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
> -
> - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
> - Poll::Ready(Ok(()))
> - }
> -
> - fn call(&mut self, _ctx: &T) -> Self::Future {
> - std::future::ready(Ok(RedirectService {}))
> + pub fn redirect_service(&self) -> RedirectService {
> + RedirectService {}
> }
> }
>
> +#[derive(Clone)]
> pub struct RedirectService;
>
> -impl Service<Request<Body>> for RedirectService {
> +impl RedirectService {
> + pub async fn serve<S>(
> + self,
> + conn: S,
> + mut graceful: Option<Arc<GracefulShutdown>>,
> + ) -> Result<(), Error>
> + where
> + S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> + {
> + let api_service = TowerToHyperService::new(self);
> + let io = TokioIo::new(conn);
> + let api_conn = conn::auto::Builder::new(TokioExecutor::new());
> + let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
> + if let Some(graceful) = graceful.take() {
> + let api_conn = graceful.watch(api_conn);
> + drop(graceful);
> + api_conn.await
> + } else {
> + api_conn.await
> + }
> + .map_err(|err| format_err!("error serving redirect connection: {err}"))
> + }
> +}
> +
> +impl Service<Request<Incoming>> for RedirectService {
> type Response = Response<Body>;
> type Error = anyhow::Error;
> type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
> @@ -135,7 +146,7 @@ impl Service<Request<Body>> for RedirectService {
> Poll::Ready(Ok(()))
> }
>
> - fn call(&mut self, req: Request<Body>) -> Self::Future {
> + fn call(&mut self, req: Request<Incoming>) -> Self::Future {
> let future = async move {
> let header_host_value = req
> .headers()
> @@ -194,12 +205,6 @@ impl PeerAddress for tokio::net::TcpStream {
> }
> }
>
> -impl PeerAddress for hyper::server::conn::AddrStream {
> - fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
> - Ok(self.remote_addr())
> - }
> -}
> -
> impl PeerAddress for tokio::net::UnixStream {
> fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
> // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
> @@ -223,11 +228,36 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
> // Rust wants this type 'pub' here (else we get 'private type `ApiService`
> // in public interface'). The type is still private because the crate does
> // not export it.
> +#[derive(Clone)]
> pub struct ApiService {
> pub peer: std::net::SocketAddr,
> pub api_config: Arc<ApiConfig>,
> }
>
> +impl ApiService {
> + pub async fn serve<S>(
> + self,
> + conn: S,
> + mut graceful: Option<Arc<GracefulShutdown>>,
> + ) -> Result<(), Error>
> + where
> + S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> + {
> + let api_service = TowerToHyperService::new(self);
> + let io = TokioIo::new(conn);
> + let api_conn = conn::auto::Builder::new(TokioExecutor::new());
> + let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
> + if let Some(graceful) = graceful.take() {
> + let api_conn = graceful.watch(api_conn);
> + drop(graceful);
> + api_conn.await
> + } else {
> + api_conn.await
> + }
> + .map_err(|err| format_err!("error serving connection: {err}"))
> + }
> +}
> +
> fn log_response(
> logfile: Option<&Arc<Mutex<FileLogger>>>,
> peer: &std::net::SocketAddr,
> @@ -307,7 +337,7 @@ fn get_user_agent(headers: &HeaderMap) -> Option<String> {
> .ok()
> }
>
> -impl Service<Request<Body>> for ApiService {
> +impl Service<Request<Incoming>> for ApiService {
> type Response = Response<Body>;
> type Error = Error;
> #[allow(clippy::type_complexity)]
> @@ -317,7 +347,7 @@ impl Service<Request<Body>> for ApiService {
> Poll::Ready(Ok(()))
> }
>
> - fn call(&mut self, req: Request<Body>) -> Self::Future {
> + fn call(&mut self, req: Request<Incoming>) -> Self::Future {
> let path = req.uri().path_and_query().unwrap().as_str().to_owned();
> let method = req.method().clone();
> let user_agent = get_user_agent(req.headers());
> @@ -384,7 +414,7 @@ fn parse_query_parameters<S: 'static + BuildHasher + Send>(
> async fn get_request_parameters<S: 'static + BuildHasher + Send>(
> param_schema: ParameterSchema,
> parts: &Parts,
> - req_body: Body,
> + req_body: Incoming,
> uri_param: HashMap<String, String, S>,
> ) -> Result<Value, Error> {
> let mut is_json = false;
> @@ -401,13 +431,17 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
> }
> }
>
> - let body = TryStreamExt::map_err(req_body, |err| {
> + let stream_body = BodyStream::new(req_body);
> + let body = TryStreamExt::map_err(stream_body, |err| {
> http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
> })
> - .try_fold(Vec::new(), |mut acc, chunk| async move {
> + .try_fold(Vec::new(), |mut acc, frame| async move {
> // FIXME: max request body size?
> - if acc.len() + chunk.len() < 64 * 1024 {
> - acc.extend_from_slice(&chunk);
> + let frame = frame
> + .into_data()
> + .map_err(|err| format_err!("Failed to read request body frame - {err:?}"))?;
> + if acc.len() + frame.len() < 64 * 1024 {
> + acc.extend_from_slice(&frame);
> Ok(acc)
> } else {
> Err(http_err!(BAD_REQUEST, "Request body too large"))
> @@ -437,13 +471,14 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
> }
> }
>
> +#[derive(Clone)]
> struct NoLogExtension();
>
> async fn proxy_protected_request(
> config: &ApiConfig,
> info: &ApiMethod,
> mut parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> peer: &std::net::SocketAddr,
> ) -> Result<Response<Body>, Error> {
> let mut uri_parts = parts.uri.clone().into_parts();
> @@ -463,9 +498,14 @@ async fn proxy_protected_request(
> let reload_timezone = info.reload_timezone;
>
> let mut resp = match config.privileged_addr.clone() {
> - None => hyper::client::Client::new().request(request).await?,
> + None => {
> + hyper_util::client::legacy::Client::builder(TokioExecutor::new())
> + .build_http()
> + .request(request)
> + .await?
> + }
> Some(addr) => {
> - hyper::client::Client::builder()
> + hyper_util::client::legacy::Client::builder(TokioExecutor::new())
> .build(addr)
> .request(request)
> .await?
> @@ -479,7 +519,7 @@ async fn proxy_protected_request(
> }
> }
>
> - Ok(resp)
> + Ok(resp.map(|b| Body::wrap_stream(BodyDataStream::new(b))))
> }
>
> fn delay_unauth_time() -> std::time::Instant {
> @@ -491,22 +531,23 @@ fn access_forbidden_time() -> std::time::Instant {
> }
>
> fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result<Response<Body>, Error> {
> - let (mut send, body) = hyper::Body::channel();
> + let (send, body) = tokio::sync::mpsc::channel::<Result<Vec<u8>, Error>>(1);
> tokio::spawn(async move {
> use futures::StreamExt;
>
> let mut stream = stream.into_inner();
> while let Some(record) = stream.next().await {
> - if send.send_data(record.to_bytes().into()).await.is_err() {
> + if send.send(Ok(record.to_bytes())).await.is_err() {
> break;
> }
> }
> });
>
> - Ok(Response::builder()
> + Response::builder()
> .status(http::StatusCode::OK)
> .header(http::header::CONTENT_TYPE, "application/json-seq")
> - .body(body)?)
> + .body(Body::wrap_stream(ReceiverStream::new(body)))
> + .map_err(Error::from)
> }
>
> fn handle_sync_stream_as_json_seq(
> @@ -527,7 +568,7 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
> info: &'static ApiMethod,
> formatter: Option<&'static dyn OutputFormatter>,
> parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> uri_param: HashMap<String, String, S>,
> ) -> Result<Response<Body>, Error> {
> let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER);
> @@ -630,9 +671,10 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
> );
> resp.map(|body| {
> Body::wrap_stream(
> - DeflateEncoder::builder(TryStreamExt::map_err(body, |err| {
> - proxmox_lang::io_format_err!("error during compression: {}", err)
> - }))
> + DeflateEncoder::builder(TryStreamExt::map_err(
> + BodyDataStream::new(body),
> + |err| proxmox_lang::io_format_err!("error during compression: {}", err),
Could inline the `err` above into the args of `io_format_err` since
you're touching that part (unless our macro doesn't support that, but I
assume it does).
> + ))
> .zlib(true)
> .flush_window(is_streaming.then_some(64 * 1024))
> .build(),
> @@ -796,7 +838,7 @@ fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMe
> impl ApiConfig {
> pub async fn handle_request(
> self: Arc<ApiConfig>,
> - req: Request<Body>,
> + req: Request<Incoming>,
> peer: &std::net::SocketAddr,
> ) -> Result<Response<Body>, Error> {
> let (parts, body) = req.into_parts();
> @@ -808,7 +850,7 @@ impl ApiConfig {
> if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
> return Ok(Response::builder()
> .status(StatusCode::URI_TOO_LONG)
> - .body("".into())
> + .body(Body::empty())
> .unwrap());
> }
>
> @@ -907,7 +949,7 @@ impl Action {
>
> pub struct ApiRequestData<'a> {
> parts: Parts,
> - body: Body,
> + body: Incoming,
> peer: &'a std::net::SocketAddr,
> config: &'a ApiConfig,
> full_path: &'a str,
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-04-02 13:34 UTC|newest]
Thread overview: 32+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34 ` Max Carrara [this message]
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32 ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=D8W6WZYUNVSL.24VBRUOPALH4M@proxmox.com \
--to=m.carrara@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal