From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pbs-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id EEE301FF17F for <inbox@lore.proxmox.com>; Mon, 19 May 2025 13:55:07 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id D5ACB91C9; Mon, 19 May 2025 13:55:07 +0200 (CEST) From: Christian Ebner <c.ebner@proxmox.com> To: pbs-devel@lists.proxmox.com Date: Mon, 19 May 2025 13:46:11 +0200 Message-Id: <20250519114640.303640-11-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250519114640.303640-1-c.ebner@proxmox.com> References: <20250519114640.303640-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.030 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs, s3.amazonaws.com, client.rs] Subject: [pbs-devel] [RFC proxmox-backup 10/39] s3 client: implement methods to operate on s3 objects in bucket X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion <pbs-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/> List-Post: <mailto:pbs-devel@lists.proxmox.com> List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Backup Server development discussion <pbs-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com> Adds the basic implementation of the client to use s3 object stores as backend for PBS datastores. This implements the basic client actions on a bucket and objects stored within given bucket. This is not feature complete and intended to be extended on a per-demand fashion rather than implementing the whole client at once. Signed-off-by: Christian Ebner <c.ebner@proxmox.com> --- Cargo.toml | 2 + pbs-s3-client/Cargo.toml | 8 + pbs-s3-client/src/client.rs | 370 +++++++++++++++++++++++++++ pbs-s3-client/src/lib.rs | 2 + pbs-s3-client/src/response_reader.rs | 324 +++++++++++++++++++++++ 5 files changed, 706 insertions(+) create mode 100644 pbs-s3-client/src/response_reader.rs diff --git a/Cargo.toml b/Cargo.toml index 3f51b356c..229ba1692 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,11 +140,13 @@ once_cell = "1.3.1" openssl = "0.10.40" percent-encoding = "2.1" pin-project-lite = "0.2" +quick-xml = "0.26" regex = "1.5.5" rustyline = "9" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_plain = "1.0" +serde-xml-rs = "0.5" siphasher = "0.3" syslog = "6" tar = "0.4" diff --git a/pbs-s3-client/Cargo.toml b/pbs-s3-client/Cargo.toml index 9ee546200..6fd0f7cca 100644 --- a/pbs-s3-client/Cargo.toml +++ b/pbs-s3-client/Cargo.toml @@ -8,11 +8,19 @@ rust-version.workspace = true [dependencies] anyhow.workspace = true +base64.workspace = true +bytes.workspace = true +crc32fast.workspace = true +futures.workspace = true hex = { workspace = true, features = [ "serde" ] } hyper.workspace = true openssl.workspace = true +quick-xml = { workspace = true, features = ["async-tokio"] } serde.workspace = true serde_plain.workspace = true +serde-xml-rs.workspace = true +tokio = { workspace = true, features = [] } +tokio-util = { workspace = true, features = ["compat"] } tracing.workspace = true url.workspace = true diff --git a/pbs-s3-client/src/client.rs b/pbs-s3-client/src/client.rs index e001cc7b0..972578e66 100644 --- a/pbs-s3-client/src/client.rs +++ b/pbs-s3-client/src/client.rs @@ -1,17 +1,36 @@ +use std::collections::HashMap; +use std::io::Cursor; use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{bail, format_err, Context, Error}; +use bytes::{Bytes, BytesMut}; +use hyper::body::HttpBody; use hyper::client::{Client, HttpConnector}; +use hyper::http::method::Method; use hyper::http::uri::Authority; +use hyper::http::StatusCode; +use hyper::http::{header, HeaderValue, Uri}; use hyper::Body; +use hyper::{Request, Response}; use openssl::hash::MessageDigest; +use openssl::sha::Sha256; use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use openssl::x509::X509StoreContextRef; +use quick_xml::events::BytesText; +use quick_xml::writer::Writer; use tracing::error; use proxmox_http::client::HttpsConnector; +use crate::aws_sign_v4::aws_sign_v4_signature; +use crate::aws_sign_v4::AWS_SIGN_V4_DATETIME_FORMAT; +use crate::object_key::S3ObjectKey; +use crate::response_reader::{ + CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse, + ListObjectsV2Response, PutObjectResponse, ResponseReader, +}; + const S3_HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); const S3_TCP_KEEPALIVE_TIME: u32 = 120; @@ -128,4 +147,355 @@ impl S3Client { "unexpected certificate fingerprint {certificate_fingerprint}" )) } + + async fn prepare(&self, mut request: Request<Body>) -> Result<Request<Body>, Error> { + let host_header = request + .uri() + .authority() + .ok_or_else(|| format_err!("request missing authority"))? + .to_string(); + + // Calculate the crc32 sum of the whole body, while the DataBlob of a chunk store does skip + // over the DataBlob header, so that must be considered when using this to check for + // changes/interity. + let mut crc32sum = crc32fast::Hasher::new(); + // Content verification for aws s3 signature + let mut hasher = Sha256::new(); + // Load payload into memory, needed as the hash and checksum have to be calculated a-priori + let buffer: Bytes = { + let body = request.body_mut(); + let mut buf = BytesMut::with_capacity(body.size_hint().lower() as usize); + while let Some(chunk) = body.data().await { + let chunk = chunk?; + hasher.update(&chunk); + crc32sum.update(&chunk); + buf.extend_from_slice(&chunk); + } + buf.freeze() + }; + let payload_digest = hex::encode(hasher.finish()); + let payload_crc32sum = base64::encode(crc32sum.finalize().to_be_bytes()); + let payload_len = buffer.len(); + *request.body_mut() = Body::from(buffer); + + let epoch = proxmox_time::epoch_i64(); + let datetime = proxmox_time::strftime_utc(AWS_SIGN_V4_DATETIME_FORMAT, epoch)?; + + request + .headers_mut() + .insert("x-amz-date", HeaderValue::from_str(&datetime)?); + request + .headers_mut() + .insert("host", HeaderValue::from_str(&host_header)?); + request.headers_mut().insert( + "x-amz-content-sha256", + HeaderValue::from_str(&payload_digest)?, + ); + if payload_len > 0 { + request.headers_mut().insert( + header::CONTENT_LENGTH, + HeaderValue::from_str(&payload_len.to_string())?, + ); + } + if !payload_crc32sum.is_empty() { + request.headers_mut().insert( + "x-amz-checksum-crc32", + HeaderValue::from_str(&payload_crc32sum)?, + ); + } + + let signature = aws_sign_v4_signature(&request, &self.options, epoch, &payload_digest)?; + + request + .headers_mut() + .insert(header::AUTHORIZATION, HeaderValue::from_str(&signature)?); + + Ok(request) + } + + pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>, Error> { + let request = self.prepare(request).await?; + let response = tokio::time::timeout(S3_HTTP_CONNECT_TIMEOUT, self.client.request(request)) + .await + .context("request timeout")??; + Ok(response) + } + + /// Check if bucket exists and got permissions to access it. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html + pub async fn head_bucket(&self) -> Result<(), Error> { + let request = Request::builder() + .method(Method::HEAD) + .uri(self.uri_builder("/")?) + .body(Body::empty())?; + let response = self.send(request).await?; + let (parts, _body) = response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND => { + bail!("bucket does not exist or no permission to access it") + } + status_code => bail!("unexpected status code {status_code}"), + } + + Ok(()) + } + + /// Fetch metadata from an object without returning the object itself. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html + pub async fn head_object( + &self, + object_key: S3ObjectKey, + ) -> Result<Option<HeadObjectResponse>, Error> { + let request = Request::builder() + .method(Method::HEAD) + .uri(self.uri_builder(&object_key)?) + .body(Body::empty())?; + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.head_object_response().await + } + + /// Fetch an object from object store. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html + pub async fn get_object( + &self, + object_key: S3ObjectKey, + ) -> Result<Option<GetObjectResponse>, Error> { + let request = Request::builder() + .method(Method::GET) + .uri(self.uri_builder(&object_key)?) + .body(Body::empty())?; + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.get_object_response().await + } + + /// Returns some or all (up to 1,000) of the objects in a bucket with each request. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html + pub async fn list_objects_v2( + &self, + prefix: Option<&str>, + max_keys: Option<u64>, + continuation_token: Option<&str>, + ) -> Result<ListObjectsV2Response, Error> { + let mut path_and_query = String::from("/?list-type=2"); + if let Some(prefix) = prefix { + path_and_query.push_str("&prefix="); + path_and_query.push_str(prefix); + } + if let Some(max_keys) = max_keys { + path_and_query.push_str("&max-keys="); + path_and_query.push_str(&max_keys.to_string()); + } + if let Some(token) = continuation_token { + path_and_query.push_str("&continuation-token="); + path_and_query.push_str(token); + } + let request = Request::builder() + .method(Method::GET) + .uri(self.uri_builder(&path_and_query)?) + .body(Body::empty())?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.list_objects_v2_response().await + } + + /// Add a new object to a bucket. + /// + /// Do not reupload if an object with matching key already exists in the bucket. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html + pub async fn put_object( + &self, + object_key: S3ObjectKey, + object_data: Body, + ) -> Result<PutObjectResponse, Error> { + // Assure data integrity after upload by providing a trailing checksum header. This value + // can also be used to compare a local chunk content against an object stored in S3. + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + let request = Request::builder() + .method(Method::PUT) + .uri(self.uri_builder(&object_key)?) + .header(header::CONTENT_TYPE, "binary/octet") + // Never overwrite pre-existing objects with the same key. + //.header(header::IF_NONE_MATCH, "*") + .body(object_data)?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.put_object_response().await + } + + /// Sets the supplied tag-set to an object that already exists in a bucket. A tag is a key-value pair. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html + pub async fn put_object_tagging( + &self, + object_key: S3ObjectKey, + tagset: &HashMap<String, String>, + ) -> Result<bool, Error> { + let mut writer = Writer::new(Cursor::new(Vec::new())); + writer + .create_element("Tagging") + .with_attribute(("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/")) + .write_inner_content(|writer| { + writer + .create_element("TagSet") + .write_inner_content(|writer| { + for (key, value) in tagset.iter() { + writer.create_element("Tag").write_inner_content(|writer| { + writer + .create_element("Key") + .write_text_content(BytesText::new(key))?; + writer + .create_element("Value") + .write_text_content(BytesText::new(value))?; + Ok(()) + })?; + } + Ok(()) + })?; + Ok(()) + })?; + + let body: Body = writer.into_inner().into_inner().into(); + let request = Request::builder() + .method(Method::PUT) + .uri(self.uri_builder(&format!("{object_key}?tagging"))?) + .body(body)?; + + let response = self.send(request).await?; + Ok(response.status().is_success()) + } + + /// Sets the supplied tag to an object that already exists in a bucket. A tag is a key-value pair. + /// Optimized version of the `put_object_tagging` to only set a single tag. + pub async fn put_object_tag( + &self, + object_key: S3ObjectKey, + tag_key: &str, + tag_value: &str, + ) -> Result<bool, Error> { + let body: Body = format!( + r#"<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <TagSet> + <Tag> + <Key>{tag_key}</Key> + <Value>{tag_value}</Value> + </Tag> + </TagSet> + </Tagging>"# + ) + .into(); + + let request = Request::builder() + .method(Method::PUT) + .uri(self.uri_builder(&format!("{object_key}?tagging"))?) + .body(body)?; + + let response = self.send(request).await?; + //TODO: Response and error handling! + Ok(response.status().is_success()) + } + + /// Creates a copy of an object that is already stored in Amazon S3. + /// Uses the `x-amz-metadata-directive` set to `REPLACE`, therefore resulting in updated metadata. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html + pub async fn copy_object( + &self, + destination_key: S3ObjectKey, + source_bucket: &str, + source_key: S3ObjectKey, + ) -> Result<CopyObjectResponse, Error> { + let copy_source = source_key.to_copy_source_key(source_bucket); + let request = Request::builder() + .method(Method::PUT) + .uri(self.uri_builder(&destination_key)?) + .header("x-amz-copy-source", HeaderValue::from_str(©_source)?) + .header( + "x-amz-metadata-directive", + HeaderValue::from_str("REPLACE")?, + ) + .body(Body::empty())?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.copy_object_response().await + } + + /// Helper to update the metadata for an object by copying it to itself. This will not cause + /// any additional costs other than the request cost itself. + /// + /// Note: This will actually create a new object for buckets with versioning enabled. + /// Return with error if that is the case, detected by checking the presence of the + /// `x-amz-version-id` header in the response. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html + pub async fn update_object_metadata( + &self, + object_key: S3ObjectKey, + ) -> Result<CopyObjectResponse, Error> { + let response = self + .copy_object(object_key.clone(), &self.options.bucket, object_key) + .await?; + if response.x_amz_version_id.is_some() { + // Return an error if the response contains an `x-amz-version-id`, indicating that the + // bucket has versioning enabled, as that will bloat the bucket size and therefore cost. + bail!("Failed to update object metadata as versioning is enabled"); + } + Ok(response) + } + + /// Delete multiple objects from a bucket using a single HTTP request. + /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html + pub async fn delete_objects( + &self, + object_keys: &[String], + ) -> Result<DeleteObjectsResponse, Error> { + let mut body = String::from(r#"<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#); + for object_key in object_keys { + let object = format!("<Object><Key>{object_key}</Key></Object>"); + body.push_str(&object); + } + body.push_str("</Delete>"); + let request = Request::builder() + .method(Method::POST) + .uri(self.uri_builder("/?delete")?) + .body(Body::from(body))?; + + let response = self.send(request).await?; + let response_reader = ResponseReader::new(response); + response_reader.delete_objects_response().await + } + + /// Delete objects by given key prefix. + /// Requires at least 2 api calls. + pub async fn delete_objects_by_prefix( + &self, + prefix: &str, + ) -> Result<DeleteObjectsResponse, Error> { + // S3 API does not provide a convenient way to delete objects by key prefix. + // List all objects with given group prefix and delete all objects found, so this + // requires at least 2 API calls. + // TODO: fix for more than 1000 response items given by api limit. + let list_objects_result = self.list_objects_v2(Some(prefix), None, None).await?; + let objects_to_delete: Vec<String> = list_objects_result + .contents + .into_iter() + .map(|item| item.key) + .collect(); + self.delete_objects(&objects_to_delete).await + } + + #[inline(always)] + /// Helper to generate [`Uri`] instance with common properties based on given path and query + /// string + fn uri_builder(&self, path_and_query: &str) -> Result<Uri, Error> { + Uri::builder() + .scheme("https") + .authority(self.authority.clone()) + .path_and_query(path_and_query) + .build() + .context("failed to build uri") + } } diff --git a/pbs-s3-client/src/lib.rs b/pbs-s3-client/src/lib.rs index 00fa26455..7cc0ea841 100644 --- a/pbs-s3-client/src/lib.rs +++ b/pbs-s3-client/src/lib.rs @@ -3,6 +3,8 @@ mod client; pub use client::{S3Client, S3ClientOptions}; mod object_key; pub use object_key::{S3ObjectKey, S3_CONTENT_PREFIX}; +mod response_reader; +pub use response_reader::PutObjectResponse; use std::time::Duration; diff --git a/pbs-s3-client/src/response_reader.rs b/pbs-s3-client/src/response_reader.rs new file mode 100644 index 000000000..ed82d77b9 --- /dev/null +++ b/pbs-s3-client/src/response_reader.rs @@ -0,0 +1,324 @@ +use std::str::FromStr; + +use anyhow::{anyhow, bail, Context, Error}; +use hyper::body::HttpBody; +use hyper::header::HeaderName; +use hyper::http::header; +use hyper::http::StatusCode; +use hyper::{Body, HeaderMap, Response}; +use serde::Deserialize; + +use crate::{HttpDate, LastModifiedTimestamp}; + +pub(crate) struct ResponseReader { + response: Response<Body>, +} + +#[derive(Debug)] +pub struct ListObjectsV2Response { + pub date: HttpDate, + pub name: String, + pub prefix: String, + pub key_count: u64, + pub max_keys: u64, + pub is_truncated: bool, + pub continuation_token: Option<String>, + pub next_continuation_token: Option<String>, + pub contents: Vec<ListObjectsV2Contents>, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +struct ListObjectsV2ResponseBody { + pub name: String, + pub prefix: String, + pub key_count: u64, + pub max_keys: u64, + pub is_truncated: bool, + pub continuation_token: Option<String>, + pub next_continuation_token: Option<String>, + pub contents: Option<Vec<ListObjectsV2Contents>>, +} + +impl ListObjectsV2ResponseBody { + fn with_date(self, date: HttpDate) -> ListObjectsV2Response { + ListObjectsV2Response { + date, + name: self.name, + prefix: self.prefix, + key_count: self.key_count, + max_keys: self.max_keys, + is_truncated: self.is_truncated, + continuation_token: self.continuation_token, + next_continuation_token: self.next_continuation_token, + contents: self.contents.unwrap_or_else(|| Vec::new()), + } + } +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct ListObjectsV2Contents { + pub key: String, + pub last_modified: LastModifiedTimestamp, + pub e_tag: String, + pub size: u64, + pub storage_class: String, +} + +#[derive(Debug)] +/// Subset of the head object response (headers only, there is no body) +/// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax +pub struct HeadObjectResponse { + pub content_length: u64, + pub content_type: String, + pub date: HttpDate, + pub e_tag: String, + pub last_modified: HttpDate, +} + +#[derive(Debug)] +/// Subset of the get object response +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html#API_GetObject_ResponseSyntax +pub struct GetObjectResponse { + pub content_length: u64, + pub content_type: String, + pub date: HttpDate, + pub e_tag: String, + pub last_modified: HttpDate, + pub content: Body, +} + +#[derive(Debug)] +pub struct CopyObjectResponse { + pub copy_object_result: CopyObjectResult, + pub x_amz_version_id: Option<String>, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct CopyObjectResult { + pub e_tag: String, + pub last_modified: LastModifiedTimestamp, +} + +/// Subset of the put object response +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_ResponseSyntax +#[derive(Debug)] +pub enum PutObjectResponse { + NeedsRetry, + PreconditionFailed, + Success(String), +} + +/// Subset of the delete objects response +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_ResponseElements +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectsResponse { + pub deleted: Option<Vec<DeletedObject>>, + pub error: Option<Vec<DeleteObjectError>>, +} + +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeletedObject.html +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct DeletedObject { + pub delete_marker: Option<bool>, + pub delete_marker_version_id: Option<String>, + pub key: Option<String>, + pub version_id: Option<String>, +} + +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_Error.html +#[derive(Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectError { + pub code: Option<String>, + pub key: Option<String>, + pub message: Option<String>, + pub version_id: Option<String>, +} + +impl ResponseReader { + pub(crate) fn new(response: Response<Body>) -> Self { + Self { response } + } + + pub(crate) async fn list_objects_v2_response(self) -> Result<ListObjectsV2Response, Error> { + let (parts, body) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => bail!("bucket does not exist"), + status_code => bail!("unexpected status code {status_code}"), + } + + let body = body.collect().await?.to_bytes(); + let body = String::from_utf8(body.to_vec())?; + + let date: HttpDate = Self::parse_header(header::DATE, &parts.headers)?; + + let response: ListObjectsV2ResponseBody = + serde_xml_rs::from_str(&body).context("failed to parse response body")?; + + Ok(response.with_date(date)) + } + + pub(crate) async fn head_object_response(self) -> Result<Option<HeadObjectResponse>, Error> { + let (parts, body) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => return Ok(None), + status_code => bail!("unexpected status code {status_code}"), + } + let body = body.collect().await?.to_bytes(); + if !body.is_empty() { + bail!("got unexpected non-empty response body"); + } + + let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?; + let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?; + let e_tag = Self::parse_header(header::ETAG, &parts.headers)?; + let date = Self::parse_header(header::DATE, &parts.headers)?; + let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?; + + Ok(Some(HeadObjectResponse { + content_length, + content_type, + date, + e_tag, + last_modified, + })) + } + + pub(crate) async fn get_object_response(self) -> Result<Option<GetObjectResponse>, Error> { + let (parts, content) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => return Ok(None), + StatusCode::FORBIDDEN => bail!("object is archived and inaccessible until restored"), + status_code => bail!("unexpected status code {status_code}"), + } + + let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?; + let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?; + let e_tag = Self::parse_header(header::ETAG, &parts.headers)?; + let date = Self::parse_header(header::DATE, &parts.headers)?; + let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?; + + Ok(Some(GetObjectResponse { + content_length, + content_type, + date, + e_tag, + last_modified, + content, + })) + } + + pub(crate) async fn copy_object_response(self) -> Result<CopyObjectResponse, Error> { + let (parts, content) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => bail!("object not found"), + StatusCode::FORBIDDEN => bail!("the source object is not in the active tier"), + status_code => bail!("unexpected status code {status_code}"), + } + + let body = content.collect().await?.to_bytes(); + let body = String::from_utf8(body.to_vec())?; + + let x_amz_version_id = match parts.headers.get("x-amz-version-id") { + Some(version_id) => Some( + version_id + .to_str() + .context("failed to parse version id header")? + .to_owned(), + ), + None => None, + }; + + let copy_object_result: CopyObjectResult = + serde_xml_rs::from_str(&body).context("failed to parse response body")?; + + Ok(CopyObjectResponse { + copy_object_result, + x_amz_version_id, + }) + } + + pub(crate) async fn put_object_response(self) -> Result<PutObjectResponse, Error> { + let (parts, body) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + // If-None-Match precondition failed, an object with same key already present. + // FIXME: Should this be dropped in favor of re-uploading and rely on the local + // cache to detect duplicates to increase data safety guarantees? + StatusCode::PRECONDITION_FAILED => return Ok(PutObjectResponse::PreconditionFailed), + StatusCode::CONFLICT => return Ok(PutObjectResponse::NeedsRetry), + StatusCode::BAD_REQUEST => bail!("invalid request: {body:?}"), + status_code => bail!("unexpected status code {status_code}"), + }; + + let body = body.collect().await?.to_bytes(); + if !body.is_empty() { + bail!("got unexpected non-empty response body"); + } + + let e_tag = Self::parse_header(header::ETAG, &parts.headers)?; + + Ok(PutObjectResponse::Success(e_tag)) + } + + pub(crate) async fn delete_objects_response(self) -> Result<DeleteObjectsResponse, Error> { + let (parts, body) = self.response.into_parts(); + + match parts.status { + StatusCode::OK => (), + StatusCode::BAD_REQUEST => bail!("invalid request: {body:?}"), + status_code => bail!("unexpected status code {status_code}"), + }; + + let body = body.collect().await?.to_bytes(); + let body = String::from_utf8(body.to_vec())?; + + let delete_objects_response: DeleteObjectsResponse = + serde_xml_rs::from_str(&body).context("failed to parse response body")?; + + Ok(delete_objects_response) + } + + fn parse_header<T: FromStr>(name: HeaderName, headers: &HeaderMap) -> Result<T, Error> + where + <T as FromStr>::Err: Send + Sync + 'static, + Result<T, <T as FromStr>::Err>: Context<T, <T as FromStr>::Err>, + { + let header_value = headers + .get(&name) + .ok_or_else(|| anyhow!("missing header '{name}'"))?; + let header_str = header_value + .to_str() + .with_context(|| format!("non UTF-8 header '{name}'"))?; + let value = header_str + .parse() + .with_context(|| format!("failed to parse header '{name}'"))?; + Ok(value) + } + + fn parse_x_amz_checksum_crc32_header(headers: &HeaderMap) -> Result<u32, Error> { + let x_amz_checksum_crc32 = headers + .get("x-amz-checksum-crc32") + .ok_or_else(|| anyhow!("missing header 'x-amz-checksum-crc32'"))?; + let x_amz_checksum_crc32 = base64::decode(x_amz_checksum_crc32.to_str()?)?; + let x_amz_checksum_crc32: [u8; 4] = x_amz_checksum_crc32 + .try_into() + .map_err(|_e| anyhow!("failed to convert x-amz-checksum-crc32 header"))?; + let x_amz_checksum_crc32 = u32::from_be_bytes(x_amz_checksum_crc32); + Ok(x_amz_checksum_crc32) + } +} -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel