From: "Max Carrara" <m.carrara@proxmox.com>
To: "Proxmox Backup Server development discussion"
<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox-backup 5/6] adapt to hyper/http 1.0
Date: Wed, 02 Apr 2025 15:36:50 +0200 [thread overview]
Message-ID: <D8W6Z2CTLGTA.9MRQMOW9SZK4@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-23-f.gruenbichler@proxmox.com>
On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> similar to the other changes:
> - Body to Incoming or proxmox-http's Body
> - use adapters between hyper<->tower and hyper<->tokio
> - adapt to new proxmox-rest-server interfaces
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
> proxmox-backup-client/Cargo.toml | 1 +
> proxmox-backup-client/src/snapshot.rs | 2 +-
> src/acme/client.rs | 6 ++-
> src/acme/plugin.rs | 62 ++++++++++++++++--------
> src/api2/admin/datastore.rs | 20 +++-----
> src/api2/backup/environment.rs | 3 +-
> src/api2/backup/mod.rs | 10 ++--
> src/api2/backup/upload_chunk.rs | 47 ++++++++++--------
> src/api2/helpers.rs | 3 +-
> src/api2/node/mod.rs | 7 +--
> src/api2/node/tasks.rs | 7 +--
> src/api2/reader/mod.rs | 17 ++++---
> src/bin/proxmox-backup-api.rs | 40 ++++++++++-----
> src/bin/proxmox-backup-proxy.rs | 70 ++++++++++++++++++++++-----
> 14 files changed, 197 insertions(+), 98 deletions(-)
>
> diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
> index a91a4908b..5f0140e78 100644
> --- a/proxmox-backup-client/Cargo.toml
> +++ b/proxmox-backup-client/Cargo.toml
> @@ -24,6 +24,7 @@ pxar.workspace = true
>
> proxmox-async.workspace = true
> proxmox-human-byte.workspace = true
> +proxmox-http = { workspace = true, features = [ "body" ] }
> proxmox-log.workspace = true
> proxmox-io.workspace = true
> proxmox-router = { workspace = true, features = [ "cli" ] }
> diff --git a/proxmox-backup-client/src/snapshot.rs b/proxmox-backup-client/src/snapshot.rs
> index f195c23b7..f1569db2e 100644
> --- a/proxmox-backup-client/src/snapshot.rs
> +++ b/proxmox-backup-client/src/snapshot.rs
> @@ -271,7 +271,7 @@ async fn upload_log(param: Value) -> Result<Value, Error> {
> );
>
> let args = snapshot_args(&backup_ns, &snapshot)?;
> - let body = hyper::Body::from(raw_data);
> + let body = proxmox_http::Body::from(raw_data);
>
> client
> .upload("application/octet-stream", body, &path, Some(args))
> diff --git a/src/acme/client.rs b/src/acme/client.rs
> index 97f628e37..4e55393e4 100644
> --- a/src/acme/client.rs
> +++ b/src/acme/client.rs
> @@ -6,8 +6,10 @@ use std::os::unix::fs::OpenOptionsExt;
>
> use anyhow::{bail, format_err};
> use bytes::Bytes;
> -use hyper::{body::HttpBody, Body, Request};
> +use http_body_util::BodyExt;
> +use hyper::Request;
> use nix::sys::stat::Mode;
> +use proxmox_http::Body;
> use serde::{Deserialize, Serialize};
>
> use proxmox_acme::account::AccountCreator;
> @@ -618,7 +620,7 @@ impl AcmeClient {
> response.json()?,
> ));
>
> - Ok((directory.as_ref().unwrap(), nonce.as_deref()))
> + Ok((directory.as_mut().unwrap(), nonce.as_deref()))
> }
>
> /// Like `get_directory`, but if the directory provides no nonce, also performs a `HEAD`
> diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
> index c33cfe405..9141670e7 100644
> --- a/src/acme/plugin.rs
> +++ b/src/acme/plugin.rs
> @@ -1,12 +1,21 @@
> use std::future::Future;
> +use std::net::{IpAddr, SocketAddr};
> use std::pin::Pin;
> use std::process::Stdio;
> use std::sync::Arc;
> use std::time::Duration;
>
> use anyhow::{bail, format_err, Error};
> -use hyper::{Body, Request, Response};
> +use bytes::Bytes;
> +use futures::TryFutureExt;
> +use http_body_util::Full;
> +use hyper::body::Incoming;
> +use hyper::server::conn::http1;
> +use hyper::service::service_fn;
> +use hyper::{Request, Response};
> +use hyper_util::rt::TokioIo;
> use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
> +use tokio::net::TcpListener;
> use tokio::process::Command;
>
> use proxmox_acme::{Authorization, Challenge};
> @@ -235,10 +244,10 @@ impl StandaloneServer {
> }
>
> async fn standalone_respond(
> - req: Request<Body>,
> + req: Request<Incoming>,
> path: Arc<String>,
> key_auth: Arc<String>,
> -) -> Result<Response<Body>, hyper::Error> {
> +) -> Result<Response<Full<Bytes>>, hyper::Error> {
> if req.method() == hyper::Method::GET && req.uri().path() == path.as_str() {
> Ok(Response::builder()
> .status(hyper::http::StatusCode::OK)
> @@ -260,9 +269,6 @@ impl AcmePlugin for StandaloneServer {
> _domain: &'d AcmeDomain,
> _task: Arc<WorkerTask>,
> ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
> - use hyper::server::conn::AddrIncoming;
> - use hyper::service::{make_service_fn, service_fn};
> -
> Box::pin(async move {
> self.stop();
>
> @@ -273,22 +279,40 @@ impl AcmePlugin for StandaloneServer {
> let key_auth = Arc::new(client.key_authorization(token)?);
> let path = Arc::new(format!("/.well-known/acme-challenge/{}", token));
>
> - let service = make_service_fn(move |_| {
> - let path = Arc::clone(&path);
> - let key_auth = Arc::clone(&key_auth);
> - async move {
> - Ok::<_, hyper::Error>(service_fn(move |request| {
> - standalone_respond(request, Arc::clone(&path), Arc::clone(&key_auth))
> - }))
> - }
> - });
> -
> // `[::]:80` first, then `*:80`
> - let incoming = AddrIncoming::bind(&(([0u16; 8], 80).into()))
> - .or_else(|_| AddrIncoming::bind(&(([0u8; 4], 80).into())))?;
> + let dual = SocketAddr::new(IpAddr::from([0u16; 8]), 80);
> + let ipv4 = SocketAddr::new(IpAddr::from([0u8; 4]), 80);
> + let incoming = TcpListener::bind(dual)
> + .or_else(|_| TcpListener::bind(ipv4))
> + .await?;
>
> - let server = hyper::Server::builder(incoming).serve(service);
> + let server = async move {
> + loop {
> + let key_auth = Arc::clone(&key_auth);
> + let path = Arc::clone(&path);
> + match incoming.accept().await {
> + Ok((tcp, _)) => {
> + let io = TokioIo::new(tcp);
> + let service = service_fn(move |request| {
> + standalone_respond(
> + request,
> + Arc::clone(&path),
> + Arc::clone(&key_auth),
> + )
> + });
>
> + tokio::task::spawn(async move {
> + if let Err(err) =
> + http1::Builder::new().serve_connection(io, service).await
> + {
> + println!("Error serving connection: {err:?}");
> + }
> + });
> + }
> + Err(err) => println!("Error accepting connection: {err:?}"),
> + }
> + }
> + };
> let (future, abort) = futures::future::abortable(server);
> self.abort_handle = Some(abort);
> tokio::spawn(future);
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 483e595c1..7aba5d313 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -9,8 +9,10 @@ use std::sync::Arc;
>
> use anyhow::{bail, format_err, Error};
> use futures::*;
> +use http_body_util::BodyExt;
> use hyper::http::request::Parts;
> -use hyper::{header, Body, Response, StatusCode};
> +use hyper::{body::Incoming, header, Response, StatusCode};
> +use proxmox_http::Body;
> use serde::Deserialize;
> use serde_json::{json, Value};
> use tokio_stream::wrappers::ReceiverStream;
> @@ -1387,7 +1389,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
>
> pub fn download_file(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -1472,7 +1474,7 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
>
> pub fn download_file_decoded(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -1598,7 +1600,7 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
>
> pub fn upload_backup_log(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -1636,13 +1638,7 @@ pub fn upload_backup_log(
> file_name = file_name.deref(),
> );
>
> - let data = req_body
> - .map_err(Error::from)
> - .try_fold(Vec::new(), |mut acc, chunk| {
> - acc.extend_from_slice(&chunk);
> - future::ok::<_, Error>(acc)
> - })
> - .await?;
> + let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
>
> // always verify blob/CRC at server side
> let blob = DataBlob::load_from_reader(&mut &data[..])?;
> @@ -1815,7 +1811,7 @@ fn get_local_pxar_reader(
>
> pub fn pxar_file_download(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 99d885e2e..8a2e9ddcb 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -7,6 +7,7 @@ use tracing::info;
> use ::serde::Serialize;
> use serde_json::{json, Value};
>
> +use proxmox_http::Body;
> use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
> use proxmox_sys::fs::{lock_dir_noblock_shared, replace_file, CreateOptions};
>
> @@ -19,7 +20,7 @@ use proxmox_rest_server::{formatter::*, WorkerTask};
>
> use crate::backup::verify_backup_dir_with_lock;
>
> -use hyper::{Body, Response};
> +use hyper::Response;
>
> #[derive(Copy, Clone, Serialize)]
> struct UploadStatistic {
> diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
> index efc97a1fb..f4378e185 100644
> --- a/src/api2/backup/mod.rs
> +++ b/src/api2/backup/mod.rs
> @@ -5,11 +5,13 @@ use futures::*;
> use hex::FromHex;
> use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
> use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{body::Incoming, Request, Response, StatusCode};
> +use hyper_util::service::TowerToHyperService;
> use serde::Deserialize;
> use serde_json::{json, Value};
> use tracing::warn;
>
> +use proxmox_http::Body;
> use proxmox_rest_server::{H2Service, WorkerTask};
> use proxmox_router::{http_err, list_subdirs_api_method};
> use proxmox_router::{
> @@ -70,7 +72,7 @@ pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error>
>
> fn upgrade_to_backup_protocol(
> parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -247,7 +249,7 @@ fn upgrade_to_backup_protocol(
> http.max_frame_size(4 * 1024 * 1024);
>
> let env3 = env2.clone();
> - http.serve_connection(conn, service).map(move |result| {
> + http.serve_connection(conn, TowerToHyperService::new(service)).map(move |result| {
> match result {
> Err(err) => {
> // Avoid Transport endpoint is not connected (os error 107)
> @@ -824,7 +826,7 @@ pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
>
> fn download_previous(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
> index 20259660a..2c66c2855 100644
> --- a/src/api2/backup/upload_chunk.rs
> +++ b/src/api2/backup/upload_chunk.rs
> @@ -5,8 +5,9 @@ use std::task::{Context, Poll};
> use anyhow::{bail, format_err, Error};
> use futures::*;
> use hex::FromHex;
> +use http_body_util::{BodyDataStream, BodyExt};
> +use hyper::body::Incoming;
> use hyper::http::request::Parts;
> -use hyper::Body;
> use serde_json::{json, Value};
>
> use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
> @@ -21,7 +22,7 @@ use pbs_tools::json::{required_integer_param, required_string_param};
> use super::environment::*;
>
> pub struct UploadChunk {
> - stream: Body,
> + stream: BodyDataStream<Incoming>,
> store: Arc<DataStore>,
> digest: [u8; 32],
> size: u32,
> @@ -31,7 +32,7 @@ pub struct UploadChunk {
>
> impl UploadChunk {
> pub fn new(
> - stream: Body,
> + stream: BodyDataStream<Incoming>,
> store: Arc<DataStore>,
> digest: [u8; 32],
> size: u32,
> @@ -146,7 +147,7 @@ pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
>
> fn upload_fixed_chunk(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -161,8 +162,14 @@ fn upload_fixed_chunk(
>
> let env: &BackupEnvironment = rpcenv.as_ref();
>
> - let (digest, size, compressed_size, is_duplicate) =
> - UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
> + let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> + BodyDataStream::new(req_body),
> + env.datastore.clone(),
> + digest,
> + size,
> + encoded_size,
> + )
> + .await?;
>
> env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
> let digest_str = hex::encode(digest);
> @@ -215,7 +222,7 @@ pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
>
> fn upload_dynamic_chunk(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -230,8 +237,14 @@ fn upload_dynamic_chunk(
>
> let env: &BackupEnvironment = rpcenv.as_ref();
>
> - let (digest, size, compressed_size, is_duplicate) =
> - UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
> + let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> + BodyDataStream::new(req_body),
> + env.datastore.clone(),
> + digest,
> + size,
> + encoded_size,
> + )
> + .await?;
>
> env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
> let digest_str = hex::encode(digest);
> @@ -250,13 +263,13 @@ pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
>
> fn upload_speedtest(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> _param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> ) -> ApiResponseFuture {
> async move {
> - let result = req_body
> + let result = BodyDataStream::new(req_body)
> .map_err(Error::from)
> .try_fold(0, |size: usize, chunk| {
> let sum = size + chunk.len();
> @@ -303,7 +316,7 @@ pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
>
> fn upload_blob(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -318,13 +331,7 @@ fn upload_blob(
> bail!("wrong blob file extension: '{}'", file_name);
> }
>
> - let data = req_body
> - .map_err(Error::from)
> - .try_fold(Vec::new(), |mut acc, chunk| {
> - acc.extend_from_slice(&chunk);
> - future::ok::<_, Error>(acc)
> - })
> - .await?;
> + let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
>
> if encoded_size != data.len() {
> bail!(
> @@ -334,7 +341,7 @@ fn upload_blob(
> );
> }
>
> - env.add_blob(&file_name, data)?;
> + env.add_blob(&file_name, data.to_vec())?;
>
> Ok(env.format_response(Ok(Value::Null)))
> }
> diff --git a/src/api2/helpers.rs b/src/api2/helpers.rs
> index 3dc1befc1..f346b0cca 100644
> --- a/src/api2/helpers.rs
> +++ b/src/api2/helpers.rs
> @@ -2,8 +2,9 @@ use std::path::PathBuf;
>
> use anyhow::Error;
> use futures::stream::TryStreamExt;
> -use hyper::{header, Body, Response, StatusCode};
> +use hyper::{header, Response, StatusCode};
>
> +use proxmox_http::Body;
> use proxmox_router::http_bail;
>
> pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
> diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
> index 62b447096..e7c6213c1 100644
> --- a/src/api2/node/mod.rs
> +++ b/src/api2/node/mod.rs
> @@ -5,10 +5,11 @@ use std::os::unix::io::AsRawFd;
>
> use anyhow::{bail, format_err, Error};
> use futures::future::{FutureExt, TryFutureExt};
> -use hyper::body::Body;
> +use hyper::body::Incoming;
> use hyper::http::request::Parts;
> use hyper::upgrade::Upgraded;
> use hyper::Request;
> +use hyper_util::rt::TokioIo;
> use serde_json::{json, Value};
> use tokio::io::{AsyncBufReadExt, BufReader};
>
> @@ -267,7 +268,7 @@ pub const API_METHOD_WEBSOCKET: ApiMethod = ApiMethod::new(
>
> fn upgrade_to_websocket(
> parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -306,7 +307,7 @@ fn upgrade_to_websocket(
> };
>
> let local = tokio::net::TcpStream::connect(format!("localhost:{}", port)).await?;
> - ws.serve_connection(conn, local).await
> + ws.serve_connection(TokioIo::new(conn), local).await
> });
>
> Ok(response)
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index cad740559..bd6763069 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -3,9 +3,10 @@ use std::io::{BufRead, BufReader};
>
> use anyhow::{bail, Error};
> use futures::FutureExt;
> +use hyper::body::Incoming;
> use hyper::http::request::Parts;
> use hyper::http::{header, Response, StatusCode};
> -use hyper::Body;
> +use proxmox_http::Body;
> use serde_json::{json, Value};
>
> use proxmox_async::stream::AsyncReaderStream;
> @@ -321,7 +322,7 @@ pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
> );
> fn read_task_log(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -404,7 +405,7 @@ fn read_task_log(
> Ok(Response::builder()
> .status(StatusCode::OK)
> .header(header::CONTENT_TYPE, "application/json")
> - .body(Body::from(json.to_string()))
> + .body(json.to_string().into())
> .unwrap())
> }
> .boxed()
> diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
> index 1713f182b..b69000087 100644
> --- a/src/api2/reader/mod.rs
> +++ b/src/api2/reader/mod.rs
> @@ -3,12 +3,15 @@
> use anyhow::{bail, format_err, Error};
> use futures::*;
> use hex::FromHex;
> +use hyper::body::Incoming;
> use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
> use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{Request, Response, StatusCode};
> +use hyper_util::service::TowerToHyperService;
> use serde::Deserialize;
> use serde_json::Value;
>
> +use proxmox_http::Body;
> use proxmox_rest_server::{H2Service, WorkerTask};
> use proxmox_router::{
> http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
> @@ -68,7 +71,7 @@ pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
>
> fn upgrade_to_backup_reader_protocol(
> parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -190,7 +193,7 @@ fn upgrade_to_backup_reader_protocol(
> http.initial_connection_window_size(window_size);
> http.max_frame_size(4 * 1024 * 1024);
>
> - http.serve_connection(conn, service)
> + http.serve_connection(conn, TowerToHyperService::new(service))
> .map_err(Error::from)
> .await
> };
> @@ -244,7 +247,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
>
> fn download_file(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -300,7 +303,7 @@ pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
>
> fn download_chunk(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -348,7 +351,7 @@ fn download_chunk(
> /* this is too slow
> fn download_chunk_old(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -393,7 +396,7 @@ pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
>
> fn speedtest(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> _param: Value,
> _info: &ApiMethod,
> _rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 7b4187550..438fd9d7e 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -1,12 +1,15 @@
> use std::future::Future;
> use std::pin::{pin, Pin};
> +use std::sync::Arc;
>
> use anyhow::{bail, Error};
> -use futures::*;
> use hyper::http::Response;
> -use hyper::{Body, StatusCode};
> +use hyper::StatusCode;
> +use hyper_util::server::graceful::GracefulShutdown;
> +use tokio::net::TcpListener;
> use tracing::level_filters::LevelFilter;
>
> +use proxmox_http::Body;
> use proxmox_lang::try_block;
> use proxmox_rest_server::{ApiConfig, RestServer};
> use proxmox_router::RpcEnvironmentType;
> @@ -34,7 +37,7 @@ fn get_index() -> Pin<Box<dyn Future<Output = Response<Body>> + Send>> {
> Response::builder()
> .status(StatusCode::OK)
> .header(hyper::header::CONTENT_TYPE, "text/html")
> - .body(index.into())
> + .body(index.to_string().into())
> .unwrap()
> })
> }
> @@ -108,17 +111,28 @@ async fn run() -> Result<(), Error> {
> // http server future:
> let server = proxmox_daemon::server::create_daemon(
> ([127, 0, 0, 1], 82).into(),
> - move |listener| {
> - let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
> -
> - Ok(async {
> + move |listener: TcpListener| {
> + Ok(async move {
> proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
> -
> - hyper::Server::builder(incoming)
> - .serve(rest_server)
> - .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> - .map_err(Error::from)
> - .await
> + let graceful = Arc::new(GracefulShutdown::new());
> + loop {
> + let graceful2 = Arc::clone(&graceful);
> + tokio::select! {
> + incoming = listener.accept() => {
> + let (conn, _) = incoming?;
> + let api_service = rest_server.api_service(&conn)?;
> + tokio::spawn(async move { api_service.serve(conn, Some(graceful2)).await });
> + },
> + _shutdown = proxmox_daemon::shutdown_future() => {
> + break;
> + },
> + }
> + }
> + if let Some(shutdown) = Arc::into_inner(graceful) {
> + log::info!("shutting down..");
> + shutdown.shutdown().await
> + }
> + Ok(())
> })
> },
> Some(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN),
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index c9a6032e6..8ee537207 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -7,7 +7,8 @@ use futures::*;
> use hyper::header;
> use hyper::http::request::Parts;
> use hyper::http::Response;
> -use hyper::{Body, StatusCode};
> +use hyper::StatusCode;
> +use hyper_util::server::graceful::GracefulShutdown;
> use tracing::level_filters::LevelFilter;
> use tracing::{info, warn};
> use url::form_urlencoded;
> @@ -15,6 +16,7 @@ use url::form_urlencoded;
> use openssl::ssl::SslAcceptor;
> use serde_json::{json, Value};
>
> +use proxmox_http::Body;
> use proxmox_lang::try_block;
> use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
> use proxmox_sys::fs::CreateOptions;
> @@ -289,27 +291,71 @@ async fn run() -> Result<(), Error> {
> let server = proxmox_daemon::server::create_daemon(
> ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
> move |listener| {
> - let (secure_connections, insecure_connections) =
> + let (mut secure_connections, mut insecure_connections) =
> connections.accept_tls_optional(listener, acceptor);
>
> Ok(async {
> proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
>
> - let secure_server = hyper::Server::builder(secure_connections)
> - .serve(rest_server)
> - .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> - .map_err(Error::from);
> + let secure_server = async move {
> + let graceful = Arc::new(GracefulShutdown::new());
> + loop {
> + let graceful2 = Arc::clone(&graceful);
> + tokio::select! {
> + Some(conn) = secure_connections.next() => {
> + match conn {
> + Ok(conn) => {
> + let api_service = rest_server.api_service(&conn)?;
> + tokio::spawn(async move {
> + api_service.serve(conn, Some(graceful2)).await
> + });
> + },
> + Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
> + }
> + },
> + _shutdown = proxmox_daemon::shutdown_future() => {
> + break;
> + }
> + }
> + }
> + if let Some(shutdown) = Arc::into_inner(graceful) {
> + shutdown.shutdown().await
> + }
> + Ok::<(), Error>(())
> + };
>
> - let insecure_server = hyper::Server::builder(insecure_connections)
> - .serve(redirector)
> - .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> - .map_err(Error::from);
> + let insecure_server = async move {
> + let graceful = Arc::new(GracefulShutdown::new());
> + loop {
> + let graceful2 = Arc::clone(&graceful);
> + tokio::select! {
> + Some(conn) = insecure_connections.next() => {
> + match conn {
> + Ok(conn) => {
> + let redirect_service = redirector.redirect_service();
> + tokio::spawn(async move {
> + redirect_service.serve(conn, Some(graceful2)).await
> + });
> + },
> + Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
> + }
> + },
> + _shutdown = proxmox_daemon::shutdown_future() => {
> + break;
> + }
> + }
> + }
> + if let Some(shutdown) = Arc::into_inner(graceful) {
> + shutdown.shutdown().await
> + }
> + Ok::<(), Error>(())
> + };
The new `secure_server` and `insecure_server` above aren't really that
pretty; perhaps we can clean them up once we got a trait for the return
types of `accept_tls_optional()` and similar :s
In general I'd suggest breaking up the returned coroutine since it's
gotten quite large now, perhaps explicitly defining separate coroutines
for `secure_server` and `insecure_server` each, instead of using `async`
blocks, or explicitly defining the returned coroutine as a whole.
No hard feelings if you don't, though.
>
> let (secure_res, insecure_res) =
> try_join!(tokio::spawn(secure_server), tokio::spawn(insecure_server))
> .context("failed to complete REST server task")?;
>
> - let results = [secure_res, insecure_res];
> + let results: [Result<(), Error>; 2] = [secure_res, insecure_res];
>
> if results.iter().any(Result::is_err) {
> let cat_errors = results
> @@ -321,7 +367,7 @@ async fn run() -> Result<(), Error> {
> bail!(cat_errors);
> }
>
> - Ok(())
> + Ok::<(), Error>(())
> })
> },
> Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-04-02 13:37 UTC|newest]
Thread overview: 32+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36 ` Max Carrara [this message]
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32 ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=D8W6Z2CTLGTA.9MRQMOW9SZK4@proxmox.com \
--to=m.carrara@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal