From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation
Date: Wed, 26 Mar 2025 16:23:09 +0100 [thread overview]
Message-ID: <20250326152327.332179-6-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-1-f.gruenbichler@proxmox.com>
hyper/http 1.0 now only have a Body trait and some implementations for
specific use cases. following reqwest's lead (and copying some parts of
its implementation), implement our own Body struct for the two common
use cases:
- a body instance containing the full body data as Bytes
- a streaming body instance
together with the most common helper methods (empty body, convert, wrap
existing stream as body) this should make the rest of the upgrade fairly
straight-forward.
Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
proxmox-http/Cargo.toml | 10 +++
proxmox-http/src/body.rs | 133 +++++++++++++++++++++++++++++++++++++++
proxmox-http/src/lib.rs | 5 ++
3 files changed, 148 insertions(+)
create mode 100644 proxmox-http/src/body.rs
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 4ec142c9..1fbc70a8 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -40,6 +40,15 @@ flate2 = { workspace = true }
[features]
default = []
+body = [
+ "dep:bytes",
+ "dep:futures",
+ "dep:http-body",
+ "dep:http-body-util",
+ "dep:hyper",
+ "dep:sync_wrapper",
+ "sync_wrapper?/futures",
+]
rate-limiter = ["dep:hyper"]
rate-limited-stream = [
"dep:tokio",
@@ -67,6 +76,7 @@ client = [
"hyper-util?/http1",
"hyper-util?/tokio",
"tokio?/io-util",
+ "body",
"http-helpers",
"rate-limited-stream",
]
diff --git a/proxmox-http/src/body.rs b/proxmox-http/src/body.rs
new file mode 100644
index 00000000..3eb17355
--- /dev/null
+++ b/proxmox-http/src/body.rs
@@ -0,0 +1,133 @@
+use std::{pin::Pin, task::Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+
+use futures::ready;
+use http_body_util::combinators::BoxBody;
+use hyper::body::{Body as HyperBody, Frame, SizeHint};
+
+// Partially copied and heavily based on reqwest 0.12 Body implementation from src/async_impl/body.rs
+// Copyright (c) 2016-2025 Sean McArthur
+
+/// Custom implementation of hyper::body::Body supporting either a "full" body that can return its
+/// contents as byte sequence in one go, or "streaming" body that can be polled.
+pub struct Body {
+ inner: InnerBody,
+}
+
+enum InnerBody {
+ Full(Bytes),
+ Streaming(BoxBody<Bytes, Error>),
+}
+
+impl Body {
+ /// Shortcut for creating an empty body instance with no data.
+ pub fn empty() -> Self {
+ Bytes::new().into()
+ }
+
+ /// Returns the body contents if it is a "full" body, None otherwise.
+ pub fn as_bytes(&self) -> Option<&[u8]> {
+ match self.inner {
+ InnerBody::Full(ref bytes) => Some(bytes),
+ InnerBody::Streaming(_) => None,
+ }
+ }
+
+ pub fn wrap_stream<S>(stream: S) -> Body
+ where
+ S: futures::stream::TryStream + Send + 'static,
+
+ S::Error: Into<Error>,
+
+ Bytes: From<S::Ok>,
+ {
+ Body::stream(stream)
+ }
+
+ pub(crate) fn stream<S>(stream: S) -> Body
+ where
+ S: futures::stream::TryStream + Send + 'static,
+
+ S::Error: Into<Error>,
+
+ Bytes: From<S::Ok>,
+ {
+ use futures::TryStreamExt;
+
+ use http_body::Frame;
+
+ use http_body_util::StreamBody;
+
+ let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
+ stream
+ .map_ok(|d| Frame::data(Bytes::from(d)))
+ .map_err(Into::into),
+ )));
+
+ Body {
+ inner: InnerBody::Streaming(body),
+ }
+ }
+}
+
+impl HyperBody for Body {
+ type Data = Bytes;
+
+ type Error = Error;
+
+ fn poll_frame(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
+ match self.inner {
+ InnerBody::Full(ref mut bytes) => {
+ let res = bytes.split_off(0);
+ if res.is_empty() {
+ return Poll::Ready(None);
+ } else {
+ return Poll::Ready(Some(Ok(Frame::data(res))));
+ }
+ }
+ InnerBody::Streaming(ref mut body) => Poll::Ready(
+ ready!(Pin::new(body).poll_frame(cx))
+ .map(|opt_chunk| opt_chunk.map_err(Error::from)),
+ ),
+ }
+ }
+
+ fn is_end_stream(&self) -> bool {
+ match self.inner {
+ InnerBody::Full(ref bytes) => bytes.is_empty(),
+ InnerBody::Streaming(ref box_body) => box_body.is_end_stream(),
+ }
+ }
+
+ fn size_hint(&self) -> hyper::body::SizeHint {
+ match self.inner {
+ InnerBody::Full(ref bytes) => SizeHint::with_exact(bytes.len() as u64),
+ InnerBody::Streaming(ref box_body) => box_body.size_hint(),
+ }
+ }
+}
+
+impl From<Bytes> for Body {
+ fn from(value: Bytes) -> Self {
+ Self {
+ inner: InnerBody::Full(value),
+ }
+ }
+}
+
+impl From<Vec<u8>> for Body {
+ fn from(value: Vec<u8>) -> Self {
+ Bytes::from(value).into()
+ }
+}
+
+impl From<String> for Body {
+ fn from(value: String) -> Self {
+ Bytes::copy_from_slice(value.as_bytes()).into()
+ }
+}
diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 4770aaf4..8b6953b0 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -35,3 +35,8 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
mod rate_limited_stream;
#[cfg(feature = "rate-limited-stream")]
pub use rate_limited_stream::RateLimitedStream;
+
+#[cfg(feature = "body")]
+mod body;
+#[cfg(feature = "body")]
+pub use body::Body;
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-03-26 15:23 UTC|newest]
Thread overview: 32+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` Fabian Grünbichler [this message]
2025-04-02 13:31 ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36 ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32 ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250326152327.332179-6-f.gruenbichler@proxmox.com \
--to=f.gruenbichler@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.