public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: "Max Carrara" <m.carrara@proxmox.com>
To: "Proxmox Backup Server development discussion"
	<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox-backup 5/6] adapt to hyper/http 1.0
Date: Wed, 02 Apr 2025 15:36:50 +0200	[thread overview]
Message-ID: <D8W6Z2CTLGTA.9MRQMOW9SZK4@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-23-f.gruenbichler@proxmox.com>

On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> similar to the other changes:
> - Body to Incoming or proxmox-http's Body
> - use adapters between hyper<->tower and hyper<->tokio
> - adapt to new proxmox-rest-server interfaces
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
>  proxmox-backup-client/Cargo.toml      |  1 +
>  proxmox-backup-client/src/snapshot.rs |  2 +-
>  src/acme/client.rs                    |  6 ++-
>  src/acme/plugin.rs                    | 62 ++++++++++++++++--------
>  src/api2/admin/datastore.rs           | 20 +++-----
>  src/api2/backup/environment.rs        |  3 +-
>  src/api2/backup/mod.rs                | 10 ++--
>  src/api2/backup/upload_chunk.rs       | 47 ++++++++++--------
>  src/api2/helpers.rs                   |  3 +-
>  src/api2/node/mod.rs                  |  7 +--
>  src/api2/node/tasks.rs                |  7 +--
>  src/api2/reader/mod.rs                | 17 ++++---
>  src/bin/proxmox-backup-api.rs         | 40 ++++++++++-----
>  src/bin/proxmox-backup-proxy.rs       | 70 ++++++++++++++++++++++-----
>  14 files changed, 197 insertions(+), 98 deletions(-)
>
> diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
> index a91a4908b..5f0140e78 100644
> --- a/proxmox-backup-client/Cargo.toml
> +++ b/proxmox-backup-client/Cargo.toml
> @@ -24,6 +24,7 @@ pxar.workspace = true
>  
>  proxmox-async.workspace = true
>  proxmox-human-byte.workspace = true
> +proxmox-http = { workspace = true, features = [ "body" ] }
>  proxmox-log.workspace = true
>  proxmox-io.workspace = true
>  proxmox-router = { workspace = true, features = [ "cli" ] }
> diff --git a/proxmox-backup-client/src/snapshot.rs b/proxmox-backup-client/src/snapshot.rs
> index f195c23b7..f1569db2e 100644
> --- a/proxmox-backup-client/src/snapshot.rs
> +++ b/proxmox-backup-client/src/snapshot.rs
> @@ -271,7 +271,7 @@ async fn upload_log(param: Value) -> Result<Value, Error> {
>      );
>  
>      let args = snapshot_args(&backup_ns, &snapshot)?;
> -    let body = hyper::Body::from(raw_data);
> +    let body = proxmox_http::Body::from(raw_data);
>  
>      client
>          .upload("application/octet-stream", body, &path, Some(args))
> diff --git a/src/acme/client.rs b/src/acme/client.rs
> index 97f628e37..4e55393e4 100644
> --- a/src/acme/client.rs
> +++ b/src/acme/client.rs
> @@ -6,8 +6,10 @@ use std::os::unix::fs::OpenOptionsExt;
>  
>  use anyhow::{bail, format_err};
>  use bytes::Bytes;
> -use hyper::{body::HttpBody, Body, Request};
> +use http_body_util::BodyExt;
> +use hyper::Request;
>  use nix::sys::stat::Mode;
> +use proxmox_http::Body;
>  use serde::{Deserialize, Serialize};
>  
>  use proxmox_acme::account::AccountCreator;
> @@ -618,7 +620,7 @@ impl AcmeClient {
>              response.json()?,
>          ));
>  
> -        Ok((directory.as_ref().unwrap(), nonce.as_deref()))
> +        Ok((directory.as_mut().unwrap(), nonce.as_deref()))
>      }
>  
>      /// Like `get_directory`, but if the directory provides no nonce, also performs a `HEAD`
> diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
> index c33cfe405..9141670e7 100644
> --- a/src/acme/plugin.rs
> +++ b/src/acme/plugin.rs
> @@ -1,12 +1,21 @@
>  use std::future::Future;
> +use std::net::{IpAddr, SocketAddr};
>  use std::pin::Pin;
>  use std::process::Stdio;
>  use std::sync::Arc;
>  use std::time::Duration;
>  
>  use anyhow::{bail, format_err, Error};
> -use hyper::{Body, Request, Response};
> +use bytes::Bytes;
> +use futures::TryFutureExt;
> +use http_body_util::Full;
> +use hyper::body::Incoming;
> +use hyper::server::conn::http1;
> +use hyper::service::service_fn;
> +use hyper::{Request, Response};
> +use hyper_util::rt::TokioIo;
>  use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
> +use tokio::net::TcpListener;
>  use tokio::process::Command;
>  
>  use proxmox_acme::{Authorization, Challenge};
> @@ -235,10 +244,10 @@ impl StandaloneServer {
>  }
>  
>  async fn standalone_respond(
> -    req: Request<Body>,
> +    req: Request<Incoming>,
>      path: Arc<String>,
>      key_auth: Arc<String>,
> -) -> Result<Response<Body>, hyper::Error> {
> +) -> Result<Response<Full<Bytes>>, hyper::Error> {
>      if req.method() == hyper::Method::GET && req.uri().path() == path.as_str() {
>          Ok(Response::builder()
>              .status(hyper::http::StatusCode::OK)
> @@ -260,9 +269,6 @@ impl AcmePlugin for StandaloneServer {
>          _domain: &'d AcmeDomain,
>          _task: Arc<WorkerTask>,
>      ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
> -        use hyper::server::conn::AddrIncoming;
> -        use hyper::service::{make_service_fn, service_fn};
> -
>          Box::pin(async move {
>              self.stop();
>  
> @@ -273,22 +279,40 @@ impl AcmePlugin for StandaloneServer {
>              let key_auth = Arc::new(client.key_authorization(token)?);
>              let path = Arc::new(format!("/.well-known/acme-challenge/{}", token));
>  
> -            let service = make_service_fn(move |_| {
> -                let path = Arc::clone(&path);
> -                let key_auth = Arc::clone(&key_auth);
> -                async move {
> -                    Ok::<_, hyper::Error>(service_fn(move |request| {
> -                        standalone_respond(request, Arc::clone(&path), Arc::clone(&key_auth))
> -                    }))
> -                }
> -            });
> -
>              // `[::]:80` first, then `*:80`
> -            let incoming = AddrIncoming::bind(&(([0u16; 8], 80).into()))
> -                .or_else(|_| AddrIncoming::bind(&(([0u8; 4], 80).into())))?;
> +            let dual = SocketAddr::new(IpAddr::from([0u16; 8]), 80);
> +            let ipv4 = SocketAddr::new(IpAddr::from([0u8; 4]), 80);
> +            let incoming = TcpListener::bind(dual)
> +                .or_else(|_| TcpListener::bind(ipv4))
> +                .await?;
>  
> -            let server = hyper::Server::builder(incoming).serve(service);
> +            let server = async move {
> +                loop {
> +                    let key_auth = Arc::clone(&key_auth);
> +                    let path = Arc::clone(&path);
> +                    match incoming.accept().await {
> +                        Ok((tcp, _)) => {
> +                            let io = TokioIo::new(tcp);
> +                            let service = service_fn(move |request| {
> +                                standalone_respond(
> +                                    request,
> +                                    Arc::clone(&path),
> +                                    Arc::clone(&key_auth),
> +                                )
> +                            });
>  
> +                            tokio::task::spawn(async move {
> +                                if let Err(err) =
> +                                    http1::Builder::new().serve_connection(io, service).await
> +                                {
> +                                    println!("Error serving connection: {err:?}");
> +                                }
> +                            });
> +                        }
> +                        Err(err) => println!("Error accepting connection: {err:?}"),
> +                    }
> +                }
> +            };
>              let (future, abort) = futures::future::abortable(server);
>              self.abort_handle = Some(abort);
>              tokio::spawn(future);
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 483e595c1..7aba5d313 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -9,8 +9,10 @@ use std::sync::Arc;
>  
>  use anyhow::{bail, format_err, Error};
>  use futures::*;
> +use http_body_util::BodyExt;
>  use hyper::http::request::Parts;
> -use hyper::{header, Body, Response, StatusCode};
> +use hyper::{body::Incoming, header, Response, StatusCode};
> +use proxmox_http::Body;
>  use serde::Deserialize;
>  use serde_json::{json, Value};
>  use tokio_stream::wrappers::ReceiverStream;
> @@ -1387,7 +1389,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
>  
>  pub fn download_file(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -1472,7 +1474,7 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
>  
>  pub fn download_file_decoded(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -1598,7 +1600,7 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
>  
>  pub fn upload_backup_log(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -1636,13 +1638,7 @@ pub fn upload_backup_log(
>              file_name = file_name.deref(),
>          );
>  
> -        let data = req_body
> -            .map_err(Error::from)
> -            .try_fold(Vec::new(), |mut acc, chunk| {
> -                acc.extend_from_slice(&chunk);
> -                future::ok::<_, Error>(acc)
> -            })
> -            .await?;
> +        let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
>  
>          // always verify blob/CRC at server side
>          let blob = DataBlob::load_from_reader(&mut &data[..])?;
> @@ -1815,7 +1811,7 @@ fn get_local_pxar_reader(
>  
>  pub fn pxar_file_download(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 99d885e2e..8a2e9ddcb 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -7,6 +7,7 @@ use tracing::info;
>  use ::serde::Serialize;
>  use serde_json::{json, Value};
>  
> +use proxmox_http::Body;
>  use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
>  use proxmox_sys::fs::{lock_dir_noblock_shared, replace_file, CreateOptions};
>  
> @@ -19,7 +20,7 @@ use proxmox_rest_server::{formatter::*, WorkerTask};
>  
>  use crate::backup::verify_backup_dir_with_lock;
>  
> -use hyper::{Body, Response};
> +use hyper::Response;
>  
>  #[derive(Copy, Clone, Serialize)]
>  struct UploadStatistic {
> diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
> index efc97a1fb..f4378e185 100644
> --- a/src/api2/backup/mod.rs
> +++ b/src/api2/backup/mod.rs
> @@ -5,11 +5,13 @@ use futures::*;
>  use hex::FromHex;
>  use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
>  use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{body::Incoming, Request, Response, StatusCode};
> +use hyper_util::service::TowerToHyperService;
>  use serde::Deserialize;
>  use serde_json::{json, Value};
>  use tracing::warn;
>  
> +use proxmox_http::Body;
>  use proxmox_rest_server::{H2Service, WorkerTask};
>  use proxmox_router::{http_err, list_subdirs_api_method};
>  use proxmox_router::{
> @@ -70,7 +72,7 @@ pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error>
>  
>  fn upgrade_to_backup_protocol(
>      parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -247,7 +249,7 @@ fn upgrade_to_backup_protocol(
>                          http.max_frame_size(4 * 1024 * 1024);
>  
>                          let env3 = env2.clone();
> -                        http.serve_connection(conn, service).map(move |result| {
> +                        http.serve_connection(conn, TowerToHyperService::new(service)).map(move |result| {
>                              match result {
>                                  Err(err) => {
>                                      // Avoid  Transport endpoint is not connected (os error 107)
> @@ -824,7 +826,7 @@ pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
>  
>  fn download_previous(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
> index 20259660a..2c66c2855 100644
> --- a/src/api2/backup/upload_chunk.rs
> +++ b/src/api2/backup/upload_chunk.rs
> @@ -5,8 +5,9 @@ use std::task::{Context, Poll};
>  use anyhow::{bail, format_err, Error};
>  use futures::*;
>  use hex::FromHex;
> +use http_body_util::{BodyDataStream, BodyExt};
> +use hyper::body::Incoming;
>  use hyper::http::request::Parts;
> -use hyper::Body;
>  use serde_json::{json, Value};
>  
>  use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
> @@ -21,7 +22,7 @@ use pbs_tools::json::{required_integer_param, required_string_param};
>  use super::environment::*;
>  
>  pub struct UploadChunk {
> -    stream: Body,
> +    stream: BodyDataStream<Incoming>,
>      store: Arc<DataStore>,
>      digest: [u8; 32],
>      size: u32,
> @@ -31,7 +32,7 @@ pub struct UploadChunk {
>  
>  impl UploadChunk {
>      pub fn new(
> -        stream: Body,
> +        stream: BodyDataStream<Incoming>,
>          store: Arc<DataStore>,
>          digest: [u8; 32],
>          size: u32,
> @@ -146,7 +147,7 @@ pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
>  
>  fn upload_fixed_chunk(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -161,8 +162,14 @@ fn upload_fixed_chunk(
>  
>          let env: &BackupEnvironment = rpcenv.as_ref();
>  
> -        let (digest, size, compressed_size, is_duplicate) =
> -            UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
> +        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> +            BodyDataStream::new(req_body),
> +            env.datastore.clone(),
> +            digest,
> +            size,
> +            encoded_size,
> +        )
> +        .await?;
>  
>          env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
>          let digest_str = hex::encode(digest);
> @@ -215,7 +222,7 @@ pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
>  
>  fn upload_dynamic_chunk(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -230,8 +237,14 @@ fn upload_dynamic_chunk(
>  
>          let env: &BackupEnvironment = rpcenv.as_ref();
>  
> -        let (digest, size, compressed_size, is_duplicate) =
> -            UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
> +        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> +            BodyDataStream::new(req_body),
> +            env.datastore.clone(),
> +            digest,
> +            size,
> +            encoded_size,
> +        )
> +        .await?;
>  
>          env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
>          let digest_str = hex::encode(digest);
> @@ -250,13 +263,13 @@ pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
>  
>  fn upload_speedtest(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      _param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
>  ) -> ApiResponseFuture {
>      async move {
> -        let result = req_body
> +        let result = BodyDataStream::new(req_body)
>              .map_err(Error::from)
>              .try_fold(0, |size: usize, chunk| {
>                  let sum = size + chunk.len();
> @@ -303,7 +316,7 @@ pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
>  
>  fn upload_blob(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -318,13 +331,7 @@ fn upload_blob(
>              bail!("wrong blob file extension: '{}'", file_name);
>          }
>  
> -        let data = req_body
> -            .map_err(Error::from)
> -            .try_fold(Vec::new(), |mut acc, chunk| {
> -                acc.extend_from_slice(&chunk);
> -                future::ok::<_, Error>(acc)
> -            })
> -            .await?;
> +        let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
>  
>          if encoded_size != data.len() {
>              bail!(
> @@ -334,7 +341,7 @@ fn upload_blob(
>              );
>          }
>  
> -        env.add_blob(&file_name, data)?;
> +        env.add_blob(&file_name, data.to_vec())?;
>  
>          Ok(env.format_response(Ok(Value::Null)))
>      }
> diff --git a/src/api2/helpers.rs b/src/api2/helpers.rs
> index 3dc1befc1..f346b0cca 100644
> --- a/src/api2/helpers.rs
> +++ b/src/api2/helpers.rs
> @@ -2,8 +2,9 @@ use std::path::PathBuf;
>  
>  use anyhow::Error;
>  use futures::stream::TryStreamExt;
> -use hyper::{header, Body, Response, StatusCode};
> +use hyper::{header, Response, StatusCode};
>  
> +use proxmox_http::Body;
>  use proxmox_router::http_bail;
>  
>  pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
> diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
> index 62b447096..e7c6213c1 100644
> --- a/src/api2/node/mod.rs
> +++ b/src/api2/node/mod.rs
> @@ -5,10 +5,11 @@ use std::os::unix::io::AsRawFd;
>  
>  use anyhow::{bail, format_err, Error};
>  use futures::future::{FutureExt, TryFutureExt};
> -use hyper::body::Body;
> +use hyper::body::Incoming;
>  use hyper::http::request::Parts;
>  use hyper::upgrade::Upgraded;
>  use hyper::Request;
> +use hyper_util::rt::TokioIo;
>  use serde_json::{json, Value};
>  use tokio::io::{AsyncBufReadExt, BufReader};
>  
> @@ -267,7 +268,7 @@ pub const API_METHOD_WEBSOCKET: ApiMethod = ApiMethod::new(
>  
>  fn upgrade_to_websocket(
>      parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -306,7 +307,7 @@ fn upgrade_to_websocket(
>              };
>  
>              let local = tokio::net::TcpStream::connect(format!("localhost:{}", port)).await?;
> -            ws.serve_connection(conn, local).await
> +            ws.serve_connection(TokioIo::new(conn), local).await
>          });
>  
>          Ok(response)
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index cad740559..bd6763069 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -3,9 +3,10 @@ use std::io::{BufRead, BufReader};
>  
>  use anyhow::{bail, Error};
>  use futures::FutureExt;
> +use hyper::body::Incoming;
>  use hyper::http::request::Parts;
>  use hyper::http::{header, Response, StatusCode};
> -use hyper::Body;
> +use proxmox_http::Body;
>  use serde_json::{json, Value};
>  
>  use proxmox_async::stream::AsyncReaderStream;
> @@ -321,7 +322,7 @@ pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
>  );
>  fn read_task_log(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -404,7 +405,7 @@ fn read_task_log(
>          Ok(Response::builder()
>              .status(StatusCode::OK)
>              .header(header::CONTENT_TYPE, "application/json")
> -            .body(Body::from(json.to_string()))
> +            .body(json.to_string().into())
>              .unwrap())
>      }
>      .boxed()
> diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
> index 1713f182b..b69000087 100644
> --- a/src/api2/reader/mod.rs
> +++ b/src/api2/reader/mod.rs
> @@ -3,12 +3,15 @@
>  use anyhow::{bail, format_err, Error};
>  use futures::*;
>  use hex::FromHex;
> +use hyper::body::Incoming;
>  use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
>  use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{Request, Response, StatusCode};
> +use hyper_util::service::TowerToHyperService;
>  use serde::Deserialize;
>  use serde_json::Value;
>  
> +use proxmox_http::Body;
>  use proxmox_rest_server::{H2Service, WorkerTask};
>  use proxmox_router::{
>      http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
> @@ -68,7 +71,7 @@ pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
>  
>  fn upgrade_to_backup_reader_protocol(
>      parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -190,7 +193,7 @@ fn upgrade_to_backup_reader_protocol(
>                      http.initial_connection_window_size(window_size);
>                      http.max_frame_size(4 * 1024 * 1024);
>  
> -                    http.serve_connection(conn, service)
> +                    http.serve_connection(conn, TowerToHyperService::new(service))
>                          .map_err(Error::from)
>                          .await
>                  };
> @@ -244,7 +247,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
>  
>  fn download_file(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -300,7 +303,7 @@ pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
>  
>  fn download_chunk(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -348,7 +351,7 @@ fn download_chunk(
>  /* this is too slow
>  fn download_chunk_old(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -393,7 +396,7 @@ pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
>  
>  fn speedtest(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      _param: Value,
>      _info: &ApiMethod,
>      _rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 7b4187550..438fd9d7e 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -1,12 +1,15 @@
>  use std::future::Future;
>  use std::pin::{pin, Pin};
> +use std::sync::Arc;
>  
>  use anyhow::{bail, Error};
> -use futures::*;
>  use hyper::http::Response;
> -use hyper::{Body, StatusCode};
> +use hyper::StatusCode;
> +use hyper_util::server::graceful::GracefulShutdown;
> +use tokio::net::TcpListener;
>  use tracing::level_filters::LevelFilter;
>  
> +use proxmox_http::Body;
>  use proxmox_lang::try_block;
>  use proxmox_rest_server::{ApiConfig, RestServer};
>  use proxmox_router::RpcEnvironmentType;
> @@ -34,7 +37,7 @@ fn get_index() -> Pin<Box<dyn Future<Output = Response<Body>> + Send>> {
>          Response::builder()
>              .status(StatusCode::OK)
>              .header(hyper::header::CONTENT_TYPE, "text/html")
> -            .body(index.into())
> +            .body(index.to_string().into())
>              .unwrap()
>      })
>  }
> @@ -108,17 +111,28 @@ async fn run() -> Result<(), Error> {
>      // http server future:
>      let server = proxmox_daemon::server::create_daemon(
>          ([127, 0, 0, 1], 82).into(),
> -        move |listener| {
> -            let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
> -
> -            Ok(async {
> +        move |listener: TcpListener| {
> +            Ok(async move {
>                  proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
> -
> -                hyper::Server::builder(incoming)
> -                    .serve(rest_server)
> -                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> -                    .map_err(Error::from)
> -                    .await
> +                let graceful = Arc::new(GracefulShutdown::new());
> +                loop {
> +                    let graceful2 = Arc::clone(&graceful);
> +                    tokio::select! {
> +                        incoming = listener.accept() => {
> +                            let (conn, _) = incoming?;
> +                            let api_service = rest_server.api_service(&conn)?;
> +                            tokio::spawn(async move { api_service.serve(conn, Some(graceful2)).await });
> +                        },
> +                        _shutdown = proxmox_daemon::shutdown_future() => {
> +                            break;
> +                        },
> +                    }
> +                }
> +                if let Some(shutdown) = Arc::into_inner(graceful) {
> +                    log::info!("shutting down..");
> +                    shutdown.shutdown().await
> +                }
> +                Ok(())
>              })
>          },
>          Some(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN),
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index c9a6032e6..8ee537207 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -7,7 +7,8 @@ use futures::*;
>  use hyper::header;
>  use hyper::http::request::Parts;
>  use hyper::http::Response;
> -use hyper::{Body, StatusCode};
> +use hyper::StatusCode;
> +use hyper_util::server::graceful::GracefulShutdown;
>  use tracing::level_filters::LevelFilter;
>  use tracing::{info, warn};
>  use url::form_urlencoded;
> @@ -15,6 +16,7 @@ use url::form_urlencoded;
>  use openssl::ssl::SslAcceptor;
>  use serde_json::{json, Value};
>  
> +use proxmox_http::Body;
>  use proxmox_lang::try_block;
>  use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
>  use proxmox_sys::fs::CreateOptions;
> @@ -289,27 +291,71 @@ async fn run() -> Result<(), Error> {
>      let server = proxmox_daemon::server::create_daemon(
>          ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
>          move |listener| {
> -            let (secure_connections, insecure_connections) =
> +            let (mut secure_connections, mut insecure_connections) =
>                  connections.accept_tls_optional(listener, acceptor);
>  
>              Ok(async {
>                  proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
>  
> -                let secure_server = hyper::Server::builder(secure_connections)
> -                    .serve(rest_server)
> -                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> -                    .map_err(Error::from);
> +                let secure_server = async move {
> +                    let graceful = Arc::new(GracefulShutdown::new());
> +                    loop {
> +                        let graceful2 = Arc::clone(&graceful);
> +                        tokio::select! {
> +                            Some(conn) = secure_connections.next() => {
> +                                match conn {
> +                                    Ok(conn) => {
> +                                        let api_service = rest_server.api_service(&conn)?;
> +                                        tokio::spawn(async move {
> +                                            api_service.serve(conn, Some(graceful2)).await
> +                                        });
> +                                    },
> +                                    Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
> +                                }
> +                            },
> +                            _shutdown = proxmox_daemon::shutdown_future() => {
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if let Some(shutdown) = Arc::into_inner(graceful) {
> +                        shutdown.shutdown().await
> +                    }
> +                    Ok::<(), Error>(())
> +                };
>  
> -                let insecure_server = hyper::Server::builder(insecure_connections)
> -                    .serve(redirector)
> -                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> -                    .map_err(Error::from);
> +                let insecure_server = async move {
> +                    let graceful = Arc::new(GracefulShutdown::new());
> +                    loop {
> +                        let graceful2 = Arc::clone(&graceful);
> +                        tokio::select! {
> +                            Some(conn) = insecure_connections.next() => {
> +                                match conn {
> +                                    Ok(conn) => {
> +                                        let redirect_service = redirector.redirect_service();
> +                                        tokio::spawn(async move {
> +                                            redirect_service.serve(conn, Some(graceful2)).await
> +                                        });
> +                                    },
> +                                    Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
> +                                }
> +                            },
> +                            _shutdown = proxmox_daemon::shutdown_future() => {
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if let Some(shutdown) = Arc::into_inner(graceful) {
> +                        shutdown.shutdown().await
> +                    }
> +                    Ok::<(), Error>(())
> +                };

The new `secure_server` and `insecure_server` above aren't really that
pretty; perhaps we can clean them up once we got a trait for the return
types of `accept_tls_optional()` and similar :s

In general I'd suggest breaking up the returned coroutine since it's
gotten quite large now, perhaps explicitly defining separate coroutines
for `secure_server` and `insecure_server` each, instead of using `async`
blocks, or explicitly defining the returned coroutine as a whole.
No hard feelings if you don't, though.

>  
>                  let (secure_res, insecure_res) =
>                      try_join!(tokio::spawn(secure_server), tokio::spawn(insecure_server))
>                          .context("failed to complete REST server task")?;
>  
> -                let results = [secure_res, insecure_res];
> +                let results: [Result<(), Error>; 2] = [secure_res, insecure_res];
>  
>                  if results.iter().any(Result::is_err) {
>                      let cat_errors = results
> @@ -321,7 +367,7 @@ async fn run() -> Result<(), Error> {
>                      bail!(cat_errors);
>                  }
>  
> -                Ok(())
> +                Ok::<(), Error>(())
>              })
>          },
>          Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

  reply	other threads:[~2025-04-02 13:37 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36   ` Max Carrara [this message]
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32   ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=D8W6Z2CTLGTA.9MRQMOW9SZK4@proxmox.com \
    --to=m.carrara@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal