From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 0C5971FF16F for ; Tue, 8 Jul 2025 19:10:09 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 989761D63C; Tue, 8 Jul 2025 19:10:53 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Tue, 8 Jul 2025 19:00:34 +0200 Message-ID: <20250708170114.1556057-7-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.2 In-Reply-To: <20250708170114.1556057-1-c.ebner@proxmox.com> References: <20250708170114.1556057-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.040 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs, client.rs, s3.amazonaws.com] Subject: [pbs-devel] [PATCH proxmox v6 6/9] s3 client: implement methods to operate on s3 objects in bucket X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Adds the basic implementation of the client to use s3 object stores as backend for PBS datastores. This implements the basic client actions on a bucket and objects stored within given bucket. This is not feature complete and intended to be extended on a per-demand fashion rather than implementing the whole client at once. Signed-off-by: Christian Ebner --- Cargo.toml | 4 + proxmox-s3-client/Cargo.toml | 7 + proxmox-s3-client/debian/control | 18 + proxmox-s3-client/src/client.rs | 462 ++++++++++++++++++++++- proxmox-s3-client/src/lib.rs | 4 +- proxmox-s3-client/src/response_reader.rs | 321 ++++++++++++++++ 6 files changed, 812 insertions(+), 4 deletions(-) create mode 100644 proxmox-s3-client/src/response_reader.rs diff --git a/Cargo.toml b/Cargo.toml index cf0ef097..d88d8383 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ lettre = "0.11.1" libc = "0.2.107" log = "0.4.17" mail-parser = "0.11" +md5 = "0.7.0" native-tls = "0.2" nix = "0.29" openssl = "0.10" @@ -105,18 +106,21 @@ pam-sys = "0.5" percent-encoding = "2.1" pin-utils = "0.1.0" proc-macro2 = "1.0" +quick-xml = "0.36.1" quote = "1.0" regex = "1.5" serde = "1.0" serde_cbor = "0.11.1" serde_json = "1.0" serde_plain = "1.0" +serde-xml-rs = "0.5" syn = { version = "2", features = [ "full", "visit-mut" ] } sync_wrapper = "1" tar = "0.4" tokio = "1.6" tokio-openssl = "0.6.1" tokio-stream = "0.1.0" +tokio-util = "0.7" tower-service = "0.3.0" tracing = "0.1" tracing-journald = "0.3.1" diff --git a/proxmox-s3-client/Cargo.toml b/proxmox-s3-client/Cargo.toml index f5b87493..18bddddd 100644 --- a/proxmox-s3-client/Cargo.toml +++ b/proxmox-s3-client/Cargo.toml @@ -13,16 +13,23 @@ rust-version.workspace = true [dependencies] anyhow.workspace = true +bytes.workspace = true +futures.workspace = true const_format.workspace = true hex = { workspace = true, features = [ "serde" ] } http-body-util.workspace = true hyper-util = { workspace = true, features = [ "client-legacy", "tokio", "http1" ] } hyper.workspace = true iso8601.workspace = true +md5.workspace = true openssl.workspace = true +quick-xml = { workspace = true, features = [ "async-tokio" ] } regex.workspace = true serde.workspace = true serde_plain.workspace = true +serde-xml-rs.workspace = true +tokio.workspace = true +tokio-util = { workspace = true, features = [ "compat" ] } tracing.workspace = true url.workspace = true diff --git a/proxmox-s3-client/debian/control b/proxmox-s3-client/debian/control index f3ffa2d9..f0cd6b0a 100644 --- a/proxmox-s3-client/debian/control +++ b/proxmox-s3-client/debian/control @@ -7,7 +7,9 @@ Build-Depends-Arch: cargo:native , rustc:native (>= 1.82) , libstd-rust-dev , librust-anyhow-1+default-dev , + librust-bytes-1+default-dev , librust-const-format-0.2+default-dev , + librust-futures-0.3+default-dev , librust-hex-0.4+default-dev , librust-hex-0.4+serde-dev , librust-http-body-util-0.1+default-dev , @@ -17,6 +19,7 @@ Build-Depends-Arch: cargo:native , librust-hyper-util-0.1+http1-dev (>= 0.1.12-~~) , librust-hyper-util-0.1+tokio-dev (>= 0.1.12-~~) , librust-iso8601-0.6+default-dev (>= 0.6.1-~~) , + librust-md5-0.7+default-dev , librust-openssl-0.10+default-dev , librust-proxmox-base64-1+default-dev , librust-proxmox-http-1+body-dev , @@ -30,9 +33,15 @@ Build-Depends-Arch: cargo:native , librust-proxmox-serde-1+default-dev , librust-proxmox-serde-1+serde-json-dev , librust-proxmox-time-2+default-dev (>= 2.1.0-~~) , + librust-quick-xml-0.36+async-tokio-dev (>= 0.36.1-~~) , + librust-quick-xml-0.36+default-dev (>= 0.36.1-~~) , librust-regex-1+default-dev (>= 1.5-~~) , librust-serde-1+default-dev , librust-serde-plain-1+default-dev , + librust-serde-xml-rs-0.5+default-dev , + librust-tokio-1+default-dev (>= 1.6-~~) , + librust-tokio-util-0.7+compat-dev , + librust-tokio-util-0.7+default-dev , librust-tracing-0.1+default-dev , librust-url-2+default-dev (>= 2.2-~~) Maintainer: Proxmox Support Team @@ -49,7 +58,9 @@ Multi-Arch: same Depends: ${misc:Depends}, librust-anyhow-1+default-dev, + librust-bytes-1+default-dev, librust-const-format-0.2+default-dev, + librust-futures-0.3+default-dev, librust-hex-0.4+default-dev, librust-hex-0.4+serde-dev, librust-http-body-util-0.1+default-dev, @@ -59,6 +70,7 @@ Depends: librust-hyper-util-0.1+http1-dev (>= 0.1.12-~~), librust-hyper-util-0.1+tokio-dev (>= 0.1.12-~~), librust-iso8601-0.6+default-dev (>= 0.6.1-~~), + librust-md5-0.7+default-dev, librust-openssl-0.10+default-dev, librust-proxmox-base64-1+default-dev, librust-proxmox-http-1+body-dev, @@ -72,9 +84,15 @@ Depends: librust-proxmox-serde-1+default-dev, librust-proxmox-serde-1+serde-json-dev, librust-proxmox-time-2+default-dev (>= 2.1.0-~~), + librust-quick-xml-0.36+async-tokio-dev (>= 0.36.1-~~), + librust-quick-xml-0.36+default-dev (>= 0.36.1-~~), librust-regex-1+default-dev (>= 1.5-~~), librust-serde-1+default-dev, librust-serde-plain-1+default-dev, + librust-serde-xml-rs-0.5+default-dev, + librust-tokio-1+default-dev (>= 1.6-~~), + librust-tokio-util-0.7+compat-dev, + librust-tokio-util-0.7+default-dev, librust-tracing-0.1+default-dev, librust-url-2+default-dev (>= 2.2-~~) Provides: diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs index ecc3d103..bd83fad8 100644 --- a/proxmox-s3-client/src/client.rs +++ b/proxmox-s3-client/src/client.rs @@ -1,23 +1,49 @@ +use std::path::Path; +use std::str::FromStr; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::{bail, format_err, Context, Error}; -use hyper::http::uri::Authority; +use hyper::body::{Bytes, Incoming}; +use hyper::http::method::Method; +use hyper::http::uri::{Authority, Parts, PathAndQuery, Scheme}; +use hyper::http::{header, HeaderValue, StatusCode, Uri}; +use hyper::{Request, Response}; use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::client::legacy::Client; use hyper_util::rt::TokioExecutor; use openssl::hash::MessageDigest; +use openssl::sha::Sha256; use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use openssl::x509::X509StoreContextRef; use tracing::error; use proxmox_http::client::HttpsConnector; -use proxmox_http::{Body, RateLimiter}; +use proxmox_http::{Body, RateLimit, RateLimiter}; use crate::api_types::{S3ClientConfig, S3ClientSecretsConfig}; +use crate::aws_sign_v4::aws_sign_v4_signature; +use crate::aws_sign_v4::AWS_SIGN_V4_DATETIME_FORMAT; +use crate::object_key::S3ObjectKey; +use crate::response_reader::{ + CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse, + ListObjectsV2Response, PutObjectResponse, ResponseReader, +}; const S3_HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); const S3_TCP_KEEPALIVE_TIME: u32 = 120; +const MAX_S3_UPLOAD_RETRY: usize = 3; + +/// S3 object key path prefix without the context prefix as defined by the client options. +/// +/// The client option's context prefix will be pre-pended by the various client methods before +/// sending api requests. +pub enum S3PathPrefix { + /// Path prefix relative to client's context prefix + Some(String), + /// No prefix + None, +} /// Configuration options for client pub struct S3ClientOptions { @@ -181,4 +207,434 @@ impl S3Client { "unexpected certificate fingerprint {certificate_fingerprint}" )) } + + /// Prepare API request by adding commonly required headers and perform request signing + async fn prepare(&self, mut request: Request) -> Result, Error> { + let host_header = request + .uri() + .authority() + .ok_or_else(|| format_err!("request missing authority"))? + .to_string(); + + // Content verification for aws s3 signature + let mut hasher = Sha256::new(); + let contents = request + .body() + .as_bytes() + .ok_or_else(|| format_err!("cannot prepare request with streaming body"))?; + hasher.update(contents); + // Use MD5 as upload integrity check, as other methods are not supported by all S3 object + // store providers and might be ignored and this is recommended by AWS as described in + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax + let payload_md5 = md5::compute(contents); + let payload_digest = hex::encode(hasher.finish()); + let payload_len = contents.len(); + + let epoch = proxmox_time::epoch_i64(); + let datetime = proxmox_time::strftime_utc(AWS_SIGN_V4_DATETIME_FORMAT, epoch)?; + + request + .headers_mut() + .insert("x-amz-date", HeaderValue::from_str(&datetime)?); + request + .headers_mut() + .insert("host", HeaderValue::from_str(&host_header)?); + request.headers_mut().insert( + "x-amz-content-sha256", + HeaderValue::from_str(&payload_digest)?, + ); + request.headers_mut().insert( + header::CONTENT_LENGTH, + HeaderValue::from_str(&payload_len.to_string())?, + ); + if payload_len > 0 { + let md5_digest = proxmox_base64::encode(*payload_md5); + request + .headers_mut() + .insert("Content-MD5", HeaderValue::from_str(&md5_digest)?); + } + + let signature = aws_sign_v4_signature(&request, &self.options, epoch, &payload_digest)?; + + request + .headers_mut() + .insert(header::AUTHORIZATION, HeaderValue::from_str(&signature)?); + + Ok(request) + } + + /// Send API request to the configured endpoint using the inner https client. + async fn send(&self, request: Request) -> Result, Error> { + let request = self.prepare(request).await?; + if request.method() == Method::PUT { + if let Some(limiter) = &self.put_rate_limiter { + let sleep = { + let mut limiter = limiter.lock().unwrap(); + limiter.register_traffic(Instant::now(), 1) + }; + tokio::time::sleep(sleep).await; + } + } + let response = tokio::time::timeout(S3_HTTP_CONNECT_TIMEOUT, self.client.request(request)) + .await + .context("request timeout")??; + Ok(response) + } + + /// Check if bucket exists and got permissions to access it. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html + pub async fn head_bucket(&self) -> Result<(), Error> { + let request = Request::builder() + .method(Method::HEAD) + .uri(self.build_uri("/", &[])?) + .body(Body::empty())?; + let response = self.send(request).await?; + let (parts, _body) = response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND => { + bail!("bucket does not exist or no permission to access it") + } + status_code => bail!("unexpected status code {status_code}"), + } + + Ok(()) + } + + /// Fetch metadata from an object without returning the object itself. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html + pub async fn head_object( + &self, + object_key: S3ObjectKey, + ) -> Result, Error> { + let object_key = object_key.to_full_key(&self.options.common_prefix); + let request = Request::builder() + .method(Method::HEAD) + .uri(self.build_uri(&object_key, &[])?) + .body(Body::empty())?; + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.head_object_response().await + } + + /// Fetch an object from object store. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html + pub async fn get_object( + &self, + object_key: S3ObjectKey, + ) -> Result, Error> { + let object_key = object_key.to_full_key(&self.options.common_prefix); + let request = Request::builder() + .method(Method::GET) + .uri(self.build_uri(&object_key, &[])?) + .body(Body::empty())?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.get_object_response().await + } + + /// Returns some or all (up to 1,000) of the objects in a bucket with each request. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html + pub async fn list_objects_v2( + &self, + prefix: &S3PathPrefix, + continuation_token: Option<&str>, + ) -> Result { + let mut query = vec![("list-type", "2")]; + let abs_prefix: String; + if let S3PathPrefix::Some(prefix) = prefix { + abs_prefix = if prefix.starts_with("/") { + format!("{}{prefix}", self.options.common_prefix) + } else { + format!("{}/{prefix}", self.options.common_prefix) + }; + query.push(("prefix", &abs_prefix)); + } + if let Some(token) = continuation_token { + query.push(("continuation-token", token)); + } + let request = Request::builder() + .method(Method::GET) + .uri(self.build_uri("/", &query)?) + .body(Body::empty())?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.list_objects_v2_response().await + } + + /// Add a new object to a bucket. + /// + /// Do not reupload if an object with matching key already exists in the bucket if the replace + /// flag is not set. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html + pub async fn put_object( + &self, + object_key: S3ObjectKey, + object_data: Body, + replace: bool, + ) -> Result { + let object_key = object_key.to_full_key(&self.options.common_prefix); + let mut request = Request::builder() + .method(Method::PUT) + .uri(self.build_uri(&object_key, &[])?) + .header(header::CONTENT_TYPE, "binary/octet"); + + if !replace { + request = request.header(header::IF_NONE_MATCH, "*"); + } + + let request = request.body(object_data)?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.put_object_response().await + } + + /// Removes an object from a bucket. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html + pub async fn delete_object(&self, object_key: S3ObjectKey) -> Result<(), Error> { + let object_key = object_key.to_full_key(&self.options.common_prefix); + let request = Request::builder() + .method(Method::DELETE) + .uri(self.build_uri(&object_key, &[])?) + .body(Body::empty())?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.delete_object_response().await + } + + /// Delete multiple objects from a bucket using a single HTTP request. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html + pub async fn delete_objects( + &self, + object_keys: &[S3ObjectKey], + ) -> Result { + let mut body = String::from(r#""#); + for object_key in object_keys { + body.push_str(""); + body.push_str(object_key); + body.push_str(""); + } + body.push_str(""); + let request = Request::builder() + .method(Method::POST) + .uri(self.build_uri("/", &[("delete", "")])?) + .body(Body::from(body))?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.delete_objects_response().await + } + + /// Creates a copy of an object that is already stored in Amazon S3. + /// Uses the `x-amz-metadata-directive` set to `REPLACE`, therefore resulting in updated metadata. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html + pub async fn copy_object( + &self, + source_key: S3ObjectKey, + destination_key: S3ObjectKey, + ) -> Result { + let copy_source = source_key.to_copy_source_key(&self.options.bucket, &self.options.common_prefix); + let destination_key = destination_key.to_full_key(&self.options.common_prefix); + let request = Request::builder() + .method(Method::PUT) + .uri(self.build_uri(&destination_key, &[])?) + .header("x-amz-copy-source", HeaderValue::from_str(©_source)?) + .header( + "x-amz-metadata-directive", + HeaderValue::from_str("REPLACE")?, + ) + .body(Body::empty())?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.copy_object_response().await + } + + /// Delete objects by given key prefix. + /// Requires at least 2 api calls. + pub async fn delete_objects_by_prefix(&self, prefix: &S3PathPrefix) -> Result { + // S3 API does not provide a convenient way to delete objects by key prefix. + // List all objects with given group prefix and delete all objects found, so this + // requires at least 2 API calls. + let mut next_continuation_token: Option = None; + let mut delete_errors = false; + loop { + let list_objects_result = self + .list_objects_v2(prefix, next_continuation_token.as_deref()) + .await?; + + let objects_to_delete: Vec = list_objects_result + .contents + .into_iter() + .map(|item| item.key) + .collect(); + + let response = self.delete_objects(&objects_to_delete).await?; + if response.error.is_some() { + delete_errors = true; + } + + if list_objects_result.is_truncated { + next_continuation_token = list_objects_result + .next_continuation_token + .as_ref() + .cloned(); + continue; + } + break; + } + Ok(delete_errors) + } + + /// Delete objects by given key prefix, but exclude items pre-filter based on suffix + /// (including the parent component of the matched suffix). E.g. do not remove items in a + /// snapshot directory, by matching based on the protected file marker (given as suffix). + /// Items matching the suffix provided as `ignore` will be excluded in the parent of a matching + /// suffix entry. E.g. owner and notes for a group, if a group snapshots was matched by a + /// protected marker. + /// + /// Requires at least 2 api calls. + pub async fn delete_objects_by_prefix_with_suffix_filter( + &self, + prefix: &S3PathPrefix, + suffix: &str, + excldue_from_parent: &[&str], + ) -> Result { + // S3 API does not provide a convenient way to delete objects by key prefix. + // List all objects with given group prefix and delete all objects found, so this + // requires at least 2 API calls. + let mut next_continuation_token: Option = None; + let mut delete_errors = false; + let mut prefix_filters = Vec::new(); + let mut list_objects = Vec::new(); + loop { + let list_objects_result = self + .list_objects_v2(prefix, next_continuation_token.as_deref()) + .await?; + + let mut prefixes: Vec = list_objects_result + .contents + .iter() + .filter_map(|item| { + let prefix_filter = item.key.strip_suffix(suffix).map(|prefix| { + let path = Path::new(prefix); + if let Some(parent) = path.parent() { + for filter in excldue_from_parent { + let filter = parent.join(filter); + // valid utf-8 as combined from `str` values + prefix_filters.push(filter.to_string_lossy().to_string()); + } + } + prefix.to_string() + }); + if prefix_filter.is_none() { + list_objects.push(item.key.clone()); + } + prefix_filter + }) + .collect(); + prefix_filters.append(&mut prefixes); + + if list_objects_result.is_truncated { + next_continuation_token = list_objects_result + .next_continuation_token + .as_ref() + .cloned(); + continue; + } + break; + } + + let objects_to_delete: Vec = list_objects + .into_iter() + .filter_map(|item| { + for prefix in &prefix_filters { + if item.strip_prefix(prefix).is_some() { + return None; + } + } + Some(item) + }) + .collect(); + + for objects in objects_to_delete.chunks(1000) { + let result = self.delete_objects(objects).await?; + if result.error.is_some() { + delete_errors = true; + } + } + + Ok(delete_errors) + } + + /// Upload the given object via the S3 api, retrying up to 3 times in case of error. + pub async fn upload_with_retry( + &self, + object_key: S3ObjectKey, + object_data: Bytes, + replace: bool, + ) -> Result { + for retry in 0..MAX_S3_UPLOAD_RETRY { + let body = Body::from(object_data.clone()); + match self.put_object(object_key.clone(), body, replace).await { + Ok(PutObjectResponse::Success(_response_body)) => return Ok(false), + Ok(PutObjectResponse::PreconditionFailed) => return Ok(true), + Ok(PutObjectResponse::NeedsRetry) => { + if retry >= MAX_S3_UPLOAD_RETRY - 1 { + bail!("concurrent operation, chunk upload failed") + } + } + Err(err) => { + if retry >= MAX_S3_UPLOAD_RETRY - 1 { + return Err(err.context("chunk upload failed")); + } + } + }; + } + Ok(false) + } + + #[inline(always)] + /// Helper to generate [`Uri`] instance with common properties based on given path and query. + fn build_uri(&self, mut path: &str, query: &[(&str, &str)]) -> Result { + if path.starts_with('/') { + path = &path[1..]; + } + let mut path_and_query = if self.options.path_style { + format!("/{bucket}/{path}", bucket = self.options.bucket) + } else { + format!("/{path}") + }; + + if !query.is_empty() { + path_and_query.push('?'); + // No further input validation as http::uri::Builder will check path and query + let mut query_iter = query.iter().peekable(); + while let Some((key, value)) = query_iter.next() { + path_and_query.push_str(key); + if !value.is_empty() { + path_and_query.push('='); + path_and_query.push_str(value); + } + if query_iter.peek().is_some() { + path_and_query.push('&'); + } + } + } + + let path_and_query = + PathAndQuery::from_str(&path_and_query).context("failed to parse path and query")?; + + let mut uri_parts = Parts::default(); + uri_parts.scheme = Some(Scheme::HTTPS); + uri_parts.authority = Some(self.authority.clone()); + uri_parts.path_and_query = Some(path_and_query); + + Uri::from_parts(uri_parts).context("failed to build uri") + } } diff --git a/proxmox-s3-client/src/lib.rs b/proxmox-s3-client/src/lib.rs index fc314c42..991e1546 100644 --- a/proxmox-s3-client/src/lib.rs +++ b/proxmox-s3-client/src/lib.rs @@ -13,7 +13,7 @@ pub use aws_sign_v4::uri_decode; #[cfg(feature = "impl")] mod client; #[cfg(feature = "impl")] -pub use client::{S3Client, S3ClientOptions}; +pub use client::{S3Client, S3ClientOptions, S3PathPrefix}; #[cfg(feature = "impl")] mod timestamps; #[cfg(feature = "impl")] @@ -22,3 +22,5 @@ pub use timestamps::*; mod object_key; #[cfg(feature = "impl")] pub use object_key::S3ObjectKey; +#[cfg(feature = "impl")] +mod response_reader; diff --git a/proxmox-s3-client/src/response_reader.rs b/proxmox-s3-client/src/response_reader.rs new file mode 100644 index 00000000..720ed712 --- /dev/null +++ b/proxmox-s3-client/src/response_reader.rs @@ -0,0 +1,321 @@ +use std::str::FromStr; + +use anyhow::{anyhow, bail, Context, Error}; +use http_body_util::BodyExt; +use hyper::body::Incoming; +use hyper::header::HeaderName; +use hyper::http::header; +use hyper::http::StatusCode; +use hyper::{HeaderMap, Response}; +use serde::Deserialize; + +use crate::S3ObjectKey; +use crate::{HttpDate, LastModifiedTimestamp}; + +pub(crate) struct ResponseReader { + response: Response, +} + +#[derive(Debug)] +pub struct ListObjectsV2Response { + pub date: HttpDate, + pub name: String, + pub prefix: String, + pub key_count: u64, + pub max_keys: u64, + pub is_truncated: bool, + pub continuation_token: Option, + pub next_continuation_token: Option, + pub contents: Vec, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +struct ListObjectsV2ResponseBody { + pub name: String, + pub prefix: String, + pub key_count: u64, + pub max_keys: u64, + pub is_truncated: bool, + pub continuation_token: Option, + pub next_continuation_token: Option, + pub contents: Option>, +} + +impl ListObjectsV2ResponseBody { + fn with_date(self, date: HttpDate) -> ListObjectsV2Response { + ListObjectsV2Response { + date, + name: self.name, + prefix: self.prefix, + key_count: self.key_count, + max_keys: self.max_keys, + is_truncated: self.is_truncated, + continuation_token: self.continuation_token, + next_continuation_token: self.next_continuation_token, + contents: self.contents.unwrap_or_default(), + } + } +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct ListObjectsV2Contents { + pub key: S3ObjectKey, + pub last_modified: LastModifiedTimestamp, + pub e_tag: String, + pub size: u64, + pub storage_class: String, +} + +#[derive(Debug)] +/// Subset of the head object response (headers only, there is no body) +/// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax +pub struct HeadObjectResponse { + pub content_length: u64, + pub content_type: String, + pub date: HttpDate, + pub e_tag: String, + pub last_modified: HttpDate, +} + +/// Subset of the get object response +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html#API_GetObject_ResponseSyntax +pub struct GetObjectResponse { + pub content_length: u64, + pub content_type: String, + pub date: HttpDate, + pub e_tag: String, + pub last_modified: HttpDate, + pub content: Incoming, +} + +/// Subset of the put object response +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_ResponseSyntax +#[derive(Debug)] +pub enum PutObjectResponse { + NeedsRetry, + PreconditionFailed, + Success(String), +} + +/// Subset of the delete objects response +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_ResponseElements +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectsResponse { + pub deleted: Option>, + pub error: Option>, +} + +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeletedObject.html +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct DeletedObject { + pub delete_marker: Option, + pub delete_marker_version_id: Option, + pub key: Option, + pub version_id: Option, +} + +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_Error.html +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectError { + pub code: Option, + pub key: Option, + pub message: Option, + pub version_id: Option, +} + +#[derive(Debug)] +pub struct CopyObjectResponse { + pub copy_object_result: CopyObjectResult, + pub x_amz_version_id: Option, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct CopyObjectResult { + pub e_tag: String, + pub last_modified: LastModifiedTimestamp, +} + +impl ResponseReader { + pub(crate) fn new(response: Response) -> Self { + Self { response } + } + + pub(crate) async fn list_objects_v2_response(self) -> Result { + let (parts, body) = self.response.into_parts(); + let body = body.collect().await?.to_bytes(); + + match parts.status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => bail!("bucket does not exist"), + status_code => bail!("unexpected status code {status_code}"), + } + + let body = String::from_utf8(body.to_vec())?; + + let date: HttpDate = Self::parse_header(header::DATE, &parts.headers)?; + + let response: ListObjectsV2ResponseBody = + serde_xml_rs::from_str(&body).context("failed to parse response body")?; + + Ok(response.with_date(date)) + } + + pub(crate) async fn head_object_response(self) -> Result, Error> { + let (parts, body) = self.response.into_parts(); + let body = body.collect().await?.to_bytes(); + + match parts.status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => return Ok(None), + status_code => bail!("unexpected status code {status_code}"), + } + if !body.is_empty() { + bail!("got unexpected non-empty response body"); + } + + let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?; + let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?; + let e_tag = Self::parse_header(header::ETAG, &parts.headers)?; + let date = Self::parse_header(header::DATE, &parts.headers)?; + let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?; + + Ok(Some(HeadObjectResponse { + content_length, + content_type, + date, + e_tag, + last_modified, + })) + } + + pub(crate) async fn get_object_response(self) -> Result, Error> { + let (parts, content) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => return Ok(None), + StatusCode::FORBIDDEN => bail!("object is archived and inaccessible until restored"), + status_code => bail!("unexpected status code {status_code}"), + } + + let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?; + let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?; + let e_tag = Self::parse_header(header::ETAG, &parts.headers)?; + let date = Self::parse_header(header::DATE, &parts.headers)?; + let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?; + + Ok(Some(GetObjectResponse { + content_length, + content_type, + date, + e_tag, + last_modified, + content, + })) + } + + pub(crate) async fn put_object_response(self) -> Result { + let (parts, body) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::PRECONDITION_FAILED => return Ok(PutObjectResponse::PreconditionFailed), + StatusCode::CONFLICT => return Ok(PutObjectResponse::NeedsRetry), + StatusCode::BAD_REQUEST => bail!("invalid request"), + status_code => bail!("unexpected status code {status_code}"), + }; + + let body = body.collect().await?.to_bytes(); + if !body.is_empty() { + bail!("got unexpected non-empty response body"); + } + + let e_tag = Self::parse_header(header::ETAG, &parts.headers)?; + + Ok(PutObjectResponse::Success(e_tag)) + } + + pub(crate) async fn delete_object_response(self) -> Result<(), Error> { + let (parts, _body) = self.response.into_parts(); + + match parts.status { + StatusCode::NO_CONTENT => (), + status_code => bail!("unexpected status code {status_code}"), + }; + + Ok(()) + } + + pub(crate) async fn delete_objects_response(self) -> Result { + let (parts, body) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::BAD_REQUEST => bail!("invalid request"), + status_code => bail!("unexpected status code {status_code}"), + }; + + let body = body.collect().await?.to_bytes(); + let body = String::from_utf8(body.to_vec())?; + + let delete_objects_response: DeleteObjectsResponse = + serde_xml_rs::from_str(&body).context("failed to parse response body")?; + + Ok(delete_objects_response) + } + + pub(crate) async fn copy_object_response(self) -> Result { + let (parts, body) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => bail!("object not found"), + StatusCode::FORBIDDEN => bail!("the source object is not in the active tier"), + status_code => bail!("unexpected status code {status_code}"), + } + + let body = body.collect().await?.to_bytes(); + let body = String::from_utf8(body.to_vec())?; + + let x_amz_version_id = match parts.headers.get("x-amz-version-id") { + Some(version_id) => Some( + version_id + .to_str() + .context("failed to parse version id header")? + .to_owned(), + ), + None => None, + }; + + let copy_object_result: CopyObjectResult = + serde_xml_rs::from_str(&body).context("failed to parse response body")?; + + Ok(CopyObjectResponse { + copy_object_result, + x_amz_version_id, + }) + } + + fn parse_header(name: HeaderName, headers: &HeaderMap) -> Result + where + ::Err: Send + Sync + 'static, + Result::Err>: Context::Err>, + { + let header_value = headers + .get(&name) + .ok_or_else(|| anyhow!("missing header '{name}'"))?; + let header_str = header_value + .to_str() + .with_context(|| format!("non UTF-8 header '{name}'"))?; + let value = header_str + .parse() + .with_context(|| format!("failed to parse header '{name}'"))?; + Ok(value) + } +} -- 2.47.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel