From: "Max Carrara" <m.carrara@proxmox.com>
To: "Proxmox Backup Server development discussion"
<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: update to hyper 1.0
Date: Wed, 02 Apr 2025 15:34:08 +0200 [thread overview]
Message-ID: <D8W6WZYUNVSL.24VBRUOPALH4M@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-15-f.gruenbichler@proxmox.com>
On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> and switch to proxmox-http's Body implementation.
>
> hyper now has a special (opaque) Body implementation called Incoming
> which is used for incoming request bodies on the server side, and
> incoming response bodies on the client side, so our API handler's now
> consume an instance of this type.
>
> the Accept trait previously offered by hyper is gone in 1.0, and needs
> to be replaced with an accept loop. our corresponding Acceptor
> implementations have been dropped as well.
>
> hyper now has its own Service and async Read/Write traits, but helpfully
> provides wrappers for tower's Service and tokio's AsyncRead/AsyncWrite
> variants.
>
> graceful shutdown handling is now exposed differently on the hyper side,
> and to allow usage with upgradable connections the variant form
> hyper-util needs to be used, as the one straight from hyper doesn't
> support it (yet).
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
> proxmox-rest-server/Cargo.toml | 7 +-
> .../examples/minimal-rest-server.rs | 5 +-
> proxmox-rest-server/src/api_config.rs | 44 ++---
> proxmox-rest-server/src/connection.rs | 14 +-
> proxmox-rest-server/src/formatter.rs | 8 +-
> proxmox-rest-server/src/h2service.rs | 15 +-
> proxmox-rest-server/src/lib.rs | 2 +-
> proxmox-rest-server/src/rest.rs | 164 +++++++++++-------
> 8 files changed, 142 insertions(+), 117 deletions(-)
>
> diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
> index ffbd925a..ee253b4f 100644
> --- a/proxmox-rest-server/Cargo.toml
> +++ b/proxmox-rest-server/Cargo.toml
> @@ -20,7 +20,9 @@ anyhow.workspace = true
> futures.workspace = true
> handlebars = { workspace = true, optional = true }
> http.workspace = true
> +http-body-util.workspace = true
> hyper = { workspace = true, features = [ "full" ] }
> +hyper-util = { workspace = true, features = [ "client", "client-legacy", "http1", "server", "server-auto", "server-graceful", "service", "tokio" ]}
> libc.workspace = true
> log.workspace = true
> nix.workspace = true
> @@ -39,7 +41,7 @@ url.workspace = true
> proxmox-async.workspace = true
> proxmox-compression.workspace = true
> proxmox-daemon.workspace = true
> -proxmox-http = { workspace = true, optional = true }
> +proxmox-http = { workspace = true, features = ["body"] }
> proxmox-lang.workspace = true
> proxmox-log.workspace = true
> proxmox-router.workspace = true
> @@ -52,6 +54,5 @@ proxmox-worker-task.workspace = true
> default = []
> templates = ["dep:handlebars"]
> rate-limited-stream = [
> - "dep:proxmox-http",
> - "proxmox-http?/rate-limited-stream",
> + "proxmox-http/rate-limited-stream",
> ]
> diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs
> index 23be586c..454430fb 100644
> --- a/proxmox-rest-server/examples/minimal-rest-server.rs
> +++ b/proxmox-rest-server/examples/minimal-rest-server.rs
> @@ -6,8 +6,9 @@ use std::sync::{LazyLock, Mutex};
> use anyhow::{bail, format_err, Error};
> use http::request::Parts;
> use http::HeaderMap;
> -use hyper::{Body, Method, Response};
> +use hyper::{Method, Response};
>
> +use proxmox_http::Body;
> use proxmox_router::{
> list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
> };
> @@ -57,7 +58,7 @@ fn get_index(
> Box::pin(async move {
> // build an index page
> http::Response::builder()
> - .body("hello world".into())
> + .body("hello world".to_owned().into_bytes().into())
> .unwrap()
> })
> }
> diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
> index b20b2da0..0b847a0c 100644
> --- a/proxmox-rest-server/src/api_config.rs
> +++ b/proxmox-rest-server/src/api_config.rs
> @@ -9,10 +9,12 @@ use std::task::{Context, Poll};
> use anyhow::{format_err, Error};
> use http::{HeaderMap, Method, Uri};
> use hyper::http::request::Parts;
> -use hyper::{Body, Response};
> +use hyper::Response;
> +use hyper_util::rt::TokioIo;
> use tower_service::Service;
>
> use proxmox_daemon::command_socket::CommandSocket;
> +use proxmox_http::Body;
> use proxmox_log::{FileLogOptions, FileLogger};
> use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
> use proxmox_sys::fs::{create_path, CreateOptions};
> @@ -107,7 +109,7 @@ impl ApiConfig {
> ) -> Response<Body> {
> match self.index_handler.as_ref() {
> Some(handler) => (handler.func)(rest_env, parts).await,
> - None => Response::builder().status(404).body("".into()).unwrap(),
> + None => Response::builder().status(404).body(Body::empty()).unwrap(),
> }
> }
>
> @@ -511,7 +513,7 @@ impl From<std::os::unix::net::SocketAddr> for PrivilegedAddr {
> }
>
> impl Service<Uri> for PrivilegedAddr {
> - type Response = PrivilegedSocket;
> + type Response = TokioIo<PrivilegedSocket>;
> type Error = io::Error;
> type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
>
> @@ -527,6 +529,7 @@ impl Service<Uri> for PrivilegedAddr {
> tokio::net::TcpStream::connect(addr)
> .await
> .map(PrivilegedSocket::Tcp)
> + .map(TokioIo::new)
> })
> }
> PrivilegedAddr::Unix(addr) => {
> @@ -537,6 +540,7 @@ impl Service<Uri> for PrivilegedAddr {
> })?)
> .await
> .map(PrivilegedSocket::Unix)
> + .map(TokioIo::new)
> })
> }
> }
> @@ -607,39 +611,11 @@ impl tokio::io::AsyncWrite for PrivilegedSocket {
> }
> }
>
> -impl hyper::client::connect::Connection for PrivilegedSocket {
> - fn connected(&self) -> hyper::client::connect::Connected {
> +impl hyper_util::client::legacy::connect::Connection for PrivilegedSocket {
> + fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
> match self {
> Self::Tcp(s) => s.connected(),
> - Self::Unix(_) => hyper::client::connect::Connected::new(),
> + Self::Unix(_) => hyper_util::client::legacy::connect::Connected::new(),
> }
> }
> }
> -
> -/// Implements hyper's `Accept` for `UnixListener`s.
> -pub struct UnixAcceptor {
> - listener: tokio::net::UnixListener,
> -}
> -
> -impl From<tokio::net::UnixListener> for UnixAcceptor {
> - fn from(listener: tokio::net::UnixListener) -> Self {
> - Self { listener }
> - }
> -}
> -
> -impl hyper::server::accept::Accept for UnixAcceptor {
> - type Conn = tokio::net::UnixStream;
> - type Error = io::Error;
> -
> - fn poll_accept(
> - self: Pin<&mut Self>,
> - cx: &mut Context<'_>,
> - ) -> Poll<Option<io::Result<Self::Conn>>> {
> - Pin::new(&mut self.get_mut().listener)
> - .poll_accept(cx)
> - .map(|res| match res {
> - Ok((stream, _addr)) => Some(Ok(stream)),
> - Err(err) => Some(Err(err)),
> - })
> - }
> -}
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 526555ae..a65ef398 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -14,7 +14,6 @@ use std::time::Duration;
>
> use anyhow::{format_err, Context, Error};
> use futures::FutureExt;
> -use hyper::server::accept;
> use openssl::ec::{EcGroup, EcKey};
> use openssl::nid::Nid;
> use openssl::pkey::{PKey, Private};
> @@ -226,12 +225,13 @@ impl AcceptBuilder {
> self,
> listener: TcpListener,
> acceptor: Arc<Mutex<SslAcceptor>>,
> - ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
> + // FIXME: replace return value with own trait? see now removed UnixAcceptor
Yeah, we probably should provide our own trait for this and have it
implement `futures_core::Stream` (or `std::async_iter::AsyncIterator`
once that's stabilized). For now the below is fine IMO, as we can cook
up a trait later as well. Switching over to hyper/1.0 isn't too pretty,
but it is what it is :s
> + ) -> ReceiverStream<Result<ClientStreamResult, Error>> {
> let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
>
> tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
>
> - accept::from_stream(ReceiverStream::new(secure_receiver))
> + ReceiverStream::new(secure_receiver)
> }
>
> pub fn accept_tls_optional(
> @@ -239,8 +239,8 @@ impl AcceptBuilder {
> listener: TcpListener,
> acceptor: Arc<Mutex<SslAcceptor>>,
> ) -> (
> - impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
> - impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
> + ReceiverStream<Result<ClientStreamResult, Error>>,
> + ReceiverStream<Result<InsecureClientStreamResult, Error>>,
> ) {
> let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
> let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
> @@ -252,8 +252,8 @@ impl AcceptBuilder {
> ));
>
> (
> - accept::from_stream(ReceiverStream::new(secure_receiver)),
> - accept::from_stream(ReceiverStream::new(insecure_receiver)),
> + ReceiverStream::new(secure_receiver),
> + ReceiverStream::new(insecure_receiver),
> )
> }
> }
> diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
> index 32ca9936..9ce87205 100644
> --- a/proxmox-rest-server/src/formatter.rs
> +++ b/proxmox-rest-server/src/formatter.rs
> @@ -5,12 +5,14 @@ use anyhow::Error;
> use serde_json::{json, Value};
>
> use hyper::header;
> -use hyper::{Body, Response, StatusCode};
> +use hyper::{Response, StatusCode};
>
> +use proxmox_http::Body;
> use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
> use proxmox_schema::ParameterError;
>
> /// Extension to set error message for server side logging
> +#[derive(Clone)]
> pub(crate) struct ErrorMessageExtension(pub String);
>
> /// Methods to format data and errors
> @@ -168,11 +170,11 @@ impl OutputFormatter for JsonFormatter {
>
> pub(crate) fn error_to_response(err: Error) -> Response<Body> {
> let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
> - let mut resp = Response::new(Body::from(apierr.message.clone()));
> + let mut resp = Response::new(apierr.message.clone().into());
> *resp.status_mut() = apierr.code;
> resp
> } else {
> - let mut resp = Response::new(Body::from(err.to_string()));
> + let mut resp = Response::new(err.to_string().into());
> *resp.status_mut() = StatusCode::BAD_REQUEST;
> resp
> };
> diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs
> index db6e3b0a..18258e14 100644
> --- a/proxmox-rest-server/src/h2service.rs
> +++ b/proxmox-rest-server/src/h2service.rs
> @@ -6,8 +6,10 @@ use std::sync::Arc;
> use std::task::{Context, Poll};
>
> use futures::*;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::body::Incoming;
> +use hyper::{Request, Response, StatusCode};
>
> +use proxmox_http::Body;
> use proxmox_router::http_err;
> use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
>
> @@ -19,6 +21,7 @@ use crate::{normalize_path_with_components, WorkerTask};
> /// We use this kind of service to handle backup protocol
> /// connections. State is stored inside the generic ``rpcenv``. Logs
> /// goes into the ``WorkerTask`` log.
> +#[derive(Clone)]
> pub struct H2Service<E> {
> router: &'static Router,
> rpcenv: E,
> @@ -42,7 +45,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
> }
> }
>
> - fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
> + fn handle_request(&self, req: Request<Incoming>) -> ApiResponseFuture {
> let (parts, body) = req.into_parts();
>
> let method = parts.method.clone();
> @@ -103,7 +106,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
> }
> }
>
> -impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
> +impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Incoming>> for H2Service<E> {
> type Response = Response<Body>;
> type Error = Error;
> #[allow(clippy::type_complexity)]
> @@ -113,7 +116,7 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
> Poll::Ready(Ok(()))
> }
>
> - fn call(&mut self, req: Request<Body>) -> Self::Future {
> + fn call(&mut self, req: Request<Incoming>) -> Self::Future {
> let path = req.uri().path().to_owned();
> let method = req.method().clone();
> let worker = self.worker.clone();
> @@ -126,14 +129,14 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
> }
> Err(err) => {
> if let Some(apierr) = err.downcast_ref::<HttpError>() {
> - let mut resp = Response::new(Body::from(apierr.message.clone()));
> + let mut resp = Response::new(apierr.message.clone().into());
> resp.extensions_mut()
> .insert(ErrorMessageExtension(apierr.message.clone()));
> *resp.status_mut() = apierr.code;
> Self::log_response(worker, method, &path, &resp);
> Ok(resp)
> } else {
> - let mut resp = Response::new(Body::from(err.to_string()));
> + let mut resp = Response::new(err.to_string().into());
> resp.extensions_mut()
> .insert(ErrorMessageExtension(err.to_string()));
> *resp.status_mut() = StatusCode::BAD_REQUEST;
> diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
> index 43dafa91..5ddd3667 100644
> --- a/proxmox-rest-server/src/lib.rs
> +++ b/proxmox-rest-server/src/lib.rs
> @@ -34,7 +34,7 @@ mod environment;
> pub use environment::*;
>
> mod api_config;
> -pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
> +pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
>
> mod rest;
> pub use rest::{Redirector, RestServer};
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index f5a72052..f902592d 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -10,17 +10,25 @@ use std::task::{Context, Poll};
> use anyhow::{bail, format_err, Error};
> use futures::future::FutureExt;
> use futures::stream::TryStreamExt;
> -use hyper::body::HttpBody;
> +use http_body_util::{BodyDataStream, BodyStream};
> +use hyper::body::{Body as HyperBody, Incoming};
> use hyper::header::{self, HeaderMap};
> use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{Request, Response, StatusCode};
> +use hyper_util::rt::{TokioExecutor, TokioIo};
> +use hyper_util::server::conn;
> +use hyper_util::server::graceful::GracefulShutdown;
> +use hyper_util::service::TowerToHyperService;
> use regex::Regex;
> use serde_json::Value;
> use tokio::fs::File;
> +use tokio::io::{AsyncRead, AsyncWrite};
> use tokio::time::Instant;
> +use tokio_stream::wrappers::ReceiverStream;
> use tower_service::Service;
> use url::form_urlencoded;
>
> +use proxmox_http::Body;
> use proxmox_router::{
> check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
> RpcEnvironmentType, UserInformation,
> @@ -40,6 +48,7 @@ unsafe extern "C" {
> fn tzset();
> }
>
> +#[derive(Clone)]
> struct AuthStringExtension(String);
>
> pub(crate) struct EmptyUserInformation {}
> @@ -74,24 +83,11 @@ impl RestServer {
> api_config: Arc::new(api_config),
> }
> }
> -}
>
> -impl<T: PeerAddress> Service<&T> for RestServer {
> - type Response = ApiService;
> - type Error = Error;
> - type Future = std::future::Ready<Result<ApiService, Error>>;
> -
> - fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
> - Poll::Ready(Ok(()))
> - }
> -
> - fn call(&mut self, ctx: &T) -> Self::Future {
> - std::future::ready(match ctx.peer_addr() {
> - Err(err) => Err(format_err!("unable to get peer address - {}", err)),
> - Ok(peer) => Ok(ApiService {
> - peer,
> - api_config: Arc::clone(&self.api_config),
> - }),
> + pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> + Ok(ApiService {
> + peer: peer.peer_addr()?,
> + api_config: Arc::clone(&self.api_config),
> })
> }
> }
> @@ -108,25 +104,40 @@ impl Redirector {
> pub fn new() -> Self {
> Self {}
> }
> -}
>
> -impl<T> Service<&T> for Redirector {
> - type Response = RedirectService;
> - type Error = Error;
> - type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
> -
> - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
> - Poll::Ready(Ok(()))
> - }
> -
> - fn call(&mut self, _ctx: &T) -> Self::Future {
> - std::future::ready(Ok(RedirectService {}))
> + pub fn redirect_service(&self) -> RedirectService {
> + RedirectService {}
> }
> }
>
> +#[derive(Clone)]
> pub struct RedirectService;
>
> -impl Service<Request<Body>> for RedirectService {
> +impl RedirectService {
> + pub async fn serve<S>(
> + self,
> + conn: S,
> + mut graceful: Option<Arc<GracefulShutdown>>,
> + ) -> Result<(), Error>
> + where
> + S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> + {
> + let api_service = TowerToHyperService::new(self);
> + let io = TokioIo::new(conn);
> + let api_conn = conn::auto::Builder::new(TokioExecutor::new());
> + let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
> + if let Some(graceful) = graceful.take() {
> + let api_conn = graceful.watch(api_conn);
> + drop(graceful);
> + api_conn.await
> + } else {
> + api_conn.await
> + }
> + .map_err(|err| format_err!("error serving redirect connection: {err}"))
> + }
> +}
> +
> +impl Service<Request<Incoming>> for RedirectService {
> type Response = Response<Body>;
> type Error = anyhow::Error;
> type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
> @@ -135,7 +146,7 @@ impl Service<Request<Body>> for RedirectService {
> Poll::Ready(Ok(()))
> }
>
> - fn call(&mut self, req: Request<Body>) -> Self::Future {
> + fn call(&mut self, req: Request<Incoming>) -> Self::Future {
> let future = async move {
> let header_host_value = req
> .headers()
> @@ -194,12 +205,6 @@ impl PeerAddress for tokio::net::TcpStream {
> }
> }
>
> -impl PeerAddress for hyper::server::conn::AddrStream {
> - fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
> - Ok(self.remote_addr())
> - }
> -}
> -
> impl PeerAddress for tokio::net::UnixStream {
> fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
> // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
> @@ -223,11 +228,36 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
> // Rust wants this type 'pub' here (else we get 'private type `ApiService`
> // in public interface'). The type is still private because the crate does
> // not export it.
> +#[derive(Clone)]
> pub struct ApiService {
> pub peer: std::net::SocketAddr,
> pub api_config: Arc<ApiConfig>,
> }
>
> +impl ApiService {
> + pub async fn serve<S>(
> + self,
> + conn: S,
> + mut graceful: Option<Arc<GracefulShutdown>>,
> + ) -> Result<(), Error>
> + where
> + S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> + {
> + let api_service = TowerToHyperService::new(self);
> + let io = TokioIo::new(conn);
> + let api_conn = conn::auto::Builder::new(TokioExecutor::new());
> + let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
> + if let Some(graceful) = graceful.take() {
> + let api_conn = graceful.watch(api_conn);
> + drop(graceful);
> + api_conn.await
> + } else {
> + api_conn.await
> + }
> + .map_err(|err| format_err!("error serving connection: {err}"))
> + }
> +}
> +
> fn log_response(
> logfile: Option<&Arc<Mutex<FileLogger>>>,
> peer: &std::net::SocketAddr,
> @@ -307,7 +337,7 @@ fn get_user_agent(headers: &HeaderMap) -> Option<String> {
> .ok()
> }
>
> -impl Service<Request<Body>> for ApiService {
> +impl Service<Request<Incoming>> for ApiService {
> type Response = Response<Body>;
> type Error = Error;
> #[allow(clippy::type_complexity)]
> @@ -317,7 +347,7 @@ impl Service<Request<Body>> for ApiService {
> Poll::Ready(Ok(()))
> }
>
> - fn call(&mut self, req: Request<Body>) -> Self::Future {
> + fn call(&mut self, req: Request<Incoming>) -> Self::Future {
> let path = req.uri().path_and_query().unwrap().as_str().to_owned();
> let method = req.method().clone();
> let user_agent = get_user_agent(req.headers());
> @@ -384,7 +414,7 @@ fn parse_query_parameters<S: 'static + BuildHasher + Send>(
> async fn get_request_parameters<S: 'static + BuildHasher + Send>(
> param_schema: ParameterSchema,
> parts: &Parts,
> - req_body: Body,
> + req_body: Incoming,
> uri_param: HashMap<String, String, S>,
> ) -> Result<Value, Error> {
> let mut is_json = false;
> @@ -401,13 +431,17 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
> }
> }
>
> - let body = TryStreamExt::map_err(req_body, |err| {
> + let stream_body = BodyStream::new(req_body);
> + let body = TryStreamExt::map_err(stream_body, |err| {
> http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
> })
> - .try_fold(Vec::new(), |mut acc, chunk| async move {
> + .try_fold(Vec::new(), |mut acc, frame| async move {
> // FIXME: max request body size?
> - if acc.len() + chunk.len() < 64 * 1024 {
> - acc.extend_from_slice(&chunk);
> + let frame = frame
> + .into_data()
> + .map_err(|err| format_err!("Failed to read request body frame - {err:?}"))?;
> + if acc.len() + frame.len() < 64 * 1024 {
> + acc.extend_from_slice(&frame);
> Ok(acc)
> } else {
> Err(http_err!(BAD_REQUEST, "Request body too large"))
> @@ -437,13 +471,14 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
> }
> }
>
> +#[derive(Clone)]
> struct NoLogExtension();
>
> async fn proxy_protected_request(
> config: &ApiConfig,
> info: &ApiMethod,
> mut parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> peer: &std::net::SocketAddr,
> ) -> Result<Response<Body>, Error> {
> let mut uri_parts = parts.uri.clone().into_parts();
> @@ -463,9 +498,14 @@ async fn proxy_protected_request(
> let reload_timezone = info.reload_timezone;
>
> let mut resp = match config.privileged_addr.clone() {
> - None => hyper::client::Client::new().request(request).await?,
> + None => {
> + hyper_util::client::legacy::Client::builder(TokioExecutor::new())
> + .build_http()
> + .request(request)
> + .await?
> + }
> Some(addr) => {
> - hyper::client::Client::builder()
> + hyper_util::client::legacy::Client::builder(TokioExecutor::new())
> .build(addr)
> .request(request)
> .await?
> @@ -479,7 +519,7 @@ async fn proxy_protected_request(
> }
> }
>
> - Ok(resp)
> + Ok(resp.map(|b| Body::wrap_stream(BodyDataStream::new(b))))
> }
>
> fn delay_unauth_time() -> std::time::Instant {
> @@ -491,22 +531,23 @@ fn access_forbidden_time() -> std::time::Instant {
> }
>
> fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result<Response<Body>, Error> {
> - let (mut send, body) = hyper::Body::channel();
> + let (send, body) = tokio::sync::mpsc::channel::<Result<Vec<u8>, Error>>(1);
> tokio::spawn(async move {
> use futures::StreamExt;
>
> let mut stream = stream.into_inner();
> while let Some(record) = stream.next().await {
> - if send.send_data(record.to_bytes().into()).await.is_err() {
> + if send.send(Ok(record.to_bytes())).await.is_err() {
> break;
> }
> }
> });
>
> - Ok(Response::builder()
> + Response::builder()
> .status(http::StatusCode::OK)
> .header(http::header::CONTENT_TYPE, "application/json-seq")
> - .body(body)?)
> + .body(Body::wrap_stream(ReceiverStream::new(body)))
> + .map_err(Error::from)
> }
>
> fn handle_sync_stream_as_json_seq(
> @@ -527,7 +568,7 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
> info: &'static ApiMethod,
> formatter: Option<&'static dyn OutputFormatter>,
> parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> uri_param: HashMap<String, String, S>,
> ) -> Result<Response<Body>, Error> {
> let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER);
> @@ -630,9 +671,10 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
> );
> resp.map(|body| {
> Body::wrap_stream(
> - DeflateEncoder::builder(TryStreamExt::map_err(body, |err| {
> - proxmox_lang::io_format_err!("error during compression: {}", err)
> - }))
> + DeflateEncoder::builder(TryStreamExt::map_err(
> + BodyDataStream::new(body),
> + |err| proxmox_lang::io_format_err!("error during compression: {}", err),
Could inline the `err` above into the args of `io_format_err` since
you're touching that part (unless our macro doesn't support that, but I
assume it does).
> + ))
> .zlib(true)
> .flush_window(is_streaming.then_some(64 * 1024))
> .build(),
> @@ -796,7 +838,7 @@ fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMe
> impl ApiConfig {
> pub async fn handle_request(
> self: Arc<ApiConfig>,
> - req: Request<Body>,
> + req: Request<Incoming>,
> peer: &std::net::SocketAddr,
> ) -> Result<Response<Body>, Error> {
> let (parts, body) = req.into_parts();
> @@ -808,7 +850,7 @@ impl ApiConfig {
> if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
> return Ok(Response::builder()
> .status(StatusCode::URI_TOO_LONG)
> - .body("".into())
> + .body(Body::empty())
> .unwrap());
> }
>
> @@ -907,7 +949,7 @@ impl Action {
>
> pub struct ApiRequestData<'a> {
> parts: Parts,
> - body: Body,
> + body: Incoming,
> peer: &'a std::net::SocketAddr,
> config: &'a ApiConfig,
> full_path: &'a str,
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-04-02 13:34 UTC|newest]
Thread overview: 32+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34 ` Max Carrara [this message]
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32 ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=D8W6WZYUNVSL.24VBRUOPALH4M@proxmox.com \
--to=m.carrara@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal