From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 3ED2E1FF142 for ; Mon, 16 Feb 2026 13:13:49 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 04026109EC; Mon, 16 Feb 2026 13:14:34 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox v2 5/6] s3-client: account for downloaded bytes in incoming response body Date: Mon, 16 Feb 2026 13:13:53 +0100 Message-ID: <20260216121406.99617-6-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260216121406.99617-1-c.ebner@proxmox.com> References: <20260216121406.99617-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1771244054484 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.048 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 2EG2MLVX7WAICXXKX35QSFEOEM5E7L4Z X-Message-ID-Hash: 2EG2MLVX7WAICXXKX35QSFEOEM5E7L4Z X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Keep track of the downloaded contents in get object responses by accounting of passing bytes when collecting the incoming body. To do so, the shared request counters are stored via an atomic reference counter and cloned along to the response reader and a new `Content` type which wraps `Incoming` and implements the `Body`, where the accounting happens. Signed-off-by: Christian Ebner --- changes since version 1: - adapt to relaxed atomic ordering requirement proxmox-s3-client/src/client.rs | 16 ++--- proxmox-s3-client/src/response_reader.rs | 75 ++++++++++++++++++++++-- 2 files changed, 77 insertions(+), 14 deletions(-) diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs index 549fb904..f115e06a 100644 --- a/proxmox-s3-client/src/client.rs +++ b/proxmox-s3-client/src/client.rs @@ -490,7 +490,7 @@ impl S3Client { .uri(self.build_uri("/", &[])?) .body(Body::empty())?; let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?; - let response_reader = ResponseReader::new(response); + let response_reader = ResponseReader::new(response, self.request_counters.clone()); response_reader.list_buckets_response().await } @@ -506,7 +506,7 @@ impl S3Client { .uri(self.build_uri(&object_key, &[])?) .body(Body::empty())?; let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?; - let response_reader = ResponseReader::new(response); + let response_reader = ResponseReader::new(response, self.request_counters.clone()); response_reader.head_object_response().await } @@ -523,7 +523,7 @@ impl S3Client { .body(Body::empty())?; let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?; - let response_reader = ResponseReader::new(response); + let response_reader = ResponseReader::new(response, self.request_counters.clone()); response_reader.get_object_response().await } @@ -553,7 +553,7 @@ impl S3Client { .body(Body::empty())?; let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?; - let response_reader = ResponseReader::new(response); + let response_reader = ResponseReader::new(response, self.request_counters.clone()); response_reader.list_objects_v2_response().await } @@ -590,7 +590,7 @@ impl S3Client { let request = request.body(object_data)?; let response = self.send(request, timeout).await?; - let response_reader = ResponseReader::new(response); + let response_reader = ResponseReader::new(response, self.request_counters.clone()); response_reader.put_object_response().await } @@ -604,7 +604,7 @@ impl S3Client { .body(Body::empty())?; let response = self.send(request, None).await?; - let response_reader = ResponseReader::new(response); + let response_reader = ResponseReader::new(response, self.request_counters.clone()); response_reader.delete_object_response().await } @@ -631,7 +631,7 @@ impl S3Client { .body(Body::from(body))?; let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?; - let response_reader = ResponseReader::new(response); + let response_reader = ResponseReader::new(response, self.request_counters.clone()); response_reader.delete_objects_response().await } @@ -662,7 +662,7 @@ impl S3Client { .body(Body::empty())?; let response = self.send(request, Some(S3_HTTP_REQUEST_TIMEOUT)).await?; - let response_reader = ResponseReader::new(response); + let response_reader = ResponseReader::new(response, self.request_counters.clone()); response_reader.copy_object_response().await } diff --git a/proxmox-s3-client/src/response_reader.rs b/proxmox-s3-client/src/response_reader.rs index e03b3bb0..fa03f045 100644 --- a/proxmox-s3-client/src/response_reader.rs +++ b/proxmox-s3-client/src/response_reader.rs @@ -1,19 +1,24 @@ +use std::pin::Pin; use std::str::FromStr; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::task::{Context as Ctx, Poll}; use anyhow::{anyhow, bail, Context, Error}; use http_body_util::BodyExt; -use hyper::body::{Bytes, Incoming}; +use hyper::body::{Body, Bytes, Frame, Incoming, SizeHint}; use hyper::header::HeaderName; use hyper::http::header; use hyper::http::StatusCode; use hyper::{HeaderMap, Response}; use serde::Deserialize; -use crate::{HttpDate, LastModifiedTimestamp, S3ObjectKey}; +use crate::{HttpDate, LastModifiedTimestamp, S3ObjectKey, SharedRequestCounters}; /// Response reader to check S3 api response status codes and parse response body, if any. pub(crate) struct ResponseReader { response: Response, + request_counters: Option>, } #[derive(Debug)] @@ -105,7 +110,7 @@ pub struct GetObjectResponse { /// Last modified http header. pub last_modified: HttpDate, /// Object content in http response body. - pub content: Incoming, + pub content: Content, } #[derive(Debug)] @@ -226,10 +231,64 @@ pub struct Bucket { pub creation_date: LastModifiedTimestamp, } +/// Response content stream +pub struct Content { + incoming: Incoming, + request_counters: Option>, +} + +impl Body for Content { + type Data = Bytes; + type Error = hyper::Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Ctx<'_>, + ) -> Poll, Self::Error>>> { + let mut this = self.as_mut(); + + let incoming = Pin::new(&mut this.incoming).poll_frame(cx); + + if let Some(counter) = self.request_counters.as_ref() { + match incoming { + Poll::Pending => Poll::Pending, + Poll::Ready(f) => { + if let Some(Ok(frame)) = f { + let bytes = frame + .data_ref() + .map(|bytes| bytes.len() as u64) + .unwrap_or(0); + let _ = counter.add_download_traffic(bytes, Ordering::AcqRel); + Poll::Ready(Some(Ok(frame))) + } else { + Poll::Ready(None) + } + } + } + } else { + return incoming; + } + } + + fn is_end_stream(&self) -> bool { + self.incoming.is_end_stream() + } + + fn size_hint(&self) -> SizeHint { + self.incoming.size_hint() + } +} + impl ResponseReader { /// Create a new response reader to parse given response. - pub(crate) fn new(response: Response) -> Self { - Self { response } + pub(crate) fn new( + response: Response, + request_counters: Option>, + ) -> Self { + Self { + response, + request_counters, + } } /// Read and parse the list object v2 response. @@ -299,7 +358,11 @@ impl ResponseReader { /// Returns with error if the object is not accessible, an unexpected status code is encountered /// or the response headers or body cannot be parsed. pub(crate) async fn get_object_response(self) -> Result, Error> { - let (parts, content) = self.response.into_parts(); + let (parts, incoming) = self.response.into_parts(); + let content = Content { + incoming, + request_counters: self.request_counters.clone(), + }; match parts.status { StatusCode::OK => (), -- 2.47.3