all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH v1 5/6] s3-client: account for downloaded bytes in incoming response body
Date: Mon,  9 Feb 2026 10:15:21 +0100	[thread overview]
Message-ID: <20260209091533.156902-6-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260209091533.156902-1-c.ebner@proxmox.com>

Keep track of the downloaded contents in get object responses by
accounting of passing bytes when collecting the incoming body.

To do so, the shared request counters are stored via an atomic
reference counter and cloned along to the response reader and
a new `Content` type which wraps `Incoming` and implements the
`Body`, where the accounting happens.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 proxmox-s3-client/src/client.rs          | 16 ++---
 proxmox-s3-client/src/response_reader.rs | 75 ++++++++++++++++++++++--
 2 files changed, 77 insertions(+), 14 deletions(-)

diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs
index 63adab29..1cb94698 100644
--- a/proxmox-s3-client/src/client.rs
+++ b/proxmox-s3-client/src/client.rs
@@ -490,7 +490,7 @@ impl S3Client {
             .uri(self.build_uri("/", &[])?)
             .body(Body::empty())?;
         let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
-        let response_reader = ResponseReader::new(response);
+        let response_reader = ResponseReader::new(response, self.request_counters.clone());
         response_reader.list_buckets_response().await
     }
 
@@ -506,7 +506,7 @@ impl S3Client {
             .uri(self.build_uri(&object_key, &[])?)
             .body(Body::empty())?;
         let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
-        let response_reader = ResponseReader::new(response);
+        let response_reader = ResponseReader::new(response, self.request_counters.clone());
         response_reader.head_object_response().await
     }
 
@@ -523,7 +523,7 @@ impl S3Client {
             .body(Body::empty())?;
 
         let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
-        let response_reader = ResponseReader::new(response);
+        let response_reader = ResponseReader::new(response, self.request_counters.clone());
         response_reader.get_object_response().await
     }
 
@@ -553,7 +553,7 @@ impl S3Client {
             .body(Body::empty())?;
 
         let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
-        let response_reader = ResponseReader::new(response);
+        let response_reader = ResponseReader::new(response, self.request_counters.clone());
         response_reader.list_objects_v2_response().await
     }
 
@@ -590,7 +590,7 @@ impl S3Client {
         let request = request.body(object_data)?;
 
         let response = self.send(request, timeout).await?;
-        let response_reader = ResponseReader::new(response);
+        let response_reader = ResponseReader::new(response, self.request_counters.clone());
         response_reader.put_object_response().await
     }
 
@@ -604,7 +604,7 @@ impl S3Client {
             .body(Body::empty())?;
 
         let response = self.send(request, None).await?;
-        let response_reader = ResponseReader::new(response);
+        let response_reader = ResponseReader::new(response, self.request_counters.clone());
         response_reader.delete_object_response().await
     }
 
@@ -631,7 +631,7 @@ impl S3Client {
             .body(Body::from(body))?;
 
         let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
-        let response_reader = ResponseReader::new(response);
+        let response_reader = ResponseReader::new(response, self.request_counters.clone());
         response_reader.delete_objects_response().await
     }
 
@@ -662,7 +662,7 @@ impl S3Client {
             .body(Body::empty())?;
 
         let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
-        let response_reader = ResponseReader::new(response);
+        let response_reader = ResponseReader::new(response, self.request_counters.clone());
         response_reader.copy_object_response().await
     }
 
diff --git a/proxmox-s3-client/src/response_reader.rs b/proxmox-s3-client/src/response_reader.rs
index e03b3bb0..5ab82efe 100644
--- a/proxmox-s3-client/src/response_reader.rs
+++ b/proxmox-s3-client/src/response_reader.rs
@@ -1,19 +1,24 @@
+use std::pin::Pin;
 use std::str::FromStr;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::task::{Context as Ctx, Poll};
 
 use anyhow::{anyhow, bail, Context, Error};
 use http_body_util::BodyExt;
-use hyper::body::{Bytes, Incoming};
+use hyper::body::{Body, Bytes, Frame, Incoming, SizeHint};
 use hyper::header::HeaderName;
 use hyper::http::header;
 use hyper::http::StatusCode;
 use hyper::{HeaderMap, Response};
 use serde::Deserialize;
 
-use crate::{HttpDate, LastModifiedTimestamp, S3ObjectKey};
+use crate::{HttpDate, LastModifiedTimestamp, S3ObjectKey, SharedRequestCounters};
 
 /// Response reader to check S3 api response status codes and parse response body, if any.
 pub(crate) struct ResponseReader {
     response: Response<Incoming>,
+    request_counters: Option<Arc<SharedRequestCounters>>,
 }
 
 #[derive(Debug)]
@@ -105,7 +110,7 @@ pub struct GetObjectResponse {
     /// Last modified http header.
     pub last_modified: HttpDate,
     /// Object content in http response body.
-    pub content: Incoming,
+    pub content: Content,
 }
 
 #[derive(Debug)]
@@ -226,10 +231,64 @@ pub struct Bucket {
     pub creation_date: LastModifiedTimestamp,
 }
 
