From: "Max Carrara" <m.carrara@proxmox.com>
To: "Proxmox Backup Server development discussion"
<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox-backup 5/6] adapt to hyper/http 1.0
Date: Wed, 02 Apr 2025 15:36:50 +0200 [thread overview]
Message-ID: <D8W6Z2CTLGTA.9MRQMOW9SZK4@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-23-f.gruenbichler@proxmox.com>
On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> similar to the other changes:
> - Body to Incoming or proxmox-http's Body
> - use adapters between hyper<->tower and hyper<->tokio
> - adapt to new proxmox-rest-server interfaces
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
> proxmox-backup-client/Cargo.toml | 1 +
> proxmox-backup-client/src/snapshot.rs | 2 +-
> src/acme/client.rs | 6 ++-
> src/acme/plugin.rs | 62 ++++++++++++++++--------
> src/api2/admin/datastore.rs | 20 +++-----
> src/api2/backup/environment.rs | 3 +-
> src/api2/backup/mod.rs | 10 ++--
> src/api2/backup/upload_chunk.rs | 47 ++++++++++--------
> src/api2/helpers.rs | 3 +-
> src/api2/node/mod.rs | 7 +--
> src/api2/node/tasks.rs | 7 +--
> src/api2/reader/mod.rs | 17 ++++---
> src/bin/proxmox-backup-api.rs | 40 ++++++++++-----
> src/bin/proxmox-backup-proxy.rs | 70 ++++++++++++++++++++++-----
> 14 files changed, 197 insertions(+), 98 deletions(-)
>
> diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
> index a91a4908b..5f0140e78 100644
> --- a/proxmox-backup-client/Cargo.toml
> +++ b/proxmox-backup-client/Cargo.toml
> @@ -24,6 +24,7 @@ pxar.workspace = true
>
> proxmox-async.workspace = true
> proxmox-human-byte.workspace = true
> +proxmox-http = { workspace = true, features = [ "body" ] }
> proxmox-log.workspace = true
> proxmox-io.workspace = true
> proxmox-router = { workspace = true, features = [ "cli" ] }
> diff --git a/proxmox-backup-client/src/snapshot.rs b/proxmox-backup-client/src/snapshot.rs
> index f195c23b7..f1569db2e 100644
> --- a/proxmox-backup-client/src/snapshot.rs
> +++ b/proxmox-backup-client/src/snapshot.rs
> @@ -271,7 +271,7 @@ async fn upload_log(param: Value) -> Result<Value, Error> {
> );
>
> let args = snapshot_args(&backup_ns, &snapshot)?;
> - let body = hyper::Body::from(raw_data);
> + let body = proxmox_http::Body::from(raw_data);
>
> client
> .upload("application/octet-stream", body, &path, Some(args))
> diff --git a/src/acme/client.rs b/src/acme/client.rs
> index 97f628e37..4e55393e4 100644
> --- a/src/acme/client.rs
> +++ b/src/acme/client.rs
> @@ -6,8 +6,10 @@ use std::os::unix::fs::OpenOptionsExt;
>
> use anyhow::{bail, format_err};
> use bytes::Bytes;
> -use hyper::{body::HttpBody, Body, Request};
> +use http_body_util::BodyExt;
> +use hyper::Request;
> use nix::sys::stat::Mode;
> +use proxmox_http::Body;
> use serde::{Deserialize, Serialize};
>
> use proxmox_acme::account::AccountCreator;
> @@ -618,7 +620,7 @@ impl AcmeClient {
> response.json()?,
> ));
>
> - Ok((directory.as_ref().unwrap(), nonce.as_deref()))
> + Ok((directory.as_mut().unwrap(), nonce.as_deref()))
> }
>
> /// Like `get_directory`, but if the directory provides no nonce, also performs a `HEAD`
> diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
> index c33cfe405..9141670e7 100644
> --- a/src/acme/plugin.rs
> +++ b/src/acme/plugin.rs
> @@ -1,12 +1,21 @@
> use std::future::Future;
> +use std::net::{IpAddr, SocketAddr};
> use std::pin::Pin;
> use std::process::Stdio;
> use std::sync::Arc;
> use std::time::Duration;
>
> use anyhow::{bail, format_err, Error};
> -use hyper::{Body, Request, Response};
> +use bytes::Bytes;
> +use futures::TryFutureExt;
> +use http_body_util::Full;
> +use hyper::body::Incoming;
> +use hyper::server::conn::http1;
> +use hyper::service::service_fn;
> +use hyper::{Request, Response};
> +use hyper_util::rt::TokioIo;
> use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
> +use tokio::net::TcpListener;
> use tokio::process::Command;
>
> use proxmox_acme::{Authorization, Challenge};
> @@ -235,10 +244,10 @@ impl StandaloneServer {
> }
>
> async fn standalone_respond(
> - req: Request<Body>,
> + req: Request<Incoming>,
> path: Arc<String>,
> key_auth: Arc<String>,
> -) -> Result<Response<Body>, hyper::Error> {
> +) -> Result<Response<Full<Bytes>>, hyper::Error> {
> if req.method() == hyper::Method::GET && req.uri().path() == path.as_str() {
> Ok(Response::builder()
> .status(hyper::http::StatusCode::OK)
> @@ -260,9 +269,6 @@ impl AcmePlugin for StandaloneServer {
> _domain: &'d AcmeDomain,
> _task: Arc<WorkerTask>,
> ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
> - use hyper::server::conn::AddrIncoming;
> - use hyper::service::{make_service_fn, service_fn};
> -
> Box::pin(async move {
> self.stop();
>
> @@ -273,22 +279,40 @@ impl AcmePlugin for StandaloneServer {
> let key_auth = Arc::new(client.key_authorization(token)?);
> let path = Arc::new(format!("/.well-known/acme-challenge/{}", token));
>
> - let service = make_service_fn(move |_| {
> - let path = Arc::clone(&path);
> - let key_auth = Arc::clone(&key_auth);
> - async move {
> - Ok::<_, hyper::Error>(service_fn(move |request| {
> - standalone_respond(request, Arc::clone(&path), Arc::clone(&key_auth))
> - }))
> - }
> - });
> -
> // `[::]:80` first, then `*:80`
> - let incoming = AddrIncoming::bind(&(([0u16; 8], 80).into()))
> - .or_else(|_| AddrIncoming::bind(&(([0u8; 4], 80).into())))?;
> + let dual = SocketAddr::new(IpAddr::from([0u16; 8]), 80);
> + let ipv4 = SocketAddr::new(IpAddr::from([0u8; 4]), 80);
> + let incoming = TcpListener::bind(dual)
> + .or_else(|_| TcpListener::bind(ipv4))
> + .await?;
>
> - let server = hyper::Server::builder(incoming).serve(service);
> + let server = async move {
> + loop {
> + let key_auth = Arc::clone(&key_auth);
> + let path = Arc::clone(&path);
> + match incoming.accept().await {
> + Ok((tcp, _)) => {
> + let io = TokioIo::new(tcp);
> + let service = service_fn(move |request| {
> + standalone_respond(
> + request,
> + Arc::clone(&path),
> + Arc::clone(&key_auth),
> + )
> + });
>
> + tokio::task::spawn(async move {
> + if let Err(err) =
> + http1::Builder::new().serve_connection(io, service).await
> + {
> + println!("Error serving connection: {err:?}");
> + }
> + });
> + }
> + Err(err) => println!("Error accepting connection: {err:?}"),
> + }
> + }
> + };
> let (future, abort) = futures::future::abortable(server);
> self.abort_handle = Some(abort);
> tokio::spawn(future);
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 483e595c1..7aba5d313 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -9,8 +9,10 @@ use std::sync::Arc;
>
> use anyhow::{bail, format_err, Error};
> use futures::*;
> +use http_body_util::BodyExt;
> use hyper::http::request::Parts;
> -use hyper::{header, Body, Response, StatusCode};
> +use hyper::{body::Incoming, header, Response, StatusCode};
> +use proxmox_http::Body;
> use serde::Deserialize;
> use serde_json::{json, Value};
> use tokio_stream::wrappers::ReceiverStream;
> @@ -1387,7 +1389,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
>
> pub fn download_file(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -1472,7 +1474,7 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
>
> pub fn download_file_decoded(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -1598,7 +1600,7 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
>
> pub fn upload_backup_log(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -1636,13 +1638,7 @@ pub fn upload_backup_log(
> file_name = file_name.deref(),
> );
>
> - let data = req_body
> - .map_err(Error::from)
> - .try_fold(Vec::new(), |mut acc, chunk| {
> - acc.extend_from_slice(&chunk);
> - future::ok::<_, Error>(acc)
> - })
> - .await?;
> + let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
>
> // always verify blob/CRC at server side
> let blob = DataBlob::load_from_reader(&mut &data[..])?;
> @@ -1815,7 +1811,7 @@ fn get_local_pxar_reader(
>
> pub fn pxar_file_download(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 99d885e2e..8a2e9ddcb 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -7,6 +7,7 @@ use tracing::info;
> use ::serde::Serialize;
> use serde_json::{json, Value};
>
> +use proxmox_http::Body;
> use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
> use proxmox_sys::fs::{lock_dir_noblock_shared, replace_file, CreateOptions};
>
> @@ -19,7 +20,7 @@ use proxmox_rest_server::{formatter::*, WorkerTask};
>
> use crate::backup::verify_backup_dir_with_lock;
>
> -use hyper::{Body, Response};
> +use hyper::Response;
>
> #[derive(Copy, Clone, Serialize)]
> struct UploadStatistic {
> diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
> index efc97a1fb..f4378e185 100644
> --- a/src/api2/backup/mod.rs
> +++ b/src/api2/backup/mod.rs
> @@ -5,11 +5,13 @@ use futures::*;
> use hex::FromHex;
> use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
> use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{body::Incoming, Request, Response, StatusCode};
> +use hyper_util::service::TowerToHyperService;
> use serde::Deserialize;
> use serde_json::{json, Value};
> use tracing::warn;
>
> +use proxmox_http::Body;
> use proxmox_rest_server::{H2Service, WorkerTask};
> use proxmox_router::{http_err, list_subdirs_api_method};
> use proxmox_router::{
> @@ -70,7 +72,7 @@ pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error>
>
> fn upgrade_to_backup_protocol(
> parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -247,7 +249,7 @@ fn upgrade_to_backup_protocol(
> http.max_frame_size(4 * 1024 * 1024);
>
> let env3 = env2.clone();
> - http.serve_connection(conn, service).map(move |result| {
> + http.serve_connection(conn, TowerToHyperService::new(service)).map(move |result| {
> match result {
> Err(err) => {
> // Avoid Transport endpoint is not connected (os error 107)
> @@ -824,7 +826,7 @@ pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
>
> fn download_previous(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
> index 20259660a..2c66c2855 100644
> --- a/src/api2/backup/upload_chunk.rs
> +++ b/src/api2/backup/upload_chunk.rs
> @@ -5,8 +5,9 @@ use std::task::{Context, Poll};
> use anyhow::{bail, format_err, Error};
> use futures::*;
> use hex::FromHex;
> +use http_body_util::{BodyDataStream, BodyExt};
> +use hyper::body::Incoming;
> use hyper::http::request::Parts;
> -use hyper::Body;
> use serde_json::{json, Value};
>
> use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
> @@ -21,7 +22,7 @@ use pbs_tools::json::{required_integer_param, required_string_param};
> use super::environment::*;
>
> pub struct UploadChunk {
> - stream: Body,
> + stream: BodyDataStream<Incoming>,
> store: Arc<DataStore>,
> digest: [u8; 32],
> size: u32,
> @@ -31,7 +32,7 @@ pub struct UploadChunk {
>
> impl UploadChunk {
> pub fn new(
> - stream: Body,
> + stream: BodyDataStream<Incoming>,
> store: Arc<DataStore>,
> digest: [u8; 32],
> size: u32,
> @@ -146,7 +147,7 @@ pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
>
> fn upload_fixed_chunk(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -161,8 +162,14 @@ fn upload_fixed_chunk(
>
> let env: &BackupEnvironment = rpcenv.as_ref();
>
> - let (digest, size, compressed_size, is_duplicate) =
> - UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
> + let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> + BodyDataStream::new(req_body),
> + env.datastore.clone(),
> + digest,
> + size,
> + encoded_size,
> + )
> + .await?;
>
> env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
> let digest_str = hex::encode(digest);
> @@ -215,7 +222,7 @@ pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
>
> fn upload_dynamic_chunk(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -230,8 +237,14 @@ fn upload_dynamic_chunk(
>
> let env: &BackupEnvironment = rpcenv.as_ref();
>
> - let (digest, size, compressed_size, is_duplicate) =
> - UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
> + let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> + BodyDataStream::new(req_body),
> + env.datastore.clone(),
> + digest,
> + size,
> + encoded_size,
> + )
> + .await?;
>
> env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
> let digest_str = hex::encode(digest);
> @@ -250,13 +263,13 @@ pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
>
> fn upload_speedtest(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> _param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> ) -> ApiResponseFuture {
> async move {
> - let result = req_body
> + let result = BodyDataStream::new(req_body)
> .map_err(Error::from)
> .try_fold(0, |size: usize, chunk| {
> let sum = size + chunk.len();
> @@ -303,7 +316,7 @@ pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
>
> fn upload_blob(
> _parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -318,13 +331,7 @@ fn upload_blob(
> bail!("wrong blob file extension: '{}'", file_name);
> }
>
> - let data = req_body
> - .map_err(Error::from)
> - .try_fold(Vec::new(), |mut acc, chunk| {
> - acc.extend_from_slice(&chunk);
> - future::ok::<_, Error>(acc)
> - })
> - .await?;
> + let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
>
> if encoded_size != data.len() {
> bail!(
> @@ -334,7 +341,7 @@ fn upload_blob(
> );
> }
>
> - env.add_blob(&file_name, data)?;
> + env.add_blob(&file_name, data.to_vec())?;
>
> Ok(env.format_response(Ok(Value::Null)))
> }
> diff --git a/src/api2/helpers.rs b/src/api2/helpers.rs
> index 3dc1befc1..f346b0cca 100644
> --- a/src/api2/helpers.rs
> +++ b/src/api2/helpers.rs
> @@ -2,8 +2,9 @@ use std::path::PathBuf;
>
> use anyhow::Error;
> use futures::stream::TryStreamExt;
> -use hyper::{header, Body, Response, StatusCode};
> +use hyper::{header, Response, StatusCode};
>
> +use proxmox_http::Body;
> use proxmox_router::http_bail;
>
> pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
> diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
> index 62b447096..e7c6213c1 100644
> --- a/src/api2/node/mod.rs
> +++ b/src/api2/node/mod.rs
> @@ -5,10 +5,11 @@ use std::os::unix::io::AsRawFd;
>
> use anyhow::{bail, format_err, Error};
> use futures::future::{FutureExt, TryFutureExt};
> -use hyper::body::Body;
> +use hyper::body::Incoming;
> use hyper::http::request::Parts;
> use hyper::upgrade::Upgraded;
> use hyper::Request;
> +use hyper_util::rt::TokioIo;
> use serde_json::{json, Value};
> use tokio::io::{AsyncBufReadExt, BufReader};
>
> @@ -267,7 +268,7 @@ pub const API_METHOD_WEBSOCKET: ApiMethod = ApiMethod::new(
>
> fn upgrade_to_websocket(
> parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -306,7 +307,7 @@ fn upgrade_to_websocket(
> };
>
> let local = tokio::net::TcpStream::connect(format!("localhost:{}", port)).await?;
> - ws.serve_connection(conn, local).await
> + ws.serve_connection(TokioIo::new(conn), local).await
> });
>
> Ok(response)
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index cad740559..bd6763069 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -3,9 +3,10 @@ use std::io::{BufRead, BufReader};
>
> use anyhow::{bail, Error};
> use futures::FutureExt;
> +use hyper::body::Incoming;
> use hyper::http::request::Parts;
> use hyper::http::{header, Response, StatusCode};
> -use hyper::Body;
> +use proxmox_http::Body;
> use serde_json::{json, Value};
>
> use proxmox_async::stream::AsyncReaderStream;
> @@ -321,7 +322,7 @@ pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
> );
> fn read_task_log(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -404,7 +405,7 @@ fn read_task_log(
> Ok(Response::builder()
> .status(StatusCode::OK)
> .header(header::CONTENT_TYPE, "application/json")
> - .body(Body::from(json.to_string()))
> + .body(json.to_string().into())
> .unwrap())
> }
> .boxed()
> diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
> index 1713f182b..b69000087 100644
> --- a/src/api2/reader/mod.rs
> +++ b/src/api2/reader/mod.rs
> @@ -3,12 +3,15 @@
> use anyhow::{bail, format_err, Error};
> use futures::*;
> use hex::FromHex;
> +use hyper::body::Incoming;
> use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
> use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{Request, Response, StatusCode};
> +use hyper_util::service::TowerToHyperService;
> use serde::Deserialize;
> use serde_json::Value;
>
> +use proxmox_http::Body;
> use proxmox_rest_server::{H2Service, WorkerTask};
> use proxmox_router::{
> http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
> @@ -68,7 +71,7 @@ pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
>
> fn upgrade_to_backup_reader_protocol(
> parts: Parts,
> - req_body: Body,
> + req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -190,7 +193,7 @@ fn upgrade_to_backup_reader_protocol(
> http.initial_connection_window_size(window_size);
> http.max_frame_size(4 * 1024 * 1024);
>
> - http.serve_connection(conn, service)
> + http.serve_connection(conn, TowerToHyperService::new(service))
> .map_err(Error::from)
> .await
> };
> @@ -244,7 +247,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
>
> fn download_file(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -300,7 +303,7 @@ pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
>
> fn download_chunk(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -348,7 +351,7 @@ fn download_chunk(
> /* this is too slow
> fn download_chunk_old(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> param: Value,
> _info: &ApiMethod,
> rpcenv: Box<dyn RpcEnvironment>,
> @@ -393,7 +396,7 @@ pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
>
> fn speedtest(
> _parts: Parts,
> - _req_body: Body,
> + _req_body: Incoming,
> _param: Value,
> _info: &ApiMethod,
> _rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 7b4187550..438fd9d7e 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -1,12 +1,15 @@
> use std::future::Future;
> use std::pin::{pin, Pin};
> +use std::sync::Arc;
>
> use anyhow::{bail, Error};
> -use futures::*;
> use hyper::http::Response;
> -use hyper::{Body, StatusCode};
> +use hyper::StatusCode;
> +use hyper_util::server::graceful::GracefulShutdown;
> +use tokio::net::TcpListener;
> use tracing::level_filters::LevelFilter;
>
> +use proxmox_http::Body;
> use proxmox_lang::try_block;
> use proxmox_rest_server::{ApiConfig, RestServer};
> use proxmox_router::RpcEnvironmentType;
> @@ -34,7 +37,7 @@ fn get_index() -> Pin<Box<dyn Future<Output = Response<Body>> + Send>> {
> Response::builder()
> .status(StatusCode::OK)
> .header(hyper::header::CONTENT_TYPE, "text/html")
> - .body(index.into())
> + .body(index.to_string().into())
> .unwrap()
> })
> }
> @@ -108,17 +111,28 @@ async fn run() -> Result<(), Error> {
> // http server future:
> let server = proxmox_daemon::server::create_daemon(
> ([127, 0, 0, 1], 82).into(),
> - move |listener| {
> - let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
> -
> - Ok(async {
> + move |listener: TcpListener| {
> + Ok(async move {
> proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
> -
> - hyper::Server::builder(incoming)
> - .serve(rest_server)
> - .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> - .map_err(Error::from)
> - .await
> + let graceful = Arc::new(GracefulShutdown::new());
> + loop {
> + let graceful2 = Arc::clone(&graceful);
> + tokio::select! {
> + incoming = listener.accept() => {
> + let (conn, _) = incoming?;
> + let api_service = rest_server.api_service(&conn)?;
> + tokio::spawn(async move { api_service.serve(conn, Some(graceful2)).await });
> + },
> + _shutdown = proxmox_daemon::shutdown_future() => {
> + break;
> + },
> + }
> + }
> + if let Some(shutdown) = Arc::into_inner(graceful) {
> + log::info!("shutting down..");
> + shutdown.shutdown().await
> + }
> + Ok(())
> })
> },
> Some(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN),
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index c9a6032e6..8ee537207 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -7,7 +7,8 @@ use futures::*;
> use hyper::header;
> use hyper::http::request::Parts;
> use hyper::http::Response;
> -use hyper::{Body, StatusCode};
> +use hyper::StatusCode;
> +use hyper_util::server::graceful::GracefulShutdown;
> use tracing::level_filters::LevelFilter;
> use tracing::{info, warn};
> use url::form_urlencoded;
> @@ -15,6 +16,7 @@ use url::form_urlencoded;
> use openssl::ssl::SslAcceptor;
> use serde_json::{json, Value};
>
> +use proxmox_http::Body;
> use proxmox_lang::try_block;
> use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
> use proxmox_sys::fs::CreateOptions;
> @@ -289,27 +291,71 @@ async fn run() -> Result<(), Error> {
> let server = proxmox_daemon::server::create_daemon(
> ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
> move |listener| {
> - let (secure_connections, insecure_connections) =
> + let (mut secure_connections, mut insecure_connections) =
> connections.accept_tls_optional(listener, acceptor);
>
> Ok(async {
> proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
>
> - let secure_server = hyper::Server::builder(secure_connections)
> - .serve(rest_server)
> - .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> - .map_err(Error::from);
> + let secure_server = async move {
> + let graceful = Arc::new(GracefulShutdown::new());
> + loop {
> + let graceful2 = Arc::clone(&graceful);
> + tokio::select! {
> + Some(conn) = secure_connections.next() => {
> + match conn {
> + Ok(conn) => {
> + let api_service = rest_server.api_service(&conn)?;
> + tokio::spawn(async move {
> + api_service.serve(conn, Some(graceful2)).await
> + });
> + },
> + Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
> + }
> + },
> + _shutdown = proxmox_daemon::shutdown_future() => {
> + break;
> + }
> + }
> + }
> + if let Some(shutdown) = Arc::into_inner(graceful) {
> + shutdown.shutdown().await
> + }
> + Ok::<(), Error>(())
> + };
>
> - let insecure_server = hyper::Server::builder(insecure_connections)
> - .serve(redirector)
> - .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> - .map_err(Error::from);
> + let insecure_server = async move {
> + let graceful = Arc::new(GracefulShutdown::new());
> + loop {
> + let graceful2 = Arc::clone(&graceful);
> + tokio::select! {
> + Some(conn) = insecure_connections.next() => {
> + match conn {
> + Ok(conn) => {
> + let redirect_service = redirector.redirect_service();
> + tokio::spawn(async move {
> + redirect_service.serve(conn, Some(graceful2)).await
> + });
> + },
> + Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
> + }
> + },
> + _shutdown = proxmox_daemon::shutdown_future() => {
> + break;
> + }
> + }
> + }
> + if let Some(shutdown) = Arc::into_inner(graceful) {
> + shutdown.shutdown().await
> + }
> + Ok::<(), Error>(())
> + };
The new `secure_server` and `insecure_server` above aren't really that
pretty; perhaps we can clean them up once we got a trait for the return
types of `accept_tls_optional()` and similar :s
In general I'd suggest breaking up the returned coroutine since it's
gotten quite large now, perhaps explicitly defining separate coroutines
for `secure_server` and `insecure_server` each, instead of using `async`
blocks, or explicitly defining the returned coroutine as a whole.
No hard feelings if you don't, though.
>
> let (secure_res, insecure_res) =
> try_join!(tokio::spawn(secure_server), tokio::spawn(insecure_server))
> .context("failed to complete REST server task")?;
>
> - let results = [secure_res, insecure_res];
> + let results: [Result<(), Error>; 2] = [secure_res, insecure_res];
>
> if results.iter().any(Result::is_err) {
> let cat_errors = results
> @@ -321,7 +367,7 @@ async fn run() -> Result<(), Error> {
> bail!(cat_errors);
> }
>
> - Ok(())
> + Ok::<(), Error>(())
> })
> },
> Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-04-02 13:37 UTC|newest]
Thread overview: 32+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36 ` Max Carrara [this message]
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32 ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=D8W6Z2CTLGTA.9MRQMOW9SZK4@proxmox.com \
--to=m.carrara@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal