From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v3 10/41] s3 client: implement methods to operate on s3 objects in bucket
Date: Mon, 16 Jun 2025 16:21:25 +0200 [thread overview]
Message-ID: <20250616142156.413652-13-c.ebner@proxmox.com> (raw)
In-Reply-To: <20250616142156.413652-1-c.ebner@proxmox.com>
Adds the basic implementation of the client to use s3 object stores
as backend for PBS datastores.
This implements the basic client actions on a bucket and objects
stored within given bucket.
This is not feature complete and intended to be extended on a
per-demand fashion rather than implementing the whole client at once.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
Cargo.toml | 3 +
pbs-s3-client/Cargo.toml | 8 +
pbs-s3-client/src/client.rs | 392 ++++++++++++++++++++++++++-
pbs-s3-client/src/lib.rs | 2 +
pbs-s3-client/src/response_reader.rs | 343 +++++++++++++++++++++++
5 files changed, 747 insertions(+), 1 deletion(-)
create mode 100644 pbs-s3-client/src/response_reader.rs
diff --git a/Cargo.toml b/Cargo.toml
index b11248f90..2c97e691f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -137,15 +137,18 @@ log = "0.4.17"
nix = "0.26.1"
nom = "7"
num-traits = "0.2"
+md5 = "0.7.0"
once_cell = "1.3.1"
openssl = "0.10.40"
percent-encoding = "2.1"
pin-project-lite = "0.2"
+quick-xml = "0.26"
regex = "1.5.5"
rustyline = "9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_plain = "1.0"
+serde-xml-rs = "0.5"
siphasher = "0.3"
syslog = "6"
tar = "0.4"
diff --git a/pbs-s3-client/Cargo.toml b/pbs-s3-client/Cargo.toml
index 6edd9a248..a6348a0e4 100644
--- a/pbs-s3-client/Cargo.toml
+++ b/pbs-s3-client/Cargo.toml
@@ -8,12 +8,20 @@ rust-version.workspace = true
[dependencies]
anyhow.workspace = true
+base64.workspace = true
+bytes.workspace = true
+futures.workspace = true
hex = { workspace = true, features = [ "serde" ] }
hyper.workspace = true
iso8601.workspace = true
+md5.workspace = true
openssl.workspace = true
+quick-xml = { workspace = true, features = ["async-tokio"] }
serde.workspace = true
serde_plain.workspace = true
+serde-xml-rs.workspace = true
+tokio = { workspace = true, features = [] }
+tokio-util = { workspace = true, features = ["compat"] }
tracing.workspace = true
url.workspace = true
diff --git a/pbs-s3-client/src/client.rs b/pbs-s3-client/src/client.rs
index b886843a3..515b05b9c 100644
--- a/pbs-s3-client/src/client.rs
+++ b/pbs-s3-client/src/client.rs
@@ -1,18 +1,39 @@
+use std::collections::HashMap;
+use std::io::Cursor;
+use std::path::Path;
+use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::{bail, format_err, Context, Error};
+use bytes::{Bytes, BytesMut};
+use hyper::body::HttpBody;
use hyper::client::{Client, HttpConnector};
-use hyper::http::uri::Authority;
+use hyper::http::method::Method;
+use hyper::http::uri::{Authority, Parts, PathAndQuery, Scheme};
+use hyper::http::StatusCode;
+use hyper::http::{header, HeaderValue, Uri};
use hyper::Body;
+use hyper::{Request, Response};
use openssl::hash::MessageDigest;
+use openssl::sha::Sha256;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use openssl::x509::X509StoreContextRef;
+use quick_xml::events::BytesText;
+use quick_xml::writer::Writer;
use tracing::error;
use pbs_api_types::{S3ClientConfig, S3ClientSecretsConfig};
use proxmox_http::client::HttpsConnector;
+use crate::aws_sign_v4::aws_sign_v4_signature;
+use crate::aws_sign_v4::AWS_SIGN_V4_DATETIME_FORMAT;
+use crate::object_key::{RelS3ObjectKey, S3ObjectKey};
+use crate::response_reader::{
+ CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse,
+ ListObjectsV2Response, PutObjectResponse, ResponseReader,
+};
+
const S3_HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const S3_TCP_KEEPALIVE_TIME: u32 = 120;
@@ -167,4 +188,373 @@ impl S3Client {
"unexpected certificate fingerprint {certificate_fingerprint}"
))
}
+
+ async fn prepare(&self, mut request: Request<Body>) -> Result<Request<Body>, Error> {
+ let host_header = request
+ .uri()
+ .authority()
+ .ok_or_else(|| format_err!("request missing authority"))?
+ .to_string();
+
+ // Content verification for aws s3 signature
+ let mut hasher = Sha256::new();
+ // Load payload into memory, needed as the hash and checksum have to be calculated a-priori
+ let buffer: Bytes = {
+ let body = request.body_mut();
+ let mut buf = BytesMut::with_capacity(body.size_hint().lower() as usize);
+ while let Some(chunk) = body.data().await {
+ let chunk = chunk?;
+ hasher.update(&chunk);
+ buf.extend_from_slice(&chunk);
+ }
+ buf.freeze()
+ };
+ // Use MD5 as upload integrity check, as other methods are not supported by all S3 object
+ // store providers and might be ignored and this is recommended by AWS as described in
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
+ let payload_md5 = md5::compute(&buffer);
+ let payload_digest = hex::encode(hasher.finish());
+ let payload_len = buffer.len();
+ *request.body_mut() = Body::from(buffer);
+
+ let epoch = proxmox_time::epoch_i64();
+ let datetime = proxmox_time::strftime_utc(AWS_SIGN_V4_DATETIME_FORMAT, epoch)?;
+
+ request
+ .headers_mut()
+ .insert("x-amz-date", HeaderValue::from_str(&datetime)?);
+ request
+ .headers_mut()
+ .insert("host", HeaderValue::from_str(&host_header)?);
+ request.headers_mut().insert(
+ "x-amz-content-sha256",
+ HeaderValue::from_str(&payload_digest)?,
+ );
+ request.headers_mut().insert(
+ header::CONTENT_LENGTH,
+ HeaderValue::from_str(&payload_len.to_string())?,
+ );
+ if payload_len > 0 {
+ let md5_digest = base64::encode(*payload_md5);
+ request
+ .headers_mut()
+ .insert("Content-MD5", HeaderValue::from_str(&md5_digest)?);
+ }
+
+ let signature = aws_sign_v4_signature(&request, &self.options, epoch, &payload_digest)?;
+
+ request
+ .headers_mut()
+ .insert(header::AUTHORIZATION, HeaderValue::from_str(&signature)?);
+
+ Ok(request)
+ }
+
+ pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
+ let request = self.prepare(request).await?;
+ let response = tokio::time::timeout(S3_HTTP_CONNECT_TIMEOUT, self.client.request(request))
+ .await
+ .context("request timeout")??;
+ Ok(response)
+ }
+
+ /// Check if bucket exists and got permissions to access it.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
+ pub async fn head_bucket(&self) -> Result<(), Error> {
+ let request = Request::builder()
+ .method(Method::HEAD)
+ .uri(self.build_uri("/", &[])?)
+ .body(Body::empty())?;
+ let response = self.send(request).await?;
+ let (parts, _body) = response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND => {
+ bail!("bucket does not exist or no permission to access it")
+ }
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ Ok(())
+ }
+
+ /// Fetch metadata from an object without returning the object itself.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
+ pub async fn head_object(
+ &self,
+ object_key: RelS3ObjectKey,
+ ) -> Result<Option<HeadObjectResponse>, Error> {
+ let object_key = object_key.to_full_key(&self.options.store_prefix);
+ let request = Request::builder()
+ .method(Method::HEAD)
+ .uri(self.build_uri(&object_key, &[])?)
+ .body(Body::empty())?;
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.head_object_response().await
+ }
+
+ /// Fetch an object from object store.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+ pub async fn get_object(
+ &self,
+ object_key: RelS3ObjectKey,
+ ) -> Result<Option<GetObjectResponse>, Error> {
+ let object_key = object_key.to_full_key(&self.options.store_prefix);
+ let request = Request::builder()
+ .method(Method::GET)
+ .uri(self.build_uri(&object_key, &[])?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.get_object_response().await
+ }
+
+ /// Returns some or all (up to 1,000) of the objects in a bucket with each request.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
+ pub async fn list_objects_v2(
+ &self,
+ prefix: &S3PathPrefix,
+ continuation_token: Option<&str>,
+ ) -> Result<ListObjectsV2Response, Error> {
+ let mut query = vec![("list-type", "2")];
+ let abs_prefix: String;
+ if let S3PathPrefix::Some(prefix) = prefix {
+ abs_prefix = if prefix.starts_with("/") {
+ format!("{}{prefix}", self.options.store_prefix)
+ } else {
+ format!("{}/{prefix}", self.options.store_prefix)
+ };
+ query.push(("prefix", &abs_prefix));
+ }
+ if let Some(token) = continuation_token {
+ query.push(("continuation-token", token));
+ }
+ let request = Request::builder()
+ .method(Method::GET)
+ .uri(self.build_uri("/", &query)?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.list_objects_v2_response().await
+ }
+
+ /// Add a new object to a bucket.
+ ///
+ /// Do not reupload if an object with matching key already exists in the bucket.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
+ pub async fn put_object(
+ &self,
+ object_key: RelS3ObjectKey,
+ object_data: Body,
+ ) -> Result<PutObjectResponse, Error> {
+ let object_key = object_key.to_full_key(&self.options.store_prefix);
+ let request = Request::builder()
+ .method(Method::PUT)
+ .uri(self.build_uri(&object_key, &[])?)
+ .header(header::CONTENT_TYPE, "binary/octet")
+ // Never overwrite pre-existing objects with the same key.
+ //.header(header::IF_NONE_MATCH, "*")
+ .body(object_data)?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.put_object_response().await
+ }
+
+ /// Removes an object from a bucket.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
+ pub async fn delete_object(&self, object_key: RelS3ObjectKey) -> Result<(), Error> {
+ let object_key = object_key.to_full_key(&self.options.store_prefix);
+ let request = Request::builder()
+ .method(Method::DELETE)
+ .uri(self.build_uri(&object_key, &[])?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.delete_object_response().await
+ }
+
+ /// Delete multiple objects from a bucket using a single HTTP request.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
+ pub async fn delete_objects(
+ &self,
+ object_keys: &[S3ObjectKey],
+ ) -> Result<DeleteObjectsResponse, Error> {
+ let mut body = String::from(r#"<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#);
+ for object_key in object_keys {
+ body.push_str("<Object><Key>");
+ body.push_str(object_key);
+ body.push_str("</Key></Object>");
+ }
+ body.push_str("</Delete>");
+ let request = Request::builder()
+ .method(Method::POST)
+ .uri(self.build_uri("/", &[("delete", "")])?)
+ .body(Body::from(body))?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.delete_objects_response().await
+ }
+
+ /// Delete objects by given key prefix.
+ /// Requires at least 2 api calls.
+ pub async fn delete_objects_by_prefix(&self, prefix: &S3PathPrefix) -> Result<bool, Error> {
+ // S3 API does not provide a convenient way to delete objects by key prefix.
+ // List all objects with given group prefix and delete all objects found, so this
+ // requires at least 2 API calls.
+ let mut next_continuation_token: Option<String> = None;
+ let mut delete_errors = false;
+ loop {
+ let list_objects_result = self
+ .list_objects_v2(prefix, next_continuation_token.as_deref())
+ .await?;
+
+ let objects_to_delete: Vec<S3ObjectKey> = list_objects_result
+ .contents
+ .into_iter()
+ .map(|item| item.key)
+ .collect();
+
+ let response = self.delete_objects(&objects_to_delete).await?;
+ if response.error.is_some() {
+ delete_errors = true;
+ }
+
+ if list_objects_result.is_truncated {
+ next_continuation_token = list_objects_result
+ .next_continuation_token
+ .as_ref()
+ .cloned();
+ continue;
+ }
+ break;
+ }
+ Ok(delete_errors)
+ }
+
+ /// Delete objects by given key prefix, but exclude items pre-filter based on suffix
+ /// (including the parent component of the matched suffix). E.g. do not remove items in a
+ /// snapshot directory, by matching based on the protected file marker (given as suffix).
+ /// Items matching the suffix provided as `ignore` will be excluded in the parent of a matching
+ /// suffix entry. E.g. owner and notes for a group, if a group snapshots was matched by a
+ /// protected marker.
+ ///
+ /// Requires at least 2 api calls.
+ pub async fn delete_objects_by_prefix_with_suffix_filter(
+ &self,
+ prefix: &S3PathPrefix,
+ suffix: &str,
+ excldue_from_parent: &[&str],
+ ) -> Result<bool, Error> {
+ // S3 API does not provide a convenient way to delete objects by key prefix.
+ // List all objects with given group prefix and delete all objects found, so this
+ // requires at least 2 API calls.
+ let mut next_continuation_token: Option<String> = None;
+ let mut delete_errors = false;
+ let mut prefix_filters = Vec::new();
+ let mut list_objects = Vec::new();
+ loop {
+ let list_objects_result = self
+ .list_objects_v2(prefix, next_continuation_token.as_deref())
+ .await?;
+
+ let mut prefixes: Vec<String> = list_objects_result
+ .contents
+ .iter()
+ .filter_map(|item| {
+ let prefix_filter = item.key.strip_suffix(suffix).map(|prefix| {
+ let path = Path::new(prefix);
+ if let Some(parent) = path.parent() {
+ for filter in excldue_from_parent {
+ let filter = parent.join(filter);
+ // valid utf-8 as combined from `str` values
+ prefix_filters.push(filter.to_string_lossy().to_string());
+ }
+ }
+ prefix.to_string()
+ });
+ if prefix_filter.is_none() {
+ list_objects.push(item.key.clone());
+ }
+ prefix_filter
+ })
+ .collect();
+ prefix_filters.append(&mut prefixes);
+
+ if list_objects_result.is_truncated {
+ next_continuation_token = list_objects_result
+ .next_continuation_token
+ .as_ref()
+ .cloned();
+ continue;
+ }
+ break;
+ }
+
+ let objects_to_delete: Vec<S3ObjectKey> = list_objects
+ .into_iter()
+ .filter_map(|item| {
+ for prefix in &prefix_filters {
+ if item.strip_prefix(prefix).is_some() {
+ return None;
+ }
+ }
+ Some(item)
+ })
+ .collect();
+
+ for objects in objects_to_delete.chunks(1000) {
+ let result = self.delete_objects(objects).await?;
+ if result.error.is_some() {
+ delete_errors = true;
+ }
+ }
+
+ Ok(delete_errors)
+ }
+
+ #[inline(always)]
+ /// Helper to generate [`Uri`] instance with common properties based on given path and query.
+ fn build_uri(&self, mut path: &str, query: &[(&str, &str)]) -> Result<Uri, Error> {
+ if path.starts_with('/') {
+ path = &path[1..];
+ }
+ let mut path_and_query = if self.options.path_style {
+ format!("/{bucket}/{path}", bucket = self.options.bucket)
+ } else {
+ format!("/{path}")
+ };
+
+ if !query.is_empty() {
+ path_and_query.push('?');
+ // No further input validation as http::uri::Builder will check path and query
+ let mut query_iter = query.iter().peekable();
+ while let Some((key, value)) = query_iter.next() {
+ path_and_query.push_str(key);
+ if !value.is_empty() {
+ path_and_query.push('=');
+ path_and_query.push_str(value);
+ }
+ if query_iter.peek().is_some() {
+ path_and_query.push('&');
+ }
+ }
+ }
+
+ let path_and_query =
+ PathAndQuery::from_str(&path_and_query).context("failed to parse path and query")?;
+
+ let mut uri_parts = Parts::default();
+ uri_parts.scheme = Some(Scheme::HTTPS);
+ uri_parts.authority = Some(self.authority.clone());
+ uri_parts.path_and_query = Some(path_and_query);
+
+ Uri::from_parts(uri_parts).context("failed to build uri")
+ }
}
diff --git a/pbs-s3-client/src/lib.rs b/pbs-s3-client/src/lib.rs
index 8400edf20..a9fa0d847 100644
--- a/pbs-s3-client/src/lib.rs
+++ b/pbs-s3-client/src/lib.rs
@@ -3,6 +3,8 @@ mod client;
pub use client::{S3Client, S3ClientOptions, S3PathPrefix};
mod object_key;
pub use object_key::{S3ObjectKey, S3_CONTENT_PREFIX};
+mod response_reader;
+pub use response_reader::PutObjectResponse;
use std::time::Duration;
diff --git a/pbs-s3-client/src/response_reader.rs b/pbs-s3-client/src/response_reader.rs
new file mode 100644
index 000000000..32ae48869
--- /dev/null
+++ b/pbs-s3-client/src/response_reader.rs
@@ -0,0 +1,343 @@
+use std::str::FromStr;
+
+use anyhow::{anyhow, bail, Context, Error};
+use hyper::body::HttpBody;
+use hyper::header::HeaderName;
+use hyper::http::header;
+use hyper::http::StatusCode;
+use hyper::{Body, HeaderMap, Response};
+use serde::Deserialize;
+
+use crate::S3ObjectKey;
+use crate::{HttpDate, LastModifiedTimestamp};
+
+pub(crate) struct ResponseReader {
+ response: Response<Body>,
+}
+
+#[derive(Debug)]
+pub struct ListObjectsV2Response {
+ pub date: HttpDate,
+ pub name: String,
+ pub prefix: String,
+ pub key_count: u64,
+ pub max_keys: u64,
+ pub is_truncated: bool,
+ pub continuation_token: Option<String>,
+ pub next_continuation_token: Option<String>,
+ pub contents: Vec<ListObjectsV2Contents>,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+struct ListObjectsV2ResponseBody {
+ pub name: String,
+ pub prefix: String,
+ pub key_count: u64,
+ pub max_keys: u64,
+ pub is_truncated: bool,
+ pub continuation_token: Option<String>,
+ pub next_continuation_token: Option<String>,
+ pub contents: Option<Vec<ListObjectsV2Contents>>,
+}
+
+impl ListObjectsV2ResponseBody {
+ fn with_date(self, date: HttpDate) -> ListObjectsV2Response {
+ ListObjectsV2Response {
+ date,
+ name: self.name,
+ prefix: self.prefix,
+ key_count: self.key_count,
+ max_keys: self.max_keys,
+ is_truncated: self.is_truncated,
+ continuation_token: self.continuation_token,
+ next_continuation_token: self.next_continuation_token,
+ contents: self.contents.unwrap_or_default(),
+ }
+ }
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListObjectsV2Contents {
+ pub key: S3ObjectKey,
+ pub last_modified: LastModifiedTimestamp,
+ pub e_tag: String,
+ pub size: u64,
+ pub storage_class: String,
+}
+
+#[derive(Debug)]
+/// Subset of the head object response (headers only, there is no body)
+/// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
+pub struct HeadObjectResponse {
+ pub content_length: u64,
+ pub content_type: String,
+ pub date: HttpDate,
+ pub e_tag: String,
+ pub last_modified: HttpDate,
+}
+
+#[derive(Debug)]
+/// Subset of the get object response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html#API_GetObject_ResponseSyntax
+pub struct GetObjectResponse {
+ pub content_length: u64,
+ pub content_type: String,
+ pub date: HttpDate,
+ pub e_tag: String,
+ pub last_modified: HttpDate,
+ pub content: Body,
+}
+
+#[derive(Debug)]
+pub struct CopyObjectResponse {
+ pub copy_object_result: CopyObjectResult,
+ pub x_amz_version_id: Option<String>,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct CopyObjectResult {
+ pub e_tag: String,
+ pub last_modified: LastModifiedTimestamp,
+}
+
+/// Subset of the put object response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_ResponseSyntax
+#[derive(Debug)]
+pub enum PutObjectResponse {
+ NeedsRetry,
+ PreconditionFailed,
+ Success(String),
+}
+
+/// Subset of the delete objects response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_ResponseElements
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeleteObjectsResponse {
+ pub deleted: Option<Vec<DeletedObject>>,
+ pub error: Option<Vec<DeleteObjectError>>,
+}
+
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeletedObject.html
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeletedObject {
+ pub delete_marker: Option<bool>,
+ pub delete_marker_version_id: Option<String>,
+ pub key: Option<S3ObjectKey>,
+ pub version_id: Option<String>,
+}
+
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_Error.html
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeleteObjectError {
+ pub code: Option<String>,
+ pub key: Option<S3ObjectKey>,
+ pub message: Option<String>,
+ pub version_id: Option<String>,
+}
+
+impl ResponseReader {
+ pub(crate) fn new(response: Response<Body>) -> Self {
+ Self { response }
+ }
+
+ pub(crate) async fn list_objects_v2_response(self) -> Result<ListObjectsV2Response, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => bail!("bucket does not exist"),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ let body = body.collect().await?.to_bytes();
+ let body = String::from_utf8(body.to_vec())?;
+
+ let date: HttpDate = Self::parse_header(header::DATE, &parts.headers)?;
+
+ let response: ListObjectsV2ResponseBody =
+ serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+ Ok(response.with_date(date))
+ }
+
+ pub(crate) async fn head_object_response(self) -> Result<Option<HeadObjectResponse>, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => return Ok(None),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+ let body = body.collect().await?.to_bytes();
+ if !body.is_empty() {
+ bail!("got unexpected non-empty response body");
+ }
+
+ let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?;
+ let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?;
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+ let date = Self::parse_header(header::DATE, &parts.headers)?;
+ let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?;
+
+ Ok(Some(HeadObjectResponse {
+ content_length,
+ content_type,
+ date,
+ e_tag,
+ last_modified,
+ }))
+ }
+
+ pub(crate) async fn get_object_response(self) -> Result<Option<GetObjectResponse>, Error> {
+ let (parts, content) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => return Ok(None),
+ StatusCode::FORBIDDEN => bail!("object is archived and inaccessible until restored"),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?;
+ let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?;
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+ let date = Self::parse_header(header::DATE, &parts.headers)?;
+ let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?;
+
+ Ok(Some(GetObjectResponse {
+ content_length,
+ content_type,
+ date,
+ e_tag,
+ last_modified,
+ content,
+ }))
+ }
+
+ pub(crate) async fn copy_object_response(self) -> Result<CopyObjectResponse, Error> {
+ let (parts, content) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => bail!("object not found"),
+ StatusCode::FORBIDDEN => bail!("the source object is not in the active tier"),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ let body = content.collect().await?.to_bytes();
+ let body = String::from_utf8(body.to_vec())?;
+
+ let x_amz_version_id = match parts.headers.get("x-amz-version-id") {
+ Some(version_id) => Some(
+ version_id
+ .to_str()
+ .context("failed to parse version id header")?
+ .to_owned(),
+ ),
+ None => None,
+ };
+
+ let copy_object_result: CopyObjectResult =
+ serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+ Ok(CopyObjectResponse {
+ copy_object_result,
+ x_amz_version_id,
+ })
+ }
+
+ pub(crate) async fn put_object_response(self) -> Result<PutObjectResponse, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ // If-None-Match precondition failed, an object with same key already present.
+ // FIXME: Should this be dropped in favor of re-uploading and rely on the local
+ // cache to detect duplicates to increase data safety guarantees?
+ StatusCode::PRECONDITION_FAILED => return Ok(PutObjectResponse::PreconditionFailed),
+ StatusCode::CONFLICT => return Ok(PutObjectResponse::NeedsRetry),
+ StatusCode::BAD_REQUEST => bail!("invalid request: {body:?}"),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ let body = body.collect().await?.to_bytes();
+ if !body.is_empty() {
+ bail!("got unexpected non-empty response body");
+ }
+
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+
+ Ok(PutObjectResponse::Success(e_tag))
+ }
+
+ pub(crate) async fn delete_object_response(self) -> Result<(), Error> {
+ let (parts, _body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::NO_CONTENT => (),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ Ok(())
+ }
+
+ pub(crate) async fn delete_objects_response(self) -> Result<DeleteObjectsResponse, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::BAD_REQUEST => bail!("invalid request: {body:?}"),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ let body = body.collect().await?.to_bytes();
+ let body = String::from_utf8(body.to_vec())?;
+
+ let delete_objects_response: DeleteObjectsResponse =
+ serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+ Ok(delete_objects_response)
+ }
+
+ fn parse_header<T: FromStr>(name: HeaderName, headers: &HeaderMap) -> Result<T, Error>
+ where
+ <T as FromStr>::Err: Send + Sync + 'static,
+ Result<T, <T as FromStr>::Err>: Context<T, <T as FromStr>::Err>,
+ {
+ let header_value = headers
+ .get(&name)
+ .ok_or_else(|| anyhow!("missing header '{name}'"))?;
+ let header_str = header_value
+ .to_str()
+ .with_context(|| format!("non UTF-8 header '{name}'"))?;
+ let value = header_str
+ .parse()
+ .with_context(|| format!("failed to parse header '{name}'"))?;
+ Ok(value)
+ }
+
+ // TODO: Integrity checks via CRC32 or SHA265 currently cannot be performed, since not
+ // supported by all S3 object store providers.
+ // See also:
+ // https://tracker.ceph.com/issues/63951
+ // https://tracker.ceph.com/issues/69105
+ // https://www.backblaze.com/docs/cloud-storage-s3-compatible-api
+ fn parse_x_amz_checksum_crc32_header(headers: &HeaderMap) -> Result<Option<u32>, Error> {
+ let x_amz_checksum_crc32 = match headers.get("x-amz-checksum-crc32") {
+ Some(x_amz_checksum_crc32) => x_amz_checksum_crc32,
+ None => return Ok(None),
+ };
+ let x_amz_checksum_crc32 = base64::decode(x_amz_checksum_crc32.to_str()?)?;
+ let x_amz_checksum_crc32: [u8; 4] = x_amz_checksum_crc32
+ .try_into()
+ .map_err(|_e| anyhow!("failed to convert x-amz-checksum-crc32 header"))?;
+ let x_amz_checksum_crc32 = u32::from_be_bytes(x_amz_checksum_crc32);
+ Ok(Some(x_amz_checksum_crc32))
+ }
+}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-06-16 14:22 UTC|newest]
Thread overview: 45+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-06-16 14:21 [pbs-devel] [PATCH proxmox{, -backup} v3 00/43] fix #2943: S3 storage backend for datastores Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox v3 1/2] pbs-api-types: add types for S3 client configs and secrets Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox v3 2/2] pbs-api-types: extend datastore config by backend config enum Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 01/41] api: fix minor formatting issues Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 02/41] bin: sort submodules alphabetically Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 03/41] datastore: ignore missing owner file when removing group directory Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 04/41] verify: refactor verify related functions to be methods of worker Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 05/41] s3 client: add crate for AWS S3 compatible object store client Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 06/41] s3 client: implement AWS signature v4 request authentication Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 07/41] s3 client: add dedicated type for s3 object keys Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 08/41] s3 client: add type for last modified timestamp in responses Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 09/41] s3 client: add helper to parse http date headers Christian Ebner
2025-06-16 14:21 ` Christian Ebner [this message]
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 11/41] config: introduce s3 object store client configuration Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 12/41] api: config: implement endpoints to manipulate and list s3 configs Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 13/41] api: datastore: check S3 backend bucket access on datastore create Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 14/41] api/bin: add endpoint and command to check s3 client connection Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 15/41] datastore: allow to get the backend for a datastore Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 16/41] api: backup: store datastore backend in runtime environment Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 17/41] api: backup: conditionally upload chunks to S3 object store backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 18/41] api: backup: conditionally upload blobs " Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 19/41] api: backup: conditionally upload indices " Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 20/41] api: backup: conditionally upload manifest " Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 21/41] sync: pull: conditionally upload content to S3 backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 22/41] api: reader: fetch chunks based on datastore backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 23/41] datastore: local chunk reader: read chunks based on backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 24/41] verify worker: add datastore backed to verify worker Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 25/41] verify: implement chunk verification for stores with s3 backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 26/41] datastore: create namespace marker in S3 backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 27/41] datastore: create/delete protected marker file on S3 storage backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 28/41] datastore: prune groups/snapshots from S3 object store backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 29/41] datastore: get and set owner for S3 " Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 30/41] datastore: implement garbage collection for s3 backend Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 31/41] ui: add datastore type selector and reorganize component layout Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 32/41] ui: add S3 client edit window for configuration create/edit Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 33/41] ui: add S3 client view for configuration Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 34/41] ui: expose the S3 client view in the navigation tree Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 35/41] ui: add s3 client selector and bucket field for s3 backend setup Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 36/41] tools: lru cache: add removed callback for evicted cache nodes Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 37/41] tools: async lru cache: implement insert, remove and contains methods Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 38/41] datastore: add local datastore cache for network attached storages Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 39/41] api: backup: use local datastore cache on S3 backend chunk upload Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 40/41] api: reader: use local datastore cache on S3 backend chunk fetching Christian Ebner
2025-06-16 14:21 ` [pbs-devel] [PATCH proxmox-backup v3 41/41] api: backup: add no-cache flag to bypass local datastore cache Christian Ebner
2025-06-23 9:43 ` [pbs-devel] superseded: [PATCH proxmox{, -backup} v3 00/43] fix #2943: S3 storage backend for datastores Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250616142156.413652-13-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal