public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0
@ 2025-03-26 15:23 Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
                   ` (24 more replies)
  0 siblings, 25 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

this RFC series adapts proxmox and proxmox-backup to hyper/http 1.0. I
also have similar patches for PDM, but those require an update of gloo
and proxmox-yew-comp and the basic approach is the same as with the
patches here, and since I expect some feedback to incorporate anyway I
saved those for the first "proper" version.

hyper 1.0 came with a lot of changes, the most notable ones:

Body is now a trait, not a struct
- there's a new Incoming impl for incoming requests on the server side,
and incoming responses on the client side
- http-body-util has some more impls
- proxmox-http has a new impl covering our two common use cases, see
the patch there for details

hyper now doesn't expose tower's Service or tokio's
AsyncRead/AsyncWrite, but has its own variants for both with
corresponding wrappers/adapters.

the previous Accept trait for translation from a listening socket to
connections is gone, an accept loop should be used instead.

the pooling client is moved from hyper to hyper-util. despite its
"legacy" label we still use it, as we'd need to either implement a ton
of code ourself or switch to reqwest otherwise.

graceful shutdown of connections is handled differently, so are
connection ugprades.

I did some rough testing of the usual things without noticing any
breakage, but I am sure I missed some parts. there's also room for
improvement for sure, in particular surrounding the rest-server and
connection accepting part - suggestions welcome!

proxmox workspace:

Fabian Grünbichler (17):
  http: order feature values
  http: rate-limited-stream: update to hyper/http 1.0
  http: adapt MaybeTlsStream to hyper 1.x
  http: adapt connector to hyper 1.x
  http: add Body implementation
  http: adapt simple client to hyper 1.x
  http: websocket: update to http/hyper 1
  openid: use http 0.2 to avoid openidconnect update
  proxmox-login: switch to http 1.x
  client: switch to hyper/http 1.0
  metrics: update to hyper/http 1.0
  acme: switch to http/hyper 1.0
  proxmox-router: update to hyper 1.0
  proxmox-rest-server: update to hyper 1.0
  proxmox-rest-server: fix and extend example
  proxmox-auth-api: update to hyper 1.0
  proxmox-acme-api: update to hyper 1.0

 Cargo.toml                                    |   8 +-
 proxmox-acme-api/Cargo.toml                   |   4 +
 proxmox-acme-api/src/acme_plugin.rs           |  63 +++++--
 proxmox-acme/Cargo.toml                       |   3 +-
 proxmox-acme/src/async_client.rs              |  11 +-
 proxmox-auth-api/Cargo.toml                   |   2 +
 proxmox-auth-api/src/api/access.rs            |   4 +-
 proxmox-client/Cargo.toml                     |   1 +
 proxmox-client/src/client.rs                  |  22 +--
 proxmox-http/Cargo.toml                       |  45 +++--
 proxmox-http/src/body.rs                      | 133 ++++++++++++++
 proxmox-http/src/client/connector.rs          |  44 +++--
 proxmox-http/src/client/simple.rs             |  93 +++++++---
 proxmox-http/src/client/tls.rs                |   2 +-
 proxmox-http/src/lib.rs                       |   5 +
 proxmox-http/src/rate_limited_stream.rs       |   2 +-
 proxmox-http/src/websocket/mod.rs             |   6 +-
 proxmox-login/Cargo.toml                      |   2 +-
 proxmox-metrics/src/influxdb/http.rs          |   5 +-
 proxmox-openid/Cargo.toml                     |   3 +-
 proxmox-rest-server/Cargo.toml                |   9 +-
 .../examples/minimal-rest-server.rs           |  48 ++++-
 proxmox-rest-server/src/api_config.rs         |  44 ++---
 proxmox-rest-server/src/connection.rs         |  14 +-
 proxmox-rest-server/src/formatter.rs          |   8 +-
 proxmox-rest-server/src/h2service.rs          |  15 +-
 proxmox-rest-server/src/lib.rs                |   2 +-
 proxmox-rest-server/src/rest.rs               | 164 +++++++++++-------
 proxmox-router/Cargo.toml                     |   6 +-
 proxmox-router/src/router.rs                  |  19 +-
 proxmox-router/src/stream/parsing.rs          |  16 +-
 31 files changed, 567 insertions(+), 236 deletions(-)
 create mode 100644 proxmox-http/src/body.rs

proxmox-backup:

Fabian Grünbichler (6):
  Revert "h2: switch to legacy feature"
  pbs-client: adapt http client to hyper/http 1.0
  pbs-client: vsock: adapt to hyper/http 1.0
  restore daemon: adapt to hyper/http 1.0
  adapt to hyper/http 1.0
  adapt examples to hyper/http 1.0

 Cargo.toml                                    | 10 ++-
 examples/h2client.rs                          |  6 +-
 examples/h2s-client.rs                        |  6 +-
 examples/h2s-server.rs                        | 28 +++-----
 examples/h2server.rs                          | 28 +++-----
 pbs-client/Cargo.toml                         |  4 +-
 pbs-client/src/backup_writer.rs               |  8 +--
 pbs-client/src/http_client.rs                 | 38 +++++-----
 pbs-client/src/pipe_to_stream.rs              |  2 +-
 pbs-client/src/vsock_client.rs                | 27 +++----
 proxmox-backup-client/Cargo.toml              |  1 +
 proxmox-backup-client/src/snapshot.rs         |  2 +-
 proxmox-restore-daemon/Cargo.toml             |  2 +
 proxmox-restore-daemon/src/main.rs            | 24 +++++--
 .../src/proxmox_restore_daemon/api.rs         |  6 +-
 .../src/proxmox_restore_daemon/auth.rs        |  5 +-
 src/acme/client.rs                            |  6 +-
 src/acme/plugin.rs                            | 62 +++++++++++-----
 src/api2/admin/datastore.rs                   | 20 +++---
 src/api2/backup/environment.rs                |  3 +-
 src/api2/backup/mod.rs                        | 10 +--
 src/api2/backup/upload_chunk.rs               | 47 +++++++------
 src/api2/helpers.rs                           |  3 +-
 src/api2/node/mod.rs                          |  7 +-
 src/api2/node/tasks.rs                        |  7 +-
 src/api2/reader/mod.rs                        | 17 +++--
 src/bin/proxmox-backup-api.rs                 | 40 +++++++----
 src/bin/proxmox-backup-proxy.rs               | 70 +++++++++++++++----
 28 files changed, 297 insertions(+), 192 deletions(-)

-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 01/17] http: order feature values
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
                   ` (23 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

put optional dependencies first, followed by features on external
crates, followed by internal features.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-http/Cargo.toml | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index c8c963f7..5edf304d 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -43,8 +43,8 @@ rate-limited-stream = [
     "dep:hyper",
     "dep:tokio",
     "hyper?/client",
-    "rate-limiter",
     "tokio?/time",
+    "rate-limiter",
 ]
 client = [
     "dep:futures",
@@ -53,14 +53,14 @@ client = [
     "dep:proxmox-compression",
     "dep:tokio",
     "dep:tokio-openssl",
-    "http-helpers",
     "hyper?/client",
     "hyper?/http1",
     "hyper?/http2",
     "hyper?/stream",
     "hyper?/tcp",
-    "rate-limited-stream",
     "tokio?/io-util",
+    "http-helpers",
+    "rate-limited-stream",
 ]
 client-sync = [ "client-trait", "http-helpers", "dep:ureq", "dep:native-tls" ]
 client-trait = [ "dep:http" ]
@@ -72,9 +72,9 @@ websocket = [
     "dep:openssl",
     "dep:proxmox-sys",
     "dep:proxmox-io",
-    "proxmox-io?/tokio",
     "dep:proxmox-lang",
     "dep:tokio",
+    "proxmox-io?/tokio",
     "tokio?/io-util",
     "tokio?/sync",
 ]
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
                   ` (22 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

using the legacy client from hyper_util, which is the replacement for
the pre 1.0 Client from hyper itself

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-http/Cargo.toml                 | 7 +++++--
 proxmox-http/src/rate_limited_stream.rs | 2 +-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 5edf304d..c5137e2a 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -17,6 +17,7 @@ base64 = { workspace = true, optional = true }
 futures = { workspace = true, optional = true }
 http = { workspace = true, optional = true }
 hyper = { workspace = true, optional = true }
+hyper-util = { workspace = true, optional = true }
 native-tls = { workspace = true, optional = true }
 openssl =  { version = "0.10", optional = true }
 serde_json = { workspace = true, optional = true }
@@ -40,9 +41,11 @@ default = []
 
 rate-limiter = ["dep:hyper"]
 rate-limited-stream = [
-    "dep:hyper",
     "dep:tokio",
-    "hyper?/client",
+    "dep:hyper-util",
+    "hyper-util?/client",
+    "hyper-util?/client-legacy",
+    "hyper-util?/http1",
     "tokio?/time",
     "rate-limiter",
 ]
diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
index d43c09c1..e9308a47 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -5,7 +5,7 @@ use std::pin::Pin;
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, Instant};
 
-use hyper::client::connect::{Connected, Connection};
+use hyper_util::client::legacy::connect::{Connected, Connection};
 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
 use tokio::time::Sleep;
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
                   ` (21 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

using the legacy client from hyper_util, which is the replacement for
the pre 1.0 Client from hyper itself

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-http/src/client/tls.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/proxmox-http/src/client/tls.rs b/proxmox-http/src/client/tls.rs
index 81aff783..9eba154a 100644
--- a/proxmox-http/src/client/tls.rs
+++ b/proxmox-http/src/client/tls.rs
@@ -4,7 +4,7 @@ use std::io;
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
-use hyper::client::connect::{Connected, Connection};
+use hyper_util::client::legacy::connect::{Connected, Connection};
 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
 use tokio_openssl::SslStream;
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 04/17] http: adapt connector to hyper 1.x
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (2 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-04-02 13:31   ` Max Carrara
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
                   ` (20 subsequent siblings)
  24 siblings, 1 reply; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

by switching to tower's Service and wrapping in TokioIo as needed. hyper
now uses their own Service type to not expose tower in their public API,
and their own Async IO traits, but they provide wrappers to not require
too many changes for crates like ours here that already used hyper 0.14.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-http/Cargo.toml              |  9 ++++--
 proxmox-http/src/client/connector.rs | 44 ++++++++++++++++++----------
 2 files changed, 36 insertions(+), 17 deletions(-)

diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index c5137e2a..4ec142c9 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -25,6 +25,7 @@ tokio = { workspace = true, features = [], optional = true }
 tokio-openssl = { workspace = true, optional = true }
 ureq = { version = "2.4", features = ["native-certs", "native-tls"], optional = true, default-features = false }
 url = { workspace = true, optional = true }
+tower-service = { workspace = true, optional = true }
 
 proxmox-async = { workspace = true, optional = true }
 proxmox-sys = { workspace = true, optional = true }
@@ -52,15 +53,19 @@ rate-limited-stream = [
 client = [
     "dep:futures",
     "dep:hyper",
+    "dep:hyper-util",
     "dep:openssl",
     "dep:proxmox-compression",
     "dep:tokio",
     "dep:tokio-openssl",
+    "dep:tower-service",
     "hyper?/client",
     "hyper?/http1",
     "hyper?/http2",
-    "hyper?/stream",
-    "hyper?/tcp",
+    "hyper-util?/client",
+    "hyper-util?/client-legacy",
+    "hyper-util?/http1",
+    "hyper-util?/tokio",
     "tokio?/io-util",
     "http-helpers",
     "rate-limited-stream",
diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index 63b9d10c..70421793 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -6,7 +6,8 @@ use std::task::{Context, Poll};
 
 use futures::*;
 use http::Uri;
-use hyper::client::HttpConnector;
+use hyper_util::client::legacy::connect::HttpConnector;
+use hyper_util::rt::TokioIo;
 use openssl::ssl::SslConnector;
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tokio::net::TcpStream;
@@ -122,8 +123,8 @@ impl HttpsConnector {
     }
 }
 
-impl hyper::service::Service<Uri> for HttpsConnector {
-    type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
+impl tower_service::Service<Uri> for HttpsConnector {
+    type Response = TokioIo<MaybeTlsStream<RateLimitedStream<TcpStream>>>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
     type Future =
@@ -171,9 +172,13 @@ impl hyper::service::Service<Uri> for HttpsConnector {
             if use_connect {
                 async move {
                     use std::fmt::Write as _;
-                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
-                        format_err!("error connecting to {} - {}", proxy_authority, err)
-                    })?;
+                    let tcp_stream = connector
+                        .call(proxy_uri)
+                        .await
+                        .map_err(|err| {
+                            format_err!("error connecting to {} - {}", proxy_authority, err)
+                        })?
+                        .into_inner();
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
@@ -196,24 +201,30 @@ impl hyper::service::Service<Uri> for HttpsConnector {
                     Self::parse_connect_response(&mut tcp_stream).await?;
 
                     if is_https {
-                        Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+                        Self::secure_stream(tcp_stream, &ssl_connector, &host)
+                            .await
+                            .map(TokioIo::new)
                     } else {
-                        Ok(MaybeTlsStream::Normal(tcp_stream))
+                        Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
                     }
                 }
                 .boxed()
             } else {
                 async move {
-                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
-                        format_err!("error connecting to {} - {}", proxy_authority, err)
-                    })?;
+                    let tcp_stream = connector
+                        .call(proxy_uri)
+                        .await
+                        .map_err(|err| {
+                            format_err!("error connecting to {} - {}", proxy_authority, err)
+                        })?
+                        .into_inner();
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
                     let tcp_stream =
                         RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
 
-                    Ok(MaybeTlsStream::Proxied(tcp_stream))
+                    Ok(TokioIo::new(MaybeTlsStream::Proxied(tcp_stream)))
                 }
                 .boxed()
             }
@@ -223,7 +234,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
                 let tcp_stream = connector
                     .call(dst)
                     .await
-                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
+                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?
+                    .into_inner();
 
                 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
@@ -231,9 +243,11 @@ impl hyper::service::Service<Uri> for HttpsConnector {
                     RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
 
                 if is_https {
-                    Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+                    Self::secure_stream(tcp_stream, &ssl_connector, &host)
+                        .await
+                        .map(TokioIo::new)
                 } else {
-                    Ok(MaybeTlsStream::Normal(tcp_stream))
+                    Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
                 }
             }
             .boxed()
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (3 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-04-02 13:31   ` Max Carrara
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
                   ` (19 subsequent siblings)
  24 siblings, 1 reply; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

hyper/http 1.0 now only have a Body trait and some implementations for
specific use cases. following reqwest's lead (and copying some parts of
its implementation), implement our own Body struct for the two common
use cases:
- a body instance containing the full body data as Bytes
- a streaming body instance

together with the most common helper methods (empty body, convert, wrap
existing stream as body) this should make the rest of the upgrade fairly
straight-forward.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-http/Cargo.toml  |  10 +++
 proxmox-http/src/body.rs | 133 +++++++++++++++++++++++++++++++++++++++
 proxmox-http/src/lib.rs  |   5 ++
 3 files changed, 148 insertions(+)
 create mode 100644 proxmox-http/src/body.rs

diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 4ec142c9..1fbc70a8 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -40,6 +40,15 @@ flate2 = { workspace = true }
 [features]
 default = []
 
+body = [
+    "dep:bytes",
+    "dep:futures",
+    "dep:http-body",
+    "dep:http-body-util",
+    "dep:hyper",
+    "dep:sync_wrapper",
+    "sync_wrapper?/futures",
+]
 rate-limiter = ["dep:hyper"]
 rate-limited-stream = [
     "dep:tokio",
@@ -67,6 +76,7 @@ client = [
     "hyper-util?/http1",
     "hyper-util?/tokio",
     "tokio?/io-util",
+    "body",
     "http-helpers",
     "rate-limited-stream",
 ]
diff --git a/proxmox-http/src/body.rs b/proxmox-http/src/body.rs
new file mode 100644
index 00000000..3eb17355
--- /dev/null
+++ b/proxmox-http/src/body.rs
@@ -0,0 +1,133 @@
+use std::{pin::Pin, task::Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+
+use futures::ready;
+use http_body_util::combinators::BoxBody;
+use hyper::body::{Body as HyperBody, Frame, SizeHint};
+
+// Partially copied and heavily based on reqwest 0.12 Body implementation from src/async_impl/body.rs
+// Copyright (c) 2016-2025 Sean McArthur
+
+/// Custom implementation of hyper::body::Body supporting either a "full" body that can return its
+/// contents as byte sequence in one go, or "streaming" body that can be polled.
+pub struct Body {
+    inner: InnerBody,
+}
+
+enum InnerBody {
+    Full(Bytes),
+    Streaming(BoxBody<Bytes, Error>),
+}
+
+impl Body {
+    /// Shortcut for creating an empty body instance with no data.
+    pub fn empty() -> Self {
+        Bytes::new().into()
+    }
+
+    /// Returns the body contents if it is a "full" body, None otherwise.
+    pub fn as_bytes(&self) -> Option<&[u8]> {
+        match self.inner {
+            InnerBody::Full(ref bytes) => Some(bytes),
+            InnerBody::Streaming(_) => None,
+        }
+    }
+
+    pub fn wrap_stream<S>(stream: S) -> Body
+    where
+        S: futures::stream::TryStream + Send + 'static,
+
+        S::Error: Into<Error>,
+
+        Bytes: From<S::Ok>,
+    {
+        Body::stream(stream)
+    }
+
+    pub(crate) fn stream<S>(stream: S) -> Body
+    where
+        S: futures::stream::TryStream + Send + 'static,
+
+        S::Error: Into<Error>,
+
+        Bytes: From<S::Ok>,
+    {
+        use futures::TryStreamExt;
+
+        use http_body::Frame;
+
+        use http_body_util::StreamBody;
+
+        let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
+            stream
+                .map_ok(|d| Frame::data(Bytes::from(d)))
+                .map_err(Into::into),
+        )));
+
+        Body {
+            inner: InnerBody::Streaming(body),
+        }
+    }
+}
+
+impl HyperBody for Body {
+    type Data = Bytes;
+
+    type Error = Error;
+
+    fn poll_frame(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
+        match self.inner {
+            InnerBody::Full(ref mut bytes) => {
+                let res = bytes.split_off(0);
+                if res.is_empty() {
+                    return Poll::Ready(None);
+                } else {
+                    return Poll::Ready(Some(Ok(Frame::data(res))));
+                }
+            }
+            InnerBody::Streaming(ref mut body) => Poll::Ready(
+                ready!(Pin::new(body).poll_frame(cx))
+                    .map(|opt_chunk| opt_chunk.map_err(Error::from)),
+            ),
+        }
+    }
+
+    fn is_end_stream(&self) -> bool {
+        match self.inner {
+            InnerBody::Full(ref bytes) => bytes.is_empty(),
+            InnerBody::Streaming(ref box_body) => box_body.is_end_stream(),
+        }
+    }
+
+    fn size_hint(&self) -> hyper::body::SizeHint {
+        match self.inner {
+            InnerBody::Full(ref bytes) => SizeHint::with_exact(bytes.len() as u64),
+            InnerBody::Streaming(ref box_body) => box_body.size_hint(),
+        }
+    }
+}
+
+impl From<Bytes> for Body {
+    fn from(value: Bytes) -> Self {
+        Self {
+            inner: InnerBody::Full(value),
+        }
+    }
+}
+
+impl From<Vec<u8>> for Body {
+    fn from(value: Vec<u8>) -> Self {
+        Bytes::from(value).into()
+    }
+}
+
+impl From<String> for Body {
+    fn from(value: String) -> Self {
+        Bytes::copy_from_slice(value.as_bytes()).into()
+    }
+}
diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 4770aaf4..8b6953b0 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -35,3 +35,8 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
 mod rate_limited_stream;
 #[cfg(feature = "rate-limited-stream")]
 pub use rate_limited_stream::RateLimitedStream;
+
+#[cfg(feature = "body")]
+mod body;
+#[cfg(feature = "body")]
+pub use body::Body;
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (4 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
                   ` (18 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

the main change requiring adaptations here is that hyper no longer
provides a Body struct, but switched to a trait, so we use our own Body
type introduced by the previous commit.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 Cargo.toml                        |  8 ++-
 proxmox-http/Cargo.toml           |  8 ++-
 proxmox-http/src/client/simple.rs | 93 ++++++++++++++++++++++---------
 3 files changed, 81 insertions(+), 28 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 268b39eb..202cfd1e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -84,8 +84,11 @@ form_urlencoded = "1.1"
 futures = "0.3"
 handlebars = "3.0"
 hex = "0.4"
-http = "0.2"
-hyper = "0.14.5"
+http = "1"
+http-body = "1"
+http-body-util = "0.1"
+hyper-util = "0.1"
+hyper = "1"
 ldap3 = { version = "0.11", default-features = false }
 lettre = "0.11.1"
 libc = "0.2.107"
@@ -105,6 +108,7 @@ serde_cbor = "0.11.1"
 serde_json = "1.0"
 serde_plain = "1.0"
 syn = { version = "2", features = [ "full", "visit-mut" ] }
+sync_wrapper = "1.0.2"
 tar = "0.4"
 tokio = "1.6"
 tokio-openssl = "0.6.1"
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 1fbc70a8..7668c6ee 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -14,13 +14,17 @@ rust-version.workspace = true
 [dependencies]
 anyhow.workspace = true
 base64 = { workspace = true, optional = true }
+bytes = { workspace = true, optional = true }
 futures = { workspace = true, optional = true }
 http = { workspace = true, optional = true }
+http-body = { workspace = true, optional = true }
+http-body-util = { workspace = true, optional = true }
 hyper = { workspace = true, optional = true }
 hyper-util = { workspace = true, optional = true }
 native-tls = { workspace = true, optional = true }
 openssl =  { version = "0.10", optional = true }
 serde_json = { workspace = true, optional = true }
+sync_wrapper = { workspace = true, optional = true }
 tokio = { workspace = true, features = [], optional = true }
 tokio-openssl = { workspace = true, optional = true }
 ureq = { version = "2.4", features = ["native-certs", "native-tls"], optional = true, default-features = false }
@@ -60,8 +64,10 @@ rate-limited-stream = [
     "rate-limiter",
 ]
 client = [
+    "dep:bytes",
     "dep:futures",
-    "dep:hyper",
+    "dep:http-body",
+    "dep:http-body-util",
     "dep:hyper-util",
     "dep:openssl",
     "dep:proxmox-compression",
diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
index 062889ac..b5318f19 100644
--- a/proxmox-http/src/client/simple.rs
+++ b/proxmox-http/src/client/simple.rs
@@ -1,19 +1,24 @@
 use anyhow::{bail, format_err, Error};
 use std::collections::HashMap;
 
+use std::fmt::Display;
+
+#[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
+use http::header::HeaderName;
 #[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
 use std::str::FromStr;
 
 use futures::*;
-#[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
-use http::header::HeaderName;
+
 use http::{HeaderValue, Request, Response};
-use hyper::client::Client as HyperClient;
-use hyper::client::HttpConnector;
-use hyper::Body;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper_util::client::legacy::connect::HttpConnector;
+use hyper_util::client::legacy::Client as HyperClient;
+use hyper_util::rt::TokioExecutor;
 use openssl::ssl::{SslConnector, SslMethod};
 
 use crate::client::HttpsConnector;
+use crate::Body;
 use crate::HttpOptions;
 
 /// Asynchronous HTTP client implementation
@@ -44,7 +49,9 @@ impl Client {
         if let Some(ref proxy_config) = options.proxy_config {
             https.set_proxy(proxy_config.clone());
         }
-        let client = HyperClient::builder().build(https);
+
+        let client =
+            HyperClient::builder(TokioExecutor::new()).build::<HttpsConnector, Body>(https);
         Self { client, options }
     }
 
@@ -74,7 +81,7 @@ impl Client {
 
         request
             .headers_mut()
-            .insert(hyper::header::USER_AGENT, user_agent);
+            .insert(http::header::USER_AGENT, user_agent);
 
         self.add_proxy_headers(&mut request)?;
 
@@ -94,7 +101,7 @@ impl Client {
         let mut request = Request::builder()
             .method("POST")
             .uri(uri)
-            .header(hyper::header::CONTENT_TYPE, content_type);
+            .header(http::header::CONTENT_TYPE, content_type);
 
         if let Some(extra_headers) = extra_headers {
             for (header, value) in extra_headers {
@@ -102,9 +109,8 @@ impl Client {
             }
         }
 
-        let request = request.body(body.unwrap_or_default())?;
-
-        self.request(request).await
+        let body = body.unwrap_or(Body::empty());
+        self.request(request.body(body)?).await
     }
 
     pub async fn get_string(
@@ -145,7 +151,7 @@ impl Client {
             Ok(res) => {
                 let (parts, body) = res.into_parts();
 
-                let buf = hyper::body::to_bytes(body).await?;
+                let buf = body.collect().await?.to_bytes();
                 let new_body = String::from_utf8(buf.to_vec())
                     .map_err(|err| format_err!("Error converting HTTP result data: {}", err))?;
 
@@ -154,6 +160,25 @@ impl Client {
             Err(err) => Err(err),
         }
     }
+
+    pub async fn response_body_bytes(res: Response<Body>) -> Result<Body, Error> {
+        Self::convert_body_to_bytes(Ok(res))
+            .await
+            .map(|res| res.into_body())
+    }
+
+    async fn convert_body_to_bytes(
+        response: Result<Response<Body>, Error>,
+    ) -> Result<Response<Body>, Error> {
+        match response {
+            Ok(res) => {
+                let (parts, body) = res.into_parts();
+                let buf = body.collect().await?.to_bytes();
+                Ok(Response::from_parts(parts, buf.into()))
+            }
+            Err(err) => Err(err),
+        }
+    }
 }
 
 impl Default for Client {
@@ -181,7 +206,9 @@ impl crate::HttpClient<Body, Body> for Client {
             }
         }
 
-        proxmox_async::runtime::block_on(self.request(req))
+        proxmox_async::runtime::block_on(async move {
+            Self::convert_body_to_bytes(self.request(req).await).await
+        })
     }
 
     fn post(
@@ -191,11 +218,16 @@ impl crate::HttpClient<Body, Body> for Client {
         content_type: Option<&str>,
         extra_headers: Option<&HashMap<String, String>>,
     ) -> Result<Response<Body>, Error> {
-        proxmox_async::runtime::block_on(self.post(uri, body, content_type, extra_headers))
+        proxmox_async::runtime::block_on(async move {
+            Self::convert_body_to_bytes(self.post(uri, body, content_type, extra_headers).await)
+                .await
+        })
     }
 
     fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
-        proxmox_async::runtime::block_on(async move { self.request(request).await })
+        proxmox_async::runtime::block_on(async move {
+            Self::convert_body_to_bytes(self.request(request).await).await
+        })
     }
 }
 
@@ -231,7 +263,7 @@ impl crate::HttpClient<String, String> for Client {
         extra_headers: Option<&HashMap<String, String>>,
     ) -> Result<Response<String>, Error> {
         proxmox_async::runtime::block_on(async move {
-            let body = body.map(|s| Body::from(s.into_bytes()));
+            let body = body.map(|s| s.into());
             Self::convert_body_to_string(self.post(uri, body, content_type, extra_headers).await)
                 .await
         })
@@ -240,25 +272,34 @@ impl crate::HttpClient<String, String> for Client {
     fn request(&self, request: Request<String>) -> Result<Response<String>, Error> {
         proxmox_async::runtime::block_on(async move {
             let (parts, body) = request.into_parts();
-            let body = Body::from(body);
+            let body = body.into();
             let request = Request::from_parts(parts, body);
             Self::convert_body_to_string(self.request(request).await).await
         })
     }
 }
 
-/// Wraps the `Body` stream in a DeflateDecoder stream if the `Content-Encoding`
+/// Wraps the `Response` contents in a DeflateDecoder stream if the `Content-Encoding`
 /// header of the response is `deflate`, otherwise returns the original
 /// response.
-async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
-    let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
-        return Ok(res);
+async fn decode_response<B>(mut res: Response<B>) -> Result<Response<Body>, Error>
+where
+    B: http_body::Body<Data = bytes::Bytes> + Send + Unpin + 'static,
+    <B as http_body::Body>::Error: Into<Error> + Display,
+{
+    let Some(content_encoding) = res.headers_mut().remove(&http::header::CONTENT_ENCODING) else {
+        let (parts, body) = res.into_parts();
+        let stream = BodyDataStream::new(body);
+        let body = Body::wrap_stream(stream);
+        return Ok(Response::from_parts(parts, body));
     };
 
     let encodings = content_encoding.to_str()?;
     if encodings == "deflate" {
         let (parts, body) = res.into_parts();
-        let decoder = proxmox_compression::DeflateDecoder::builder(body)
+
+        let stream = BodyDataStream::new(body);
+        let decoder = proxmox_compression::DeflateDecoder::builder(stream)
             .zlib(true)
             .build();
         let decoded_body = Body::wrap_stream(decoder);
@@ -270,6 +311,8 @@ async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Erro
 
 #[cfg(test)]
 mod test {
+    use bytes::Bytes;
+
     use super::*;
 
     use std::io::Write;
@@ -282,10 +325,10 @@ si aliquod aeternum et infinitum impendere."#;
     #[tokio::test]
     async fn test_parse_response_deflate() {
         let encoded = encode_deflate(BODY.as_bytes()).unwrap();
-        let encoded_body = Body::from(encoded);
+        let encoded_body = Body::from(Bytes::from(encoded));
         let encoded_response = Response::builder()
-            .header(hyper::header::CONTENT_ENCODING, "deflate")
-            .body(encoded_body)
+            .header(http::header::CONTENT_ENCODING, "deflate")
+            .body::<Body>(encoded_body)
             .unwrap();
 
         let decoded_response = decode_response(encoded_response).await.unwrap();
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (5 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
                   ` (17 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

use the new Empty body type, since Body is now a trait and no longer a
struct containing different body types.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-http/Cargo.toml           | 3 ++-
 proxmox-http/src/websocket/mod.rs | 6 ++++--
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 7668c6ee..6eed6e23 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -92,7 +92,7 @@ http-helpers = [ "dep:base64", "dep:http", "dep:proxmox-sys", "dep:serde_json",
 websocket = [
     "dep:base64",
     "dep:futures",
-    "dep:hyper",
+    "dep:http",
     "dep:openssl",
     "dep:proxmox-sys",
     "dep:proxmox-io",
@@ -101,4 +101,5 @@ websocket = [
     "proxmox-io?/tokio",
     "tokio?/io-util",
     "tokio?/sync",
+    "body",
 ]
diff --git a/proxmox-http/src/websocket/mod.rs b/proxmox-http/src/websocket/mod.rs
index eef5fa8e..576fc18d 100644
--- a/proxmox-http/src/websocket/mod.rs
+++ b/proxmox-http/src/websocket/mod.rs
@@ -11,11 +11,11 @@ use std::task::{Context, Poll};
 
 use anyhow::{bail, format_err, Error};
 use futures::select;
-use hyper::header::{
+use http::header::{
     HeaderMap, HeaderValue, CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY,
     SEC_WEBSOCKET_PROTOCOL, SEC_WEBSOCKET_VERSION, UPGRADE,
 };
-use hyper::{Body, Response, StatusCode};
+use http::{Response, StatusCode};
 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
 use tokio::sync::mpsc;
 
@@ -25,6 +25,8 @@ use futures::ready;
 use proxmox_io::ByteBuffer;
 use proxmox_lang::io_format_err;
 
+use crate::Body;
+
 // see RFC6455 section 7.4.1
 #[derive(Debug, Clone, Copy)]
 #[repr(u16)]
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (6 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
                   ` (16 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

to avoid too much churn right now

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-openid/Cargo.toml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/proxmox-openid/Cargo.toml b/proxmox-openid/Cargo.toml
index fde67b61..65a8587d 100644
--- a/proxmox-openid/Cargo.toml
+++ b/proxmox-openid/Cargo.toml
@@ -14,7 +14,8 @@ rust-version.workspace = true
 
 [dependencies]
 anyhow.workspace = true
-http.workspace = true
+# until oauth2 and openidconnect are updated
+http.version = "0.2"
 nix.workspace = true
 serde = { workspace = true, features = ["derive"] }
 serde_json.workspace = true
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (7 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
                   ` (15 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-login/Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/proxmox-login/Cargo.toml b/proxmox-login/Cargo.toml
index 50dfe2c8..a78fa317 100644
--- a/proxmox-login/Cargo.toml
+++ b/proxmox-login/Cargo.toml
@@ -21,7 +21,7 @@ serde_json.workspace = true
 webauthn-rs = { workspace = true, optional = true }
 
 # For `Authentication::set_auth_headers`
-http = { version = "0.2.4", optional = true }
+http = { workspace = true, optional = true }
 
 [target.'cfg(target_arch="wasm32")'.dependencies]
 js-sys = "0.3.55"
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (8 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
                   ` (14 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

and adapt to the corresponding proxmox-http changes.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-client/Cargo.toml    |  1 +
 proxmox-client/src/client.rs | 22 ++++++++++++----------
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/proxmox-client/Cargo.toml b/proxmox-client/Cargo.toml
index f890501e..25d41b2b 100644
--- a/proxmox-client/Cargo.toml
+++ b/proxmox-client/Cargo.toml
@@ -14,6 +14,7 @@ repository.workspace = true
 anyhow.workspace = true
 hex.workspace = true
 http.workspace = true
+http-body-util.workspace = true
 serde.workspace = true
 serde_json.workspace = true
 
diff --git a/proxmox-client/src/client.rs b/proxmox-client/src/client.rs
index 6f1c9ef1..da2c5c59 100644
--- a/proxmox-client/src/client.rs
+++ b/proxmox-client/src/client.rs
@@ -8,13 +8,14 @@ use http::request::Request;
 use http::uri::PathAndQuery;
 use http::Method;
 use http::{StatusCode, Uri};
-use hyper::body::{Body, HttpBody};
+use http_body_util::BodyExt;
 use openssl::hash::MessageDigest;
 use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
 use openssl::x509::{self, X509};
 use proxmox_login::Ticket;
 use serde::Serialize;
 
+use proxmox_http::Body;
 use proxmox_login::ticket::Validity;
 use proxmox_login::{Login, SecondFactorChallenge, TicketResult};
 
@@ -223,7 +224,7 @@ impl Client {
         // send an `Accept: application/json-seq` header.
         streaming: bool,
         cookie_name: &Option<String>,
-    ) -> Result<(http::response::Parts, hyper::Body), Error> {
+    ) -> Result<(http::response::Parts, Body), Error> {
         let mut request = auth.set_auth_headers_with_cookie_name(
             Request::builder().method(method).uri(uri),
             cookie_name,
@@ -237,7 +238,7 @@ impl Client {
                 .header(http::header::CONTENT_TYPE, "application/json")
                 .body(body.into())
         } else {
-            request.body(Default::default())
+            request.body(Body::empty())
         }
         .map_err(|err| Error::internal("failed to build request", err))?;
 
@@ -449,12 +450,13 @@ impl Client {
     }
 }
 
-async fn read_body(mut body: Body) -> Result<Vec<u8>, Error> {
-    let mut data = Vec::<u8>::new();
-    while let Some(more) = body.data().await {
-        let more = more.map_err(|err| Error::internal("error reading response body", err))?;
-        data.extend(&more[..]);
-    }
+async fn read_body(body: Body) -> Result<Vec<u8>, Error> {
+    let data = body
+        .collect()
+        .await
+        .map_err(Error::Anyhow)?
+        .to_bytes()
+        .to_vec();
     Ok(data)
 }
 
@@ -465,7 +467,7 @@ impl HttpApiClient for Client {
     type ResponseStreamFuture<'a> =
         Pin<Box<dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, Error>> + Send + 'a>>;
 
-    type Body = hyper::Body;
+    type Body = Body;
 
     fn request<'a, T>(
         &'a self,
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 11/17] metrics: update to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (9 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
                   ` (13 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

and use new Body from proxmox-http

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-metrics/src/influxdb/http.rs | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
index d773c16f..074d2ada 100644
--- a/proxmox-metrics/src/influxdb/http.rs
+++ b/proxmox-metrics/src/influxdb/http.rs
@@ -1,12 +1,11 @@
 use std::sync::Arc;
 
 use anyhow::{bail, Error};
-use hyper::Body;
 use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
 use tokio::sync::mpsc;
 
 use proxmox_http::client::Client;
-use proxmox_http::HttpOptions;
+use proxmox_http::{Body, HttpOptions};
 
 use crate::influxdb::utils;
 use crate::{Metrics, MetricsData};
@@ -174,7 +173,7 @@ impl InfluxDbHttp {
             request = request.header("Authorization", format!("Token {}", token));
         }
 
-        let request = request.body(Body::from(self.data.split_off(0)))?;
+        let request = request.body(self.data.split_off(0).into())?;
 
         let res = self.client.request(request).await?;
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (10 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-04-02 13:31   ` Max Carrara
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
                   ` (12 subsequent siblings)
  24 siblings, 1 reply; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-acme/Cargo.toml          |  3 ++-
 proxmox-acme/src/async_client.rs | 11 +++++++----
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/proxmox-acme/Cargo.toml b/proxmox-acme/Cargo.toml
index f6dbe481..26b92f98 100644
--- a/proxmox-acme/Cargo.toml
+++ b/proxmox-acme/Cargo.toml
@@ -26,6 +26,7 @@ proxmox-schema = { workspace = true, optional = true, features = [ "api-macro" ]
 proxmox-http = { workspace = true, optional = true, features = [ "client" ] }
 anyhow = { workspace = true, optional = true }
 bytes = { workspace = true, optional = true }
+http-body-util = { workspace = true, optional = true }
 hyper = { workspace = true, optional = true }
 
 [dependencies.ureq]
@@ -39,7 +40,7 @@ default = [ "impl" ]
 api-types = [ "dep:proxmox-schema" ]
 impl = [ "api-types", "dep:openssl" ]
 client = [ "impl", "dep:ureq", "dep:native-tls"]
-async-client = [ "impl", "dep:hyper", "dep:proxmox-http", "dep:anyhow", "dep:bytes" ]
+async-client = [ "impl", "dep:http-body-util", "dep:hyper", "dep:proxmox-http", "dep:anyhow", "dep:bytes" ]
 
 [dev-dependencies]
 anyhow.workspace = true
diff --git a/proxmox-acme/src/async_client.rs b/proxmox-acme/src/async_client.rs
index 6e38570f..a29b6f91 100644
--- a/proxmox-acme/src/async_client.rs
+++ b/proxmox-acme/src/async_client.rs
@@ -2,10 +2,11 @@
 
 use anyhow::format_err;
 use bytes::Bytes;
-use hyper::{Body, Request};
+use http_body_util::BodyExt;
+use hyper::Request;
 use serde::{Deserialize, Serialize};
 
-use proxmox_http::client::Client;
+use proxmox_http::{client::Client, Body};
 
 use crate::account::AccountCreator;
 use crate::order::{Order, OrderData};
@@ -400,9 +401,11 @@ impl AcmeClient {
         let (parts, body) = response.into_parts();
 
         let status = parts.status.as_u16();
-        let body = hyper::body::to_bytes(body)
+        let body = body
+            .collect()
             .await
-            .map_err(|err| Error::Custom(format!("failed to retrieve response body: {}", err)))?;
+            .map_err(|err| Error::Custom(format!("failed to retrieve response body: {}", err)))?
+            .to_bytes();
 
         let got_nonce = if let Some(new_nonce) = parts.headers.get(crate::REPLAY_NONCE) {
             let new_nonce = new_nonce.to_str().map_err(|err| {
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (11 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
                   ` (11 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

and switch to proxmox_http's body. hyper now has a special (opaque) Body
implementation called Incoming which is used for incoming request bodies
on the server side, and incoming response bodies on the client side, so
our API handler's now consume an instance of this type.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-router/Cargo.toml            |  6 ++++--
 proxmox-router/src/router.rs         | 19 +++++++++++--------
 proxmox-router/src/stream/parsing.rs | 16 +++++++++++-----
 3 files changed, 26 insertions(+), 15 deletions(-)

diff --git a/proxmox-router/Cargo.toml b/proxmox-router/Cargo.toml
index 9f1af3e9..de3f09e3 100644
--- a/proxmox-router/Cargo.toml
+++ b/proxmox-router/Cargo.toml
@@ -19,6 +19,7 @@ required-features = [ "cli" ]
 
 [dependencies]
 anyhow.workspace = true
+bytes = { workspace = true, optional = true }
 env_logger = { workspace = true, optional = true }
 futures.workspace = true
 http = { workspace = true, optional = true }
@@ -34,6 +35,7 @@ unicode-width ="0.1.8"
 rustyline = { version = "9", optional = true }
 libc = { workspace = true, optional = true }
 
+proxmox-http = { workspace = true, optional = true }
 proxmox-http-error.workspace = true
 proxmox-schema.workspace = true
 proxmox-async.workspace = true
@@ -45,6 +47,6 @@ tokio-stream.workspace = true
 [features]
 default = [ "cli", "server" ]
 cli = [ "stream", "dep:env_logger", "dep:libc", "dep:rustyline" ]
-server = [ "dep:http", "dep:hyper" ]
+server = [ "dep:http", "dep:hyper", "dep:proxmox-http", "proxmox-http?/body" ]
 test-harness = [ "proxmox-schema/test-harness" ]
-stream = [ "dep:hyper" ]
+stream = [ "dep:bytes", "dep:hyper", "dep:proxmox-http", "proxmox-http?/body" ]
diff --git a/proxmox-router/src/router.rs b/proxmox-router/src/router.rs
index 49593508..04552a91 100644
--- a/proxmox-router/src/router.rs
+++ b/proxmox-router/src/router.rs
@@ -8,9 +8,10 @@ use anyhow::Error;
 use http::request::Parts;
 #[cfg(feature = "server")]
 use http::{Method, Response};
-#[cfg(feature = "server")]
-use hyper::Body;
+use hyper::body::Incoming;
 use percent_encoding::percent_decode_str;
+#[cfg(feature = "server")]
+use proxmox_http::Body;
 use serde::Serialize;
 use serde_json::Value;
 
@@ -393,14 +394,15 @@ impl IntoRecord for Result<Record, Error> {
 /// ```
 /// # use serde_json::{json, Value};
 /// #
-/// use hyper::{Body, Response, http::request::Parts};
+/// use hyper::{Response, body::Incoming, http::request::Parts};
 ///
+/// use proxmox_http::Body;
 /// use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
 /// use proxmox_schema::ObjectSchema;
 ///
 /// fn low_level_hello(
 ///    parts: Parts,
-///    req_body: Body,
+///    req_body: Incoming,
 ///    param: Value,
 ///    info: &ApiMethod,
 ///    rpcenv: Box<dyn RpcEnvironment>,
@@ -408,7 +410,7 @@ impl IntoRecord for Result<Record, Error> {
 ///    Box::pin(async move {
 ///        let response = http::Response::builder()
 ///            .status(200)
-///            .body(Body::from("Hello world!"))?;
+///            .body(Body::from("Hello world!".to_string()))?;
 ///        Ok(response)
 ///    })
 /// }
@@ -421,7 +423,7 @@ impl IntoRecord for Result<Record, Error> {
 #[cfg(feature = "server")]
 pub type ApiAsyncHttpHandlerFn = &'static (dyn Fn(
     Parts,
-    Body,
+    Incoming,
     Value,
     &'static ApiMethod,
     Box<dyn RpcEnvironment>,
@@ -443,8 +445,9 @@ pub type ApiResponseFuture =
 /// ```
 /// use serde_json::Value;
 ///
-/// use hyper::{Body, Response, http::request::Parts};
+/// use hyper::{Response, http::request::Parts};
 ///
+/// use proxmox_http::Body;
 /// use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
 /// use proxmox_schema::ObjectSchema;
 ///
@@ -457,7 +460,7 @@ pub type ApiResponseFuture =
 ///    Box::pin(async move {
 ///        let response = http::Response::builder()
 ///            .status(200)
-///            .body(Body::from("Hello world!"))?;
+///            .body(Body::from("Hello world!".to_string()))?;
 ///        Ok(response)
 ///    })
 /// }
diff --git a/proxmox-router/src/stream/parsing.rs b/proxmox-router/src/stream/parsing.rs
index 1726d2f8..d9d00e59 100644
--- a/proxmox-router/src/stream/parsing.rs
+++ b/proxmox-router/src/stream/parsing.rs
@@ -4,10 +4,12 @@ use std::pin::Pin;
 use std::task::{ready, Poll};
 
 use anyhow::{format_err, Context as _, Error};
+use bytes::Bytes;
 use futures::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, BufReader};
-use hyper::body::{Body, Bytes};
 use serde::Deserialize;
 
+use proxmox_http::Body;
+
 use super::Record;
 
 pub struct Records<R = BodyBufReader>
@@ -269,8 +271,7 @@ impl AsyncBufRead for BodyBufReader {
         self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> Poll<io::Result<&[u8]>> {
-        use hyper::body::HttpBody;
-
+        use hyper::body::Body as HyperBody;
         let Self {
             ref mut reader,
             ref mut buf_at,
@@ -283,12 +284,17 @@ impl AsyncBufRead for BodyBufReader {
 
             let result = match reader {
                 None => return Poll::Ready(Ok(&[])),
-                Some(reader) => ready!(Pin::new(reader).poll_data(cx)),
+                Some(reader) => ready!(Pin::new(reader).poll_frame(cx)),
             };
 
             match result {
                 Some(Ok(bytes)) => {
-                    *buf_at = Some((bytes, 0));
+                    *buf_at = Some((
+                        bytes
+                            .into_data()
+                            .map_err(|_frame| io::Error::other("Failed to read frame from body"))?,
+                        0,
+                    ));
                 }
                 Some(Err(err)) => {
                     *reader = None;
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: update to hyper 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (12 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-04-02 13:34   ` Max Carrara
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
                   ` (10 subsequent siblings)
  24 siblings, 1 reply; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

and switch to proxmox-http's Body implementation.

hyper now has a special (opaque) Body implementation called Incoming
which is used for incoming request bodies on the server side, and
incoming response bodies on the client side, so our API handler's now
consume an instance of this type.

the Accept trait previously offered by hyper is gone in 1.0, and needs
to be replaced with an accept loop. our corresponding Acceptor
implementations have been dropped as well.

hyper now has its own Service and async Read/Write traits, but helpfully
provides wrappers for tower's Service and tokio's AsyncRead/AsyncWrite
variants.

graceful shutdown handling is now exposed differently on the hyper side,
and to allow usage with upgradable connections the variant form
hyper-util needs to be used, as the one straight from hyper doesn't
support it (yet).

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-rest-server/Cargo.toml                |   7 +-
 .../examples/minimal-rest-server.rs           |   5 +-
 proxmox-rest-server/src/api_config.rs         |  44 ++---
 proxmox-rest-server/src/connection.rs         |  14 +-
 proxmox-rest-server/src/formatter.rs          |   8 +-
 proxmox-rest-server/src/h2service.rs          |  15 +-
 proxmox-rest-server/src/lib.rs                |   2 +-
 proxmox-rest-server/src/rest.rs               | 164 +++++++++++-------
 8 files changed, 142 insertions(+), 117 deletions(-)

diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index ffbd925a..ee253b4f 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -20,7 +20,9 @@ anyhow.workspace = true
 futures.workspace = true
 handlebars = { workspace = true, optional = true }
 http.workspace = true
+http-body-util.workspace = true
 hyper = { workspace = true, features = [ "full" ] }
+hyper-util = { workspace = true, features = [ "client", "client-legacy", "http1", "server", "server-auto", "server-graceful", "service", "tokio" ]}
 libc.workspace = true
 log.workspace = true
 nix.workspace = true
@@ -39,7 +41,7 @@ url.workspace = true
 proxmox-async.workspace = true
 proxmox-compression.workspace = true
 proxmox-daemon.workspace = true
-proxmox-http = { workspace = true, optional = true }
+proxmox-http = { workspace = true, features = ["body"] }
 proxmox-lang.workspace = true
 proxmox-log.workspace = true
 proxmox-router.workspace = true
@@ -52,6 +54,5 @@ proxmox-worker-task.workspace = true
 default = []
 templates = ["dep:handlebars"]
 rate-limited-stream = [
-    "dep:proxmox-http",
-    "proxmox-http?/rate-limited-stream",
+    "proxmox-http/rate-limited-stream",
 ]
diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs
index 23be586c..454430fb 100644
--- a/proxmox-rest-server/examples/minimal-rest-server.rs
+++ b/proxmox-rest-server/examples/minimal-rest-server.rs
@@ -6,8 +6,9 @@ use std::sync::{LazyLock, Mutex};
 use anyhow::{bail, format_err, Error};
 use http::request::Parts;
 use http::HeaderMap;
-use hyper::{Body, Method, Response};
+use hyper::{Method, Response};
 
+use proxmox_http::Body;
 use proxmox_router::{
     list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
 };
@@ -57,7 +58,7 @@ fn get_index(
     Box::pin(async move {
         // build an index page
         http::Response::builder()
-            .body("hello world".into())
+            .body("hello world".to_owned().into_bytes().into())
             .unwrap()
     })
 }
diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
index b20b2da0..0b847a0c 100644
--- a/proxmox-rest-server/src/api_config.rs
+++ b/proxmox-rest-server/src/api_config.rs
@@ -9,10 +9,12 @@ use std::task::{Context, Poll};
 use anyhow::{format_err, Error};
 use http::{HeaderMap, Method, Uri};
 use hyper::http::request::Parts;
-use hyper::{Body, Response};
+use hyper::Response;
+use hyper_util::rt::TokioIo;
 use tower_service::Service;
 
 use proxmox_daemon::command_socket::CommandSocket;
+use proxmox_http::Body;
 use proxmox_log::{FileLogOptions, FileLogger};
 use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
 use proxmox_sys::fs::{create_path, CreateOptions};
@@ -107,7 +109,7 @@ impl ApiConfig {
     ) -> Response<Body> {
         match self.index_handler.as_ref() {
             Some(handler) => (handler.func)(rest_env, parts).await,
-            None => Response::builder().status(404).body("".into()).unwrap(),
+            None => Response::builder().status(404).body(Body::empty()).unwrap(),
         }
     }
 
@@ -511,7 +513,7 @@ impl From<std::os::unix::net::SocketAddr> for PrivilegedAddr {
 }
 
 impl Service<Uri> for PrivilegedAddr {
-    type Response = PrivilegedSocket;
+    type Response = TokioIo<PrivilegedSocket>;
     type Error = io::Error;
     type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
 
@@ -527,6 +529,7 @@ impl Service<Uri> for PrivilegedAddr {
                     tokio::net::TcpStream::connect(addr)
                         .await
                         .map(PrivilegedSocket::Tcp)
+                        .map(TokioIo::new)
                 })
             }
             PrivilegedAddr::Unix(addr) => {
@@ -537,6 +540,7 @@ impl Service<Uri> for PrivilegedAddr {
                     })?)
                     .await
                     .map(PrivilegedSocket::Unix)
+                    .map(TokioIo::new)
                 })
             }
         }
@@ -607,39 +611,11 @@ impl tokio::io::AsyncWrite for PrivilegedSocket {
     }
 }
 
-impl hyper::client::connect::Connection for PrivilegedSocket {
-    fn connected(&self) -> hyper::client::connect::Connected {
+impl hyper_util::client::legacy::connect::Connection for PrivilegedSocket {
+    fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
         match self {
             Self::Tcp(s) => s.connected(),
-            Self::Unix(_) => hyper::client::connect::Connected::new(),
+            Self::Unix(_) => hyper_util::client::legacy::connect::Connected::new(),
         }
     }
 }
-
-/// Implements hyper's `Accept` for `UnixListener`s.
-pub struct UnixAcceptor {
-    listener: tokio::net::UnixListener,
-}
-
-impl From<tokio::net::UnixListener> for UnixAcceptor {
-    fn from(listener: tokio::net::UnixListener) -> Self {
-        Self { listener }
-    }
-}
-
-impl hyper::server::accept::Accept for UnixAcceptor {
-    type Conn = tokio::net::UnixStream;
-    type Error = io::Error;
-
-    fn poll_accept(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<io::Result<Self::Conn>>> {
-        Pin::new(&mut self.get_mut().listener)
-            .poll_accept(cx)
-            .map(|res| match res {
-                Ok((stream, _addr)) => Some(Ok(stream)),
-                Err(err) => Some(Err(err)),
-            })
-    }
-}
diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
index 526555ae..a65ef398 100644
--- a/proxmox-rest-server/src/connection.rs
+++ b/proxmox-rest-server/src/connection.rs
@@ -14,7 +14,6 @@ use std::time::Duration;
 
 use anyhow::{format_err, Context, Error};
 use futures::FutureExt;
-use hyper::server::accept;
 use openssl::ec::{EcGroup, EcKey};
 use openssl::nid::Nid;
 use openssl::pkey::{PKey, Private};
@@ -226,12 +225,13 @@ impl AcceptBuilder {
         self,
         listener: TcpListener,
         acceptor: Arc<Mutex<SslAcceptor>>,
-    ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
+        // FIXME: replace return value with own trait? see now removed UnixAcceptor
+    ) -> ReceiverStream<Result<ClientStreamResult, Error>> {
         let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
 
         tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
 
-        accept::from_stream(ReceiverStream::new(secure_receiver))
+        ReceiverStream::new(secure_receiver)
     }
 
     pub fn accept_tls_optional(
@@ -239,8 +239,8 @@ impl AcceptBuilder {
         listener: TcpListener,
         acceptor: Arc<Mutex<SslAcceptor>>,
     ) -> (
-        impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
-        impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
+        ReceiverStream<Result<ClientStreamResult, Error>>,
+        ReceiverStream<Result<InsecureClientStreamResult, Error>>,
     ) {
         let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
         let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
@@ -252,8 +252,8 @@ impl AcceptBuilder {
         ));
 
         (
-            accept::from_stream(ReceiverStream::new(secure_receiver)),
-            accept::from_stream(ReceiverStream::new(insecure_receiver)),
+            ReceiverStream::new(secure_receiver),
+            ReceiverStream::new(insecure_receiver),
         )
     }
 }
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index 32ca9936..9ce87205 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -5,12 +5,14 @@ use anyhow::Error;
 use serde_json::{json, Value};
 
 use hyper::header;
-use hyper::{Body, Response, StatusCode};
+use hyper::{Response, StatusCode};
 
+use proxmox_http::Body;
 use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
 use proxmox_schema::ParameterError;
 
 /// Extension to set error message for server side logging
+#[derive(Clone)]
 pub(crate) struct ErrorMessageExtension(pub String);
 
 /// Methods to format data and errors
@@ -168,11 +170,11 @@ impl OutputFormatter for JsonFormatter {
 
 pub(crate) fn error_to_response(err: Error) -> Response<Body> {
     let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
-        let mut resp = Response::new(Body::from(apierr.message.clone()));
+        let mut resp = Response::new(apierr.message.clone().into());
         *resp.status_mut() = apierr.code;
         resp
     } else {
-        let mut resp = Response::new(Body::from(err.to_string()));
+        let mut resp = Response::new(err.to_string().into());
         *resp.status_mut() = StatusCode::BAD_REQUEST;
         resp
     };
diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs
index db6e3b0a..18258e14 100644
--- a/proxmox-rest-server/src/h2service.rs
+++ b/proxmox-rest-server/src/h2service.rs
@@ -6,8 +6,10 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use futures::*;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::body::Incoming;
+use hyper::{Request, Response, StatusCode};
 
+use proxmox_http::Body;
 use proxmox_router::http_err;
 use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
 
@@ -19,6 +21,7 @@ use crate::{normalize_path_with_components, WorkerTask};
 /// We use this kind of service to handle backup protocol
 /// connections. State is stored inside the generic ``rpcenv``. Logs
 /// goes into the ``WorkerTask`` log.
+#[derive(Clone)]
 pub struct H2Service<E> {
     router: &'static Router,
     rpcenv: E,
@@ -42,7 +45,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
         }
     }
 
-    fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
+    fn handle_request(&self, req: Request<Incoming>) -> ApiResponseFuture {
         let (parts, body) = req.into_parts();
 
         let method = parts.method.clone();
@@ -103,7 +106,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
     }
 }
 
-impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
+impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Incoming>> for H2Service<E> {
     type Response = Response<Body>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
@@ -113,7 +116,7 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
         Poll::Ready(Ok(()))
     }
 
-    fn call(&mut self, req: Request<Body>) -> Self::Future {
+    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
         let path = req.uri().path().to_owned();
         let method = req.method().clone();
         let worker = self.worker.clone();
@@ -126,14 +129,14 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
                 }
                 Err(err) => {
                     if let Some(apierr) = err.downcast_ref::<HttpError>() {
-                        let mut resp = Response::new(Body::from(apierr.message.clone()));
+                        let mut resp = Response::new(apierr.message.clone().into());
                         resp.extensions_mut()
                             .insert(ErrorMessageExtension(apierr.message.clone()));
                         *resp.status_mut() = apierr.code;
                         Self::log_response(worker, method, &path, &resp);
                         Ok(resp)
                     } else {
-                        let mut resp = Response::new(Body::from(err.to_string()));
+                        let mut resp = Response::new(err.to_string().into());
                         resp.extensions_mut()
                             .insert(ErrorMessageExtension(err.to_string()));
                         *resp.status_mut() = StatusCode::BAD_REQUEST;
diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
index 43dafa91..5ddd3667 100644
--- a/proxmox-rest-server/src/lib.rs
+++ b/proxmox-rest-server/src/lib.rs
@@ -34,7 +34,7 @@ mod environment;
 pub use environment::*;
 
 mod api_config;
-pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
+pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
 
 mod rest;
 pub use rest::{Redirector, RestServer};
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index f5a72052..f902592d 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -10,17 +10,25 @@ use std::task::{Context, Poll};
 use anyhow::{bail, format_err, Error};
 use futures::future::FutureExt;
 use futures::stream::TryStreamExt;
-use hyper::body::HttpBody;
+use http_body_util::{BodyDataStream, BodyStream};
+use hyper::body::{Body as HyperBody, Incoming};
 use hyper::header::{self, HeaderMap};
 use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
+use hyper_util::rt::{TokioExecutor, TokioIo};
+use hyper_util::server::conn;
+use hyper_util::server::graceful::GracefulShutdown;
+use hyper_util::service::TowerToHyperService;
 use regex::Regex;
 use serde_json::Value;
 use tokio::fs::File;
+use tokio::io::{AsyncRead, AsyncWrite};
 use tokio::time::Instant;
+use tokio_stream::wrappers::ReceiverStream;
 use tower_service::Service;
 use url::form_urlencoded;
 
+use proxmox_http::Body;
 use proxmox_router::{
     check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
     RpcEnvironmentType, UserInformation,
@@ -40,6 +48,7 @@ unsafe extern "C" {
     fn tzset();
 }
 
+#[derive(Clone)]
 struct AuthStringExtension(String);
 
 pub(crate) struct EmptyUserInformation {}
@@ -74,24 +83,11 @@ impl RestServer {
             api_config: Arc::new(api_config),
         }
     }
-}
 
-impl<T: PeerAddress> Service<&T> for RestServer {
-    type Response = ApiService;
-    type Error = Error;
-    type Future = std::future::Ready<Result<ApiService, Error>>;
-
-    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn call(&mut self, ctx: &T) -> Self::Future {
-        std::future::ready(match ctx.peer_addr() {
-            Err(err) => Err(format_err!("unable to get peer address - {}", err)),
-            Ok(peer) => Ok(ApiService {
-                peer,
-                api_config: Arc::clone(&self.api_config),
-            }),
+    pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
+        Ok(ApiService {
+            peer: peer.peer_addr()?,
+            api_config: Arc::clone(&self.api_config),
         })
     }
 }
@@ -108,25 +104,40 @@ impl Redirector {
     pub fn new() -> Self {
         Self {}
     }
-}
 
-impl<T> Service<&T> for Redirector {
-    type Response = RedirectService;
-    type Error = Error;
-    type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
-
-    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn call(&mut self, _ctx: &T) -> Self::Future {
-        std::future::ready(Ok(RedirectService {}))
+    pub fn redirect_service(&self) -> RedirectService {
+        RedirectService {}
     }
 }
 
+#[derive(Clone)]
 pub struct RedirectService;
 
-impl Service<Request<Body>> for RedirectService {
+impl RedirectService {
+    pub async fn serve<S>(
+        self,
+        conn: S,
+        mut graceful: Option<Arc<GracefulShutdown>>,
+    ) -> Result<(), Error>
+    where
+        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+    {
+        let api_service = TowerToHyperService::new(self);
+        let io = TokioIo::new(conn);
+        let api_conn = conn::auto::Builder::new(TokioExecutor::new());
+        let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
+        if let Some(graceful) = graceful.take() {
+            let api_conn = graceful.watch(api_conn);
+            drop(graceful);
+            api_conn.await
+        } else {
+            api_conn.await
+        }
+        .map_err(|err| format_err!("error serving redirect connection: {err}"))
+    }
+}
+
+impl Service<Request<Incoming>> for RedirectService {
     type Response = Response<Body>;
     type Error = anyhow::Error;
     type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
@@ -135,7 +146,7 @@ impl Service<Request<Body>> for RedirectService {
         Poll::Ready(Ok(()))
     }
 
-    fn call(&mut self, req: Request<Body>) -> Self::Future {
+    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
         let future = async move {
             let header_host_value = req
                 .headers()
@@ -194,12 +205,6 @@ impl PeerAddress for tokio::net::TcpStream {
     }
 }
 
-impl PeerAddress for hyper::server::conn::AddrStream {
-    fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
-        Ok(self.remote_addr())
-    }
-}
-
 impl PeerAddress for tokio::net::UnixStream {
     fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
         // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
@@ -223,11 +228,36 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
 // Rust wants this type 'pub' here (else we get 'private type `ApiService`
 // in public interface'). The type is still private because the crate does
 // not export it.
+#[derive(Clone)]
 pub struct ApiService {
     pub peer: std::net::SocketAddr,
     pub api_config: Arc<ApiConfig>,
 }
 
+impl ApiService {
+    pub async fn serve<S>(
+        self,
+        conn: S,
+        mut graceful: Option<Arc<GracefulShutdown>>,
+    ) -> Result<(), Error>
+    where
+        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+    {
+        let api_service = TowerToHyperService::new(self);
+        let io = TokioIo::new(conn);
+        let api_conn = conn::auto::Builder::new(TokioExecutor::new());
+        let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
+        if let Some(graceful) = graceful.take() {
+            let api_conn = graceful.watch(api_conn);
+            drop(graceful);
+            api_conn.await
+        } else {
+            api_conn.await
+        }
+        .map_err(|err| format_err!("error serving connection: {err}"))
+    }
+}
+
 fn log_response(
     logfile: Option<&Arc<Mutex<FileLogger>>>,
     peer: &std::net::SocketAddr,
@@ -307,7 +337,7 @@ fn get_user_agent(headers: &HeaderMap) -> Option<String> {
         .ok()
 }
 
-impl Service<Request<Body>> for ApiService {
+impl Service<Request<Incoming>> for ApiService {
     type Response = Response<Body>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
@@ -317,7 +347,7 @@ impl Service<Request<Body>> for ApiService {
         Poll::Ready(Ok(()))
     }
 
-    fn call(&mut self, req: Request<Body>) -> Self::Future {
+    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
         let path = req.uri().path_and_query().unwrap().as_str().to_owned();
         let method = req.method().clone();
         let user_agent = get_user_agent(req.headers());
@@ -384,7 +414,7 @@ fn parse_query_parameters<S: 'static + BuildHasher + Send>(
 async fn get_request_parameters<S: 'static + BuildHasher + Send>(
     param_schema: ParameterSchema,
     parts: &Parts,
-    req_body: Body,
+    req_body: Incoming,
     uri_param: HashMap<String, String, S>,
 ) -> Result<Value, Error> {
     let mut is_json = false;
@@ -401,13 +431,17 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
         }
     }
 
-    let body = TryStreamExt::map_err(req_body, |err| {
+    let stream_body = BodyStream::new(req_body);
+    let body = TryStreamExt::map_err(stream_body, |err| {
         http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
     })
-    .try_fold(Vec::new(), |mut acc, chunk| async move {
+    .try_fold(Vec::new(), |mut acc, frame| async move {
         // FIXME: max request body size?
-        if acc.len() + chunk.len() < 64 * 1024 {
-            acc.extend_from_slice(&chunk);
+        let frame = frame
+            .into_data()
+            .map_err(|err| format_err!("Failed to read request body frame - {err:?}"))?;
+        if acc.len() + frame.len() < 64 * 1024 {
+            acc.extend_from_slice(&frame);
             Ok(acc)
         } else {
             Err(http_err!(BAD_REQUEST, "Request body too large"))
@@ -437,13 +471,14 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
     }
 }
 
+#[derive(Clone)]
 struct NoLogExtension();
 
 async fn proxy_protected_request(
     config: &ApiConfig,
     info: &ApiMethod,
     mut parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     peer: &std::net::SocketAddr,
 ) -> Result<Response<Body>, Error> {
     let mut uri_parts = parts.uri.clone().into_parts();
@@ -463,9 +498,14 @@ async fn proxy_protected_request(
     let reload_timezone = info.reload_timezone;
 
     let mut resp = match config.privileged_addr.clone() {
-        None => hyper::client::Client::new().request(request).await?,
+        None => {
+            hyper_util::client::legacy::Client::builder(TokioExecutor::new())
+                .build_http()
+                .request(request)
+                .await?
+        }
         Some(addr) => {
-            hyper::client::Client::builder()
+            hyper_util::client::legacy::Client::builder(TokioExecutor::new())
                 .build(addr)
                 .request(request)
                 .await?
@@ -479,7 +519,7 @@ async fn proxy_protected_request(
         }
     }
 
-    Ok(resp)
+    Ok(resp.map(|b| Body::wrap_stream(BodyDataStream::new(b))))
 }
 
 fn delay_unauth_time() -> std::time::Instant {
@@ -491,22 +531,23 @@ fn access_forbidden_time() -> std::time::Instant {
 }
 
 fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result<Response<Body>, Error> {
-    let (mut send, body) = hyper::Body::channel();
+    let (send, body) = tokio::sync::mpsc::channel::<Result<Vec<u8>, Error>>(1);
     tokio::spawn(async move {
         use futures::StreamExt;
 
         let mut stream = stream.into_inner();
         while let Some(record) = stream.next().await {
-            if send.send_data(record.to_bytes().into()).await.is_err() {
+            if send.send(Ok(record.to_bytes())).await.is_err() {
                 break;
             }
         }
     });
 
-    Ok(Response::builder()
+    Response::builder()
         .status(http::StatusCode::OK)
         .header(http::header::CONTENT_TYPE, "application/json-seq")
-        .body(body)?)
+        .body(Body::wrap_stream(ReceiverStream::new(body)))
+        .map_err(Error::from)
 }
 
 fn handle_sync_stream_as_json_seq(
@@ -527,7 +568,7 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
     info: &'static ApiMethod,
     formatter: Option<&'static dyn OutputFormatter>,
     parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     uri_param: HashMap<String, String, S>,
 ) -> Result<Response<Body>, Error> {
     let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER);
@@ -630,9 +671,10 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
             );
             resp.map(|body| {
                 Body::wrap_stream(
-                    DeflateEncoder::builder(TryStreamExt::map_err(body, |err| {
-                        proxmox_lang::io_format_err!("error during compression: {}", err)
-                    }))
+                    DeflateEncoder::builder(TryStreamExt::map_err(
+                        BodyDataStream::new(body),
+                        |err| proxmox_lang::io_format_err!("error during compression: {}", err),
+                    ))
                     .zlib(true)
                     .flush_window(is_streaming.then_some(64 * 1024))
                     .build(),
@@ -796,7 +838,7 @@ fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMe
 impl ApiConfig {
     pub async fn handle_request(
         self: Arc<ApiConfig>,
-        req: Request<Body>,
+        req: Request<Incoming>,
         peer: &std::net::SocketAddr,
     ) -> Result<Response<Body>, Error> {
         let (parts, body) = req.into_parts();
@@ -808,7 +850,7 @@ impl ApiConfig {
         if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
             return Ok(Response::builder()
                 .status(StatusCode::URI_TOO_LONG)
-                .body("".into())
+                .body(Body::empty())
                 .unwrap());
         }
 
@@ -907,7 +949,7 @@ impl Action {
 
 pub struct ApiRequestData<'a> {
     parts: Parts,
-    body: Body,
+    body: Incoming,
     peer: &'a std::net::SocketAddr,
     config: &'a ApiConfig,
     full_path: &'a str,
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (13 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
                   ` (9 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

enabling logging while we are at it (useful when debugging things like
the graceful shutdown implementation).

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-rest-server/Cargo.toml                |  2 +-
 .../examples/minimal-rest-server.rs           | 43 ++++++++++++++++---
 2 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index ee253b4f..4a65415c 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -31,7 +31,7 @@ percent-encoding.workspace = true
 regex.workspace = true
 serde = { workspace = true, features = [ "derive" ] }
 serde_json.workspace = true
-tokio = { workspace = true, features = ["signal", "process"] }
+tokio = { workspace = true, features = ["macros", "signal", "process"] }
 tokio-openssl.workspace = true
 tokio-stream.workspace = true
 tracing.workspace = true
diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs
index 454430fb..7ff2008a 100644
--- a/proxmox-rest-server/examples/minimal-rest-server.rs
+++ b/proxmox-rest-server/examples/minimal-rest-server.rs
@@ -1,20 +1,24 @@
 use std::collections::HashMap;
 use std::future::Future;
 use std::pin::Pin;
-use std::sync::{LazyLock, Mutex};
+use std::sync::{Arc, LazyLock, Mutex};
 
 use anyhow::{bail, format_err, Error};
+use futures::future;
 use http::request::Parts;
 use http::HeaderMap;
 use hyper::{Method, Response};
 
+use hyper_util::server::graceful::GracefulShutdown;
 use proxmox_http::Body;
+use proxmox_log::LevelFilter;
 use proxmox_router::{
     list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
 };
 use proxmox_schema::api;
 
 use proxmox_rest_server::{ApiConfig, AuthError, RestEnvironment, RestServer};
+use tokio::net::TcpListener;
 
 // Create a Dummy User information system
 struct DummyUserInfo;
@@ -191,21 +195,46 @@ const ROUTER: Router = Router::new()
 async fn run() -> Result<(), Error> {
     // we first have to configure the api environment (basedir etc.)
 
+    proxmox_log::Logger::from_env("RUST_LOG", LevelFilter::INFO)
+        .stderr()
+        .init()?;
+
     let config = ApiConfig::new("/var/tmp/", RpcEnvironmentType::PUBLIC)
         .default_api2_handler(&ROUTER)
         .auth_handler_func(check_auth)
         .index_handler_func(get_index);
     let rest_server = RestServer::new(config);
 
+    proxmox_daemon::catch_shutdown_signal(future::pending())?;
+
+    log::info!("creating server..");
+
     // then we have to create a daemon that listens, accepts and serves the api to clients
     proxmox_daemon::server::create_daemon(
         ([127, 0, 0, 1], 65000).into(),
-        move |listener| {
-            let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
-
+        move |listener: TcpListener| {
             Ok(async move {
-                hyper::Server::builder(incoming).serve(rest_server).await?;
-
+                let graceful = Arc::new(GracefulShutdown::new());
+                loop {
+                    let graceful2 = Arc::clone(&graceful);
+                    tokio::select! {
+                        incoming = listener.accept() => {
+                            log::info!("accepted new connection!");
+                            let (conn, _) = incoming?;
+                            let api_service = rest_server.api_service(&conn)?;
+                            tokio::spawn(async move { let res = api_service.serve(conn, Some(graceful2)).await; log::info!("connection finished: {res:?}") });
+                        },
+                        _shutdown = proxmox_daemon::shutdown_future() => {
+                            log::info!("shutdown future triggered!");
+                            break;
+                        }
+                    }
+                }
+                log::info!("count {}", Arc::strong_count(&graceful));
+                if let Some(shutdown) = Arc::into_inner(graceful) {
+                    log::info!("shutting down..");
+                    shutdown.shutdown().await
+                }
                 Ok(())
             })
         },
@@ -213,6 +242,8 @@ async fn run() -> Result<(), Error> {
     )
     .await?;
 
+    log::info!("done - exit server");
+
     Ok(())
 }
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (14 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
                   ` (8 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

and switch to proxmox-http's Body type

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-auth-api/Cargo.toml        | 2 ++
 proxmox-auth-api/src/api/access.rs | 4 ++--
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/proxmox-auth-api/Cargo.toml b/proxmox-auth-api/Cargo.toml
index c5468ca7..a25b4832 100644
--- a/proxmox-auth-api/Cargo.toml
+++ b/proxmox-auth-api/Cargo.toml
@@ -32,6 +32,7 @@ serde = { workspace = true, optional = true, features = [ "derive" ] }
 serde_json = { workspace = true, optional = true }
 serde_plain = { workspace = true, optional = true }
 
+proxmox-http = { workspace = true, optional = true, features = ["body"] }
 proxmox-product-config = { workspace = true, optional = true }
 proxmox-rest-server = { workspace = true, optional = true }
 proxmox-router = { workspace = true, optional = true }
@@ -53,6 +54,7 @@ api = [
     "dep:hyper",
     "dep:serde_json",
 
+    "dep:proxmox-http",
     "dep:proxmox-rest-server",
     "dep:proxmox-router",
     "dep:proxmox-tfa",
diff --git a/proxmox-auth-api/src/api/access.rs b/proxmox-auth-api/src/api/access.rs
index 396935f5..8557362e 100644
--- a/proxmox-auth-api/src/api/access.rs
+++ b/proxmox-auth-api/src/api/access.rs
@@ -3,10 +3,10 @@
 use anyhow::{bail, format_err, Error};
 use http::request::Parts;
 use http::Response;
-use hyper::Body;
 use openssl::hash::MessageDigest;
 use serde_json::{json, Value};
 
+use proxmox_http::Body;
 use proxmox_rest_server::{extract_cookie, RestEnvironment};
 use proxmox_router::{
     http_err, ApiHandler, ApiMethod, ApiResponseFuture, Permission, RpcEnvironment,
@@ -173,7 +173,7 @@ fn create_ticket_http_only(
             }
         }
 
-        Ok(response.body(Body::from(json!({"data": ticket_response }).to_string()))?)
+        Ok(response.body(json!({"data": ticket_response }).to_string().into())?)
     })
 }
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: update to hyper 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (15 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
                   ` (7 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

since this hyper-based server is not related to the rest of the stack
and has very simple requirements, it is implemented using just
hyper(-util) and not proxmox-http.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-acme-api/Cargo.toml         |  4 ++
 proxmox-acme-api/src/acme_plugin.rs | 63 ++++++++++++++++++++---------
 2 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/proxmox-acme-api/Cargo.toml b/proxmox-acme-api/Cargo.toml
index f54cba6d..3f7a8c6a 100644
--- a/proxmox-acme-api/Cargo.toml
+++ b/proxmox-acme-api/Cargo.toml
@@ -17,7 +17,9 @@ base64 = { workspace = true, optional = true }
 futures = { workspace = true, optional = true }
 hex = { workspace = true, optional = true }
 http = { workspace = true, optional = true }
+http-body-util = { workspace = true, optional = true }
 hyper = { workspace = true, optional = true }
+hyper-util = { workspace = true, optional = true, features = ["server", "http1"] }
 serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
 tokio = { workspace = true, optional = true, features = ["fs"] }
@@ -47,7 +49,9 @@ impl = [
     "dep:futures",
     "dep:hex",
     "dep:http",
+    "dep:http-body-util",
     "dep:hyper",
+    "dep:hyper-util",
     "dep:libc",
     "dep:openssl",
     "dep:tokio",
diff --git a/proxmox-acme-api/src/acme_plugin.rs b/proxmox-acme-api/src/acme_plugin.rs
index cd4012f5..54e6c159 100644
--- a/proxmox-acme-api/src/acme_plugin.rs
+++ b/proxmox-acme-api/src/acme_plugin.rs
@@ -7,8 +7,15 @@ use std::sync::Arc;
 use std::time::Duration;
 
 use anyhow::{bail, format_err, Error};
-use hyper::{Body, Request, Response};
+use futures::TryFutureExt;
+use http::{Request, Response};
+use http_body_util::Full;
+use hyper::body::{Bytes, Incoming};
+use hyper::server::conn::http1;
+use hyper_util::rt::TokioIo;
+use std::net::{IpAddr, SocketAddr};
 use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
+use tokio::net::TcpListener;
 use tokio::process::Command;
 
 use proxmox_acme::async_client::AcmeClient;
@@ -236,14 +243,14 @@ impl StandaloneServer {
 }
 
 async fn standalone_respond(
-    req: Request<Body>,
+    req: Request<Incoming>,
     path: Arc<String>,
     key_auth: Arc<String>,
-) -> Result<Response<Body>, hyper::Error> {
+) -> Result<Response<Full<Bytes>>, hyper::Error> {
     if req.method() == hyper::Method::GET && req.uri().path() == path.as_str() {
         Ok(Response::builder()
             .status(http::StatusCode::OK)
-            .body(key_auth.as_bytes().to_vec().into())
+            .body(Full::from(Bytes::from(key_auth.as_bytes().to_owned())))
             .unwrap())
     } else {
         Ok(Response::builder()
@@ -261,8 +268,7 @@ impl AcmePlugin for StandaloneServer {
         _domain: &'d AcmeDomain,
         _task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
-        use hyper::server::conn::AddrIncoming;
-        use hyper::service::{make_service_fn, service_fn};
+        use hyper::service::service_fn;
 
         Box::pin(async move {
             self.stop();
@@ -274,21 +280,40 @@ impl AcmePlugin for StandaloneServer {
             let key_auth = Arc::new(client.key_authorization(token)?);
             let path = Arc::new(format!("/.well-known/acme-challenge/{}", token));
 
-            let service = make_service_fn(move |_| {
-                let path = Arc::clone(&path);
-                let key_auth = Arc::clone(&key_auth);
-                async move {
-                    Ok::<_, hyper::Error>(service_fn(move |request| {
-                        standalone_respond(request, Arc::clone(&path), Arc::clone(&key_auth))
-                    }))
-                }
-            });
-
             // `[::]:80` first, then `*:80`
-            let incoming = AddrIncoming::bind(&(([0u16; 8], 80).into()))
-                .or_else(|_| AddrIncoming::bind(&(([0u8; 4], 80).into())))?;
+            let dual = SocketAddr::new(IpAddr::from([0u16; 8]), 80);
+            let ipv4 = SocketAddr::new(IpAddr::from([0u8; 4]), 80);
+            let incoming = TcpListener::bind(dual)
+                .or_else(|_| TcpListener::bind(ipv4))
+                .await?;
 
-            let server = hyper::Server::builder(incoming).serve(service);
+            let server = async move {
+                loop {
+                    let key_auth = Arc::clone(&key_auth);
+                    let path = Arc::clone(&path);
+                    match incoming.accept().await {
+                        Ok((tcp, _)) => {
+                            let io = TokioIo::new(tcp);
+                            let service = service_fn(move |request| {
+                                standalone_respond(
+                                    request,
+                                    Arc::clone(&path),
+                                    Arc::clone(&key_auth),
+                                )
+                            });
+
+                            tokio::task::spawn(async move {
+                                if let Err(err) =
+                                    http1::Builder::new().serve_connection(io, service).await
+                                {
+                                    println!("Error serving connection: {err:?}");
+                                }
+                            });
+                        }
+                        Err(err) => println!("Error accepting connection: {err:?}"),
+                    }
+                }
+            };
 
             let (future, abort) = futures::future::abortable(server);
             self.abort_handle = Some(abort);
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature"
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (16 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
                   ` (6 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

This reverts commit 168ed370263e84a6235968c615b856b9280debe1.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 Cargo.toml                       |  2 +-
 examples/h2client.rs             |  6 +++---
 examples/h2s-client.rs           |  6 +++---
 pbs-client/src/backup_writer.rs  |  8 ++++----
 pbs-client/src/http_client.rs    | 12 +++++-------
 pbs-client/src/pipe_to_stream.rs |  2 +-
 6 files changed, 17 insertions(+), 19 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 306d50e92..d52566809 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -124,7 +124,7 @@ env_logger = "0.11"
 flate2 = "1.0"
 foreign-types = "0.3"
 futures = "0.3"
-h2 = { version = "0.4", features = [ "legacy", "stream" ] }
+h2 = { version = "0.4", features = [ "stream" ] }
 handlebars = "3.0"
 hex = "0.4.3"
 hyper = { version = "0.14", features = [ "backports", "deprecated", "full" ] }
diff --git a/examples/h2client.rs b/examples/h2client.rs
index e44c43fa4..1dcb44987 100644
--- a/examples/h2client.rs
+++ b/examples/h2client.rs
@@ -10,7 +10,7 @@ use tokio::net::TcpStream;
 // Simple H2 client to test H2 download speed using h2server.rs
 
 struct Process {
-    body: h2::legacy::RecvStream,
+    body: h2::RecvStream,
     trailers: bool,
     bytes: usize,
 }
@@ -50,7 +50,7 @@ impl Future for Process {
 }
 
 fn send_request(
-    mut client: h2::legacy::client::SendRequest<bytes::Bytes>,
+    mut client: h2::client::SendRequest<bytes::Bytes>,
 ) -> impl Future<Output = Result<usize, Error>> {
     println!("sending request");
 
@@ -78,7 +78,7 @@ async fn run() -> Result<(), Error> {
     let conn = TcpStream::connect(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?;
     conn.set_nodelay(true).unwrap();
 
-    let (client, h2) = h2::legacy::client::Builder::new()
+    let (client, h2) = h2::client::Builder::new()
         .initial_connection_window_size(1024 * 1024 * 1024)
         .initial_window_size(1024 * 1024 * 1024)
         .max_frame_size(4 * 1024 * 1024)
diff --git a/examples/h2s-client.rs b/examples/h2s-client.rs
index 86b3a9312..a12b5a484 100644
--- a/examples/h2s-client.rs
+++ b/examples/h2s-client.rs
@@ -10,7 +10,7 @@ use tokio::net::TcpStream;
 // Simple H2 client to test H2 download speed using h2s-server.rs
 
 struct Process {
-    body: h2::legacy::RecvStream,
+    body: h2::RecvStream,
     trailers: bool,
     bytes: usize,
 }
@@ -50,7 +50,7 @@ impl Future for Process {
 }
 
 fn send_request(
-    mut client: h2::legacy::client::SendRequest<bytes::Bytes>,
+    mut client: h2::client::SendRequest<bytes::Bytes>,
 ) -> impl Future<Output = Result<usize, Error>> {
     println!("sending request");
 
@@ -94,7 +94,7 @@ async fn run() -> Result<(), Error> {
         .await
         .map_err(|err| format_err!("connect failed - {}", err))?;
 
-    let (client, h2) = h2::legacy::client::Builder::new()
+    let (client, h2) = h2::client::Builder::new()
         .initial_connection_window_size(1024 * 1024 * 1024)
         .initial_window_size(1024 * 1024 * 1024)
         .max_frame_size(4 * 1024 * 1024)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 325425069..1253ef561 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -56,7 +56,7 @@ pub struct UploadOptions {
 }
 
 struct ChunkUploadResponse {
-    future: h2::legacy::client::ResponseFuture,
+    future: h2::client::ResponseFuture,
     size: usize,
 }
 
@@ -143,7 +143,7 @@ impl BackupWriter {
         param: Option<Value>,
         content_type: &str,
         data: Vec<u8>,
-    ) -> Result<h2::legacy::client::ResponseFuture, Error> {
+    ) -> Result<h2::client::ResponseFuture, Error> {
         let request =
             H2Client::request_builder("localhost", method, path, param, Some(content_type))
                 .unwrap();
@@ -514,7 +514,7 @@ impl BackupWriter {
     }
 
     fn response_queue() -> (
-        mpsc::Sender<h2::legacy::client::ResponseFuture>,
+        mpsc::Sender<h2::client::ResponseFuture>,
         oneshot::Receiver<Result<(), Error>>,
     ) {
         let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
@@ -537,7 +537,7 @@ impl BackupWriter {
         tokio::spawn(
             ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
-                .try_for_each(move |response: h2::legacy::client::ResponseFuture| {
+                .try_for_each(move |response: h2::client::ResponseFuture| {
                     response
                         .map_err(Error::from)
                         .and_then(H2Client::h2api_response)
diff --git a/pbs-client/src/http_client.rs b/pbs-client/src/http_client.rs
index 8a89031c8..612e3b303 100644
--- a/pbs-client/src/http_client.rs
+++ b/pbs-client/src/http_client.rs
@@ -790,7 +790,7 @@ impl HttpClient {
 
         let max_window_size = (1 << 31) - 2;
 
-        let (h2, connection) = h2::legacy::client::Builder::new()
+        let (h2, connection) = h2::client::Builder::new()
             .initial_connection_window_size(max_window_size)
             .initial_window_size(max_window_size)
             .max_frame_size(4 * 1024 * 1024)
@@ -935,11 +935,11 @@ impl Drop for HttpClient {
 
 #[derive(Clone)]
 pub struct H2Client {
-    h2: h2::legacy::client::SendRequest<bytes::Bytes>,
+    h2: h2::client::SendRequest<bytes::Bytes>,
 }
 
 impl H2Client {
-    pub fn new(h2: h2::legacy::client::SendRequest<bytes::Bytes>) -> Self {
+    pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
         Self { h2 }
     }
 
@@ -1019,7 +1019,7 @@ impl H2Client {
         &self,
         request: Request<()>,
         data: Option<bytes::Bytes>,
-    ) -> impl Future<Output = Result<h2::legacy::client::ResponseFuture, Error>> {
+    ) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> {
         self.h2
             .clone()
             .ready()
@@ -1036,9 +1036,7 @@ impl H2Client {
             })
     }
 
-    pub async fn h2api_response(
-        response: Response<h2::legacy::RecvStream>,
-    ) -> Result<Value, Error> {
+    pub async fn h2api_response(response: Response<h2::RecvStream>) -> Result<Value, Error> {
         let status = response.status();
 
         let (_head, mut body) = response.into_parts();
diff --git a/pbs-client/src/pipe_to_stream.rs b/pbs-client/src/pipe_to_stream.rs
index 3fc942d35..ae6898514 100644
--- a/pbs-client/src/pipe_to_stream.rs
+++ b/pbs-client/src/pipe_to_stream.rs
@@ -8,7 +8,7 @@ use std::task::{Context, Poll};
 use anyhow::{format_err, Error};
 use bytes::Bytes;
 use futures::{ready, Future};
-use h2::legacy::SendStream;
+use h2::SendStream;
 
 pub struct PipeToSendStream {
     body_tx: SendStream<Bytes>,
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (17 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
                   ` (5 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

similar changes to proxmox-http:
- Body to Incoming for incoming requests
- Body to proxmox-http's Body for everything else
- switch to "legacy" pooling client from hyper-util

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 Cargo.toml                    |  8 ++++++--
 pbs-client/Cargo.toml         |  4 +++-
 pbs-client/src/http_client.rs | 30 +++++++++++++++++-------------
 3 files changed, 26 insertions(+), 16 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index d52566809..c30bad4fc 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -127,7 +127,9 @@ futures = "0.3"
 h2 = { version = "0.4", features = [ "stream" ] }
 handlebars = "3.0"
 hex = "0.4.3"
-hyper = { version = "0.14", features = [ "backports", "deprecated", "full" ] }
+http-body-util = "0.1"
+hyper-util = "0.1"
+hyper = { version = "1", features = [ "full" ] }
 libc = "0.2"
 log = "0.4.17"
 nix = "0.26.1"
@@ -172,7 +174,9 @@ endian_trait.workspace = true
 futures.workspace = true
 h2.workspace = true
 hex.workspace = true
+http-body-util.workspace = true
 hyper.workspace = true
+hyper-util = { workspace = true, features = ["server", "server-auto", "server-graceful"] }
 libc.workspace = true
 log.workspace = true
 nix.workspace = true
@@ -208,7 +212,7 @@ proxmox-auth-api = { workspace = true, features = [ "api", "pam-authenticator" ]
 proxmox-compression.workspace = true
 proxmox-config-digest.workspace = true
 proxmox-daemon.workspace = true
-proxmox-http = { workspace = true, features = [ "client-trait", "proxmox-async", "rate-limited-stream" ] } # pbs-client doesn't use these
+proxmox-http = { workspace = true, features = [ "body", "client-trait", "proxmox-async", "rate-limited-stream" ] } # pbs-client doesn't use these
 proxmox-human-byte.workspace = true
 proxmox-io.workspace = true
 proxmox-lang.workspace = true
diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml
index c28fe87ca..84e73e7af 100644
--- a/pbs-client/Cargo.toml
+++ b/pbs-client/Cargo.toml
@@ -12,6 +12,8 @@ bytes.workspace = true
 futures.workspace = true
 h2.workspace = true
 hex.workspace = true
+http-body-util.workspace = true
+hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "http2", "tokio" ]}
 hyper.workspace = true
 libc.workspace = true
 nix.workspace = true
@@ -33,7 +35,7 @@ pathpatterns.workspace = true
 proxmox-async.workspace = true
 proxmox-auth-api.workspace = true
 proxmox-compression.workspace = true
-proxmox-http = { workspace = true, features = [ "rate-limiter" ] }
+proxmox-http = { workspace = true, features = [ "body", "rate-limiter" ] }
 proxmox-human-byte.workspace = true
 proxmox-io = { workspace = true, features = [ "tokio" ] }
 proxmox-log = { workspace = true }
diff --git a/pbs-client/src/http_client.rs b/pbs-client/src/http_client.rs
index 612e3b303..848575bb9 100644
--- a/pbs-client/src/http_client.rs
+++ b/pbs-client/src/http_client.rs
@@ -3,12 +3,15 @@ use std::sync::{Arc, Mutex, RwLock};
 use std::time::Duration;
 
 use anyhow::{bail, format_err, Error};
+use bytes::Bytes;
 use futures::*;
-use hyper::client::{Client, HttpConnector};
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper::body::Incoming;
 use hyper::http::header::HeaderValue;
 use hyper::http::Uri;
 use hyper::http::{Request, Response};
-use hyper::{body::HttpBody, Body};
+use hyper_util::client::legacy::{connect::HttpConnector, Client};
+use hyper_util::rt::{TokioExecutor, TokioIo};
 use openssl::{
     ssl::{SslConnector, SslMethod},
     x509::X509StoreContextRef,
@@ -24,6 +27,7 @@ use proxmox_sys::linux::tty;
 use proxmox_async::broadcast_future::BroadcastFuture;
 use proxmox_http::client::HttpsConnector;
 use proxmox_http::uri::{build_authority, json_object_to_query};
+use proxmox_http::Body;
 use proxmox_http::{ProxyConfig, RateLimiter};
 use proxmox_log::{error, info, warn};
 
@@ -134,7 +138,7 @@ impl Default for HttpClientOptions {
 
 /// HTTP(S) API client
 pub struct HttpClient {
-    client: Client<HttpsConnector>,
+    client: Client<HttpsConnector, Body>,
     server: String,
     port: u16,
     fingerprint: Arc<Mutex<Option<String>>>,
@@ -398,7 +402,7 @@ impl HttpClient {
             https.set_proxy(config);
         }
 
-        let client = Client::builder()
+        let client = Client::builder(TokioExecutor::new())
             //.http2_initial_stream_window_size( (1 << 31) - 2)
             //.http2_initial_connection_window_size( (1 << 31) - 2)
             .build::<_, Body>(https);
@@ -706,7 +710,7 @@ impl HttpClient {
                 .map(|_| Err(format_err!("unknown error")))
                 .await?
         } else {
-            futures::TryStreamExt::map_err(resp.into_body(), Error::from)
+            futures::TryStreamExt::map_err(BodyDataStream::new(resp.into_body()), Error::from)
                 .try_fold(output, move |acc, chunk| async move {
                     acc.write_all(&chunk)?;
                     Ok::<_, Error>(acc)
@@ -786,7 +790,7 @@ impl HttpClient {
             bail!("unknown error");
         }
 
-        let upgraded = hyper::upgrade::on(resp).await?;
+        let upgraded = TokioIo::new(hyper::upgrade::on(resp).await?);
 
         let max_window_size = (1 << 31) - 2;
 
@@ -814,7 +818,7 @@ impl HttpClient {
     }
 
     async fn credentials(
-        client: Client<HttpsConnector>,
+        client: Client<HttpsConnector, Body>,
         server: String,
         port: u16,
         username: Userid,
@@ -841,9 +845,9 @@ impl HttpClient {
         Ok(auth)
     }
 
-    async fn api_response(response: Response<Body>) -> Result<Value, Error> {
+    async fn api_response(response: Response<Incoming>) -> Result<Value, Error> {
         let status = response.status();
-        let data = HttpBody::collect(response.into_body()).await?.to_bytes();
+        let data = response.into_body().collect().await?.to_bytes();
 
         let text = String::from_utf8(data.to_vec()).unwrap();
         if status.is_success() {
@@ -859,7 +863,7 @@ impl HttpClient {
     }
 
     async fn api_request(
-        client: Client<HttpsConnector>,
+        client: Client<HttpsConnector, Body>,
         req: Request<Body>,
     ) -> Result<Value, Error> {
         Self::api_response(
@@ -894,7 +898,7 @@ impl HttpClient {
                     .uri(url)
                     .header("User-Agent", "proxmox-backup-client/1.0")
                     .header(hyper::header::CONTENT_TYPE, "application/json")
-                    .body(Body::from(data.to_string()))?;
+                    .body(data.to_string().into())?;
                 Ok(request)
             } else {
                 let query = json_object_to_query(data)?;
@@ -935,11 +939,11 @@ impl Drop for HttpClient {
 
 #[derive(Clone)]
 pub struct H2Client {
-    h2: h2::client::SendRequest<bytes::Bytes>,
+    h2: h2::client::SendRequest<Bytes>,
 }
 
 impl H2Client {
-    pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
+    pub fn new(h2: h2::client::SendRequest<Bytes>) -> Self {
         Self { h2 }
     }
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (18 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
                   ` (4 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

similar to the http one:
- Body to Incoming for incoming requests
- Body to proxmox-http's Body for everything else
- use legacy client
- use wrappers for hyper<->tower and hyper<->tokio

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 pbs-client/src/vsock_client.rs | 27 +++++++++++++++------------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/pbs-client/src/vsock_client.rs b/pbs-client/src/vsock_client.rs
index 5c18c6f3b..578433b79 100644
--- a/pbs-client/src/vsock_client.rs
+++ b/pbs-client/src/vsock_client.rs
@@ -3,17 +3,20 @@ use std::task::{Context, Poll};
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
-use hyper::client::connect::{Connected, Connection};
-use hyper::client::Client;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper::body::Incoming;
 use hyper::http::Uri;
 use hyper::http::{Request, Response};
-use hyper::{body::HttpBody, Body};
+use hyper_util::client::legacy::connect::{Connected, Connection};
+use hyper_util::client::legacy::Client;
+use hyper_util::rt::{TokioExecutor, TokioIo};
 use pin_project_lite::pin_project;
 use serde_json::Value;
 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
 use tokio::net::UnixStream;
 
 use proxmox_http::uri::json_object_to_query;
+use proxmox_http::Body;
 use proxmox_router::HttpError;
 
 pub const DEFAULT_VSOCK_PORT: u16 = 807;
@@ -30,9 +33,9 @@ pin_project! {
 }
 
 impl tower_service::Service<Uri> for VsockConnector {
-    type Response = UnixConnection;
+    type Response = TokioIo<UnixConnection>;
     type Error = Error;
-    type Future = Pin<Box<dyn Future<Output = Result<UnixConnection, Error>> + Send>>;
+    type Future = Pin<Box<dyn Future<Output = Result<TokioIo<UnixConnection>, Error>> + Send>>;
 
     fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
         Poll::Ready(Ok(()))
@@ -82,7 +85,7 @@ impl tower_service::Service<Uri> for VsockConnector {
             let stream = tokio::net::UnixStream::from_std(std_stream)?;
             let connection = UnixConnection { stream };
 
-            Ok(connection)
+            Ok(TokioIo::new(connection))
         })
         // unravel the thread JoinHandle to a usable future
         .map(|res| match res {
@@ -133,7 +136,7 @@ impl AsyncWrite for UnixConnection {
 
 /// Slimmed down version of HttpClient for virtio-vsock connections (file restore daemon)
 pub struct VsockClient {
-    client: Client<VsockConnector>,
+    client: Client<VsockConnector, Body>,
     cid: i32,
     port: u16,
     auth: Option<String>,
@@ -142,7 +145,7 @@ pub struct VsockClient {
 impl VsockClient {
     pub fn new(cid: i32, port: u16, auth: Option<String>) -> Self {
         let conn = VsockConnector {};
-        let client = Client::builder().build::<_, Body>(conn);
+        let client = Client::builder(TokioExecutor::new()).build::<_, Body>(conn);
         Self {
             client,
             cid,
@@ -179,7 +182,7 @@ impl VsockClient {
         if !status.is_success() {
             Self::api_response(resp).await.map(|_| ())?
         } else {
-            futures::TryStreamExt::map_err(resp.into_body(), Error::from)
+            futures::TryStreamExt::map_err(BodyDataStream::new(resp.into_body()), Error::from)
                 .try_fold(output, move |acc, chunk| async move {
                     acc.write_all(&chunk).await?;
                     Ok::<_, Error>(acc)
@@ -189,9 +192,9 @@ impl VsockClient {
         Ok(())
     }
 
-    async fn api_response(response: Response<Body>) -> Result<Value, Error> {
+    async fn api_response(response: Response<Incoming>) -> Result<Value, Error> {
         let status = response.status();
-        let data = HttpBody::collect(response.into_body()).await?.to_bytes();
+        let data = response.into_body().collect().await?.to_bytes();
 
         let text = String::from_utf8(data.to_vec()).unwrap();
         if status.is_success() {
@@ -237,7 +240,7 @@ impl VsockClient {
         if let Some(data) = data {
             if method == "POST" {
                 let builder = make_builder("application/json", &url);
-                let request = builder.body(Body::from(data.to_string()))?;
+                let request = builder.body(data.to_string().into())?;
                 return Ok(request);
             } else {
                 let query = json_object_to_query(data)?;
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: adapt to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (19 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
                   ` (3 subsequent siblings)
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

like pbs-client and proxmox-http.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-restore-daemon/Cargo.toml             |  2 ++
 proxmox-restore-daemon/src/main.rs            | 24 ++++++++++++++-----
 .../src/proxmox_restore_daemon/api.rs         |  6 +++--
 .../src/proxmox_restore_daemon/auth.rs        |  5 ++--
 4 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/proxmox-restore-daemon/Cargo.toml b/proxmox-restore-daemon/Cargo.toml
index bcb50d8ba..4a48518ab 100644
--- a/proxmox-restore-daemon/Cargo.toml
+++ b/proxmox-restore-daemon/Cargo.toml
@@ -12,6 +12,7 @@ base64.workspace = true
 env_logger.workspace = true
 futures.workspace = true
 hyper.workspace = true
+hyper-util = { workspace = true, features = [ "service" ] }
 libc.workspace = true
 log.workspace = true
 nix.workspace = true
@@ -26,6 +27,7 @@ pxar.workspace = true
 
 proxmox-async.workspace = true
 proxmox-compression.workspace = true
+proxmox-http.workspace = true
 proxmox-rest-server.workspace = true
 proxmox-router = { workspace = true, features = [ "cli", "server" ] }
 proxmox-schema = { workspace = true, features = [ "api-macro" ] }
diff --git a/proxmox-restore-daemon/src/main.rs b/proxmox-restore-daemon/src/main.rs
index 7f61faed8..74ba1cd8d 100644
--- a/proxmox-restore-daemon/src/main.rs
+++ b/proxmox-restore-daemon/src/main.rs
@@ -9,6 +9,9 @@ use std::path::Path;
 use std::sync::{Arc, LazyLock, Mutex};
 
 use anyhow::{bail, format_err, Error};
+use futures::StreamExt;
+use hyper_util::rt::TokioIo;
+use hyper_util::service::TowerToHyperService;
 use log::{error, info};
 use tokio::sync::mpsc;
 use tokio_stream::wrappers::ReceiverStream;
@@ -114,14 +117,23 @@ async fn run() -> Result<(), Error> {
 
     let vsock_fd = get_vsock_fd()?;
     let connections = accept_vsock_connections(vsock_fd);
-    let receiver_stream = ReceiverStream::new(connections);
-    let acceptor = hyper::server::accept::from_stream(receiver_stream);
+    let mut receiver_stream = ReceiverStream::new(connections);
 
     let hyper_future = async move {
-        hyper::Server::builder(acceptor)
-            .serve(rest_server)
-            .await
-            .map_err(|err| format_err!("hyper finished with error: {}", err))
+        while let Some(conn) = receiver_stream.next().await {
+            let conn = conn?;
+
+            let api_service = TowerToHyperService::new(rest_server.api_service(&conn)?);
+
+            let conn = hyper::server::conn::http1::Builder::new()
+                .serve_connection(TokioIo::new(conn), api_service);
+
+            tokio::spawn(async move {
+                conn.await
+                    .map_err(|err| format_err!("hyper finished with error: {}", err))
+            });
+        }
+        Ok(())
     };
 
     tokio::try_join!(init_future, hyper_future)?;
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index 8955772bc..0d8569402 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -6,14 +6,16 @@ use std::path::{Path, PathBuf};
 
 use anyhow::{bail, Error};
 use futures::FutureExt;
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
-use hyper::{header, Body, Response, StatusCode};
+use hyper::{header, Response, StatusCode};
 use log::error;
 use serde_json::Value;
 use tokio::sync::Semaphore;
 
 use pathpatterns::{MatchEntry, MatchPattern, MatchType, Pattern};
 use proxmox_compression::{tar::tar_directory, zip::zip_directory, zstd::ZstdEncoder};
+use proxmox_http::Body;
 use proxmox_router::{
     list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router,
     RpcEnvironment, SubdirMap,
@@ -264,7 +266,7 @@ pub const API_METHOD_EXTRACT: ApiMethod = ApiMethod::new(
 
 fn extract(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     _rpcenv: Box<dyn RpcEnvironment>,
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/auth.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/auth.rs
index 8173d48a0..e346aebd3 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/auth.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/auth.rs
@@ -7,8 +7,9 @@ use std::sync::Arc;
 
 use anyhow::{bail, format_err, Error};
 use hyper::http::HeaderMap;
-use hyper::{Body, Method, Response, StatusCode};
+use hyper::{Method, Response, StatusCode};
 
+use proxmox_http::Body;
 use proxmox_router::UserInformation;
 
 use proxmox_rest_server::AuthError;
@@ -69,7 +70,7 @@ pub fn get_index() -> Pin<Box<dyn Future<Output = hyper::http::Response<Body>> +
         Response::builder()
             .status(StatusCode::OK)
             .header(hyper::header::CONTENT_TYPE, "text/html")
-            .body(index.into())
+            .body(index.to_owned().into())
             .unwrap()
     })
 }
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 5/6] adapt to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (20 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-04-02 13:36   ` Max Carrara
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
                   ` (2 subsequent siblings)
  24 siblings, 1 reply; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

similar to the other changes:
- Body to Incoming or proxmox-http's Body
- use adapters between hyper<->tower and hyper<->tokio
- adapt to new proxmox-rest-server interfaces

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-backup-client/Cargo.toml      |  1 +
 proxmox-backup-client/src/snapshot.rs |  2 +-
 src/acme/client.rs                    |  6 ++-
 src/acme/plugin.rs                    | 62 ++++++++++++++++--------
 src/api2/admin/datastore.rs           | 20 +++-----
 src/api2/backup/environment.rs        |  3 +-
 src/api2/backup/mod.rs                | 10 ++--
 src/api2/backup/upload_chunk.rs       | 47 ++++++++++--------
 src/api2/helpers.rs                   |  3 +-
 src/api2/node/mod.rs                  |  7 +--
 src/api2/node/tasks.rs                |  7 +--
 src/api2/reader/mod.rs                | 17 ++++---
 src/bin/proxmox-backup-api.rs         | 40 ++++++++++-----
 src/bin/proxmox-backup-proxy.rs       | 70 ++++++++++++++++++++++-----
 14 files changed, 197 insertions(+), 98 deletions(-)

diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
index a91a4908b..5f0140e78 100644
--- a/proxmox-backup-client/Cargo.toml
+++ b/proxmox-backup-client/Cargo.toml
@@ -24,6 +24,7 @@ pxar.workspace = true
 
 proxmox-async.workspace = true
 proxmox-human-byte.workspace = true
+proxmox-http = { workspace = true, features = [ "body" ] }
 proxmox-log.workspace = true
 proxmox-io.workspace = true
 proxmox-router = { workspace = true, features = [ "cli" ] }
diff --git a/proxmox-backup-client/src/snapshot.rs b/proxmox-backup-client/src/snapshot.rs
index f195c23b7..f1569db2e 100644
--- a/proxmox-backup-client/src/snapshot.rs
+++ b/proxmox-backup-client/src/snapshot.rs
@@ -271,7 +271,7 @@ async fn upload_log(param: Value) -> Result<Value, Error> {
     );
 
     let args = snapshot_args(&backup_ns, &snapshot)?;
-    let body = hyper::Body::from(raw_data);
+    let body = proxmox_http::Body::from(raw_data);
 
     client
         .upload("application/octet-stream", body, &path, Some(args))
diff --git a/src/acme/client.rs b/src/acme/client.rs
index 97f628e37..4e55393e4 100644
--- a/src/acme/client.rs
+++ b/src/acme/client.rs
@@ -6,8 +6,10 @@ use std::os::unix::fs::OpenOptionsExt;
 
 use anyhow::{bail, format_err};
 use bytes::Bytes;
-use hyper::{body::HttpBody, Body, Request};
+use http_body_util::BodyExt;
+use hyper::Request;
 use nix::sys::stat::Mode;
+use proxmox_http::Body;
 use serde::{Deserialize, Serialize};
 
 use proxmox_acme::account::AccountCreator;
@@ -618,7 +620,7 @@ impl AcmeClient {
             response.json()?,
         ));
 
-        Ok((directory.as_ref().unwrap(), nonce.as_deref()))
+        Ok((directory.as_mut().unwrap(), nonce.as_deref()))
     }
 
     /// Like `get_directory`, but if the directory provides no nonce, also performs a `HEAD`
diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
index c33cfe405..9141670e7 100644
--- a/src/acme/plugin.rs
+++ b/src/acme/plugin.rs
@@ -1,12 +1,21 @@
 use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
 use std::pin::Pin;
 use std::process::Stdio;
 use std::sync::Arc;
 use std::time::Duration;
 
 use anyhow::{bail, format_err, Error};
-use hyper::{Body, Request, Response};
+use bytes::Bytes;
+use futures::TryFutureExt;
+use http_body_util::Full;
+use hyper::body::Incoming;
+use hyper::server::conn::http1;
+use hyper::service::service_fn;
+use hyper::{Request, Response};
+use hyper_util::rt::TokioIo;
 use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
+use tokio::net::TcpListener;
 use tokio::process::Command;
 
 use proxmox_acme::{Authorization, Challenge};
@@ -235,10 +244,10 @@ impl StandaloneServer {
 }
 
 async fn standalone_respond(
-    req: Request<Body>,
+    req: Request<Incoming>,
     path: Arc<String>,
     key_auth: Arc<String>,
-) -> Result<Response<Body>, hyper::Error> {
+) -> Result<Response<Full<Bytes>>, hyper::Error> {
     if req.method() == hyper::Method::GET && req.uri().path() == path.as_str() {
         Ok(Response::builder()
             .status(hyper::http::StatusCode::OK)
@@ -260,9 +269,6 @@ impl AcmePlugin for StandaloneServer {
         _domain: &'d AcmeDomain,
         _task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
-        use hyper::server::conn::AddrIncoming;
-        use hyper::service::{make_service_fn, service_fn};
-
         Box::pin(async move {
             self.stop();
 
@@ -273,22 +279,40 @@ impl AcmePlugin for StandaloneServer {
             let key_auth = Arc::new(client.key_authorization(token)?);
             let path = Arc::new(format!("/.well-known/acme-challenge/{}", token));
 
-            let service = make_service_fn(move |_| {
-                let path = Arc::clone(&path);
-                let key_auth = Arc::clone(&key_auth);
-                async move {
-                    Ok::<_, hyper::Error>(service_fn(move |request| {
-                        standalone_respond(request, Arc::clone(&path), Arc::clone(&key_auth))
-                    }))
-                }
-            });
-
             // `[::]:80` first, then `*:80`
-            let incoming = AddrIncoming::bind(&(([0u16; 8], 80).into()))
-                .or_else(|_| AddrIncoming::bind(&(([0u8; 4], 80).into())))?;
+            let dual = SocketAddr::new(IpAddr::from([0u16; 8]), 80);
+            let ipv4 = SocketAddr::new(IpAddr::from([0u8; 4]), 80);
+            let incoming = TcpListener::bind(dual)
+                .or_else(|_| TcpListener::bind(ipv4))
+                .await?;
 
-            let server = hyper::Server::builder(incoming).serve(service);
+            let server = async move {
+                loop {
+                    let key_auth = Arc::clone(&key_auth);
+                    let path = Arc::clone(&path);
+                    match incoming.accept().await {
+                        Ok((tcp, _)) => {
+                            let io = TokioIo::new(tcp);
+                            let service = service_fn(move |request| {
+                                standalone_respond(
+                                    request,
+                                    Arc::clone(&path),
+                                    Arc::clone(&key_auth),
+                                )
+                            });
 
+                            tokio::task::spawn(async move {
+                                if let Err(err) =
+                                    http1::Builder::new().serve_connection(io, service).await
+                                {
+                                    println!("Error serving connection: {err:?}");
+                                }
+                            });
+                        }
+                        Err(err) => println!("Error accepting connection: {err:?}"),
+                    }
+                }
+            };
             let (future, abort) = futures::future::abortable(server);
             self.abort_handle = Some(abort);
             tokio::spawn(future);
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 483e595c1..7aba5d313 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -9,8 +9,10 @@ use std::sync::Arc;
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
+use http_body_util::BodyExt;
 use hyper::http::request::Parts;
-use hyper::{header, Body, Response, StatusCode};
+use hyper::{body::Incoming, header, Response, StatusCode};
+use proxmox_http::Body;
 use serde::Deserialize;
 use serde_json::{json, Value};
 use tokio_stream::wrappers::ReceiverStream;
@@ -1387,7 +1389,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
 
 pub fn download_file(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -1472,7 +1474,7 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
 
 pub fn download_file_decoded(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -1598,7 +1600,7 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
 
 pub fn upload_backup_log(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -1636,13 +1638,7 @@ pub fn upload_backup_log(
             file_name = file_name.deref(),
         );
 
-        let data = req_body
-            .map_err(Error::from)
-            .try_fold(Vec::new(), |mut acc, chunk| {
-                acc.extend_from_slice(&chunk);
-                future::ok::<_, Error>(acc)
-            })
-            .await?;
+        let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
 
         // always verify blob/CRC at server side
         let blob = DataBlob::load_from_reader(&mut &data[..])?;
@@ -1815,7 +1811,7 @@ fn get_local_pxar_reader(
 
 pub fn pxar_file_download(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 99d885e2e..8a2e9ddcb 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -7,6 +7,7 @@ use tracing::info;
 use ::serde::Serialize;
 use serde_json::{json, Value};
 
+use proxmox_http::Body;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
 use proxmox_sys::fs::{lock_dir_noblock_shared, replace_file, CreateOptions};
 
@@ -19,7 +20,7 @@ use proxmox_rest_server::{formatter::*, WorkerTask};
 
 use crate::backup::verify_backup_dir_with_lock;
 
-use hyper::{Body, Response};
+use hyper::Response;
 
 #[derive(Copy, Clone, Serialize)]
 struct UploadStatistic {
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index efc97a1fb..f4378e185 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -5,11 +5,13 @@ use futures::*;
 use hex::FromHex;
 use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
 use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming, Request, Response, StatusCode};
+use hyper_util::service::TowerToHyperService;
 use serde::Deserialize;
 use serde_json::{json, Value};
 use tracing::warn;
 
+use proxmox_http::Body;
 use proxmox_rest_server::{H2Service, WorkerTask};
 use proxmox_router::{http_err, list_subdirs_api_method};
 use proxmox_router::{
@@ -70,7 +72,7 @@ pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error>
 
 fn upgrade_to_backup_protocol(
     parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -247,7 +249,7 @@ fn upgrade_to_backup_protocol(
                         http.max_frame_size(4 * 1024 * 1024);
 
                         let env3 = env2.clone();
-                        http.serve_connection(conn, service).map(move |result| {
+                        http.serve_connection(conn, TowerToHyperService::new(service)).map(move |result| {
                             match result {
                                 Err(err) => {
                                     // Avoid  Transport endpoint is not connected (os error 107)
@@ -824,7 +826,7 @@ pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
 
 fn download_previous(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660a..2c66c2855 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -5,8 +5,9 @@ use std::task::{Context, Poll};
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use hex::FromHex;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
-use hyper::Body;
 use serde_json::{json, Value};
 
 use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
@@ -21,7 +22,7 @@ use pbs_tools::json::{required_integer_param, required_string_param};
 use super::environment::*;
 
 pub struct UploadChunk {
-    stream: Body,
+    stream: BodyDataStream<Incoming>,
     store: Arc<DataStore>,
     digest: [u8; 32],
     size: u32,
@@ -31,7 +32,7 @@ pub struct UploadChunk {
 
 impl UploadChunk {
     pub fn new(
-        stream: Body,
+        stream: BodyDataStream<Incoming>,
         store: Arc<DataStore>,
         digest: [u8; 32],
         size: u32,
@@ -146,7 +147,7 @@ pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
 
 fn upload_fixed_chunk(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -161,8 +162,14 @@ fn upload_fixed_chunk(
 
         let env: &BackupEnvironment = rpcenv.as_ref();
 
-        let (digest, size, compressed_size, is_duplicate) =
-            UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
+        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
+            BodyDataStream::new(req_body),
+            env.datastore.clone(),
+            digest,
+            size,
+            encoded_size,
+        )
+        .await?;
 
         env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
         let digest_str = hex::encode(digest);
@@ -215,7 +222,7 @@ pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
 
 fn upload_dynamic_chunk(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -230,8 +237,14 @@ fn upload_dynamic_chunk(
 
         let env: &BackupEnvironment = rpcenv.as_ref();
 
-        let (digest, size, compressed_size, is_duplicate) =
-            UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
+        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
+            BodyDataStream::new(req_body),
+            env.datastore.clone(),
+            digest,
+            size,
+            encoded_size,
+        )
+        .await?;
 
         env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
         let digest_str = hex::encode(digest);
@@ -250,13 +263,13 @@ pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
 
 fn upload_speedtest(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     _param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
     async move {
-        let result = req_body
+        let result = BodyDataStream::new(req_body)
             .map_err(Error::from)
             .try_fold(0, |size: usize, chunk| {
                 let sum = size + chunk.len();
@@ -303,7 +316,7 @@ pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
 
 fn upload_blob(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -318,13 +331,7 @@ fn upload_blob(
             bail!("wrong blob file extension: '{}'", file_name);
         }
 
-        let data = req_body
-            .map_err(Error::from)
-            .try_fold(Vec::new(), |mut acc, chunk| {
-                acc.extend_from_slice(&chunk);
-                future::ok::<_, Error>(acc)
-            })
-            .await?;
+        let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
 
         if encoded_size != data.len() {
             bail!(
@@ -334,7 +341,7 @@ fn upload_blob(
             );
         }
 
-        env.add_blob(&file_name, data)?;
+        env.add_blob(&file_name, data.to_vec())?;
 
         Ok(env.format_response(Ok(Value::Null)))
     }
diff --git a/src/api2/helpers.rs b/src/api2/helpers.rs
index 3dc1befc1..f346b0cca 100644
--- a/src/api2/helpers.rs
+++ b/src/api2/helpers.rs
@@ -2,8 +2,9 @@ use std::path::PathBuf;
 
 use anyhow::Error;
 use futures::stream::TryStreamExt;
-use hyper::{header, Body, Response, StatusCode};
+use hyper::{header, Response, StatusCode};
 
+use proxmox_http::Body;
 use proxmox_router::http_bail;
 
 pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
index 62b447096..e7c6213c1 100644
--- a/src/api2/node/mod.rs
+++ b/src/api2/node/mod.rs
@@ -5,10 +5,11 @@ use std::os::unix::io::AsRawFd;
 
 use anyhow::{bail, format_err, Error};
 use futures::future::{FutureExt, TryFutureExt};
-use hyper::body::Body;
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
 use hyper::upgrade::Upgraded;
 use hyper::Request;
+use hyper_util::rt::TokioIo;
 use serde_json::{json, Value};
 use tokio::io::{AsyncBufReadExt, BufReader};
 
@@ -267,7 +268,7 @@ pub const API_METHOD_WEBSOCKET: ApiMethod = ApiMethod::new(
 
 fn upgrade_to_websocket(
     parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -306,7 +307,7 @@ fn upgrade_to_websocket(
             };
 
             let local = tokio::net::TcpStream::connect(format!("localhost:{}", port)).await?;
-            ws.serve_connection(conn, local).await
+            ws.serve_connection(TokioIo::new(conn), local).await
         });
 
         Ok(response)
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index cad740559..bd6763069 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -3,9 +3,10 @@ use std::io::{BufRead, BufReader};
 
 use anyhow::{bail, Error};
 use futures::FutureExt;
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
 use hyper::http::{header, Response, StatusCode};
-use hyper::Body;
+use proxmox_http::Body;
 use serde_json::{json, Value};
 
 use proxmox_async::stream::AsyncReaderStream;
@@ -321,7 +322,7 @@ pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
 );
 fn read_task_log(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -404,7 +405,7 @@ fn read_task_log(
         Ok(Response::builder()
             .status(StatusCode::OK)
             .header(header::CONTENT_TYPE, "application/json")
-            .body(Body::from(json.to_string()))
+            .body(json.to_string().into())
             .unwrap())
     }
     .boxed()
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 1713f182b..b69000087 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -3,12 +3,15 @@
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use hex::FromHex;
+use hyper::body::Incoming;
 use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
 use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
+use hyper_util::service::TowerToHyperService;
 use serde::Deserialize;
 use serde_json::Value;
 
+use proxmox_http::Body;
 use proxmox_rest_server::{H2Service, WorkerTask};
 use proxmox_router::{
     http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
@@ -68,7 +71,7 @@ pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
 
 fn upgrade_to_backup_reader_protocol(
     parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -190,7 +193,7 @@ fn upgrade_to_backup_reader_protocol(
                     http.initial_connection_window_size(window_size);
                     http.max_frame_size(4 * 1024 * 1024);
 
-                    http.serve_connection(conn, service)
+                    http.serve_connection(conn, TowerToHyperService::new(service))
                         .map_err(Error::from)
                         .await
                 };
@@ -244,7 +247,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
 
 fn download_file(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -300,7 +303,7 @@ pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
 
 fn download_chunk(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -348,7 +351,7 @@ fn download_chunk(
 /* this is too slow
 fn download_chunk_old(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -393,7 +396,7 @@ pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
 
 fn speedtest(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     _param: Value,
     _info: &ApiMethod,
     _rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 7b4187550..438fd9d7e 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -1,12 +1,15 @@
 use std::future::Future;
 use std::pin::{pin, Pin};
+use std::sync::Arc;
 
 use anyhow::{bail, Error};
-use futures::*;
 use hyper::http::Response;
-use hyper::{Body, StatusCode};
+use hyper::StatusCode;
+use hyper_util::server::graceful::GracefulShutdown;
+use tokio::net::TcpListener;
 use tracing::level_filters::LevelFilter;
 
+use proxmox_http::Body;
 use proxmox_lang::try_block;
 use proxmox_rest_server::{ApiConfig, RestServer};
 use proxmox_router::RpcEnvironmentType;
@@ -34,7 +37,7 @@ fn get_index() -> Pin<Box<dyn Future<Output = Response<Body>> + Send>> {
         Response::builder()
             .status(StatusCode::OK)
             .header(hyper::header::CONTENT_TYPE, "text/html")
-            .body(index.into())
+            .body(index.to_string().into())
             .unwrap()
     })
 }
@@ -108,17 +111,28 @@ async fn run() -> Result<(), Error> {
     // http server future:
     let server = proxmox_daemon::server::create_daemon(
         ([127, 0, 0, 1], 82).into(),
-        move |listener| {
-            let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
-
-            Ok(async {
+        move |listener: TcpListener| {
+            Ok(async move {
                 proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
-
-                hyper::Server::builder(incoming)
-                    .serve(rest_server)
-                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
-                    .map_err(Error::from)
-                    .await
+                let graceful = Arc::new(GracefulShutdown::new());
+                loop {
+                    let graceful2 = Arc::clone(&graceful);
+                    tokio::select! {
+                        incoming = listener.accept() => {
+                            let (conn, _) = incoming?;
+                            let api_service = rest_server.api_service(&conn)?;
+                            tokio::spawn(async move { api_service.serve(conn, Some(graceful2)).await });
+                        },
+                        _shutdown = proxmox_daemon::shutdown_future() => {
+                            break;
+                        },
+                    }
+                }
+                if let Some(shutdown) = Arc::into_inner(graceful) {
+                    log::info!("shutting down..");
+                    shutdown.shutdown().await
+                }
+                Ok(())
             })
         },
         Some(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN),
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index c9a6032e6..8ee537207 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -7,7 +7,8 @@ use futures::*;
 use hyper::header;
 use hyper::http::request::Parts;
 use hyper::http::Response;
-use hyper::{Body, StatusCode};
+use hyper::StatusCode;
+use hyper_util::server::graceful::GracefulShutdown;
 use tracing::level_filters::LevelFilter;
 use tracing::{info, warn};
 use url::form_urlencoded;
@@ -15,6 +16,7 @@ use url::form_urlencoded;
 use openssl::ssl::SslAcceptor;
 use serde_json::{json, Value};
 
+use proxmox_http::Body;
 use proxmox_lang::try_block;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
 use proxmox_sys::fs::CreateOptions;
@@ -289,27 +291,71 @@ async fn run() -> Result<(), Error> {
     let server = proxmox_daemon::server::create_daemon(
         ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
         move |listener| {
-            let (secure_connections, insecure_connections) =
+            let (mut secure_connections, mut insecure_connections) =
                 connections.accept_tls_optional(listener, acceptor);
 
             Ok(async {
                 proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
 
-                let secure_server = hyper::Server::builder(secure_connections)
-                    .serve(rest_server)
-                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
-                    .map_err(Error::from);
+                let secure_server = async move {
+                    let graceful = Arc::new(GracefulShutdown::new());
+                    loop {
+                        let graceful2 = Arc::clone(&graceful);
+                        tokio::select! {
+                            Some(conn) = secure_connections.next() => {
+                                match conn {
+                                    Ok(conn) => {
+                                        let api_service = rest_server.api_service(&conn)?;
+                                        tokio::spawn(async move {
+                                            api_service.serve(conn, Some(graceful2)).await
+                                        });
+                                    },
+                                    Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
+                                }
+                            },
+                            _shutdown = proxmox_daemon::shutdown_future() => {
+                                break;
+                            }
+                        }
+                    }
+                    if let Some(shutdown) = Arc::into_inner(graceful) {
+                        shutdown.shutdown().await
+                    }
+                    Ok::<(), Error>(())
+                };
 
-                let insecure_server = hyper::Server::builder(insecure_connections)
-                    .serve(redirector)
-                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
-                    .map_err(Error::from);
+                let insecure_server = async move {
+                    let graceful = Arc::new(GracefulShutdown::new());
+                    loop {
+                        let graceful2 = Arc::clone(&graceful);
+                        tokio::select! {
+                            Some(conn) = insecure_connections.next() => {
+                                match conn {
+                                    Ok(conn) => {
+                                        let redirect_service = redirector.redirect_service();
+                                        tokio::spawn(async move {
+                                            redirect_service.serve(conn, Some(graceful2)).await
+                                        });
+                                    },
+                                    Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
+                                }
+                            },
+                            _shutdown = proxmox_daemon::shutdown_future() => {
+                                break;
+                            }
+                        }
+                    }
+                    if let Some(shutdown) = Arc::into_inner(graceful) {
+                        shutdown.shutdown().await
+                    }
+                    Ok::<(), Error>(())
+                };
 
                 let (secure_res, insecure_res) =
                     try_join!(tokio::spawn(secure_server), tokio::spawn(insecure_server))
                         .context("failed to complete REST server task")?;
 
-                let results = [secure_res, insecure_res];
+                let results: [Result<(), Error>; 2] = [secure_res, insecure_res];
 
                 if results.iter().any(Result::is_err) {
                     let cat_errors = results
@@ -321,7 +367,7 @@ async fn run() -> Result<(), Error> {
                     bail!(cat_errors);
                 }
 
-                Ok(())
+                Ok::<(), Error>(())
             })
         },
         Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (21 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
@ 2025-03-26 15:23 ` Fabian Grünbichler
  2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
  2025-04-02 14:39 ` Thomas Lamprecht
  24 siblings, 0 replies; 32+ messages in thread
From: Fabian Grünbichler @ 2025-03-26 15:23 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 examples/h2s-server.rs | 28 +++++++++-------------------
 examples/h2server.rs   | 28 ++++++++--------------------
 2 files changed, 17 insertions(+), 39 deletions(-)

diff --git a/examples/h2s-server.rs b/examples/h2s-server.rs
index 0f4c0c145..2f15b0127 100644
--- a/examples/h2s-server.rs
+++ b/examples/h2s-server.rs
@@ -1,26 +1,16 @@
 use std::sync::Arc;
 
 use anyhow::{format_err, Error};
-use futures::*;
-use hyper::{Body, Request, Response};
+use bytes::Bytes;
+use futures::{future, FutureExt, TryFutureExt};
+use http_body_util::Full;
+use hyper::{body::Incoming, Request, Response};
+use hyper_util::rt::{TokioExecutor, TokioIo};
 use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
 use tokio::net::{TcpListener, TcpStream};
 
 use pbs_buildcfg::configdir;
 
-#[derive(Clone, Copy)]
-struct H2SExecutor;
-
-impl<Fut> hyper::rt::Executor<Fut> for H2SExecutor
-where
-    Fut: Future + Send + 'static,
-    Fut::Output: Send,
-{
-    fn execute(&self, fut: Fut) {
-        tokio::spawn(fut);
-    }
-}
-
 fn main() -> Result<(), Error> {
     proxmox_async::runtime::main(run())
 }
@@ -63,16 +53,16 @@ async fn handle_connection(socket: TcpStream, acceptor: Arc<SslAcceptor>) -> Res
 
     stream.as_mut().accept().await?;
 
-    let mut http = hyper::server::conn::http2::Builder::new(H2SExecutor);
+    let mut http = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
     // increase window size: todo - find optiomal size
     let max_window_size = (1 << 31) - 2;
     http.initial_stream_window_size(max_window_size);
     http.initial_connection_window_size(max_window_size);
 
-    let service = hyper::service::service_fn(|_req: Request<Body>| {
+    let service = hyper::service::service_fn(|_req: Request<Incoming>| {
         println!("Got request");
         let buffer = vec![65u8; 4 * 1024 * 1024]; // nonsense [A,A,A,A...]
-        let body = Body::from(buffer);
+        let body = Full::<Bytes>::from(buffer);
 
         let response = Response::builder()
             .status(hyper::http::StatusCode::OK)
@@ -85,7 +75,7 @@ async fn handle_connection(socket: TcpStream, acceptor: Arc<SslAcceptor>) -> Res
         future::ok::<_, Error>(response)
     });
 
-    http.serve_connection(stream, service)
+    http.serve_connection(TokioIo::new(stream), service)
         .map_err(Error::from)
         .await?;
 
diff --git a/examples/h2server.rs b/examples/h2server.rs
index 6b286e787..2c368810d 100644
--- a/examples/h2server.rs
+++ b/examples/h2server.rs
@@ -1,24 +1,12 @@
-use std::future::Future;
-
 use anyhow::Error;
+use bytes::Bytes;
 use futures::*;
-use hyper::{Body, Request, Response};
+use http_body_util::Full;
+use hyper::{body::Incoming, Request, Response};
 
+use hyper_util::rt::{TokioExecutor, TokioIo};
 use tokio::net::{TcpListener, TcpStream};
 
-#[derive(Clone, Copy)]
-struct H2Executor;
-
-impl<Fut> hyper::rt::Executor<Fut> for H2Executor
-where
-    Fut: Future + Send + 'static,
-    Fut::Output: Send,
-{
-    fn execute(&self, fut: Fut) {
-        tokio::spawn(fut);
-    }
-}
-
 fn main() -> Result<(), Error> {
     proxmox_async::runtime::main(run())
 }
@@ -41,16 +29,16 @@ async fn run() -> Result<(), Error> {
 async fn handle_connection(socket: TcpStream) -> Result<(), Error> {
     socket.set_nodelay(true).unwrap();
 
-    let mut http = hyper::server::conn::http2::Builder::new(H2Executor);
+    let mut http = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
     // increase window size: todo - find optiomal size
     let max_window_size = (1 << 31) - 2;
     http.initial_stream_window_size(max_window_size);
     http.initial_connection_window_size(max_window_size);
 
-    let service = hyper::service::service_fn(|_req: Request<Body>| {
+    let service = hyper::service::service_fn(|_req: Request<Incoming>| {
         println!("Got request");
         let buffer = vec![65u8; 4 * 1024 * 1024]; // nonsense [A,A,A,A...]
-        let body = Body::from(buffer);
+        let body = Full::<Bytes>::from(buffer);
 
         let response = Response::builder()
             .status(hyper::http::StatusCode::OK)
@@ -63,7 +51,7 @@ async fn handle_connection(socket: TcpStream) -> Result<(), Error> {
         future::ok::<_, Error>(response)
     });
 
-    http.serve_connection(socket, service)
+    http.serve_connection(TokioIo::new(socket), service)
         .map_err(Error::from)
         .await?;
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* Re: [pbs-devel] [PATCH proxmox 04/17] http: adapt connector to hyper 1.x
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
@ 2025-04-02 13:31   ` Max Carrara
  0 siblings, 0 replies; 32+ messages in thread
From: Max Carrara @ 2025-04-02 13:31 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> by switching to tower's Service and wrapping in TokioIo as needed. hyper
> now uses their own Service type to not expose tower in their public API,
> and their own Async IO traits, but they provide wrappers to not require
> too many changes for crates like ours here that already used hyper 0.14.
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
>  proxmox-http/Cargo.toml              |  9 ++++--
>  proxmox-http/src/client/connector.rs | 44 ++++++++++++++++++----------
>  2 files changed, 36 insertions(+), 17 deletions(-)
>
> diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
> index c5137e2a..4ec142c9 100644
> --- a/proxmox-http/Cargo.toml
> +++ b/proxmox-http/Cargo.toml
> @@ -25,6 +25,7 @@ tokio = { workspace = true, features = [], optional = true }
>  tokio-openssl = { workspace = true, optional = true }
>  ureq = { version = "2.4", features = ["native-certs", "native-tls"], optional = true, default-features = false }
>  url = { workspace = true, optional = true }
> +tower-service = { workspace = true, optional = true }
>  
>  proxmox-async = { workspace = true, optional = true }
>  proxmox-sys = { workspace = true, optional = true }
> @@ -52,15 +53,19 @@ rate-limited-stream = [
>  client = [
>      "dep:futures",
>      "dep:hyper",
> +    "dep:hyper-util",
>      "dep:openssl",
>      "dep:proxmox-compression",
>      "dep:tokio",
>      "dep:tokio-openssl",
> +    "dep:tower-service",
>      "hyper?/client",
>      "hyper?/http1",
>      "hyper?/http2",
> -    "hyper?/stream",
> -    "hyper?/tcp",
> +    "hyper-util?/client",
> +    "hyper-util?/client-legacy",
> +    "hyper-util?/http1",
> +    "hyper-util?/tokio",
>      "tokio?/io-util",
>      "http-helpers",
>      "rate-limited-stream",
> diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
> index 63b9d10c..70421793 100644
> --- a/proxmox-http/src/client/connector.rs
> +++ b/proxmox-http/src/client/connector.rs
> @@ -6,7 +6,8 @@ use std::task::{Context, Poll};
>  
>  use futures::*;
>  use http::Uri;
> -use hyper::client::HttpConnector;
> +use hyper_util::client::legacy::connect::HttpConnector;
> +use hyper_util::rt::TokioIo;
>  use openssl::ssl::SslConnector;
>  use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
>  use tokio::net::TcpStream;
> @@ -122,8 +123,8 @@ impl HttpsConnector {
>      }
>  }
>  
> -impl hyper::service::Service<Uri> for HttpsConnector {
> -    type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
> +impl tower_service::Service<Uri> for HttpsConnector {
> +    type Response = TokioIo<MaybeTlsStream<RateLimitedStream<TcpStream>>>;
>      type Error = Error;
>      #[allow(clippy::type_complexity)]
>      type Future =
> @@ -171,9 +172,13 @@ impl hyper::service::Service<Uri> for HttpsConnector {
>              if use_connect {
>                  async move {
>                      use std::fmt::Write as _;
> -                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
> -                        format_err!("error connecting to {} - {}", proxy_authority, err)

The call to `format_err!` above can be shortened by inlining the
parameters, i.e.:

    format_err!("error connecting to {proxy_authority} - {err}")

> -                    })?;
> +                    let tcp_stream = connector
> +                        .call(proxy_uri)
> +                        .await
> +                        .map_err(|err| {
> +                            format_err!("error connecting to {} - {}", proxy_authority, err)

Same here.

> +                        })?
> +                        .into_inner();
>  
>                      let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>  
> @@ -196,24 +201,30 @@ impl hyper::service::Service<Uri> for HttpsConnector {
>                      Self::parse_connect_response(&mut tcp_stream).await?;
>  
>                      if is_https {
> -                        Self::secure_stream(tcp_stream, &ssl_connector, &host).await
> +                        Self::secure_stream(tcp_stream, &ssl_connector, &host)
> +                            .await
> +                            .map(TokioIo::new)
>                      } else {
> -                        Ok(MaybeTlsStream::Normal(tcp_stream))
> +                        Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
>                      }
>                  }
>                  .boxed()
>              } else {
>                  async move {
> -                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
> -                        format_err!("error connecting to {} - {}", proxy_authority, err)

As well as here.

> -                    })?;
> +                    let tcp_stream = connector
> +                        .call(proxy_uri)
> +                        .await
> +                        .map_err(|err| {
> +                            format_err!("error connecting to {} - {}", proxy_authority, err)

And here.

> +                        })?
> +                        .into_inner();
>  
>                      let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>  
>                      let tcp_stream =
>                          RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
>  
> -                    Ok(MaybeTlsStream::Proxied(tcp_stream))
> +                    Ok(TokioIo::new(MaybeTlsStream::Proxied(tcp_stream)))
>                  }
>                  .boxed()
>              }
> @@ -223,7 +234,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
>                  let tcp_stream = connector
>                      .call(dst)
>                      .await
> -                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
> +                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?

And here too.

> +                    .into_inner();
>  
>                  let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>  
> @@ -231,9 +243,11 @@ impl hyper::service::Service<Uri> for HttpsConnector {
>                      RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
>  
>                  if is_https {
> -                    Self::secure_stream(tcp_stream, &ssl_connector, &host).await
> +                    Self::secure_stream(tcp_stream, &ssl_connector, &host)
> +                        .await
> +                        .map(TokioIo::new)
>                  } else {
> -                    Ok(MaybeTlsStream::Normal(tcp_stream))
> +                    Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
>                  }
>              }
>              .boxed()



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* Re: [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
@ 2025-04-02 13:31   ` Max Carrara
  0 siblings, 0 replies; 32+ messages in thread
From: Max Carrara @ 2025-04-02 13:31 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> hyper/http 1.0 now only have a Body trait and some implementations for
> specific use cases. following reqwest's lead (and copying some parts of
> its implementation), implement our own Body struct for the two common
> use cases:
> - a body instance containing the full body data as Bytes
> - a streaming body instance
>
> together with the most common helper methods (empty body, convert, wrap
> existing stream as body) this should make the rest of the upgrade fairly
> straight-forward.
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
>  proxmox-http/Cargo.toml  |  10 +++
>  proxmox-http/src/body.rs | 133 +++++++++++++++++++++++++++++++++++++++
>  proxmox-http/src/lib.rs  |   5 ++
>  3 files changed, 148 insertions(+)
>  create mode 100644 proxmox-http/src/body.rs
>
> diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
> index 4ec142c9..1fbc70a8 100644
> --- a/proxmox-http/Cargo.toml
> +++ b/proxmox-http/Cargo.toml
> @@ -40,6 +40,15 @@ flate2 = { workspace = true }
>  [features]
>  default = []
>  
> +body = [
> +    "dep:bytes",
> +    "dep:futures",
> +    "dep:http-body",
> +    "dep:http-body-util",
> +    "dep:hyper",
> +    "dep:sync_wrapper",
> +    "sync_wrapper?/futures",
> +]
>  rate-limiter = ["dep:hyper"]
>  rate-limited-stream = [
>      "dep:tokio",
> @@ -67,6 +76,7 @@ client = [
>      "hyper-util?/http1",
>      "hyper-util?/tokio",
>      "tokio?/io-util",
> +    "body",
>      "http-helpers",
>      "rate-limited-stream",
>  ]
> diff --git a/proxmox-http/src/body.rs b/proxmox-http/src/body.rs
> new file mode 100644
> index 00000000..3eb17355
> --- /dev/null
> +++ b/proxmox-http/src/body.rs
> @@ -0,0 +1,133 @@
> +use std::{pin::Pin, task::Poll};
> +
> +use anyhow::Error;

Eh, not really too much of a fan of anyhow here because the pedant in me
doesn't like it in library-esque code, but given that `anyhow::Error`
has proliferated so much in our remaining code, adapting all the call
sites is a lot of churn. So, this is fine by me, esp. since this is a
sort of "compat layer" for hyper/1.0 and our existing code anyways.

For future stuff that's more isolated we should restrict ourselves to
using `Box<dyn std::error::Error + Send + Sync + 'static>` and
variations thereof, as that can just be tossed to anyhow while also
remaining agnostic to it. That's off-topic, though; I digress. :P

> +use bytes::Bytes;
> +
> +use futures::ready;
> +use http_body_util::combinators::BoxBody;
> +use hyper::body::{Body as HyperBody, Frame, SizeHint};
> +
> +// Partially copied and heavily based on reqwest 0.12 Body implementation from src/async_impl/body.rs
> +// Copyright (c) 2016-2025 Sean McArthur
> +
> +/// Custom implementation of hyper::body::Body supporting either a "full" body that can return its
> +/// contents as byte sequence in one go, or "streaming" body that can be polled.
> +pub struct Body {
> +    inner: InnerBody,
> +}
> +
> +enum InnerBody {
> +    Full(Bytes),
> +    Streaming(BoxBody<Bytes, Error>),
> +}
> +
> +impl Body {
> +    /// Shortcut for creating an empty body instance with no data.
> +    pub fn empty() -> Self {
> +        Bytes::new().into()
> +    }
> +
> +    /// Returns the body contents if it is a "full" body, None otherwise.
> +    pub fn as_bytes(&self) -> Option<&[u8]> {
> +        match self.inner {
> +            InnerBody::Full(ref bytes) => Some(bytes),
> +            InnerBody::Streaming(_) => None,
> +        }
> +    }
> +
> +    pub fn wrap_stream<S>(stream: S) -> Body
> +    where
> +        S: futures::stream::TryStream + Send + 'static,
> +
> +        S::Error: Into<Error>,
> +
> +        Bytes: From<S::Ok>,
> +    {
> +        Body::stream(stream)
> +    }
> +
> +    pub(crate) fn stream<S>(stream: S) -> Body
> +    where
> +        S: futures::stream::TryStream + Send + 'static,
> +
> +        S::Error: Into<Error>,
> +
> +        Bytes: From<S::Ok>,
> +    {
> +        use futures::TryStreamExt;
> +
> +        use http_body::Frame;
> +
> +        use http_body_util::StreamBody;
> +
> +        let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
> +            stream
> +                .map_ok(|d| Frame::data(Bytes::from(d)))
> +                .map_err(Into::into),
> +        )));
> +
> +        Body {
> +            inner: InnerBody::Streaming(body),
> +        }
> +    }
> +}
> +
> +impl HyperBody for Body {
> +    type Data = Bytes;
> +
> +    type Error = Error;
> +
> +    fn poll_frame(
> +        mut self: std::pin::Pin<&mut Self>,
> +        cx: &mut std::task::Context<'_>,
> +    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
> +        match self.inner {
> +            InnerBody::Full(ref mut bytes) => {
> +                let res = bytes.split_off(0);
> +                if res.is_empty() {
> +                    return Poll::Ready(None);
> +                } else {
> +                    return Poll::Ready(Some(Ok(Frame::data(res))));
> +                }

The `return` statements above are superfluous; you can just

    if res.is_empty() {
        return Poll::Ready(None);
    } else {
        return Poll::Ready(Some(Ok(Frame::data(res))));
    }

> +            }
> +            InnerBody::Streaming(ref mut body) => Poll::Ready(
> +                ready!(Pin::new(body).poll_frame(cx))
> +                    .map(|opt_chunk| opt_chunk.map_err(Error::from)),

The `map_err` call here is redundant.

> +            ),
> +        }
> +    }
> +
> +    fn is_end_stream(&self) -> bool {
> +        match self.inner {
> +            InnerBody::Full(ref bytes) => bytes.is_empty(),
> +            InnerBody::Streaming(ref box_body) => box_body.is_end_stream(),
> +        }
> +    }
> +
> +    fn size_hint(&self) -> hyper::body::SizeHint {
> +        match self.inner {
> +            InnerBody::Full(ref bytes) => SizeHint::with_exact(bytes.len() as u64),
> +            InnerBody::Streaming(ref box_body) => box_body.size_hint(),
> +        }
> +    }
> +}
> +
> +impl From<Bytes> for Body {
> +    fn from(value: Bytes) -> Self {
> +        Self {
> +            inner: InnerBody::Full(value),
> +        }
> +    }
> +}
> +
> +impl From<Vec<u8>> for Body {
> +    fn from(value: Vec<u8>) -> Self {
> +        Bytes::from(value).into()
> +    }
> +}
> +
> +impl From<String> for Body {
> +    fn from(value: String) -> Self {
> +        Bytes::copy_from_slice(value.as_bytes()).into()
> +    }
> +}
> diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
> index 4770aaf4..8b6953b0 100644
> --- a/proxmox-http/src/lib.rs
> +++ b/proxmox-http/src/lib.rs
> @@ -35,3 +35,8 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
>  mod rate_limited_stream;
>  #[cfg(feature = "rate-limited-stream")]
>  pub use rate_limited_stream::RateLimitedStream;
> +
> +#[cfg(feature = "body")]
> +mod body;
> +#[cfg(feature = "body")]
> +pub use body::Body;



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* Re: [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
@ 2025-04-02 13:31   ` Max Carrara
  0 siblings, 0 replies; 32+ messages in thread
From: Max Carrara @ 2025-04-02 13:31 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
>  proxmox-acme/Cargo.toml          |  3 ++-
>  proxmox-acme/src/async_client.rs | 11 +++++++----
>  2 files changed, 9 insertions(+), 5 deletions(-)
>
> diff --git a/proxmox-acme/Cargo.toml b/proxmox-acme/Cargo.toml
> index f6dbe481..26b92f98 100644
> --- a/proxmox-acme/Cargo.toml
> +++ b/proxmox-acme/Cargo.toml
> @@ -26,6 +26,7 @@ proxmox-schema = { workspace = true, optional = true, features = [ "api-macro" ]
>  proxmox-http = { workspace = true, optional = true, features = [ "client" ] }
>  anyhow = { workspace = true, optional = true }
>  bytes = { workspace = true, optional = true }
> +http-body-util = { workspace = true, optional = true }
>  hyper = { workspace = true, optional = true }
>  
>  [dependencies.ureq]
> @@ -39,7 +40,7 @@ default = [ "impl" ]
>  api-types = [ "dep:proxmox-schema" ]
>  impl = [ "api-types", "dep:openssl" ]
>  client = [ "impl", "dep:ureq", "dep:native-tls"]
> -async-client = [ "impl", "dep:hyper", "dep:proxmox-http", "dep:anyhow", "dep:bytes" ]
> +async-client = [ "impl", "dep:http-body-util", "dep:hyper", "dep:proxmox-http", "dep:anyhow", "dep:bytes" ]
>  
>  [dev-dependencies]
>  anyhow.workspace = true
> diff --git a/proxmox-acme/src/async_client.rs b/proxmox-acme/src/async_client.rs
> index 6e38570f..a29b6f91 100644
> --- a/proxmox-acme/src/async_client.rs
> +++ b/proxmox-acme/src/async_client.rs
> @@ -2,10 +2,11 @@
>  
>  use anyhow::format_err;
>  use bytes::Bytes;
> -use hyper::{Body, Request};
> +use http_body_util::BodyExt;
> +use hyper::Request;
>  use serde::{Deserialize, Serialize};
>  
> -use proxmox_http::client::Client;
> +use proxmox_http::{client::Client, Body};
>  
>  use crate::account::AccountCreator;
>  use crate::order::{Order, OrderData};
> @@ -400,9 +401,11 @@ impl AcmeClient {
>          let (parts, body) = response.into_parts();
>  
>          let status = parts.status.as_u16();
> -        let body = hyper::body::to_bytes(body)
> +        let body = body
> +            .collect()
>              .await
> -            .map_err(|err| Error::Custom(format!("failed to retrieve response body: {}", err)))?;
> +            .map_err(|err| Error::Custom(format!("failed to retrieve response body: {}", err)))?

Could inline `err` into the `format!` call here too.

> +            .to_bytes();
>  
>          let got_nonce = if let Some(new_nonce) = parts.headers.get(crate::REPLAY_NONCE) {
>              let new_nonce = new_nonce.to_str().map_err(|err| {



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* Re: [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: update to hyper 1.0
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
@ 2025-04-02 13:34   ` Max Carrara
  0 siblings, 0 replies; 32+ messages in thread
From: Max Carrara @ 2025-04-02 13:34 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> and switch to proxmox-http's Body implementation.
>
> hyper now has a special (opaque) Body implementation called Incoming
> which is used for incoming request bodies on the server side, and
> incoming response bodies on the client side, so our API handler's now
> consume an instance of this type.
>
> the Accept trait previously offered by hyper is gone in 1.0, and needs
> to be replaced with an accept loop. our corresponding Acceptor
> implementations have been dropped as well.
>
> hyper now has its own Service and async Read/Write traits, but helpfully
> provides wrappers for tower's Service and tokio's AsyncRead/AsyncWrite
> variants.
>
> graceful shutdown handling is now exposed differently on the hyper side,
> and to allow usage with upgradable connections the variant form
> hyper-util needs to be used, as the one straight from hyper doesn't
> support it (yet).
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
>  proxmox-rest-server/Cargo.toml                |   7 +-
>  .../examples/minimal-rest-server.rs           |   5 +-
>  proxmox-rest-server/src/api_config.rs         |  44 ++---
>  proxmox-rest-server/src/connection.rs         |  14 +-
>  proxmox-rest-server/src/formatter.rs          |   8 +-
>  proxmox-rest-server/src/h2service.rs          |  15 +-
>  proxmox-rest-server/src/lib.rs                |   2 +-
>  proxmox-rest-server/src/rest.rs               | 164 +++++++++++-------
>  8 files changed, 142 insertions(+), 117 deletions(-)
>
> diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
> index ffbd925a..ee253b4f 100644
> --- a/proxmox-rest-server/Cargo.toml
> +++ b/proxmox-rest-server/Cargo.toml
> @@ -20,7 +20,9 @@ anyhow.workspace = true
>  futures.workspace = true
>  handlebars = { workspace = true, optional = true }
>  http.workspace = true
> +http-body-util.workspace = true
>  hyper = { workspace = true, features = [ "full" ] }
> +hyper-util = { workspace = true, features = [ "client", "client-legacy", "http1", "server", "server-auto", "server-graceful", "service", "tokio" ]}
>  libc.workspace = true
>  log.workspace = true
>  nix.workspace = true
> @@ -39,7 +41,7 @@ url.workspace = true
>  proxmox-async.workspace = true
>  proxmox-compression.workspace = true
>  proxmox-daemon.workspace = true
> -proxmox-http = { workspace = true, optional = true }
> +proxmox-http = { workspace = true, features = ["body"] }
>  proxmox-lang.workspace = true
>  proxmox-log.workspace = true
>  proxmox-router.workspace = true
> @@ -52,6 +54,5 @@ proxmox-worker-task.workspace = true
>  default = []
>  templates = ["dep:handlebars"]
>  rate-limited-stream = [
> -    "dep:proxmox-http",
> -    "proxmox-http?/rate-limited-stream",
> +    "proxmox-http/rate-limited-stream",
>  ]
> diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs
> index 23be586c..454430fb 100644
> --- a/proxmox-rest-server/examples/minimal-rest-server.rs
> +++ b/proxmox-rest-server/examples/minimal-rest-server.rs
> @@ -6,8 +6,9 @@ use std::sync::{LazyLock, Mutex};
>  use anyhow::{bail, format_err, Error};
>  use http::request::Parts;
>  use http::HeaderMap;
> -use hyper::{Body, Method, Response};
> +use hyper::{Method, Response};
>  
> +use proxmox_http::Body;
>  use proxmox_router::{
>      list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
>  };
> @@ -57,7 +58,7 @@ fn get_index(
>      Box::pin(async move {
>          // build an index page
>          http::Response::builder()
> -            .body("hello world".into())
> +            .body("hello world".to_owned().into_bytes().into())
>              .unwrap()
>      })
>  }
> diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
> index b20b2da0..0b847a0c 100644
> --- a/proxmox-rest-server/src/api_config.rs
> +++ b/proxmox-rest-server/src/api_config.rs
> @@ -9,10 +9,12 @@ use std::task::{Context, Poll};
>  use anyhow::{format_err, Error};
>  use http::{HeaderMap, Method, Uri};
>  use hyper::http::request::Parts;
> -use hyper::{Body, Response};
> +use hyper::Response;
> +use hyper_util::rt::TokioIo;
>  use tower_service::Service;
>  
>  use proxmox_daemon::command_socket::CommandSocket;
> +use proxmox_http::Body;
>  use proxmox_log::{FileLogOptions, FileLogger};
>  use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
>  use proxmox_sys::fs::{create_path, CreateOptions};
> @@ -107,7 +109,7 @@ impl ApiConfig {
>      ) -> Response<Body> {
>          match self.index_handler.as_ref() {
>              Some(handler) => (handler.func)(rest_env, parts).await,
> -            None => Response::builder().status(404).body("".into()).unwrap(),
> +            None => Response::builder().status(404).body(Body::empty()).unwrap(),
>          }
>      }
>  
> @@ -511,7 +513,7 @@ impl From<std::os::unix::net::SocketAddr> for PrivilegedAddr {
>  }
>  
>  impl Service<Uri> for PrivilegedAddr {
> -    type Response = PrivilegedSocket;
> +    type Response = TokioIo<PrivilegedSocket>;
>      type Error = io::Error;
>      type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
>  
> @@ -527,6 +529,7 @@ impl Service<Uri> for PrivilegedAddr {
>                      tokio::net::TcpStream::connect(addr)
>                          .await
>                          .map(PrivilegedSocket::Tcp)
> +                        .map(TokioIo::new)
>                  })
>              }
>              PrivilegedAddr::Unix(addr) => {
> @@ -537,6 +540,7 @@ impl Service<Uri> for PrivilegedAddr {
>                      })?)
>                      .await
>                      .map(PrivilegedSocket::Unix)
> +                    .map(TokioIo::new)
>                  })
>              }
>          }
> @@ -607,39 +611,11 @@ impl tokio::io::AsyncWrite for PrivilegedSocket {
>      }
>  }
>  
> -impl hyper::client::connect::Connection for PrivilegedSocket {
> -    fn connected(&self) -> hyper::client::connect::Connected {
> +impl hyper_util::client::legacy::connect::Connection for PrivilegedSocket {
> +    fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
>          match self {
>              Self::Tcp(s) => s.connected(),
> -            Self::Unix(_) => hyper::client::connect::Connected::new(),
> +            Self::Unix(_) => hyper_util::client::legacy::connect::Connected::new(),
>          }
>      }
>  }
> -
> -/// Implements hyper's `Accept` for `UnixListener`s.
> -pub struct UnixAcceptor {
> -    listener: tokio::net::UnixListener,
> -}
> -
> -impl From<tokio::net::UnixListener> for UnixAcceptor {
> -    fn from(listener: tokio::net::UnixListener) -> Self {
> -        Self { listener }
> -    }
> -}
> -
> -impl hyper::server::accept::Accept for UnixAcceptor {
> -    type Conn = tokio::net::UnixStream;
> -    type Error = io::Error;
> -
> -    fn poll_accept(
> -        self: Pin<&mut Self>,
> -        cx: &mut Context<'_>,
> -    ) -> Poll<Option<io::Result<Self::Conn>>> {
> -        Pin::new(&mut self.get_mut().listener)
> -            .poll_accept(cx)
> -            .map(|res| match res {
> -                Ok((stream, _addr)) => Some(Ok(stream)),
> -                Err(err) => Some(Err(err)),
> -            })
> -    }
> -}
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 526555ae..a65ef398 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -14,7 +14,6 @@ use std::time::Duration;
>  
>  use anyhow::{format_err, Context, Error};
>  use futures::FutureExt;
> -use hyper::server::accept;
>  use openssl::ec::{EcGroup, EcKey};
>  use openssl::nid::Nid;
>  use openssl::pkey::{PKey, Private};
> @@ -226,12 +225,13 @@ impl AcceptBuilder {
>          self,
>          listener: TcpListener,
>          acceptor: Arc<Mutex<SslAcceptor>>,
> -    ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
> +        // FIXME: replace return value with own trait? see now removed UnixAcceptor

Yeah, we probably should provide our own trait for this and have it
implement `futures_core::Stream` (or `std::async_iter::AsyncIterator`
once that's stabilized). For now the below is fine IMO, as we can cook
up a trait later as well. Switching over to hyper/1.0 isn't too pretty,
but it is what it is :s

> +    ) -> ReceiverStream<Result<ClientStreamResult, Error>> {
>          let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
>  
>          tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
>  
> -        accept::from_stream(ReceiverStream::new(secure_receiver))
> +        ReceiverStream::new(secure_receiver)
>      }
>  
>      pub fn accept_tls_optional(
> @@ -239,8 +239,8 @@ impl AcceptBuilder {
>          listener: TcpListener,
>          acceptor: Arc<Mutex<SslAcceptor>>,
>      ) -> (
> -        impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
> -        impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
> +        ReceiverStream<Result<ClientStreamResult, Error>>,
> +        ReceiverStream<Result<InsecureClientStreamResult, Error>>,
>      ) {
>          let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
>          let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
> @@ -252,8 +252,8 @@ impl AcceptBuilder {
>          ));
>  
>          (
> -            accept::from_stream(ReceiverStream::new(secure_receiver)),
> -            accept::from_stream(ReceiverStream::new(insecure_receiver)),
> +            ReceiverStream::new(secure_receiver),
> +            ReceiverStream::new(insecure_receiver),
>          )
>      }
>  }
> diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
> index 32ca9936..9ce87205 100644
> --- a/proxmox-rest-server/src/formatter.rs
> +++ b/proxmox-rest-server/src/formatter.rs
> @@ -5,12 +5,14 @@ use anyhow::Error;
>  use serde_json::{json, Value};
>  
>  use hyper::header;
> -use hyper::{Body, Response, StatusCode};
> +use hyper::{Response, StatusCode};
>  
> +use proxmox_http::Body;
>  use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
>  use proxmox_schema::ParameterError;
>  
>  /// Extension to set error message for server side logging
> +#[derive(Clone)]
>  pub(crate) struct ErrorMessageExtension(pub String);
>  
>  /// Methods to format data and errors
> @@ -168,11 +170,11 @@ impl OutputFormatter for JsonFormatter {
>  
>  pub(crate) fn error_to_response(err: Error) -> Response<Body> {
>      let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
> -        let mut resp = Response::new(Body::from(apierr.message.clone()));
> +        let mut resp = Response::new(apierr.message.clone().into());
>          *resp.status_mut() = apierr.code;
>          resp
>      } else {
> -        let mut resp = Response::new(Body::from(err.to_string()));
> +        let mut resp = Response::new(err.to_string().into());
>          *resp.status_mut() = StatusCode::BAD_REQUEST;
>          resp
>      };
> diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs
> index db6e3b0a..18258e14 100644
> --- a/proxmox-rest-server/src/h2service.rs
> +++ b/proxmox-rest-server/src/h2service.rs
> @@ -6,8 +6,10 @@ use std::sync::Arc;
>  use std::task::{Context, Poll};
>  
>  use futures::*;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::body::Incoming;
> +use hyper::{Request, Response, StatusCode};
>  
> +use proxmox_http::Body;
>  use proxmox_router::http_err;
>  use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
>  
> @@ -19,6 +21,7 @@ use crate::{normalize_path_with_components, WorkerTask};
>  /// We use this kind of service to handle backup protocol
>  /// connections. State is stored inside the generic ``rpcenv``. Logs
>  /// goes into the ``WorkerTask`` log.
> +#[derive(Clone)]
>  pub struct H2Service<E> {
>      router: &'static Router,
>      rpcenv: E,
> @@ -42,7 +45,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
>          }
>      }
>  
> -    fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
> +    fn handle_request(&self, req: Request<Incoming>) -> ApiResponseFuture {
>          let (parts, body) = req.into_parts();
>  
>          let method = parts.method.clone();
> @@ -103,7 +106,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
>      }
>  }
>  
> -impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
> +impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Incoming>> for H2Service<E> {
>      type Response = Response<Body>;
>      type Error = Error;
>      #[allow(clippy::type_complexity)]
> @@ -113,7 +116,7 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
>          Poll::Ready(Ok(()))
>      }
>  
> -    fn call(&mut self, req: Request<Body>) -> Self::Future {
> +    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
>          let path = req.uri().path().to_owned();
>          let method = req.method().clone();
>          let worker = self.worker.clone();
> @@ -126,14 +129,14 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
>                  }
>                  Err(err) => {
>                      if let Some(apierr) = err.downcast_ref::<HttpError>() {
> -                        let mut resp = Response::new(Body::from(apierr.message.clone()));
> +                        let mut resp = Response::new(apierr.message.clone().into());
>                          resp.extensions_mut()
>                              .insert(ErrorMessageExtension(apierr.message.clone()));
>                          *resp.status_mut() = apierr.code;
>                          Self::log_response(worker, method, &path, &resp);
>                          Ok(resp)
>                      } else {
> -                        let mut resp = Response::new(Body::from(err.to_string()));
> +                        let mut resp = Response::new(err.to_string().into());
>                          resp.extensions_mut()
>                              .insert(ErrorMessageExtension(err.to_string()));
>                          *resp.status_mut() = StatusCode::BAD_REQUEST;
> diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
> index 43dafa91..5ddd3667 100644
> --- a/proxmox-rest-server/src/lib.rs
> +++ b/proxmox-rest-server/src/lib.rs
> @@ -34,7 +34,7 @@ mod environment;
>  pub use environment::*;
>  
>  mod api_config;
> -pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
> +pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
>  
>  mod rest;
>  pub use rest::{Redirector, RestServer};
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index f5a72052..f902592d 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -10,17 +10,25 @@ use std::task::{Context, Poll};
>  use anyhow::{bail, format_err, Error};
>  use futures::future::FutureExt;
>  use futures::stream::TryStreamExt;
> -use hyper::body::HttpBody;
> +use http_body_util::{BodyDataStream, BodyStream};
> +use hyper::body::{Body as HyperBody, Incoming};
>  use hyper::header::{self, HeaderMap};
>  use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{Request, Response, StatusCode};
> +use hyper_util::rt::{TokioExecutor, TokioIo};
> +use hyper_util::server::conn;
> +use hyper_util::server::graceful::GracefulShutdown;
> +use hyper_util::service::TowerToHyperService;
>  use regex::Regex;
>  use serde_json::Value;
>  use tokio::fs::File;
> +use tokio::io::{AsyncRead, AsyncWrite};
>  use tokio::time::Instant;
> +use tokio_stream::wrappers::ReceiverStream;
>  use tower_service::Service;
>  use url::form_urlencoded;
>  
> +use proxmox_http::Body;
>  use proxmox_router::{
>      check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
>      RpcEnvironmentType, UserInformation,
> @@ -40,6 +48,7 @@ unsafe extern "C" {
>      fn tzset();
>  }
>  
> +#[derive(Clone)]
>  struct AuthStringExtension(String);
>  
>  pub(crate) struct EmptyUserInformation {}
> @@ -74,24 +83,11 @@ impl RestServer {
>              api_config: Arc::new(api_config),
>          }
>      }
> -}
>  
> -impl<T: PeerAddress> Service<&T> for RestServer {
> -    type Response = ApiService;
> -    type Error = Error;
> -    type Future = std::future::Ready<Result<ApiService, Error>>;
> -
> -    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
> -        Poll::Ready(Ok(()))
> -    }
> -
> -    fn call(&mut self, ctx: &T) -> Self::Future {
> -        std::future::ready(match ctx.peer_addr() {
> -            Err(err) => Err(format_err!("unable to get peer address - {}", err)),
> -            Ok(peer) => Ok(ApiService {
> -                peer,
> -                api_config: Arc::clone(&self.api_config),
> -            }),
> +    pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> +        Ok(ApiService {
> +            peer: peer.peer_addr()?,
> +            api_config: Arc::clone(&self.api_config),
>          })
>      }
>  }
> @@ -108,25 +104,40 @@ impl Redirector {
>      pub fn new() -> Self {
>          Self {}
>      }
> -}
>  
> -impl<T> Service<&T> for Redirector {
> -    type Response = RedirectService;
> -    type Error = Error;
> -    type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
> -
> -    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
> -        Poll::Ready(Ok(()))
> -    }
> -
> -    fn call(&mut self, _ctx: &T) -> Self::Future {
> -        std::future::ready(Ok(RedirectService {}))
> +    pub fn redirect_service(&self) -> RedirectService {
> +        RedirectService {}
>      }
>  }
>  
> +#[derive(Clone)]
>  pub struct RedirectService;
>  
> -impl Service<Request<Body>> for RedirectService {
> +impl RedirectService {
> +    pub async fn serve<S>(
> +        self,
> +        conn: S,
> +        mut graceful: Option<Arc<GracefulShutdown>>,
> +    ) -> Result<(), Error>
> +    where
> +        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> +    {
> +        let api_service = TowerToHyperService::new(self);
> +        let io = TokioIo::new(conn);
> +        let api_conn = conn::auto::Builder::new(TokioExecutor::new());
> +        let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
> +        if let Some(graceful) = graceful.take() {
> +            let api_conn = graceful.watch(api_conn);
> +            drop(graceful);
> +            api_conn.await
> +        } else {
> +            api_conn.await
> +        }
> +        .map_err(|err| format_err!("error serving redirect connection: {err}"))
> +    }
> +}
> +
> +impl Service<Request<Incoming>> for RedirectService {
>      type Response = Response<Body>;
>      type Error = anyhow::Error;
>      type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
> @@ -135,7 +146,7 @@ impl Service<Request<Body>> for RedirectService {
>          Poll::Ready(Ok(()))
>      }
>  
> -    fn call(&mut self, req: Request<Body>) -> Self::Future {
> +    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
>          let future = async move {
>              let header_host_value = req
>                  .headers()
> @@ -194,12 +205,6 @@ impl PeerAddress for tokio::net::TcpStream {
>      }
>  }
>  
> -impl PeerAddress for hyper::server::conn::AddrStream {
> -    fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
> -        Ok(self.remote_addr())
> -    }
> -}
> -
>  impl PeerAddress for tokio::net::UnixStream {
>      fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
>          // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
> @@ -223,11 +228,36 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>  // Rust wants this type 'pub' here (else we get 'private type `ApiService`
>  // in public interface'). The type is still private because the crate does
>  // not export it.
> +#[derive(Clone)]
>  pub struct ApiService {
>      pub peer: std::net::SocketAddr,
>      pub api_config: Arc<ApiConfig>,
>  }
>  
> +impl ApiService {
> +    pub async fn serve<S>(
> +        self,
> +        conn: S,
> +        mut graceful: Option<Arc<GracefulShutdown>>,
> +    ) -> Result<(), Error>
> +    where
> +        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> +    {
> +        let api_service = TowerToHyperService::new(self);
> +        let io = TokioIo::new(conn);
> +        let api_conn = conn::auto::Builder::new(TokioExecutor::new());
> +        let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
> +        if let Some(graceful) = graceful.take() {
> +            let api_conn = graceful.watch(api_conn);
> +            drop(graceful);
> +            api_conn.await
> +        } else {
> +            api_conn.await
> +        }
> +        .map_err(|err| format_err!("error serving connection: {err}"))
> +    }
> +}
> +
>  fn log_response(
>      logfile: Option<&Arc<Mutex<FileLogger>>>,
>      peer: &std::net::SocketAddr,
> @@ -307,7 +337,7 @@ fn get_user_agent(headers: &HeaderMap) -> Option<String> {
>          .ok()
>  }
>  
> -impl Service<Request<Body>> for ApiService {
> +impl Service<Request<Incoming>> for ApiService {
>      type Response = Response<Body>;
>      type Error = Error;
>      #[allow(clippy::type_complexity)]
> @@ -317,7 +347,7 @@ impl Service<Request<Body>> for ApiService {
>          Poll::Ready(Ok(()))
>      }
>  
> -    fn call(&mut self, req: Request<Body>) -> Self::Future {
> +    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
>          let path = req.uri().path_and_query().unwrap().as_str().to_owned();
>          let method = req.method().clone();
>          let user_agent = get_user_agent(req.headers());
> @@ -384,7 +414,7 @@ fn parse_query_parameters<S: 'static + BuildHasher + Send>(
>  async fn get_request_parameters<S: 'static + BuildHasher + Send>(
>      param_schema: ParameterSchema,
>      parts: &Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      uri_param: HashMap<String, String, S>,
>  ) -> Result<Value, Error> {
>      let mut is_json = false;
> @@ -401,13 +431,17 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
>          }
>      }
>  
> -    let body = TryStreamExt::map_err(req_body, |err| {
> +    let stream_body = BodyStream::new(req_body);
> +    let body = TryStreamExt::map_err(stream_body, |err| {
>          http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
>      })
> -    .try_fold(Vec::new(), |mut acc, chunk| async move {
> +    .try_fold(Vec::new(), |mut acc, frame| async move {
>          // FIXME: max request body size?
> -        if acc.len() + chunk.len() < 64 * 1024 {
> -            acc.extend_from_slice(&chunk);
> +        let frame = frame
> +            .into_data()
> +            .map_err(|err| format_err!("Failed to read request body frame - {err:?}"))?;
> +        if acc.len() + frame.len() < 64 * 1024 {
> +            acc.extend_from_slice(&frame);
>              Ok(acc)
>          } else {
>              Err(http_err!(BAD_REQUEST, "Request body too large"))
> @@ -437,13 +471,14 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
>      }
>  }
>  
> +#[derive(Clone)]
>  struct NoLogExtension();
>  
>  async fn proxy_protected_request(
>      config: &ApiConfig,
>      info: &ApiMethod,
>      mut parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      peer: &std::net::SocketAddr,
>  ) -> Result<Response<Body>, Error> {
>      let mut uri_parts = parts.uri.clone().into_parts();
> @@ -463,9 +498,14 @@ async fn proxy_protected_request(
>      let reload_timezone = info.reload_timezone;
>  
>      let mut resp = match config.privileged_addr.clone() {
> -        None => hyper::client::Client::new().request(request).await?,
> +        None => {
> +            hyper_util::client::legacy::Client::builder(TokioExecutor::new())
> +                .build_http()
> +                .request(request)
> +                .await?
> +        }
>          Some(addr) => {
> -            hyper::client::Client::builder()
> +            hyper_util::client::legacy::Client::builder(TokioExecutor::new())
>                  .build(addr)
>                  .request(request)
>                  .await?
> @@ -479,7 +519,7 @@ async fn proxy_protected_request(
>          }
>      }
>  
> -    Ok(resp)
> +    Ok(resp.map(|b| Body::wrap_stream(BodyDataStream::new(b))))
>  }
>  
>  fn delay_unauth_time() -> std::time::Instant {
> @@ -491,22 +531,23 @@ fn access_forbidden_time() -> std::time::Instant {
>  }
>  
>  fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result<Response<Body>, Error> {
> -    let (mut send, body) = hyper::Body::channel();
> +    let (send, body) = tokio::sync::mpsc::channel::<Result<Vec<u8>, Error>>(1);
>      tokio::spawn(async move {
>          use futures::StreamExt;
>  
>          let mut stream = stream.into_inner();
>          while let Some(record) = stream.next().await {
> -            if send.send_data(record.to_bytes().into()).await.is_err() {
> +            if send.send(Ok(record.to_bytes())).await.is_err() {
>                  break;
>              }
>          }
>      });
>  
> -    Ok(Response::builder()
> +    Response::builder()
>          .status(http::StatusCode::OK)
>          .header(http::header::CONTENT_TYPE, "application/json-seq")
> -        .body(body)?)
> +        .body(Body::wrap_stream(ReceiverStream::new(body)))
> +        .map_err(Error::from)
>  }
>  
>  fn handle_sync_stream_as_json_seq(
> @@ -527,7 +568,7 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
>      info: &'static ApiMethod,
>      formatter: Option<&'static dyn OutputFormatter>,
>      parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      uri_param: HashMap<String, String, S>,
>  ) -> Result<Response<Body>, Error> {
>      let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER);
> @@ -630,9 +671,10 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
>              );
>              resp.map(|body| {
>                  Body::wrap_stream(
> -                    DeflateEncoder::builder(TryStreamExt::map_err(body, |err| {
> -                        proxmox_lang::io_format_err!("error during compression: {}", err)
> -                    }))
> +                    DeflateEncoder::builder(TryStreamExt::map_err(
> +                        BodyDataStream::new(body),
> +                        |err| proxmox_lang::io_format_err!("error during compression: {}", err),

Could inline the `err` above into the args of `io_format_err` since
you're touching that part (unless our macro doesn't support that, but I
assume it does).

> +                    ))
>                      .zlib(true)
>                      .flush_window(is_streaming.then_some(64 * 1024))
>                      .build(),
> @@ -796,7 +838,7 @@ fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMe
>  impl ApiConfig {
>      pub async fn handle_request(
>          self: Arc<ApiConfig>,
> -        req: Request<Body>,
> +        req: Request<Incoming>,
>          peer: &std::net::SocketAddr,
>      ) -> Result<Response<Body>, Error> {
>          let (parts, body) = req.into_parts();
> @@ -808,7 +850,7 @@ impl ApiConfig {
>          if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
>              return Ok(Response::builder()
>                  .status(StatusCode::URI_TOO_LONG)
> -                .body("".into())
> +                .body(Body::empty())
>                  .unwrap());
>          }
>  
> @@ -907,7 +949,7 @@ impl Action {
>  
>  pub struct ApiRequestData<'a> {
>      parts: Parts,
> -    body: Body,
> +    body: Incoming,
>      peer: &'a std::net::SocketAddr,
>      config: &'a ApiConfig,
>      full_path: &'a str,



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 5/6] adapt to hyper/http 1.0
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
@ 2025-04-02 13:36   ` Max Carrara
  0 siblings, 0 replies; 32+ messages in thread
From: Max Carrara @ 2025-04-02 13:36 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> similar to the other changes:
> - Body to Incoming or proxmox-http's Body
> - use adapters between hyper<->tower and hyper<->tokio
> - adapt to new proxmox-rest-server interfaces
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
> ---
>  proxmox-backup-client/Cargo.toml      |  1 +
>  proxmox-backup-client/src/snapshot.rs |  2 +-
>  src/acme/client.rs                    |  6 ++-
>  src/acme/plugin.rs                    | 62 ++++++++++++++++--------
>  src/api2/admin/datastore.rs           | 20 +++-----
>  src/api2/backup/environment.rs        |  3 +-
>  src/api2/backup/mod.rs                | 10 ++--
>  src/api2/backup/upload_chunk.rs       | 47 ++++++++++--------
>  src/api2/helpers.rs                   |  3 +-
>  src/api2/node/mod.rs                  |  7 +--
>  src/api2/node/tasks.rs                |  7 +--
>  src/api2/reader/mod.rs                | 17 ++++---
>  src/bin/proxmox-backup-api.rs         | 40 ++++++++++-----
>  src/bin/proxmox-backup-proxy.rs       | 70 ++++++++++++++++++++++-----
>  14 files changed, 197 insertions(+), 98 deletions(-)
>
> diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
> index a91a4908b..5f0140e78 100644
> --- a/proxmox-backup-client/Cargo.toml
> +++ b/proxmox-backup-client/Cargo.toml
> @@ -24,6 +24,7 @@ pxar.workspace = true
>  
>  proxmox-async.workspace = true
>  proxmox-human-byte.workspace = true
> +proxmox-http = { workspace = true, features = [ "body" ] }
>  proxmox-log.workspace = true
>  proxmox-io.workspace = true
>  proxmox-router = { workspace = true, features = [ "cli" ] }
> diff --git a/proxmox-backup-client/src/snapshot.rs b/proxmox-backup-client/src/snapshot.rs
> index f195c23b7..f1569db2e 100644
> --- a/proxmox-backup-client/src/snapshot.rs
> +++ b/proxmox-backup-client/src/snapshot.rs
> @@ -271,7 +271,7 @@ async fn upload_log(param: Value) -> Result<Value, Error> {
>      );
>  
>      let args = snapshot_args(&backup_ns, &snapshot)?;
> -    let body = hyper::Body::from(raw_data);
> +    let body = proxmox_http::Body::from(raw_data);
>  
>      client
>          .upload("application/octet-stream", body, &path, Some(args))
> diff --git a/src/acme/client.rs b/src/acme/client.rs
> index 97f628e37..4e55393e4 100644
> --- a/src/acme/client.rs
> +++ b/src/acme/client.rs
> @@ -6,8 +6,10 @@ use std::os::unix::fs::OpenOptionsExt;
>  
>  use anyhow::{bail, format_err};
>  use bytes::Bytes;
> -use hyper::{body::HttpBody, Body, Request};
> +use http_body_util::BodyExt;
> +use hyper::Request;
>  use nix::sys::stat::Mode;
> +use proxmox_http::Body;
>  use serde::{Deserialize, Serialize};
>  
>  use proxmox_acme::account::AccountCreator;
> @@ -618,7 +620,7 @@ impl AcmeClient {
>              response.json()?,
>          ));
>  
> -        Ok((directory.as_ref().unwrap(), nonce.as_deref()))
> +        Ok((directory.as_mut().unwrap(), nonce.as_deref()))
>      }
>  
>      /// Like `get_directory`, but if the directory provides no nonce, also performs a `HEAD`
> diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
> index c33cfe405..9141670e7 100644
> --- a/src/acme/plugin.rs
> +++ b/src/acme/plugin.rs
> @@ -1,12 +1,21 @@
>  use std::future::Future;
> +use std::net::{IpAddr, SocketAddr};
>  use std::pin::Pin;
>  use std::process::Stdio;
>  use std::sync::Arc;
>  use std::time::Duration;
>  
>  use anyhow::{bail, format_err, Error};
> -use hyper::{Body, Request, Response};
> +use bytes::Bytes;
> +use futures::TryFutureExt;
> +use http_body_util::Full;
> +use hyper::body::Incoming;
> +use hyper::server::conn::http1;
> +use hyper::service::service_fn;
> +use hyper::{Request, Response};
> +use hyper_util::rt::TokioIo;
>  use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
> +use tokio::net::TcpListener;
>  use tokio::process::Command;
>  
>  use proxmox_acme::{Authorization, Challenge};
> @@ -235,10 +244,10 @@ impl StandaloneServer {
>  }
>  
>  async fn standalone_respond(
> -    req: Request<Body>,
> +    req: Request<Incoming>,
>      path: Arc<String>,
>      key_auth: Arc<String>,
> -) -> Result<Response<Body>, hyper::Error> {
> +) -> Result<Response<Full<Bytes>>, hyper::Error> {
>      if req.method() == hyper::Method::GET && req.uri().path() == path.as_str() {
>          Ok(Response::builder()
>              .status(hyper::http::StatusCode::OK)
> @@ -260,9 +269,6 @@ impl AcmePlugin for StandaloneServer {
>          _domain: &'d AcmeDomain,
>          _task: Arc<WorkerTask>,
>      ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
> -        use hyper::server::conn::AddrIncoming;
> -        use hyper::service::{make_service_fn, service_fn};
> -
>          Box::pin(async move {
>              self.stop();
>  
> @@ -273,22 +279,40 @@ impl AcmePlugin for StandaloneServer {
>              let key_auth = Arc::new(client.key_authorization(token)?);
>              let path = Arc::new(format!("/.well-known/acme-challenge/{}", token));
>  
> -            let service = make_service_fn(move |_| {
> -                let path = Arc::clone(&path);
> -                let key_auth = Arc::clone(&key_auth);
> -                async move {
> -                    Ok::<_, hyper::Error>(service_fn(move |request| {
> -                        standalone_respond(request, Arc::clone(&path), Arc::clone(&key_auth))
> -                    }))
> -                }
> -            });
> -
>              // `[::]:80` first, then `*:80`
> -            let incoming = AddrIncoming::bind(&(([0u16; 8], 80).into()))
> -                .or_else(|_| AddrIncoming::bind(&(([0u8; 4], 80).into())))?;
> +            let dual = SocketAddr::new(IpAddr::from([0u16; 8]), 80);
> +            let ipv4 = SocketAddr::new(IpAddr::from([0u8; 4]), 80);
> +            let incoming = TcpListener::bind(dual)
> +                .or_else(|_| TcpListener::bind(ipv4))
> +                .await?;
>  
> -            let server = hyper::Server::builder(incoming).serve(service);
> +            let server = async move {
> +                loop {
> +                    let key_auth = Arc::clone(&key_auth);
> +                    let path = Arc::clone(&path);
> +                    match incoming.accept().await {
> +                        Ok((tcp, _)) => {
> +                            let io = TokioIo::new(tcp);
> +                            let service = service_fn(move |request| {
> +                                standalone_respond(
> +                                    request,
> +                                    Arc::clone(&path),
> +                                    Arc::clone(&key_auth),
> +                                )
> +                            });
>  
> +                            tokio::task::spawn(async move {
> +                                if let Err(err) =
> +                                    http1::Builder::new().serve_connection(io, service).await
> +                                {
> +                                    println!("Error serving connection: {err:?}");
> +                                }
> +                            });
> +                        }
> +                        Err(err) => println!("Error accepting connection: {err:?}"),
> +                    }
> +                }
> +            };
>              let (future, abort) = futures::future::abortable(server);
>              self.abort_handle = Some(abort);
>              tokio::spawn(future);
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 483e595c1..7aba5d313 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -9,8 +9,10 @@ use std::sync::Arc;
>  
>  use anyhow::{bail, format_err, Error};
>  use futures::*;
> +use http_body_util::BodyExt;
>  use hyper::http::request::Parts;
> -use hyper::{header, Body, Response, StatusCode};
> +use hyper::{body::Incoming, header, Response, StatusCode};
> +use proxmox_http::Body;
>  use serde::Deserialize;
>  use serde_json::{json, Value};
>  use tokio_stream::wrappers::ReceiverStream;
> @@ -1387,7 +1389,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
>  
>  pub fn download_file(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -1472,7 +1474,7 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
>  
>  pub fn download_file_decoded(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -1598,7 +1600,7 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
>  
>  pub fn upload_backup_log(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -1636,13 +1638,7 @@ pub fn upload_backup_log(
>              file_name = file_name.deref(),
>          );
>  
> -        let data = req_body
> -            .map_err(Error::from)
> -            .try_fold(Vec::new(), |mut acc, chunk| {
> -                acc.extend_from_slice(&chunk);
> -                future::ok::<_, Error>(acc)
> -            })
> -            .await?;
> +        let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
>  
>          // always verify blob/CRC at server side
>          let blob = DataBlob::load_from_reader(&mut &data[..])?;
> @@ -1815,7 +1811,7 @@ fn get_local_pxar_reader(
>  
>  pub fn pxar_file_download(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 99d885e2e..8a2e9ddcb 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -7,6 +7,7 @@ use tracing::info;
>  use ::serde::Serialize;
>  use serde_json::{json, Value};
>  
> +use proxmox_http::Body;
>  use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
>  use proxmox_sys::fs::{lock_dir_noblock_shared, replace_file, CreateOptions};
>  
> @@ -19,7 +20,7 @@ use proxmox_rest_server::{formatter::*, WorkerTask};
>  
>  use crate::backup::verify_backup_dir_with_lock;
>  
> -use hyper::{Body, Response};
> +use hyper::Response;
>  
>  #[derive(Copy, Clone, Serialize)]
>  struct UploadStatistic {
> diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
> index efc97a1fb..f4378e185 100644
> --- a/src/api2/backup/mod.rs
> +++ b/src/api2/backup/mod.rs
> @@ -5,11 +5,13 @@ use futures::*;
>  use hex::FromHex;
>  use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
>  use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{body::Incoming, Request, Response, StatusCode};
> +use hyper_util::service::TowerToHyperService;
>  use serde::Deserialize;
>  use serde_json::{json, Value};
>  use tracing::warn;
>  
> +use proxmox_http::Body;
>  use proxmox_rest_server::{H2Service, WorkerTask};
>  use proxmox_router::{http_err, list_subdirs_api_method};
>  use proxmox_router::{
> @@ -70,7 +72,7 @@ pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error>
>  
>  fn upgrade_to_backup_protocol(
>      parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -247,7 +249,7 @@ fn upgrade_to_backup_protocol(
>                          http.max_frame_size(4 * 1024 * 1024);
>  
>                          let env3 = env2.clone();
> -                        http.serve_connection(conn, service).map(move |result| {
> +                        http.serve_connection(conn, TowerToHyperService::new(service)).map(move |result| {
>                              match result {
>                                  Err(err) => {
>                                      // Avoid  Transport endpoint is not connected (os error 107)
> @@ -824,7 +826,7 @@ pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
>  
>  fn download_previous(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
> index 20259660a..2c66c2855 100644
> --- a/src/api2/backup/upload_chunk.rs
> +++ b/src/api2/backup/upload_chunk.rs
> @@ -5,8 +5,9 @@ use std::task::{Context, Poll};
>  use anyhow::{bail, format_err, Error};
>  use futures::*;
>  use hex::FromHex;
> +use http_body_util::{BodyDataStream, BodyExt};
> +use hyper::body::Incoming;
>  use hyper::http::request::Parts;
> -use hyper::Body;
>  use serde_json::{json, Value};
>  
>  use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
> @@ -21,7 +22,7 @@ use pbs_tools::json::{required_integer_param, required_string_param};
>  use super::environment::*;
>  
>  pub struct UploadChunk {
> -    stream: Body,
> +    stream: BodyDataStream<Incoming>,
>      store: Arc<DataStore>,
>      digest: [u8; 32],
>      size: u32,
> @@ -31,7 +32,7 @@ pub struct UploadChunk {
>  
>  impl UploadChunk {
>      pub fn new(
> -        stream: Body,
> +        stream: BodyDataStream<Incoming>,
>          store: Arc<DataStore>,
>          digest: [u8; 32],
>          size: u32,
> @@ -146,7 +147,7 @@ pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
>  
>  fn upload_fixed_chunk(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -161,8 +162,14 @@ fn upload_fixed_chunk(
>  
>          let env: &BackupEnvironment = rpcenv.as_ref();
>  
> -        let (digest, size, compressed_size, is_duplicate) =
> -            UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
> +        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> +            BodyDataStream::new(req_body),
> +            env.datastore.clone(),
> +            digest,
> +            size,
> +            encoded_size,
> +        )
> +        .await?;
>  
>          env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
>          let digest_str = hex::encode(digest);
> @@ -215,7 +222,7 @@ pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
>  
>  fn upload_dynamic_chunk(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -230,8 +237,14 @@ fn upload_dynamic_chunk(
>  
>          let env: &BackupEnvironment = rpcenv.as_ref();
>  
> -        let (digest, size, compressed_size, is_duplicate) =
> -            UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
> +        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> +            BodyDataStream::new(req_body),
> +            env.datastore.clone(),
> +            digest,
> +            size,
> +            encoded_size,
> +        )
> +        .await?;
>  
>          env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
>          let digest_str = hex::encode(digest);
> @@ -250,13 +263,13 @@ pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
>  
>  fn upload_speedtest(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      _param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
>  ) -> ApiResponseFuture {
>      async move {
> -        let result = req_body
> +        let result = BodyDataStream::new(req_body)
>              .map_err(Error::from)
>              .try_fold(0, |size: usize, chunk| {
>                  let sum = size + chunk.len();
> @@ -303,7 +316,7 @@ pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
>  
>  fn upload_blob(
>      _parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -318,13 +331,7 @@ fn upload_blob(
>              bail!("wrong blob file extension: '{}'", file_name);
>          }
>  
> -        let data = req_body
> -            .map_err(Error::from)
> -            .try_fold(Vec::new(), |mut acc, chunk| {
> -                acc.extend_from_slice(&chunk);
> -                future::ok::<_, Error>(acc)
> -            })
> -            .await?;
> +        let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
>  
>          if encoded_size != data.len() {
>              bail!(
> @@ -334,7 +341,7 @@ fn upload_blob(
>              );
>          }
>  
> -        env.add_blob(&file_name, data)?;
> +        env.add_blob(&file_name, data.to_vec())?;
>  
>          Ok(env.format_response(Ok(Value::Null)))
>      }
> diff --git a/src/api2/helpers.rs b/src/api2/helpers.rs
> index 3dc1befc1..f346b0cca 100644
> --- a/src/api2/helpers.rs
> +++ b/src/api2/helpers.rs
> @@ -2,8 +2,9 @@ use std::path::PathBuf;
>  
>  use anyhow::Error;
>  use futures::stream::TryStreamExt;
> -use hyper::{header, Body, Response, StatusCode};
> +use hyper::{header, Response, StatusCode};
>  
> +use proxmox_http::Body;
>  use proxmox_router::http_bail;
>  
>  pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
> diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
> index 62b447096..e7c6213c1 100644
> --- a/src/api2/node/mod.rs
> +++ b/src/api2/node/mod.rs
> @@ -5,10 +5,11 @@ use std::os::unix::io::AsRawFd;
>  
>  use anyhow::{bail, format_err, Error};
>  use futures::future::{FutureExt, TryFutureExt};
> -use hyper::body::Body;
> +use hyper::body::Incoming;
>  use hyper::http::request::Parts;
>  use hyper::upgrade::Upgraded;
>  use hyper::Request;
> +use hyper_util::rt::TokioIo;
>  use serde_json::{json, Value};
>  use tokio::io::{AsyncBufReadExt, BufReader};
>  
> @@ -267,7 +268,7 @@ pub const API_METHOD_WEBSOCKET: ApiMethod = ApiMethod::new(
>  
>  fn upgrade_to_websocket(
>      parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -306,7 +307,7 @@ fn upgrade_to_websocket(
>              };
>  
>              let local = tokio::net::TcpStream::connect(format!("localhost:{}", port)).await?;
> -            ws.serve_connection(conn, local).await
> +            ws.serve_connection(TokioIo::new(conn), local).await
>          });
>  
>          Ok(response)
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index cad740559..bd6763069 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -3,9 +3,10 @@ use std::io::{BufRead, BufReader};
>  
>  use anyhow::{bail, Error};
>  use futures::FutureExt;
> +use hyper::body::Incoming;
>  use hyper::http::request::Parts;
>  use hyper::http::{header, Response, StatusCode};
> -use hyper::Body;
> +use proxmox_http::Body;
>  use serde_json::{json, Value};
>  
>  use proxmox_async::stream::AsyncReaderStream;
> @@ -321,7 +322,7 @@ pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
>  );
>  fn read_task_log(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -404,7 +405,7 @@ fn read_task_log(
>          Ok(Response::builder()
>              .status(StatusCode::OK)
>              .header(header::CONTENT_TYPE, "application/json")
> -            .body(Body::from(json.to_string()))
> +            .body(json.to_string().into())
>              .unwrap())
>      }
>      .boxed()
> diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
> index 1713f182b..b69000087 100644
> --- a/src/api2/reader/mod.rs
> +++ b/src/api2/reader/mod.rs
> @@ -3,12 +3,15 @@
>  use anyhow::{bail, format_err, Error};
>  use futures::*;
>  use hex::FromHex;
> +use hyper::body::Incoming;
>  use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
>  use hyper::http::request::Parts;
> -use hyper::{Body, Request, Response, StatusCode};
> +use hyper::{Request, Response, StatusCode};
> +use hyper_util::service::TowerToHyperService;
>  use serde::Deserialize;
>  use serde_json::Value;
>  
> +use proxmox_http::Body;
>  use proxmox_rest_server::{H2Service, WorkerTask};
>  use proxmox_router::{
>      http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
> @@ -68,7 +71,7 @@ pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
>  
>  fn upgrade_to_backup_reader_protocol(
>      parts: Parts,
> -    req_body: Body,
> +    req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -190,7 +193,7 @@ fn upgrade_to_backup_reader_protocol(
>                      http.initial_connection_window_size(window_size);
>                      http.max_frame_size(4 * 1024 * 1024);
>  
> -                    http.serve_connection(conn, service)
> +                    http.serve_connection(conn, TowerToHyperService::new(service))
>                          .map_err(Error::from)
>                          .await
>                  };
> @@ -244,7 +247,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
>  
>  fn download_file(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -300,7 +303,7 @@ pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
>  
>  fn download_chunk(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -348,7 +351,7 @@ fn download_chunk(
>  /* this is too slow
>  fn download_chunk_old(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      param: Value,
>      _info: &ApiMethod,
>      rpcenv: Box<dyn RpcEnvironment>,
> @@ -393,7 +396,7 @@ pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
>  
>  fn speedtest(
>      _parts: Parts,
> -    _req_body: Body,
> +    _req_body: Incoming,
>      _param: Value,
>      _info: &ApiMethod,
>      _rpcenv: Box<dyn RpcEnvironment>,
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 7b4187550..438fd9d7e 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -1,12 +1,15 @@
>  use std::future::Future;
>  use std::pin::{pin, Pin};
> +use std::sync::Arc;
>  
>  use anyhow::{bail, Error};
> -use futures::*;
>  use hyper::http::Response;
> -use hyper::{Body, StatusCode};
> +use hyper::StatusCode;
> +use hyper_util::server::graceful::GracefulShutdown;
> +use tokio::net::TcpListener;
>  use tracing::level_filters::LevelFilter;
>  
> +use proxmox_http::Body;
>  use proxmox_lang::try_block;
>  use proxmox_rest_server::{ApiConfig, RestServer};
>  use proxmox_router::RpcEnvironmentType;
> @@ -34,7 +37,7 @@ fn get_index() -> Pin<Box<dyn Future<Output = Response<Body>> + Send>> {
>          Response::builder()
>              .status(StatusCode::OK)
>              .header(hyper::header::CONTENT_TYPE, "text/html")
> -            .body(index.into())
> +            .body(index.to_string().into())
>              .unwrap()
>      })
>  }
> @@ -108,17 +111,28 @@ async fn run() -> Result<(), Error> {
>      // http server future:
>      let server = proxmox_daemon::server::create_daemon(
>          ([127, 0, 0, 1], 82).into(),
> -        move |listener| {
> -            let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
> -
> -            Ok(async {
> +        move |listener: TcpListener| {
> +            Ok(async move {
>                  proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
> -
> -                hyper::Server::builder(incoming)
> -                    .serve(rest_server)
> -                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> -                    .map_err(Error::from)
> -                    .await
> +                let graceful = Arc::new(GracefulShutdown::new());
> +                loop {
> +                    let graceful2 = Arc::clone(&graceful);
> +                    tokio::select! {
> +                        incoming = listener.accept() => {
> +                            let (conn, _) = incoming?;
> +                            let api_service = rest_server.api_service(&conn)?;
> +                            tokio::spawn(async move { api_service.serve(conn, Some(graceful2)).await });
> +                        },
> +                        _shutdown = proxmox_daemon::shutdown_future() => {
> +                            break;
> +                        },
> +                    }
> +                }
> +                if let Some(shutdown) = Arc::into_inner(graceful) {
> +                    log::info!("shutting down..");
> +                    shutdown.shutdown().await
> +                }
> +                Ok(())
>              })
>          },
>          Some(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN),
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index c9a6032e6..8ee537207 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -7,7 +7,8 @@ use futures::*;
>  use hyper::header;
>  use hyper::http::request::Parts;
>  use hyper::http::Response;
> -use hyper::{Body, StatusCode};
> +use hyper::StatusCode;
> +use hyper_util::server::graceful::GracefulShutdown;
>  use tracing::level_filters::LevelFilter;
>  use tracing::{info, warn};
>  use url::form_urlencoded;
> @@ -15,6 +16,7 @@ use url::form_urlencoded;
>  use openssl::ssl::SslAcceptor;
>  use serde_json::{json, Value};
>  
> +use proxmox_http::Body;
>  use proxmox_lang::try_block;
>  use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
>  use proxmox_sys::fs::CreateOptions;
> @@ -289,27 +291,71 @@ async fn run() -> Result<(), Error> {
>      let server = proxmox_daemon::server::create_daemon(
>          ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
>          move |listener| {
> -            let (secure_connections, insecure_connections) =
> +            let (mut secure_connections, mut insecure_connections) =
>                  connections.accept_tls_optional(listener, acceptor);
>  
>              Ok(async {
>                  proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
>  
> -                let secure_server = hyper::Server::builder(secure_connections)
> -                    .serve(rest_server)
> -                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> -                    .map_err(Error::from);
> +                let secure_server = async move {
> +                    let graceful = Arc::new(GracefulShutdown::new());
> +                    loop {
> +                        let graceful2 = Arc::clone(&graceful);
> +                        tokio::select! {
> +                            Some(conn) = secure_connections.next() => {
> +                                match conn {
> +                                    Ok(conn) => {
> +                                        let api_service = rest_server.api_service(&conn)?;
> +                                        tokio::spawn(async move {
> +                                            api_service.serve(conn, Some(graceful2)).await
> +                                        });
> +                                    },
> +                                    Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
> +                                }
> +                            },
> +                            _shutdown = proxmox_daemon::shutdown_future() => {
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if let Some(shutdown) = Arc::into_inner(graceful) {
> +                        shutdown.shutdown().await
> +                    }
> +                    Ok::<(), Error>(())
> +                };
>  
> -                let insecure_server = hyper::Server::builder(insecure_connections)
> -                    .serve(redirector)
> -                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
> -                    .map_err(Error::from);
> +                let insecure_server = async move {
> +                    let graceful = Arc::new(GracefulShutdown::new());
> +                    loop {
> +                        let graceful2 = Arc::clone(&graceful);
> +                        tokio::select! {
> +                            Some(conn) = insecure_connections.next() => {
> +                                match conn {
> +                                    Ok(conn) => {
> +                                        let redirect_service = redirector.redirect_service();
> +                                        tokio::spawn(async move {
> +                                            redirect_service.serve(conn, Some(graceful2)).await
> +                                        });
> +                                    },
> +                                    Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
> +                                }
> +                            },
> +                            _shutdown = proxmox_daemon::shutdown_future() => {
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if let Some(shutdown) = Arc::into_inner(graceful) {
> +                        shutdown.shutdown().await
> +                    }
> +                    Ok::<(), Error>(())
> +                };

The new `secure_server` and `insecure_server` above aren't really that
pretty; perhaps we can clean them up once we got a trait for the return
types of `accept_tls_optional()` and similar :s

In general I'd suggest breaking up the returned coroutine since it's
gotten quite large now, perhaps explicitly defining separate coroutines
for `secure_server` and `insecure_server` each, instead of using `async`
blocks, or explicitly defining the returned coroutine as a whole.
No hard feelings if you don't, though.

>  
>                  let (secure_res, insecure_res) =
>                      try_join!(tokio::spawn(secure_server), tokio::spawn(insecure_server))
>                          .context("failed to complete REST server task")?;
>  
> -                let results = [secure_res, insecure_res];
> +                let results: [Result<(), Error>; 2] = [secure_res, insecure_res];
>  
>                  if results.iter().any(Result::is_err) {
>                      let cat_errors = results
> @@ -321,7 +367,7 @@ async fn run() -> Result<(), Error> {
>                      bail!(cat_errors);
>                  }
>  
> -                Ok(())
> +                Ok::<(), Error>(())
>              })
>          },
>          Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* Re: [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (22 preceding siblings ...)
  2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
@ 2025-04-02 13:53 ` Max Carrara
  2025-04-03 13:32   ` Max Carrara
  2025-04-02 14:39 ` Thomas Lamprecht
  24 siblings, 1 reply; 32+ messages in thread
From: Max Carrara @ 2025-04-02 13:53 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> this RFC series adapts proxmox and proxmox-backup to hyper/http 1.0. I
> also have similar patches for PDM, but those require an update of gloo
> and proxmox-yew-comp and the basic approach is the same as with the
> patches here, and since I expect some feedback to incorporate anyway I
> saved those for the first "proper" version.
>
> hyper 1.0 came with a lot of changes, the most notable ones:
>
> Body is now a trait, not a struct
> - there's a new Incoming impl for incoming requests on the server side,
> and incoming responses on the client side
> - http-body-util has some more impls
> - proxmox-http has a new impl covering our two common use cases, see
> the patch there for details
>
> hyper now doesn't expose tower's Service or tokio's
> AsyncRead/AsyncWrite, but has its own variants for both with
> corresponding wrappers/adapters.
>
> the previous Accept trait for translation from a listening socket to
> connections is gone, an accept loop should be used instead.
>
> the pooling client is moved from hyper to hyper-util. despite its
> "legacy" label we still use it, as we'd need to either implement a ton
> of code ourself or switch to reqwest otherwise.
>
> graceful shutdown of connections is handled differently, so are
> connection ugprades.
>
> I did some rough testing of the usual things without noticing any
> breakage, but I am sure I missed some parts. there's also room for
> improvement for sure, in particular surrounding the rest-server and
> connection accepting part - suggestions welcome!

All the changes seem pretty solid to me; switching to hyper/1.0 was
never really going to be pretty, but at least hyper-util provides a lot
of the compat wrappers for that.

(Note: The proxmox-backup patches only apply with `git am -3` for me.)

I will give this a spin on my PBS dev VM and let it run on there for a
while to see if I notice anything off. I haven't spotted anything sus in
the code, so I don't expect anything to come up, but still.

Also, I haven't really had any concrete ideas yet for the return value
trait of `accept_tls_optional()` and family, but I'll let you know once
(or if) I do. Previously, all of the individual parts could just be
nicely composed together (incoming connection receiver, connection
handler, etc.), so perhaps we could cook up something similar. The new
pattern with the explicitly defined loop makes that a little hard,
though.

Apart from that, there are a few comments inline which can IMO be
addressed in a follow-up (if at all). Nothing major. This otherwise
looks pretty good to me, nice work! Especially the solution with our
"own" `Body` implementation is pretty nice and a good middle ground. 

Should nothing else come up and this be merged without a refresh,
consider:

Reviewed-by: Max Carrara <m.carrara@proxmox.com>


Feel free to ping me for a re-review / more testing, should you refresh
this series.

>
> proxmox workspace:
>
> Fabian Grünbichler (17):
>   http: order feature values
>   http: rate-limited-stream: update to hyper/http 1.0
>   http: adapt MaybeTlsStream to hyper 1.x
>   http: adapt connector to hyper 1.x
>   http: add Body implementation
>   http: adapt simple client to hyper 1.x
>   http: websocket: update to http/hyper 1
>   openid: use http 0.2 to avoid openidconnect update
>   proxmox-login: switch to http 1.x
>   client: switch to hyper/http 1.0
>   metrics: update to hyper/http 1.0
>   acme: switch to http/hyper 1.0
>   proxmox-router: update to hyper 1.0
>   proxmox-rest-server: update to hyper 1.0
>   proxmox-rest-server: fix and extend example
>   proxmox-auth-api: update to hyper 1.0
>   proxmox-acme-api: update to hyper 1.0
>
>  Cargo.toml                                    |   8 +-
>  proxmox-acme-api/Cargo.toml                   |   4 +
>  proxmox-acme-api/src/acme_plugin.rs           |  63 +++++--
>  proxmox-acme/Cargo.toml                       |   3 +-
>  proxmox-acme/src/async_client.rs              |  11 +-
>  proxmox-auth-api/Cargo.toml                   |   2 +
>  proxmox-auth-api/src/api/access.rs            |   4 +-
>  proxmox-client/Cargo.toml                     |   1 +
>  proxmox-client/src/client.rs                  |  22 +--
>  proxmox-http/Cargo.toml                       |  45 +++--
>  proxmox-http/src/body.rs                      | 133 ++++++++++++++
>  proxmox-http/src/client/connector.rs          |  44 +++--
>  proxmox-http/src/client/simple.rs             |  93 +++++++---
>  proxmox-http/src/client/tls.rs                |   2 +-
>  proxmox-http/src/lib.rs                       |   5 +
>  proxmox-http/src/rate_limited_stream.rs       |   2 +-
>  proxmox-http/src/websocket/mod.rs             |   6 +-
>  proxmox-login/Cargo.toml                      |   2 +-
>  proxmox-metrics/src/influxdb/http.rs          |   5 +-
>  proxmox-openid/Cargo.toml                     |   3 +-
>  proxmox-rest-server/Cargo.toml                |   9 +-
>  .../examples/minimal-rest-server.rs           |  48 ++++-
>  proxmox-rest-server/src/api_config.rs         |  44 ++---
>  proxmox-rest-server/src/connection.rs         |  14 +-
>  proxmox-rest-server/src/formatter.rs          |   8 +-
>  proxmox-rest-server/src/h2service.rs          |  15 +-
>  proxmox-rest-server/src/lib.rs                |   2 +-
>  proxmox-rest-server/src/rest.rs               | 164 +++++++++++-------
>  proxmox-router/Cargo.toml                     |   6 +-
>  proxmox-router/src/router.rs                  |  19 +-
>  proxmox-router/src/stream/parsing.rs          |  16 +-
>  31 files changed, 567 insertions(+), 236 deletions(-)
>  create mode 100644 proxmox-http/src/body.rs
>
> proxmox-backup:
>
> Fabian Grünbichler (6):
>   Revert "h2: switch to legacy feature"
>   pbs-client: adapt http client to hyper/http 1.0
>   pbs-client: vsock: adapt to hyper/http 1.0
>   restore daemon: adapt to hyper/http 1.0
>   adapt to hyper/http 1.0
>   adapt examples to hyper/http 1.0
>
>  Cargo.toml                                    | 10 ++-
>  examples/h2client.rs                          |  6 +-
>  examples/h2s-client.rs                        |  6 +-
>  examples/h2s-server.rs                        | 28 +++-----
>  examples/h2server.rs                          | 28 +++-----
>  pbs-client/Cargo.toml                         |  4 +-
>  pbs-client/src/backup_writer.rs               |  8 +--
>  pbs-client/src/http_client.rs                 | 38 +++++-----
>  pbs-client/src/pipe_to_stream.rs              |  2 +-
>  pbs-client/src/vsock_client.rs                | 27 +++----
>  proxmox-backup-client/Cargo.toml              |  1 +
>  proxmox-backup-client/src/snapshot.rs         |  2 +-
>  proxmox-restore-daemon/Cargo.toml             |  2 +
>  proxmox-restore-daemon/src/main.rs            | 24 +++++--
>  .../src/proxmox_restore_daemon/api.rs         |  6 +-
>  .../src/proxmox_restore_daemon/auth.rs        |  5 +-
>  src/acme/client.rs                            |  6 +-
>  src/acme/plugin.rs                            | 62 +++++++++++-----
>  src/api2/admin/datastore.rs                   | 20 +++---
>  src/api2/backup/environment.rs                |  3 +-
>  src/api2/backup/mod.rs                        | 10 +--
>  src/api2/backup/upload_chunk.rs               | 47 +++++++------
>  src/api2/helpers.rs                           |  3 +-
>  src/api2/node/mod.rs                          |  7 +-
>  src/api2/node/tasks.rs                        |  7 +-
>  src/api2/reader/mod.rs                        | 17 +++--
>  src/bin/proxmox-backup-api.rs                 | 40 +++++++----
>  src/bin/proxmox-backup-proxy.rs               | 70 +++++++++++++++----
>  28 files changed, 297 insertions(+), 192 deletions(-)



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* Re: [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0
  2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
                   ` (23 preceding siblings ...)
  2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
@ 2025-04-02 14:39 ` Thomas Lamprecht
  24 siblings, 0 replies; 32+ messages in thread
From: Thomas Lamprecht @ 2025-04-02 14:39 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Fabian Grünbichler

Am 26.03.25 um 16:23 schrieb Fabian Grünbichler:
> this RFC series adapts proxmox and proxmox-backup to hyper/http 1.0. I
> also have similar patches for PDM, but those require an update of gloo
> and proxmox-yew-comp and the basic approach is the same as with the
> patches here, and since I expect some feedback to incorporate anyway I
> saved those for the first "proper" version.

W.r.t. applying something like this I'd wait after the next point releases
but would like to still have this applied to PBS 3 to avoid having to much
difference on such core crates for the time PBS 3 and PBS 4 will be both
supported.

I do not care much if that means that it has to be cherry-picked later as
the master branch already deviated too much due to PBS 4 stuff or if it
gets applied before doing other such changes, but the latter might be
slightly less work.


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

* Re: [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0
  2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
@ 2025-04-03 13:32   ` Max Carrara
  0 siblings, 0 replies; 32+ messages in thread
From: Max Carrara @ 2025-04-03 13:32 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On Wed Apr 2, 2025 at 3:53 PM CEST, Max Carrara wrote:
> On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> > this RFC series adapts proxmox and proxmox-backup to hyper/http 1.0. I
> > also have similar patches for PDM, but those require an update of gloo
> > and proxmox-yew-comp and the basic approach is the same as with the
> > patches here, and since I expect some feedback to incorporate anyway I
> > saved those for the first "proper" version.
> >
> > hyper 1.0 came with a lot of changes, the most notable ones:
> >
> > Body is now a trait, not a struct
> > - there's a new Incoming impl for incoming requests on the server side,
> > and incoming responses on the client side
> > - http-body-util has some more impls
> > - proxmox-http has a new impl covering our two common use cases, see
> > the patch there for details
> >
> > hyper now doesn't expose tower's Service or tokio's
> > AsyncRead/AsyncWrite, but has its own variants for both with
> > corresponding wrappers/adapters.
> >
> > the previous Accept trait for translation from a listening socket to
> > connections is gone, an accept loop should be used instead.
> >
> > the pooling client is moved from hyper to hyper-util. despite its
> > "legacy" label we still use it, as we'd need to either implement a ton
> > of code ourself or switch to reqwest otherwise.
> >
> > graceful shutdown of connections is handled differently, so are
> > connection ugprades.
> >
> > I did some rough testing of the usual things without noticing any
> > breakage, but I am sure I missed some parts. there's also room for
> > improvement for sure, in particular surrounding the rest-server and
> > connection accepting part - suggestions welcome!
>
> All the changes seem pretty solid to me; switching to hyper/1.0 was
> never really going to be pretty, but at least hyper-util provides a lot
> of the compat wrappers for that.
>
> (Note: The proxmox-backup patches only apply with `git am -3` for me.)
>
> I will give this a spin on my PBS dev VM and let it run on there for a
> while to see if I notice anything off. I haven't spotted anything sus in
> the code, so I don't expect anything to come up, but still.
>
> Also, I haven't really had any concrete ideas yet for the return value
> trait of `accept_tls_optional()` and family, but I'll let you know once
> (or if) I do. Previously, all of the individual parts could just be
> nicely composed together (incoming connection receiver, connection
> handler, etc.), so perhaps we could cook up something similar. The new
> pattern with the explicitly defined loop makes that a little hard,
> though.
>
> Apart from that, there are a few comments inline which can IMO be
> addressed in a follow-up (if at all). Nothing major. This otherwise
> looks pretty good to me, nice work! Especially the solution with our
> "own" `Body` implementation is pretty nice and a good middle ground. 
>
> Should nothing else come up and this be merged without a refresh,
> consider:
>
> Reviewed-by: Max Carrara <m.carrara@proxmox.com>
>
>
> Feel free to ping me for a re-review / more testing, should you refresh
> this series.

Gave this a more thorough test run today and didn't encounter anything.

- Ran some backups
- Restored a VM
- Restored an individual file
- Tried out various things in the UI, such as forgetting backup groups,
  viewing logs, running various things (console, verify job, GC job, ..)
- Set up ACME using my personal domain on OVH + LetsEncrypt Staging

Everything seems to work just fine. I'll keep the binaries built with
this series on my VM for a couple more days in case anything pops up,
but everything I have tried so far seems to work, even ACME.

So, unless something comes up, consider:

Reviewed-by: Max Carrara <m.carrara@proxmox.com>
Tested-by: Max Carrara <m.carrara@proxmox.com>

Just to note again, feel free to ping me for a re-review / more testing,
should you refresh this series.

Nice work! 🫡

>
> >
> > proxmox workspace:
> >
> > Fabian Grünbichler (17):
> >   http: order feature values
> >   http: rate-limited-stream: update to hyper/http 1.0
> >   http: adapt MaybeTlsStream to hyper 1.x
> >   http: adapt connector to hyper 1.x
> >   http: add Body implementation
> >   http: adapt simple client to hyper 1.x
> >   http: websocket: update to http/hyper 1
> >   openid: use http 0.2 to avoid openidconnect update
> >   proxmox-login: switch to http 1.x
> >   client: switch to hyper/http 1.0
> >   metrics: update to hyper/http 1.0
> >   acme: switch to http/hyper 1.0
> >   proxmox-router: update to hyper 1.0
> >   proxmox-rest-server: update to hyper 1.0
> >   proxmox-rest-server: fix and extend example
> >   proxmox-auth-api: update to hyper 1.0
> >   proxmox-acme-api: update to hyper 1.0
> >
> >  Cargo.toml                                    |   8 +-
> >  proxmox-acme-api/Cargo.toml                   |   4 +
> >  proxmox-acme-api/src/acme_plugin.rs           |  63 +++++--
> >  proxmox-acme/Cargo.toml                       |   3 +-
> >  proxmox-acme/src/async_client.rs              |  11 +-
> >  proxmox-auth-api/Cargo.toml                   |   2 +
> >  proxmox-auth-api/src/api/access.rs            |   4 +-
> >  proxmox-client/Cargo.toml                     |   1 +
> >  proxmox-client/src/client.rs                  |  22 +--
> >  proxmox-http/Cargo.toml                       |  45 +++--
> >  proxmox-http/src/body.rs                      | 133 ++++++++++++++
> >  proxmox-http/src/client/connector.rs          |  44 +++--
> >  proxmox-http/src/client/simple.rs             |  93 +++++++---
> >  proxmox-http/src/client/tls.rs                |   2 +-
> >  proxmox-http/src/lib.rs                       |   5 +
> >  proxmox-http/src/rate_limited_stream.rs       |   2 +-
> >  proxmox-http/src/websocket/mod.rs             |   6 +-
> >  proxmox-login/Cargo.toml                      |   2 +-
> >  proxmox-metrics/src/influxdb/http.rs          |   5 +-
> >  proxmox-openid/Cargo.toml                     |   3 +-
> >  proxmox-rest-server/Cargo.toml                |   9 +-
> >  .../examples/minimal-rest-server.rs           |  48 ++++-
> >  proxmox-rest-server/src/api_config.rs         |  44 ++---
> >  proxmox-rest-server/src/connection.rs         |  14 +-
> >  proxmox-rest-server/src/formatter.rs          |   8 +-
> >  proxmox-rest-server/src/h2service.rs          |  15 +-
> >  proxmox-rest-server/src/lib.rs                |   2 +-
> >  proxmox-rest-server/src/rest.rs               | 164 +++++++++++-------
> >  proxmox-router/Cargo.toml                     |   6 +-
> >  proxmox-router/src/router.rs                  |  19 +-
> >  proxmox-router/src/stream/parsing.rs          |  16 +-
> >  31 files changed, 567 insertions(+), 236 deletions(-)
> >  create mode 100644 proxmox-http/src/body.rs
> >
> > proxmox-backup:
> >
> > Fabian Grünbichler (6):
> >   Revert "h2: switch to legacy feature"
> >   pbs-client: adapt http client to hyper/http 1.0
> >   pbs-client: vsock: adapt to hyper/http 1.0
> >   restore daemon: adapt to hyper/http 1.0
> >   adapt to hyper/http 1.0
> >   adapt examples to hyper/http 1.0
> >
> >  Cargo.toml                                    | 10 ++-
> >  examples/h2client.rs                          |  6 +-
> >  examples/h2s-client.rs                        |  6 +-
> >  examples/h2s-server.rs                        | 28 +++-----
> >  examples/h2server.rs                          | 28 +++-----
> >  pbs-client/Cargo.toml                         |  4 +-
> >  pbs-client/src/backup_writer.rs               |  8 +--
> >  pbs-client/src/http_client.rs                 | 38 +++++-----
> >  pbs-client/src/pipe_to_stream.rs              |  2 +-
> >  pbs-client/src/vsock_client.rs                | 27 +++----
> >  proxmox-backup-client/Cargo.toml              |  1 +
> >  proxmox-backup-client/src/snapshot.rs         |  2 +-
> >  proxmox-restore-daemon/Cargo.toml             |  2 +
> >  proxmox-restore-daemon/src/main.rs            | 24 +++++--
> >  .../src/proxmox_restore_daemon/api.rs         |  6 +-
> >  .../src/proxmox_restore_daemon/auth.rs        |  5 +-
> >  src/acme/client.rs                            |  6 +-
> >  src/acme/plugin.rs                            | 62 +++++++++++-----
> >  src/api2/admin/datastore.rs                   | 20 +++---
> >  src/api2/backup/environment.rs                |  3 +-
> >  src/api2/backup/mod.rs                        | 10 +--
> >  src/api2/backup/upload_chunk.rs               | 47 +++++++------
> >  src/api2/helpers.rs                           |  3 +-
> >  src/api2/node/mod.rs                          |  7 +-
> >  src/api2/node/tasks.rs                        |  7 +-
> >  src/api2/reader/mod.rs                        | 17 +++--
> >  src/bin/proxmox-backup-api.rs                 | 40 +++++++----
> >  src/bin/proxmox-backup-proxy.rs               | 70 +++++++++++++++----
> >  28 files changed, 297 insertions(+), 192 deletions(-)
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 32+ messages in thread

end of thread, other threads:[~2025-04-03 13:33 UTC | newest]

Thread overview: 32+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32   ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal