public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation
Date: Wed, 26 Mar 2025 16:23:09 +0100	[thread overview]
Message-ID: <20250326152327.332179-6-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20250326152327.332179-1-f.gruenbichler@proxmox.com>

hyper/http 1.0 now only have a Body trait and some implementations for
specific use cases. following reqwest's lead (and copying some parts of
its implementation), implement our own Body struct for the two common
use cases:
- a body instance containing the full body data as Bytes
- a streaming body instance

together with the most common helper methods (empty body, convert, wrap
existing stream as body) this should make the rest of the upgrade fairly
straight-forward.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 proxmox-http/Cargo.toml  |  10 +++
 proxmox-http/src/body.rs | 133 +++++++++++++++++++++++++++++++++++++++
 proxmox-http/src/lib.rs  |   5 ++
 3 files changed, 148 insertions(+)
 create mode 100644 proxmox-http/src/body.rs

diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 4ec142c9..1fbc70a8 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -40,6 +40,15 @@ flate2 = { workspace = true }
 [features]
 default = []
 
+body = [
+    "dep:bytes",
+    "dep:futures",
+    "dep:http-body",
+    "dep:http-body-util",
+    "dep:hyper",
+    "dep:sync_wrapper",
+    "sync_wrapper?/futures",
+]
 rate-limiter = ["dep:hyper"]
 rate-limited-stream = [
     "dep:tokio",
@@ -67,6 +76,7 @@ client = [
     "hyper-util?/http1",
     "hyper-util?/tokio",
     "tokio?/io-util",
+    "body",
     "http-helpers",
     "rate-limited-stream",
 ]
diff --git a/proxmox-http/src/body.rs b/proxmox-http/src/body.rs
new file mode 100644
index 00000000..3eb17355
--- /dev/null
+++ b/proxmox-http/src/body.rs
@@ -0,0 +1,133 @@
+use std::{pin::Pin, task::Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+
+use futures::ready;
+use http_body_util::combinators::BoxBody;
+use hyper::body::{Body as HyperBody, Frame, SizeHint};
+
+// Partially copied and heavily based on reqwest 0.12 Body implementation from src/async_impl/body.rs
+// Copyright (c) 2016-2025 Sean McArthur
+
+/// Custom implementation of hyper::body::Body supporting either a "full" body that can return its
+/// contents as byte sequence in one go, or "streaming" body that can be polled.
+pub struct Body {
+    inner: InnerBody,
+}
+
+enum InnerBody {
+    Full(Bytes),
+    Streaming(BoxBody<Bytes, Error>),
+}
+
+impl Body {
+    /// Shortcut for creating an empty body instance with no data.
+    pub fn empty() -> Self {
+        Bytes::new().into()
+    }
+
+    /// Returns the body contents if it is a "full" body, None otherwise.
+    pub fn as_bytes(&self) -> Option<&[u8]> {
+        match self.inner {
+            InnerBody::Full(ref bytes) => Some(bytes),
+            InnerBody::Streaming(_) => None,
+        }
+    }
+
+    pub fn wrap_stream<S>(stream: S) -> Body
+    where
+        S: futures::stream::TryStream + Send + 'static,
+
+        S::Error: Into<Error>,
+
+        Bytes: From<S::Ok>,
+    {
+        Body::stream(stream)
+    }
+
+    pub(crate) fn stream<S>(stream: S) -> Body
+    where
+        S: futures::stream::TryStream + Send + 'static,
+
+        S::Error: Into<Error>,
+
+        Bytes: From<S::Ok>,
+    {
+        use futures::TryStreamExt;
+
+        use http_body::Frame;
+
+        use http_body_util::StreamBody;
+
+        let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
+            stream
+                .map_ok(|d| Frame::data(Bytes::from(d)))
+                .map_err(Into::into),
+        )));
+
+        Body {
+            inner: InnerBody::Streaming(body),
+        }
+    }
+}
+
+impl HyperBody for Body {
+    type Data = Bytes;
+
+    type Error = Error;
+
+    fn poll_frame(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
+        match self.inner {
+            InnerBody::Full(ref mut bytes) => {
+                let res = bytes.split_off(0);
+                if res.is_empty() {
+                    return Poll::Ready(None);
+                } else {
+                    return Poll::Ready(Some(Ok(Frame::data(res))));
+                }
+            }
+            InnerBody::Streaming(ref mut body) => Poll::Ready(
+                ready!(Pin::new(body).poll_frame(cx))
+                    .map(|opt_chunk| opt_chunk.map_err(Error::from)),
+            ),
+        }
+    }
+
+    fn is_end_stream(&self) -> bool {
+        match self.inner {
+            InnerBody::Full(ref bytes) => bytes.is_empty(),
+            InnerBody::Streaming(ref box_body) => box_body.is_end_stream(),
+        }
+    }
+
+    fn size_hint(&self) -> hyper::body::SizeHint {
+        match self.inner {
+            InnerBody::Full(ref bytes) => SizeHint::with_exact(bytes.len() as u64),
+            InnerBody::Streaming(ref box_body) => box_body.size_hint(),
+        }
+    }
+}
+
+impl From<Bytes> for Body {
+    fn from(value: Bytes) -> Self {
+        Self {
+            inner: InnerBody::Full(value),
+        }
+    }
+}
+
+impl From<Vec<u8>> for Body {
+    fn from(value: Vec<u8>) -> Self {
+        Bytes::from(value).into()
+    }
+}
+
+impl From<String> for Body {
+    fn from(value: String) -> Self {
+        Bytes::copy_from_slice(value.as_bytes()).into()
+    }
+}
diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 4770aaf4..8b6953b0 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -35,3 +35,8 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
 mod rate_limited_stream;
 #[cfg(feature = "rate-limited-stream")]
 pub use rate_limited_stream::RateLimitedStream;