+/// Response content stream
+pub struct Content {
+    incoming: Incoming,
+    request_counters: Option<Arc<SharedRequestCounters>>,
+}
+
+impl Body for Content {
+    type Data = Bytes;
+    type Error = hyper::Error;
+
+    fn poll_frame(
+        mut self: Pin<&mut Self>,
+        cx: &mut Ctx<'_>,
+    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+        let mut this = self.as_mut();
+
+        let incoming = Pin::new(&mut this.incoming).poll_frame(cx);
+
+        if let Some(counter) = self.request_counters.as_ref() {
+            match incoming {
+                Poll::Pending => Poll::Pending,
+                Poll::Ready(f) => {
+                    if let Some(Ok(frame)) = f {
+                        let bytes = frame
+                            .data_ref()
+                            .map(|bytes| bytes.len() as u64)
+                            .unwrap_or(0);
+                        let _ = counter.add_download_traffic(bytes, Ordering::SeqCst);
+                        Poll::Ready(Some(Ok(frame)))
+                    } else {
+                        Poll::Ready(None)
+                    }
+                }
+            }
+        } else {
+            return incoming;
+        }
+    }
+
+    fn is_end_stream(&self) -> bool {
+        self.incoming.is_end_stream()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        self.incoming.size_hint()
+    }
+}
+
 impl ResponseReader {
     /// Create a new response reader to parse given response.
-    pub(crate) fn new(response: Response<Incoming>) -> Self {
-        Self { response }
+    pub(crate) fn new(
+        response: Response<Incoming>,
+        request_counters: Option<Arc<SharedRequestCounters>>,
+    ) -> Self {
+        Self {
+            response,
+            request_counters,
+        }
     }
 
     /// Read and parse the list object v2 response.
@@ -299,7 +358,11 @@ impl ResponseReader {
     /// Returns with error if the object is not accessible, an unexpected status code is encountered
     /// or the response headers or body cannot be parsed.
     pub(crate) async fn get_object_response(self) -> Result<Option<GetObjectResponse>, Error> {
-        let (parts, content) = self.response.into_parts();
+        let (parts, incoming) = self.response.into_parts();
+        let content = Content {
+            incoming,
+            request_counters: self.request_counters.clone(),
+        };
 
         match parts.status {
             StatusCode::OK => (),
-- 
2.47.3





  parent reply	other threads:[~2026-02-09  9:15 UTC|newest]

Thread overview: 26+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-02-09  9:15 [PATCH v1 00/17] partially fix #6563: add s3 request and traffic counter statistics Christian Ebner
2026-02-09  9:15 ` [PATCH v1 1/6] shared-memory: drop check for mmap file being located on tmpfs Christian Ebner
2026-02-09  9:15 ` [PATCH v1 2/6] s3-client: add persistent shared request counters for client Christian Ebner
2026-02-11 12:13   ` Robert Obkircher
2026-02-11 12:41     ` Christian Ebner
2026-02-12  9:55       ` Robert Obkircher
2026-02-12 10:19         ` Christian Ebner
2026-02-13 13:37         ` Christian Ebner
2026-02-09  9:15 ` [PATCH v1 3/6] s3-client: add counters for upload/download traffic Christian Ebner
2026-02-09  9:15 ` [PATCH v1 4/6] s3-client: account for upload traffic on successful request sending Christian Ebner
2026-02-09  9:15 ` Christian Ebner [this message]
2026-02-09  9:15 ` [PATCH v1 6/6] pbs-api-types: define api type for s3 request statistics Christian Ebner
2026-02-09  9:15 ` [PATCH v1 01/11] datastore: collect request statistics for s3 backed datastores Christian Ebner
2026-02-09  9:15 ` [PATCH v1 02/11] datastore: expose request counters " Christian Ebner
2026-02-09  9:15 ` [PATCH v1 03/11] api: s3: add endpoint to reset s3 request counters Christian Ebner
2026-02-09  9:15 ` [PATCH v1 04/11] bin: s3: expose request counter reset method as cli command Christian Ebner
2026-02-09  9:15 ` [PATCH v1 05/11] datastore: add helper method to get datastore backend type Christian Ebner
2026-02-09  9:15 ` [PATCH v1 06/11] ui: improve variable name indirectly fixing typo Christian Ebner
2026-02-09  9:15 ` [PATCH v1 07/11] ui: datastore summary: move store to be part of summary panel Christian Ebner
2026-02-09  9:15 ` [PATCH v1 08/11] ui: expose s3 request counter statistics in the datastore summary Christian Ebner
2026-02-09  9:15 ` [PATCH v1 09/11] metrics: collect s3 datastore statistics as rrd metrics Christian Ebner
2026-02-11 16:29   ` Christian Ebner
2026-02-09  9:15 ` [PATCH v1 10/11] api: admin: expose s3 statistics in datastore rrd data Christian Ebner
2026-02-09  9:15 ` [PATCH v1 11/11] partially fix #6563: ui: expose s3 rrd charts in datastore summary Christian Ebner
2026-02-09  9:39 ` [PATCH v1 00/17] partially fix #6563: add s3 request and traffic counter statistics Christian Ebner
2026-02-16 12:15 ` superseded: " Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260209091533.156902-6-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal