From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox v2 5/6] s3-client: account for downloaded bytes in incoming response body
Date: Mon, 16 Feb 2026 13:13:53 +0100 [thread overview]
Message-ID: <20260216121406.99617-6-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260216121406.99617-1-c.ebner@proxmox.com>
Keep track of the downloaded contents in get object responses by
accounting of passing bytes when collecting the incoming body.
To do so, the shared request counters are stored via an atomic
reference counter and cloned along to the response reader and
a new `Content` type which wraps `Incoming` and implements the
`Body`, where the accounting happens.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 1:
- adapt to relaxed atomic ordering requirement
proxmox-s3-client/src/client.rs | 16 ++---
proxmox-s3-client/src/response_reader.rs | 75 ++++++++++++++++++++++--
2 files changed, 77 insertions(+), 14 deletions(-)
diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs
index 549fb904..f115e06a 100644
--- a/proxmox-s3-client/src/client.rs
+++ b/proxmox-s3-client/src/client.rs
@@ -490,7 +490,7 @@ impl S3Client {
.uri(self.build_uri("/", &[])?)
.body(Body::empty())?;
let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
- let response_reader = ResponseReader::new(response);
+ let response_reader = ResponseReader::new(response, self.request_counters.clone());
response_reader.list_buckets_response().await
}
@@ -506,7 +506,7 @@ impl S3Client {
.uri(self.build_uri(&object_key, &[])?)
.body(Body::empty())?;
let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
- let response_reader = ResponseReader::new(response);
+ let response_reader = ResponseReader::new(response, self.request_counters.clone());
response_reader.head_object_response().await
}
@@ -523,7 +523,7 @@ impl S3Client {
.body(Body::empty())?;
let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
- let response_reader = ResponseReader::new(response);
+ let response_reader = ResponseReader::new(response, self.request_counters.clone());
response_reader.get_object_response().await
}
@@ -553,7 +553,7 @@ impl S3Client {
.body(Body::empty())?;
let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
- let response_reader = ResponseReader::new(response);
+ let response_reader = ResponseReader::new(response, self.request_counters.clone());
response_reader.list_objects_v2_response().await
}
@@ -590,7 +590,7 @@ impl S3Client {
let request = request.body(object_data)?;
let response = self.send(request, timeout).await?;
- let response_reader = ResponseReader::new(response);
+ let response_reader = ResponseReader::new(response, self.request_counters.clone());
response_reader.put_object_response().await
}
@@ -604,7 +604,7 @@ impl S3Client {
.body(Body::empty())?;
let response = self.send(request, None).await?;
- let response_reader = ResponseReader::new(response);
+ let response_reader = ResponseReader::new(response, self.request_counters.clone());
response_reader.delete_object_response().await
}
@@ -631,7 +631,7 @@ impl S3Client {
.body(Body::from(body))?;
let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
- let response_reader = ResponseReader::new(response);
+ let response_reader = ResponseReader::new(response, self.request_counters.clone());
response_reader.delete_objects_response().await
}
@@ -662,7 +662,7 @@ impl S3Client {
.body(Body::empty())?;
let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?;
- let response_reader = ResponseReader::new(response);
+ let response_reader = ResponseReader::new(response, self.request_counters.clone());
response_reader.copy_object_response().await
}
diff --git a/proxmox-s3-client/src/response_reader.rs b/proxmox-s3-client/src/response_reader.rs
index e03b3bb0..fa03f045 100644
--- a/proxmox-s3-client/src/response_reader.rs
+++ b/proxmox-s3-client/src/response_reader.rs
@@ -1,19 +1,24 @@
+use std::pin::Pin;
use std::str::FromStr;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::task::{Context as Ctx, Poll};
use anyhow::{anyhow, bail, Context, Error};
use http_body_util::BodyExt;
-use hyper::body::{Bytes, Incoming};
+use hyper::body::{Body, Bytes, Frame, Incoming, SizeHint};
use hyper::header::HeaderName;
use hyper::http::header;
use hyper::http::StatusCode;
use hyper::{HeaderMap, Response};
use serde::Deserialize;
-use crate::{HttpDate, LastModifiedTimestamp, S3ObjectKey};
+use crate::{HttpDate, LastModifiedTimestamp, S3ObjectKey, SharedRequestCounters};
/// Response reader to check S3 api response status codes and parse response body, if any.
pub(crate) struct ResponseReader {
response: Response<Incoming>,
+ request_counters: Option<Arc<SharedRequestCounters>>,
}
#[derive(Debug)]
@@ -105,7 +110,7 @@ pub struct GetObjectResponse {
/// Last modified http header.
pub last_modified: HttpDate,
/// Object content in http response body.
- pub content: Incoming,
+ pub content: Content,
}
#[derive(Debug)]
@@ -226,10 +231,64 @@ pub struct Bucket {
pub creation_date: LastModifiedTimestamp,
}
+/// Response content stream
+pub struct Content {
+ incoming: Incoming,
+ request_counters: Option<Arc<SharedRequestCounters>>,
+}
+
+impl Body for Content {
+ type Data = Bytes;
+ type Error = hyper::Error;
+
+ fn poll_frame(
+ mut self: Pin<&mut Self>,
+ cx: &mut Ctx<'_>,
+ ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ let mut this = self.as_mut();
+
+ let incoming = Pin::new(&mut this.incoming).poll_frame(cx);
+
+ if let Some(counter) = self.request_counters.as_ref() {
+ match incoming {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(f) => {
+ if let Some(Ok(frame)) = f {
+ let bytes = frame
+ .data_ref()
+ .map(|bytes| bytes.len() as u64)
+ .unwrap_or(0);
+ let _ = counter.add_download_traffic(bytes, Ordering::AcqRel);
+ Poll::Ready(Some(Ok(frame)))
+ } else {
+ Poll::Ready(None)
+ }
+ }
+ }
+ } else {
+ return incoming;
+ }
+ }
+
+ fn is_end_stream(&self) -> bool {
+ self.incoming.is_end_stream()
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ self.incoming.size_hint()
+ }
+}
+
impl ResponseReader {
/// Create a new response reader to parse given response.
- pub(crate) fn new(response: Response<Incoming>) -> Self {
- Self { response }
+ pub(crate) fn new(
+ response: Response<Incoming>,
+ request_counters: Option<Arc<SharedRequestCounters>>,
+ ) -> Self {
+ Self {
+ response,
+ request_counters,
+ }
}
/// Read and parse the list object v2 response.
@@ -299,7 +358,11 @@ impl ResponseReader {
/// Returns with error if the object is not accessible, an unexpected status code is encountered
/// or the response headers or body cannot be parsed.
pub(crate) async fn get_object_response(self) -> Result<Option<GetObjectResponse>, Error> {
- let (parts, content) = self.response.into_parts();
+ let (parts, incoming) = self.response.into_parts();
+ let content = Content {
+ incoming,
+ request_counters: self.request_counters.clone(),
+ };
match parts.status {
StatusCode::OK => (),
--
2.47.3
next prev parent reply other threads:[~2026-02-16 12:13 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-02-16 12:13 [PATCH proxmox{,-backup} v2 00/18] partially fix #6563: add s3 request and traffic counter statistics Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox v2 1/6] shared-memory: add method without tmpfs check for mmap file location Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox v2 2/6] s3-client: add persistent shared request counters for client Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox v2 3/6] s3-client: add counters for upload/download traffic Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox v2 4/6] s3-client: account for upload traffic on successful request sending Christian Ebner
2026-02-16 12:13 ` Christian Ebner [this message]
2026-02-16 12:13 ` [PATCH proxmox v2 6/6] pbs-api-types: define api type for s3 request statistics Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox-backup v2 01/12] metrics: split common module imports into individual use statements Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox-backup v2 02/12] datastore: collect request statistics for s3 backed datastores Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox-backup v2 03/12] datastore: expose request counters " Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox-backup v2 04/12] api: s3: add endpoint to reset s3 request counters Christian Ebner
2026-02-16 12:13 ` [PATCH proxmox-backup v2 05/12] bin: s3: expose request counter reset method as cli command Christian Ebner
2026-02-16 12:14 ` [PATCH proxmox-backup v2 06/12] datastore: add helper method to get datastore backend type Christian Ebner
2026-02-16 12:14 ` [PATCH proxmox-backup v2 07/12] ui: improve variable name indirectly fixing typo Christian Ebner
2026-02-16 12:14 ` [PATCH proxmox-backup v2 08/12] ui: datastore summary: move store to be part of summary panel Christian Ebner
2026-02-16 12:14 ` [PATCH proxmox-backup v2 09/12] ui: expose s3 request counter statistics in the datastore summary Christian Ebner
2026-02-16 12:14 ` [PATCH proxmox-backup v2 10/12] metrics: collect s3 datastore statistics as rrd metrics Christian Ebner
2026-02-16 12:14 ` [PATCH proxmox-backup v2 11/12] api: admin: expose s3 statistics in datastore rrd data Christian Ebner
2026-02-16 12:14 ` [PATCH proxmox-backup v2 12/12] partially fix #6563: ui: expose s3 rrd charts in datastore summary Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260216121406.99617-6-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.