+
+#[cfg(feature = "body")]
+mod body;
+#[cfg(feature = "body")]
+pub use body::Body;
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

  parent reply	other threads:[~2025-03-26 15:23 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-03-26 15:23 [pbs-devel] [RFC proxmox 00/23] upgrade to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 01/17] http: order feature values Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 02/17] http: rate-limited-stream: update to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 03/17] http: adapt MaybeTlsStream to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 04/17] http: adapt connector " Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` Fabian Grünbichler [this message]
2025-04-02 13:31   ` [pbs-devel] [PATCH proxmox 05/17] http: add Body implementation Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 07/17] http: websocket: update to http/hyper 1 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 08/17] openid: use http 0.2 to avoid openidconnect update Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 09/17] proxmox-login: switch to http 1.x Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 10/17] client: switch to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 11/17] metrics: update " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 12/17] acme: switch to http/hyper 1.0 Fabian Grünbichler
2025-04-02 13:31   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 13/17] proxmox-router: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: " Fabian Grünbichler
2025-04-02 13:34   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 15/17] proxmox-rest-server: fix and extend example Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 16/17] proxmox-auth-api: update to hyper 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox 17/17] proxmox-acme-api: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 1/6] Revert "h2: switch to legacy feature" Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-client: adapt http client to hyper/http 1.0 Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 4/6] restore daemon: " Fabian Grünbichler
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 5/6] " Fabian Grünbichler
2025-04-02 13:36   ` Max Carrara
2025-03-26 15:23 ` [pbs-devel] [PATCH proxmox-backup 6/6] adapt examples " Fabian Grünbichler
2025-04-02 13:53 ` [pbs-devel] [RFC proxmox 00/23] upgrade " Max Carrara
2025-04-03 13:32   ` Max Carrara
2025-04-02 14:39 ` Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250326152327.332179-6-f.gruenbichler@proxmox.com \
    --to=f.gruenbichler@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal