public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: "Max Carrara" <m.carrara@proxmox.com>
To: "Proxmox Backup Server development discussion"
	<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: update to hyper 1.0
Date: Wed, 02 Apr 2025 15:34:08 +0200	[thread overview]
Message-ID: <D8W6WZYUNVSL.24VBRUOPALH4M@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-15-f.gruenbichler@proxmox.com>

On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> and switch to proxmox-http's Body implementation.
>
> hyper now has a special (opaque) Body implementation called Incoming
> which is used for incoming request bodies on the server side, and
> incoming response bodies on the client side, so our API handler's now
> consume an instance of this type.
>
> the Accept trait previously offered by hyper is gone in 1.0, and needs
> to be replaced with an accept loop. our corresponding Acceptor
> implementations have been dropped as well.
>
> hyper now has its own Service and async Read/Write traits, but helpfully
> provides wrappers for tower's Service and tokio's AsyncRead/AsyncWrite
> variants.
>
> graceful shutdown handling is now exposed differently on the hyper side,
> and to allow usage with upgradable connections the variant form
> hyper-util needs to be used, as the one straight from hyper doesn't
> support it (yet).
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
>  proxmox-rest-server/Cargo.toml                |   7 +-
>  .../examples/minimal-rest-server.rs           |   5 +-
>  proxmox-rest-server/src/api_config.rs         |  44 ++---
>  proxmox-rest-server/src/connection.rs         |  14 +-
>  proxmox-rest-server/src/formatter.rs          |   8 +-
>  proxmox-rest-server/src/h2service.rs          |  15 +-
>  proxmox-rest-server/src/lib.rs                |   2 +-
>  proxmox-rest-server/src/rest.rs               | 164 +++++++++++-------
>  8 files changed, 142 insertions(+), 117 deletions(-)
>
> diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
> index ffbd925a..ee253b4f 100644
> --- a/proxmox-rest-server/Cargo.toml
> +++ b/proxmox-rest-server/Cargo.toml
> @@ -20,7 +20,9 @@ anyhow.workspace = true
>  futures.workspace = true
>  handlebars = { workspace = true, optional = true }
>  http.workspace = true
> +http-body-util.workspace = true
>  hyper = { workspace = true, features = [ "full" ] }
> +hyper-util = { workspace = true, features = [ "client", "client-legacy", "http1", "server", "server-auto", "server-graceful", "service", "tokio" ]}
>  libc.workspace = true
>  log.workspace = true
>  nix.workspace = true
> @@ -39,7 +41,7 @@ url.workspace = true
>  proxmox-async.workspace = true
>  proxmox-compression.workspace = true
>  proxmox-daemon.workspace = true
> -proxmox-http = { workspace = true, optional = true }
> +proxmox-http = { workspace = true, features = ["body"] }
>  proxmox-lang.workspace = true
>  proxmox-log.workspace = true
>  proxmox-router.workspace = true
> @@ -52,6 +54,5 @@ proxmox-worker-task.workspace = true
>  default = []
>  templates = ["dep:handlebars"]
>  rate-limited-stream = [
> -    "dep:proxmox-http",
> -    "proxmox-http?/rate-limited-stream",
> +    "proxmox-http/rate-limited-stream",
>  ]
> diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs
> index 23be586c..454430fb 100644
> --- a/proxmox-rest-server/examples/minimal-rest-server.rs
> +++ b/proxmox-rest-server/examples/minimal-rest-server.rs
> @@ -6,8 +6,9 @@ use std::sync::{LazyLock, Mutex};
>  use anyhow::{bail, format_err, Error};
>  use http::request::Parts;
>  use http::HeaderMap;
> -use hyper::{Body, Method, Response};
> +use hyper::{Method, Response};
>  
> +use proxmox_http::Body;
>  use proxmox_router::{
>      list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
>  };
> @@ -57,7 +58,7 @@ fn get_index(
>      Box::pin(async move {
>          // build an index page
>          http::Response::builder()
> -            .body("hello world".into())
> +            .body("hello world".to_owned().into_bytes().into())
>              .unwrap()
>      })
>  }
> diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
> index b20b2da0..0b847a0c 100644
> --- a/proxmox-rest-server/src/api_config.rs
> +++ b/proxmox-rest-server/src/api_config.rs
> @@ -9,10 +9,12 @@ use std::task::{Context, Poll};
>  use anyhow::{format_err, Error};
>  use http::{HeaderMap, Method, Uri};
>  use hyper::http::request::Parts;
> -use hyper::{Body, Response};
> +use hyper::Response;
> +use hyper_util::rt::TokioIo;
>  use tower_service::Service;
>  
>  use proxmox_daemon::command_socket::CommandSocket;
> +use proxmox_http::Body;
>  use proxmox_log::{FileLogOptions, FileLogger};
>  use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
>  use proxmox_sys::fs::{create_path, CreateOptions};
> @@ -107,7 +109,7 @@ impl ApiConfig {
>      ) -> Response<Body> {
>          match self.index_handler.as_ref() {
>              Some(handler) => (handler.func)(rest_env, parts).await,
> -            None => Response::builder().status(404).body("".into()).unwrap(),
> +            None => Response::builder().status(404).body(Body::empty()).unwrap(),
>          }
>      }
>  
> @@ -511,7 +513,7 @@ impl From<std::os::unix::net::SocketAddr> for PrivilegedAddr {
>  }
>  
>  impl Service<Uri> for PrivilegedAddr {
> -    type Response = PrivilegedSocket;
> +    type Response = TokioIo<PrivilegedSocket>;
>      type Error = io::Error;
>      type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
>  
> @@ -527,6 +529,7 @@ impl Service<Uri> for PrivilegedAddr {
>                      tokio::net::TcpStream::connect(addr)
>                          .await
>                          .map(PrivilegedSocket::Tcp)
> +                        .map(TokioIo::new)
>                  })
>              }
>              PrivilegedAddr::Unix(addr) => {
> @@ -537,6 +540,7 @@ impl Service<Uri> for PrivilegedAddr {
>                      })?)
>                      .await
>                      .map(PrivilegedSocket::Unix)
> +                    .map(TokioIo::new)
>                  })
>              }
>          }
> @@ -607,39 +611,11 @@ impl tokio::io::AsyncWrite for PrivilegedSocket {
>      }
>  }
>  
> -impl hyper::client::connect::Connection for PrivilegedSocket {
> -    fn connected(&self) -> hyper::client::connect::Connected {
> +impl hyper_util::client::legacy::connect::Connection for PrivilegedSocket {
> +    fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
>          match self {
>              Self::Tcp(s) => s.connected(),
> -            Self::Unix(_) => hyper::client::connect::Connected::new(),
> +            Self::Unix(_) => hyper_util::client::legacy::connect::Connected::new(),
>          }
>      }
>  }
> -
> -/// Implements hyper's `Accept` for `UnixListener`s.
> -pub struct UnixAcceptor {
> -    listener: tokio::net::UnixListener,
> -}
> -
> -impl From<tokio::net::UnixListener> for UnixAcceptor {
> -    fn from(listener: tokio::net::UnixListener) -> Self {
> -        Self { listener }
> -    }
> -}
> -
> -impl hyper::server::accept::Accept for UnixAcceptor {
> -    type Conn = tokio::net::UnixStream;
> -    type Error = io::Error;
> -
> -    fn poll_accept(
> -        self: Pin<&mut Self>,
> -        cx: &mut Context<'_>,
> -    ) -> Poll<Option<io::Result<Self::Conn>>> {
> -        Pin::new(&mut self.get_mut().listener)
> -            .poll_accept(cx)
> -            .map(|res| match res {
> -                Ok((stream, _addr)) => Some(Ok(stream)),
> -                Err(err) => Some(Err(err)),
> -            })
> -    }
> -}
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 526555ae..a65ef398 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -14,7 +14,6 @@ use std::time::Duration;
>  
>  use anyhow::{format_err, Context, Error};
>  use futures::FutureExt;
> -use hyper::server::accept;
>  use openssl::ec::{EcGroup, EcKey};
>  use openssl::nid::Nid;
>  use openssl::pkey::{PKey, Private};
> @@ -226,12 +225,13 @@ impl AcceptBuilder {
>          self,
>          listener: TcpListener,
>          acceptor: Arc<Mutex<SslAcceptor>>,
> -    ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
> +        // FIXME: replace return value with own trait? see now removed UnixAcceptor

Yeah, we probably should provide our own trait for this and have it
implement `futures_core::Stream` (or `std::async_iter::AsyncIterator`
once that's stabilized). For now the below is fine IMO, as we can cook
up a trait later as well. Switching over to hyper/1.0 isn't too pretty,
but it is what it is :s

> +    ) -> ReceiverStream<Result<ClientStreamResult, Error>> {
>          let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
>  
>          tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
>  
> -        accept::from_stream(ReceiverStream::new(secure_receiver))
> +        ReceiverStream::new(secure_receiver)
>      }
>  
>      pub fn accept_tls_optional(
> @@ -239,8 +239,8 @@ impl AcceptBuilder {
>          listener: TcpListener,
>          acceptor: Arc<Mutex<SslAcceptor>>,
>      ) -> (
> -        impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
> -        impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
> +        ReceiverStream<Result<ClientStreamResult, Error>>,
> +        ReceiverStream<Result<InsecureClientStreamResult, Error>>,
>      ) {
>          let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
>          let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
> @@ -252,8 +252,8 @@ impl AcceptBuilder {
>          ));
>  
>          (
> -            accept::from_stream(ReceiverStream::new(secure_receiver)),
> -            accept::from_stream(ReceiverStream::new(insecure_receiver)),
> +            ReceiverStream::new(secure_receiver),
> +            ReceiverStream::new(insecure_receiver),
>          )
>      }
>  }
> diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
> index 32ca9936..9ce87205 100644
> --- a/proxmox-rest-server/src/formatter.rs
> +++ b/proxmox-rest-server/src/formatter.rs
> @@ -5,12 +5,14 @@ use anyhow::Error;
>  use serde_json::{json, Value};
>  
>  use hyper::header;
> -use hyper::{Body, Response, StatusCode};
> +use hyper::{Response, StatusCode};
>  
> +use proxmox_http::Body;
>  use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
>  use proxmox_schema::ParameterError;
>  
>  /// Extension to set error message for server side logging
> +#[derive(Clone)]
>  pub(crate) struct ErrorMessageExtension(pub String);
>  
>  /// Methods to format data and errors
> @@ -168,11 +170,11 @@ impl OutputFormatter for JsonFormatter {
>  
>  pub(crate) fn error_to_response(err: Error) -> Response<Body> {
>      let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
> -        let mut resp = Response::new(Body::from(apierr.message.clone()));
> +        let mut resp = Response::new(apierr.message.clone().into());
>          *resp.status_mut() = apierr.code;
>          resp
>      } else {
> -        let mut resp = Response::new(Body::from(err.to_string()));
> +        let mut resp = Response::new(err.to_string().into());
>          *resp.status_mut() = StatusCode::BAD_REQUEST;
>          resp
>      };
> diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs
> index db6e3b0a..18258e14 100644
> --- a/proxmox-rest-server/src/h2service.rs
> +++ b/proxmox-rest-server/src/h2service.rs
> @@ -6,8 +6,10 @@ use std::sync::Arc;
>  use std::task::{Context, Poll};
>  
>  use futures::*;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::body::Incoming;
> +use hyper::{Request, Response, StatusCode};
>  
> +use proxmox_http::Body;
>  use proxmox_router::http_err;
>  use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
>  
> @@ -19,6 +21,7 @@ use crate::{normalize_path_with_components, WorkerTask};
>  /// We use this kind of service to handle backup protocol
>  /// connections. State is stored inside the generic ``rpcenv``. Logs
>  /// goes into the ``WorkerTask`` log.
> +#[derive(Clone)]
>  pub struct H2Service<E> {
>      router: &'static Router,
>      rpcenv: E,
> @@ -42,7 +45,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
>          }
>      }
>  
> -    fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
> +    fn handle_request(&self, req: Request<Incoming>) -> ApiResponseFuture {
>          let (parts, body) = req.into_parts();
>  
>          let method = parts.method.clone();
> @@ -103,7 +106,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
>      }
>  }
>  
> -impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
> +impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Incoming>> for H2Service<E> {
>      type Response = Response<Body>;
>      type Error = Error;
>      #[allow(clippy::type_complexity)]
> @@ -113,7 +116,7 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
>          Poll::Ready(Ok(()))
>      }
>  
> -    fn call(&mut self, req: Request<Body>) -> Self::Future {
> +    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
>          let path = req.uri().path().to_owned();
>          let method = req.method().clone();
>          let worker = self.worker.clone();
> @@ -126,14 +129,14 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
>                  }
>                  Err(err) => {
>                      if let Some(apierr) = err.downcast_ref::<HttpError>() {
> -                        let mut resp = Response::new(Body::from(apierr.message.clone()));
> +                        let mut resp = Response::new(apierr.message.clone().into());
>                          resp.extensions_mut()
>                              .insert(ErrorMessageExtension(apierr.message.clone()));
>                          *resp.status_mut() = apierr.code;
>                          Self::log_response(worker, method, &path, &resp);
>                          Ok(resp)
>                      } else {
> -                        let mut resp = Response::new(Body::from(err.to_string()));
> +                        let mut resp = Response::new(err.to_string().into());
>                          resp.extensions_mut()
>                              .insert(ErrorMessageExtension(err.to_string()));
>                          *resp.status_mut() = StatusCode::BAD_REQUEST;
> diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
> index 43dafa91..5ddd3667 100644
> --- a/proxmox-rest-server/src/lib.rs
> +++ b/proxmox-rest-server/src/lib.rs
> @@ -34,7 +34,7 @@ mod environment;
>  pub use environment::*;
>  
>  mod api_config;
> -pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
> +pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
>  
>  mod rest;
>  pub use rest::{Redirector, RestServer};
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index f5a72052..f902592d 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -10,17 +10,25 @@ use std::task::{Context, Poll};
>  use anyhow::{bail, format_err, Error};
>  use futures::future::FutureExt;
>  use futures::stream::TryStreamExt;
> -use hyper::body::HttpBody;
> +use http_body_util::{BodyDataStream, BodyStream};
> +use hyper::body::{Body as HyperBody, Incoming};
>  use hyper::header::{self, HeaderMap};
>  use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{Request, Response, StatusCode};
> +use hyper_util::rt::{TokioExecutor, TokioIo};
> +use hyper_util::server::conn;
> +use hyper_util::server::graceful::GracefulShutdown;
> +use hyper_util::service::TowerToHyperService;
>  use regex::Regex;
>  use serde_json::Value;
>  use tokio::fs::File;
> +use tokio::io::{AsyncRead, AsyncWrite};
>  use tokio::time::Instant;
> +use tokio_stream::wrappers::ReceiverStream;
>  use tower_service::Service;
>  use url::form_urlencoded;
>  
> +use proxmox_http::Body;
>  use proxmox_router::{
>      check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
>      RpcEnvironmentType, UserInformation,
> @@ -40,6 +48,7 @@ unsafe extern "C" {
>      fn tzset();
>  }
>  
> +#[derive(Clone)]
>  struct AuthStringExtension(String);
>  
>  pub(crate) struct EmptyUserInformation {}
> @@ -74,24 +83,11 @@ impl RestServer {
>              api_config: Arc::new(api_config),
>          }
>      }
> -}
>  
> -impl<T: PeerAddress> Service<&T> for RestServer {
> -    type Response = ApiService;
> -    type Error = Error;
> -    type Future = std::future::Ready<Result<ApiService, Error>>;
> -
> -    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
> -        Poll::Ready(Ok(()))
> -    }
> -
> -    fn call(&mut self, ctx: &T) -> Self::Future {
> -        std::future::ready(match ctx.peer_addr() {
> -            Err(err) => Err(format_err!("unable to get peer address - {}", err)),
> -            Ok(peer) => Ok(ApiService {
> -                peer,
> -                api_config: Arc::clone(&self.api_config),
> -            }),
> +    pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> +        Ok(ApiService {
> +            peer: peer.peer_addr()?,
> +            api_config: Arc::clone(&self.api_config),
>          })
>      }
>  }
> @@ -108,25 +104,40 @@ impl Redirector {
>      pub fn new() -> Self {
>          Self {}
>      }
> -}
>  
> -impl<T> Service<&T> for Redirector {
> -    type Response = RedirectService;
> -    type Error = Error;
> -    type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
> -
> -    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
> -        Poll::Ready(Ok(()))
> -    }
> -
> -    fn call(&mut self, _ctx: &T) -> Self::Future {
> -        std::future::ready(Ok(RedirectService {}))
> +    pub fn redirect_service(&self) -> RedirectService {
> +        RedirectService {}
>      }
>  }
>  
> +#[derive(Clone)]
>  pub struct RedirectService;
>  
> -impl Service<Request<Body>> for RedirectService {
> +impl RedirectService {
> +    pub async fn serve<S>(
> +        self,
> +        conn: S,
> +        mut graceful: Option<Arc<GracefulShutdown>>,
> +    ) -> Result<(), Error>
> +    where
> +        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> +    {
> +        let api_service = TowerToHyperService::new(self);
> +        let io = TokioIo::new(conn);
> +        let api_conn = conn::auto::Builder::new(TokioExecutor::new());
> +        let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
> +        if let Some(graceful) = graceful.take() {
> +            let api_conn = graceful.watch(api_conn);
> +            drop(graceful);
> +            api_conn.await
> +        } else {
> +            api_conn.await
> +        }
> +        .map_err(|err| format_err!("error serving redirect connection: {err}"))
> +    }
> +}
> +
> +impl Service<Request<Incoming>> for RedirectService {
>      type Response = Response<Body>;
>      type Error = anyhow::Error;
>      type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
> @@ -135,7 +146,7 @@ impl Service<Request<Body>> for RedirectService {
>          Poll::Ready(Ok(()))
>      }
>  
> -    fn call(&mut self, req: Request<Body>) -> Self::Future {
> +    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
>          let future = async move {
>              let header_host_value = req
>                  .headers()
> @@ -194,12 +205,6 @@ impl PeerAddress for tokio::net::TcpStream {
>      }
>  }
>  
> -impl PeerAddress for hyper::server::conn::AddrStream {
> -    fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
> -        Ok(self.remote_addr())
> -    }
> -}
> -
>  impl PeerAddress for tokio::net::UnixStream {
>      fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
>          // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
> @@ -223,11 +228,36 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>  // Rust wants this type 'pub' here (else we get 'private type `ApiService`
>  // in public interface'). The type is still private because the crate does
>  // not export it.
> +#[derive(Clone)]
>  pub struct ApiService {
>      pub peer: std::net::SocketAddr,
>      pub api_config: Arc<ApiConfig>,
>  }
>  
> +impl ApiService {
> +    pub async fn serve<S>(
> +        self,
> +        conn: S,
> +        mut graceful: Option<Arc<GracefulShutdown>>,
> +    ) -> Result<(), Error>
> +    where
> +        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> +    {
> +        let api_service = TowerToHyperService::new(self);
> +        let io = TokioIo::new(conn);
> +        let api_conn = conn::auto::Builder::new(TokioExecutor::new());
> +        let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
> +        if let Some(graceful) = graceful.take() {
> +            let api_conn = graceful.watch(api_conn);
> +            drop(graceful);
> +            api_conn.await
> +        } else {
> +            api_conn.await
> +        }
> +        .map_err(|err| format_err!("error serving connection: {err}"))
> +    }
> +}
> +
>  fn log_response(
>      logfile: Option<&Arc<Mutex<FileLogger>>>,
>      peer: &std::net::SocketAddr,
> @@ -307,7 +337,7 @@ fn get_user_agent(headers: &HeaderMap) -> Option<String> {
>          .ok()
>  }
>  
> -impl Service<Request<Body>> for ApiService {
> +impl Service<Request<Incoming>> for ApiService {
>      type Response = Response<Body>;
>      type Error = Error;
>      #[allow(clippy::type_complexity)]
> @@ -317,7 +347,7 @@ impl Service<Request<Body>> for ApiService {
>          Poll::Ready(Ok(()))
>      }
>  
> -    fn call(&mut self, req: Request<Body>) -> Self::Future {
> +    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
>          let path = req.uri().path_and_query().unwrap().as_str().to_owned();
>          let method = req.method().clone();
>          let user_agent = get_user_agent(req.headers());
> @@ -384,7 +414,7 @@ fn parse_query_parameters<S: 'static + BuildHasher + Send>(
>  async fn get_request_parameters<S: 'static + BuildHasher + Send>(
>      param_schema: ParameterSchema,
>      parts: &Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      uri_param: HashMap<String, String, S>,
>  ) -> Result<Value, Error> {
>      let mut is_json = false;
> @@ -401,13 +431,17 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
>          }
>      }
>  
> -    let body = TryStreamExt::map_err(req_body, |err| {
> +    let stream_body = BodyStream::new(req_body);
> +    let body = TryStreamExt::map_err(stream_body, |err| {
>          http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
>      })
> -    .try_fold(Vec::new(), |mut acc, chunk| async move {
> +    .try_fold(Vec::new(), |mut acc, frame| async move {
>          // FIXME: max request body size?
> -        if acc.len() + chunk.len() < 64 * 1024 {
> -            acc.extend_from_slice(&chunk);
> +        let frame = frame
> +            .into_data()
> +            .map_err(|err| format_err!("Failed to read request body frame - {err:?}"))?;
> +        if acc.len() + frame.len() < 64 * 1024 {
> +            acc.extend_from_slice(&frame);
>              Ok(acc)
>          } else {
>              Err(http_err!(BAD_REQUEST, "Request body too large"))
> @@ -437,13 +471,14 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
>      }
>  }
>  
> +#[derive(Clone)]
>  struct NoLogExtension();
>  
>  async fn proxy_protected_request(
>      config: &ApiConfig,
>      info: &ApiMethod,
>      mut parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      peer: &std::net::SocketAddr,
>  ) -> Result<Response<Body>, Error> {
>      let mut uri_parts = parts.uri.clone().into_parts();
> @@ -463,9 +498,14 @@ async fn proxy_protected_request(
>      let reload_timezone = info.reload_timezone;
>  
>      let mut resp = match config.privileged_addr.clone() {
> -        None => hyper::client::Client::new().request(request).await?,
> +        None => {
> +            hyper_util::client::legacy::Client::builder(TokioExecutor::new())
> +                .build_http()
> +                .request(request)
> +                .await?
> +        }
>          Some(addr) => {
> -            hyper::client::Client::builder()
> +            hyper_util::client::legacy::Client::builder(TokioExecutor::new())
>                  .build(addr)
>                  .request(request)
>                  .await?
> @@ -479,7 +519,7 @@ async fn proxy_protected_request(
>          }
>      }
>  
> -    Ok(resp)
> +    Ok(resp.map(|b| Body::wrap_stream(BodyDataStream::new(b))))
>  }
>  
>  fn delay_unauth_time() -> std::time::Instant {
> @@ -491,22 +531,23 @@ fn access_forbidden_time() -> std::time::Instant {
>  }
>  
>  fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result<Response<Body>, Error> {
> -    let (mut send, body) = hyper::Body::channel();
> +    let (send, body) = tokio::sync::mpsc::channel::<Result<Vec<u8>, Error>>(1);
>      tokio::spawn(async move {
>          use futures::StreamExt;
>  
>          let mut stream = stream.into_inner();
>          while let Some(record) = stream.next().await {
> -            if send.send_data(record.to_bytes().into()).await.is_err() {
> +            if send.send(Ok(record.to_bytes())).await.is_err() {
>                  break;
>              }
>          }
>      });
>  
> -    Ok(Response::builder()
> +    Response::builder()
>          .status(http::StatusCode::OK)
>          .header(http::header::CONTENT_TYPE, "application/json-seq")
> -        .body(body)?)
> +        .body(Body::wrap_stream(ReceiverStream::new(body)))
> +        .map_err(Error::from)
>  }
>  
>  fn handle_sync_stream_as_json_seq(
> @@ -527,7 +568,7 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
>      info: &'static ApiMethod,
>      formatter: Option<&'static dyn OutputFormatter>,
>      parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      uri_param: HashMap<String, String, S>,
>  ) -> Result<Response<Body>, Error> {
>      let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER);
> @@ -630,9 +671,10 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
>              );
>              resp.map(|body| {
>                  Body::wrap_stream(
> -                    DeflateEncoder::builder(TryStreamExt::map_err(body, |err| {
> -                        proxmox_lang::io_format_err!("error during compression: {}", err)
> -                    }))
> +                    DeflateEncoder::builder(TryStreamExt::map_err(
> +                        BodyDataStream::new(body),
> +                        |err| proxmox_lang::io_format_err!("error during compression: {}", err),

Could inline the `err` above into the args of `io_format_err` since
you're touching that part (unless our macro doesn't support that, but I
assume it does).

> +                    ))
>                      .zlib(true)
>                      .flush_window(is_streaming.then_some(64 * 1024))
>                      .build(),
> @@ -796,7 +838,7 @@ fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMe
>  impl ApiConfig {
>      pub async fn handle_request(
>          self: Arc<ApiConfig>,
> -        req: Request<Body>,
> +        req: Request<Incoming>,
>          peer: &std::net::SocketAddr,
>      ) -> Result<Response<Body>, Error> {
>          let (parts, body) = req.into_parts();
> @@ -808,7 +850,7 @@ impl ApiConfig {
>          if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
>              return Ok(Response::builder()
>                  .status(StatusCode::URI_TOO_LONG)
> -                .body("".into())
> +                .body(Body::empty())
>                  .unwrap());
>          }
>  
> @@ -907,7 +949,7 @@ impl Action {
>  
>  pub struct ApiRequestData<'a> {
>      parts: Parts,
> -    body: Body,
> +    body: Incoming,
>      peer: &'a std::net::SocketAddr,
>      config: &'a ApiConfig,
>      full_path: &'a str,



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

  reply	other threads:[~2025-04-02 13:34 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34   ` Max Carrara [this message]
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32   ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=D8W6WZYUNVSL.24VBRUOPALH4M@proxmox.com \
    --to=m.carrara@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal