* [pbs-devel] [PATCH proxmox-backup 1/6] tools: add compression module
2021-03-31 9:44 [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Dominik Csapak
@ 2021-03-31 9:44 ` Dominik Csapak
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 2/6] tools/compression: add DeflateEncoder and helpers Dominik Csapak
` (5 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Dominik Csapak @ 2021-03-31 9:44 UTC (permalink / raw)
To: pbs-devel
only contains a basic enum for the different compresssion methods
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/tools.rs | 1 +
src/tools/compression.rs | 37 +++++++++++++++++++++++++++++++++++++
2 files changed, 38 insertions(+)
create mode 100644 src/tools/compression.rs
diff --git a/src/tools.rs b/src/tools.rs
index 7e3bff7b..cd1415ec 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -22,6 +22,7 @@ pub mod apt;
pub mod async_io;
pub mod borrow;
pub mod cert;
+pub mod compression;
pub mod daemon;
pub mod disks;
pub mod format;
diff --git a/src/tools/compression.rs b/src/tools/compression.rs
new file mode 100644
index 00000000..fe15e8fc
--- /dev/null
+++ b/src/tools/compression.rs
@@ -0,0 +1,37 @@
+use anyhow::{bail, Error};
+use hyper::header;
+
+/// Possible Compression Methods, order determines preference (later is preferred)
+#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
+pub enum CompressionMethod {
+ Deflate,
+ Gzip,
+ Brotli,
+}
+
+impl CompressionMethod {
+ pub fn content_encoding(&self) -> header::HeaderValue {
+ header::HeaderValue::from_static(self.extension())
+ }
+
+ pub fn extension(&self) -> &'static str {
+ match *self {
+ CompressionMethod::Brotli => "br",
+ CompressionMethod::Gzip => "gzip",
+ CompressionMethod::Deflate => "deflate",
+ }
+ }
+}
+
+impl std::str::FromStr for CompressionMethod {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "br" => Ok(CompressionMethod::Brotli),
+ "gzip" => Ok(CompressionMethod::Brotli),
+ "deflate" => Ok(CompressionMethod::Deflate),
+ _ => bail!("unknown compression format"),
+ }
+ }
+}
--
2.20.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/6] tools/compression: add DeflateEncoder and helpers
2021-03-31 9:44 [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Dominik Csapak
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 1/6] tools: add compression module Dominik Csapak
@ 2021-03-31 9:44 ` Dominik Csapak
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 3/6] add a CACHE_DIR to the created directories on daemon startup Dominik Csapak
` (4 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Dominik Csapak @ 2021-03-31 9:44 UTC (permalink / raw)
To: pbs-devel
implements a deflate encoder that can compress anything that implements
AsyncRead + Unpin into a file with the helper 'compress'
if the inner type is a Stream, it implements Stream itself, this way
some streaming data can be streamed compressed
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
Cargo.toml | 1 +
src/tools/compression.rs | 214 +++++++++++++++++++++++++++++++++++++++
2 files changed, 215 insertions(+)
diff --git a/Cargo.toml b/Cargo.toml
index b0ef56bd..0f3005e2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,6 +29,7 @@ bitflags = "1.2.1"
bytes = "1.0"
crc32fast = "1"
endian_trait = { version = "0.6", features = ["arrays"] }
+flate2 = "1.0"
anyhow = "1.0"
futures = "0.3"
h2 = { version = "0.3", features = [ "stream" ] }
diff --git a/src/tools/compression.rs b/src/tools/compression.rs
index fe15e8fc..a3fb913d 100644
--- a/src/tools/compression.rs
+++ b/src/tools/compression.rs
@@ -1,5 +1,21 @@
+use std::io;
+use std::path::PathBuf;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::time::Duration;
+
use anyhow::{bail, Error};
+use bytes::Bytes;
+use flate2::{Compress, Compression, FlushCompress};
+use futures::ready;
+use futures::stream::Stream;
use hyper::header;
+use tokio::fs::{File, OpenOptions};
+use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
+
+use proxmox::http_err;
+use proxmox::io_format_err;
+use proxmox::tools::byte_buffer::ByteBuffer;
/// Possible Compression Methods, order determines preference (later is preferred)
#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
@@ -35,3 +51,201 @@ impl std::str::FromStr for CompressionMethod {
}
}
}
+
+/// compresses input to output via the given compression method and returns
+/// the compressed File ready to read
+pub async fn compress_file(
+ source_path: &PathBuf,
+ target_path: &PathBuf,
+ compression: &CompressionMethod,
+) -> Result<File, Error> {
+ let mut source = File::open(source_path)
+ .await
+ .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
+
+ let mut target = OpenOptions::new()
+ .write(true)
+ .read(true)
+ .create(true)
+ .mode(0o644)
+ .open(target_path)
+ .await
+ .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
+
+ let _lock = proxmox::tools::fs::lock_file(&mut target, true, Some(Duration::new(1, 0)))?;
+
+ let mut target = match compression {
+ CompressionMethod::Deflate => {
+ let mut enc = DeflateEncoder::with_quality(target, Level::Fastest);
+ enc.compress(&mut source).await?;
+ enc.into_inner()
+ }
+ _ => bail!("compression: {:?} not implemented", compression),
+ };
+
+ target.sync_all().await?;
+ target.seek(std::io::SeekFrom::Start(0)).await?;
+
+ Ok(target)
+}
+
+pub enum Level {
+ Fastest,
+ Best,
+ Default,
+ Precise(u32),
+}
+
+#[derive(Eq, PartialEq)]
+enum EncoderState {
+ Reading,
+ Writing,
+ Flushing,
+ Finished,
+}
+
+pub struct DeflateEncoder<T> {
+ inner: T,
+ compressor: Compress,
+ buffer: ByteBuffer,
+ input_buffer: Bytes,
+ state: EncoderState,
+}
+
+impl<T> DeflateEncoder<T> {
+ pub fn new(inner: T) -> Self {
+ Self::with_quality(inner, Level::Default)
+ }
+
+ pub fn with_quality(inner: T, level: Level) -> Self {
+ let level = match level {
+ Level::Fastest => Compression::fast(),
+ Level::Best => Compression::best(),
+ Level::Default => Compression::new(3),
+ Level::Precise(val) => Compression::new(val),
+ };
+
+ Self {
+ inner,
+ compressor: Compress::new(level, false),
+ buffer: ByteBuffer::with_capacity(8192),
+ input_buffer: Bytes::new(),
+ state: EncoderState::Reading,
+ }
+ }
+
+ pub fn total_in(&self) -> u64 {
+ self.compressor.total_in()
+ }
+
+ pub fn total_out(&self) -> u64 {
+ self.compressor.total_out()
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ fn encode(
+ &mut self,
+ inbuf: &[u8],
+ flush: FlushCompress,
+ ) -> Result<(usize, flate2::Status), io::Error> {
+ let old_in = self.compressor.total_in();
+ let old_out = self.compressor.total_out();
+ let res = self
+ .compressor
+ .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
+ let new_in = (self.compressor.total_in() - old_in) as usize;
+ let new_out = (self.compressor.total_out() - old_out) as usize;
+ self.buffer.add_size(new_out);
+
+ Ok((new_in, res))
+ }
+}
+
+impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
+ pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
+ where
+ R: AsyncRead + Unpin,
+ {
+ let mut buffer = ByteBuffer::with_capacity(8192);
+ let mut eof = false;
+ loop {
+ if !eof && !buffer.is_full() {
+ let read = buffer.read_from_async(reader).await?;
+ if read == 0 {
+ eof = true;
+ }
+ }
+ let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
+ buffer.consume(read);
+
+ self.inner.write_all(&self.buffer[..]).await?;
+ self.buffer.clear();
+
+ if buffer.is_empty() && eof {
+ break;
+ }
+ }
+
+ loop {
+ let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
+ self.inner.write_all(&self.buffer[..]).await?;
+ self.buffer.clear();
+ if res == flate2::Status::StreamEnd {
+ break;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl<T: Stream<Item = Result<Bytes, io::Error>> + Unpin> Stream for DeflateEncoder<T> {
+ type Item = Result<Bytes, io::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+
+ loop {
+ match this.state {
+ EncoderState::Reading => {
+ if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+ let buf = res?;
+ this.input_buffer = buf;
+ this.state = EncoderState::Writing;
+ } else {
+ this.state = EncoderState::Flushing;
+ }
+ }
+ EncoderState::Writing => {
+ if this.input_buffer.is_empty() {
+ return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
+ }
+ let mut buf = this.input_buffer.split_off(0);
+ let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
+ this.input_buffer = buf.split_off(read);
+ if this.input_buffer.is_empty() {
+ this.state = EncoderState::Reading;
+ }
+ if this.buffer.is_full() || res == flate2::Status::BufError {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ }
+ EncoderState::Flushing => {
+ let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
+ if !this.buffer.is_empty() {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ if res == flate2::Status::StreamEnd {
+ this.state = EncoderState::Finished;
+ }
+ }
+ EncoderState::Finished => return Poll::Ready(None),
+ }
+ }
+ }
+}
--
2.20.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/6] add a CACHE_DIR to the created directories on daemon startup
2021-03-31 9:44 [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Dominik Csapak
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 1/6] tools: add compression module Dominik Csapak
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 2/6] tools/compression: add DeflateEncoder and helpers Dominik Csapak
@ 2021-03-31 9:44 ` Dominik Csapak
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 4/6] server/rest: add helpers for compression Dominik Csapak
` (3 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Dominik Csapak @ 2021-03-31 9:44 UTC (permalink / raw)
To: pbs-devel
we can use this for having a semi-persistent file cache
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/bin/proxmox-backup-api.rs | 2 +-
src/buildcfg.rs | 6 ++++++
src/server/worker_task.rs | 9 +++++----
tests/worker-task-abort.rs | 2 +-
4 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 7d800259..6e813414 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -30,7 +30,7 @@ async fn run() -> Result<(), Error> {
bail!("unable to inititialize syslog - {}", err);
}
- server::create_task_log_dirs()?;
+ server::create_dirs()?;
config::create_configdir()?;
diff --git a/src/buildcfg.rs b/src/buildcfg.rs
index 4f333288..40625fad 100644
--- a/src/buildcfg.rs
+++ b/src/buildcfg.rs
@@ -7,12 +7,18 @@ pub const JS_DIR: &str = "/usr/share/javascript/proxmox-backup";
#[macro_export]
macro_rules! PROXMOX_BACKUP_RUN_DIR_M { () => ("/run/proxmox-backup") }
+#[macro_export]
+macro_rules! PROXMOX_BACKUP_CACHE_DIR_M { () => ("/var/cache/proxmox-backup") }
+
#[macro_export]
macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
/// namespaced directory for in-memory (tmpfs) run state
pub const PROXMOX_BACKUP_RUN_DIR: &str = PROXMOX_BACKUP_RUN_DIR_M!();
+/// namespaced directory for on-disk file caches
+pub const PROXMOX_BACKUP_CACHE_DIR: &str = PROXMOX_BACKUP_CACHE_DIR_M!();
+
/// namespaced directory for persistent logging
pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!();
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 6c5456c9..8e072526 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -153,8 +153,8 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
}
}
-/// Create task log directory with correct permissions
-pub fn create_task_log_dirs() -> Result<(), Error> {
+/// Create necessary directories with correct permissions
+pub fn create_dirs() -> Result<(), Error> {
try_block!({
let backup_user = crate::backup::backup_user()?;
@@ -164,9 +164,10 @@ pub fn create_task_log_dirs() -> Result<(), Error> {
create_path(buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
- create_path(buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
+ create_path(buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts.clone()))?;
+ create_path(buildcfg::PROXMOX_BACKUP_CACHE_DIR, None, Some(opts))?;
Ok(())
- }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
+ }).map_err(|err: Error| format_err!("unable to create dirs - {}", err))?;
Ok(())
}
diff --git a/tests/worker-task-abort.rs b/tests/worker-task-abort.rs
index 1a0d938d..a68ab1aa 100644
--- a/tests/worker-task-abort.rs
+++ b/tests/worker-task-abort.rs
@@ -32,7 +32,7 @@ fn garbage_collection(worker: &server::WorkerTask) -> Result<(), Error> {
#[test] #[ignore]
fn worker_task_abort() -> Result<(), Error> {
- server::create_task_log_dirs()?;
+ server::create_dirs()?;
use std::sync::{Arc, Mutex};
--
2.20.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 4/6] server/rest: add helpers for compression
2021-03-31 9:44 [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Dominik Csapak
` (2 preceding siblings ...)
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 3/6] add a CACHE_DIR to the created directories on daemon startup Dominik Csapak
@ 2021-03-31 9:44 ` Dominik Csapak
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 5/6] server/rest: compress api calls Dominik Csapak
` (2 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Dominik Csapak @ 2021-03-31 9:44 UTC (permalink / raw)
To: pbs-devel
these helpers will be used in compressing api calls and static files
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/server/rest.rs | 98 ++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 98 insertions(+)
diff --git a/src/server/rest.rs b/src/server/rest.rs
index 150125ec..bf9aab81 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -39,6 +39,7 @@ use crate::api2::types::{Authid, Userid};
use crate::auth_helpers::*;
use crate::config::cached_user_info::CachedUserInfo;
use crate::tools;
+use crate::tools::compression::{self, CompressionMethod};
use crate::tools::ticket::Ticket;
use crate::tools::FileLogger;
@@ -525,6 +526,81 @@ fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
("application/octet-stream", false)
}
+// returns the compressed file path, and the optionial Path that needs to be written
+async fn get_compressed_file_path(
+ source: PathBuf,
+ compression: &Option<CompressionMethod>,
+) -> Result<(PathBuf, Option<PathBuf>), Error> {
+ let source_ext = source
+ .extension()
+ .and_then(|osstr| osstr.to_str())
+ .unwrap_or("");
+
+ let target_ext = if let Some(method) = compression {
+ format!("{}.{}", source_ext, method.extension())
+ } else {
+ return Ok((source, None));
+ };
+
+ let target_filename = source
+ .file_name()
+ .and_then(|osstr| osstr.to_str())
+ .ok_or_else(|| format_err!("invalid filename"))?;
+
+ let mut target: PathBuf = PathBuf::from(format!(
+ "{}/{}",
+ crate::buildcfg::PROXMOX_BACKUP_CACHE_DIR,
+ target_filename
+ ));
+ target.set_extension(target_ext);
+
+ let source_metadata = tokio::fs::metadata(&source).await?;
+ let target_metadata = if let Ok(metadata) = tokio::fs::metadata(&target).await {
+ metadata
+ } else {
+ return Ok((source, Some(target)));
+ };
+
+ if source_metadata.modified()? > target_metadata.modified()? {
+ Ok((source, Some(target)))
+ } else {
+ Ok((target, None))
+ }
+}
+
+// returns a handle to either the compressed file, or to the original one,
+// if comrpessing fails (e.g. someone else is compressing it right now)
+// in that case, the compression method is changed to None
+async fn get_possibly_compressed_file(
+ source: PathBuf,
+ compression: &mut Option<CompressionMethod>,
+) -> Result<File, Error> {
+ let (filename, to_write) = get_compressed_file_path(source, compression)
+ .await
+ .map_err(|err| http_err!(BAD_REQUEST, "failed to find compress target: {}", err))?;
+
+ // if compressing fails, return the regular file instead
+ match (&to_write, &compression) {
+ (Some(target), Some(method)) => {
+ match compression::compress_file(&filename, &target, &method).await {
+ Ok(file) => Ok(file),
+ Err(_) => {
+ // we are returning the regular file, reset compression
+ *compression = None;
+ File::open(filename)
+ .await
+ .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))
+ }
+ }
+ }
+ (_, _) => {
+ File::open(filename)
+ .await
+ .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))
+ }
+ }
+}
+
async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
let (content_type, _nocomp) = extension_to_content_type(&filename);
@@ -598,6 +674,28 @@ enum AuthData {
ApiToken(String),
}
+fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMethod> {
+ let mut compression = None;
+ if let Some(raw_encoding) = headers.get(header::ACCEPT_ENCODING) {
+ if let Ok(encoding) = raw_encoding.to_str() {
+ for encoding in encoding.split(&[',', ' '][..]) {
+ if let Ok(method) = encoding.parse() {
+ if method != CompressionMethod::Deflate {
+ // fixme: implement other compressors
+ continue;
+ }
+ let method = Some(method);
+ if method > compression {
+ compression = method;
+ }
+ }
+ }
+ }
+ }
+
+ return compression;
+}
+
fn extract_auth_data(headers: &http::HeaderMap) -> Option<AuthData> {
if let Some(raw_cookie) = headers.get(header::COOKIE) {
if let Ok(cookie) = raw_cookie.to_str() {
--
2.20.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 5/6] server/rest: compress api calls
2021-03-31 9:44 [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Dominik Csapak
` (3 preceding siblings ...)
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 4/6] server/rest: add helpers for compression Dominik Csapak
@ 2021-03-31 9:44 ` Dominik Csapak
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 6/6] server/rest: compress static files Dominik Csapak
2021-03-31 10:52 ` [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Thomas Lamprecht
6 siblings, 0 replies; 8+ messages in thread
From: Dominik Csapak @ 2021-03-31 9:44 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/server/rest.rs | 25 +++++++++++++++++++++++--
1 file changed, 23 insertions(+), 2 deletions(-)
diff --git a/src/server/rest.rs b/src/server/rest.rs
index bf9aab81..0b641c66 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -39,7 +39,7 @@ use crate::api2::types::{Authid, Userid};
use crate::auth_helpers::*;
use crate::config::cached_user_info::CachedUserInfo;
use crate::tools;
-use crate::tools::compression::{self, CompressionMethod};
+use crate::tools::compression::{self, CompressionMethod, DeflateEncoder, Level};
use crate::tools::ticket::Ticket;
use crate::tools::FileLogger;
@@ -398,6 +398,7 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
uri_param: HashMap<String, String, S>,
) -> Result<Response<Body>, Error> {
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
+ let compression = extract_compression_method(&parts.headers);
let result = match info.handler {
ApiHandler::AsyncHttp(handler) => {
@@ -418,7 +419,7 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
}
};
- let resp = match result {
+ let mut resp = match result {
Ok(resp) => resp,
Err(err) => {
if let Some(httperr) = err.downcast_ref::<HttpError>() {
@@ -430,6 +431,26 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
}
};
+ let resp = match compression {
+ Some(CompressionMethod::Deflate) => {
+ resp.headers_mut()
+ .insert(header::CONTENT_ENCODING, CompressionMethod::Deflate.content_encoding());
+ resp.map(|body|
+ Body::wrap_stream(DeflateEncoder::with_quality(
+ body.map_err(|err| {
+ proxmox::io_format_err!("error during compression: {}", err)
+ }),
+ Level::Fastest,
+ )),
+ )
+ }
+ Some(_other) => {
+ // fixme: implement other compression algorithms
+ resp
+ }
+ None => resp,
+ };
+
if info.reload_timezone {
unsafe {
tzset();
--
2.20.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 6/6] server/rest: compress static files
2021-03-31 9:44 [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Dominik Csapak
` (4 preceding siblings ...)
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 5/6] server/rest: compress api calls Dominik Csapak
@ 2021-03-31 9:44 ` Dominik Csapak
2021-03-31 10:52 ` [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Thomas Lamprecht
6 siblings, 0 replies; 8+ messages in thread
From: Dominik Csapak @ 2021-03-31 9:44 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/server/rest.rs | 62 +++++++++++++++++++++++++++++-----------------
1 file changed, 39 insertions(+), 23 deletions(-)
diff --git a/src/server/rest.rs b/src/server/rest.rs
index 0b641c66..2876f31f 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -39,6 +39,7 @@ use crate::api2::types::{Authid, Userid};
use crate::auth_helpers::*;
use crate::config::cached_user_info::CachedUserInfo;
use crate::tools;
+use crate::tools::AsyncReaderStream;
use crate::tools::compression::{self, CompressionMethod, DeflateEncoder, Level};
use crate::tools::ticket::Ticket;
use crate::tools::FileLogger;
@@ -622,14 +623,14 @@ async fn get_possibly_compressed_file(
}
}
-async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
- let (content_type, _nocomp) = extension_to_content_type(&filename);
-
+async fn simple_static_file_download(
+ filename: PathBuf,
+ content_type: &'static str,
+ mut compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
use tokio::io::AsyncReadExt;
- let mut file = File::open(filename)
- .await
- .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
+ let mut file = get_possibly_compressed_file(filename, &mut compression).await?;
let mut data: Vec<u8> = Vec::new();
file.read_to_end(&mut data)
@@ -641,37 +642,51 @@ async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>
header::CONTENT_TYPE,
header::HeaderValue::from_static(content_type),
);
+
+ if let Some(method) = compression {
+ response
+ .headers_mut()
+ .insert(header::CONTENT_ENCODING, method.content_encoding());
+ }
+
Ok(response)
}
-async fn chuncked_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
- let (content_type, _nocomp) = extension_to_content_type(&filename);
+async fn chuncked_static_file_download(
+ filename: PathBuf,
+ content_type: &'static str,
+ mut compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
+ let mut resp = Response::builder()
+ .status(StatusCode::OK)
+ .header(header::CONTENT_TYPE, content_type);
- let file = File::open(filename)
- .await
- .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
+ let file = get_possibly_compressed_file(filename, &mut compression).await?;
- let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
- .map_ok(|bytes| bytes.freeze());
- let body = Body::wrap_stream(payload);
+ if let Some(method) = compression {
+ resp = resp.header(header::CONTENT_ENCODING, method.content_encoding());
+ }
- // FIXME: set other headers ?
- Ok(Response::builder()
- .status(StatusCode::OK)
- .header(header::CONTENT_TYPE, content_type)
- .body(body)
+ Ok(resp
+ .body(Body::wrap_stream(AsyncReaderStream::new(file)))
.unwrap())
}
-async fn handle_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
+async fn handle_static_file_download(
+ filename: PathBuf,
+ compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
let metadata = tokio::fs::metadata(filename.clone())
.map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err))
.await?;
+ let (content_type, nocomp) = extension_to_content_type(&filename);
+ let compression = if nocomp { None } else { compression };
+
if metadata.len() < 1024 * 32 {
- simple_static_file_download(filename).await
+ simple_static_file_download(filename, content_type, compression).await
} else {
- chuncked_static_file_download(filename).await
+ chuncked_static_file_download(filename, content_type, compression).await
}
}
@@ -944,7 +959,8 @@ async fn handle_request(
}
} else {
let filename = api.find_alias(&components);
- return handle_static_file_download(filename).await;
+ let compression = extract_compression_method(&parts.headers);
+ return handle_static_file_download(filename, compression).await;
}
}
--
2.20.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files
2021-03-31 9:44 [pbs-devel] [PATCH proxmox-backup 0/6] add compression to api/static files Dominik Csapak
` (5 preceding siblings ...)
2021-03-31 9:44 ` [pbs-devel] [PATCH proxmox-backup 6/6] server/rest: compress static files Dominik Csapak
@ 2021-03-31 10:52 ` Thomas Lamprecht
6 siblings, 0 replies; 8+ messages in thread
From: Thomas Lamprecht @ 2021-03-31 10:52 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Dominik Csapak
On 31.03.21 11:44, Dominik Csapak wrote:
> by using the flate2 crate
>
> this series will also help send a simpler version of my zip deflate
> encoder patch (i'll send that in the next few days)
high level comment: talked with Dietmar in the morning, and we agreed to
avoid a disk cache for now, the static files are seldom served so probably
not worth a more complex caching system which produces disk IO - we can
always add it in the future, if the on-demand approach shows problems.
>
> Dominik Csapak (6):
> tools: add compression module
> tools/compression: add DeflateEncoder and helpers
> add a CACHE_DIR to the created directories on daemon startup
> server/rest: add helpers for compression
> server/rest: compress api calls
> server/rest: compress static files
>
> Cargo.toml | 1 +
> src/bin/proxmox-backup-api.rs | 2 +-
> src/buildcfg.rs | 6 +
> src/server/rest.rs | 181 ++++++++++++++++++++----
> src/server/worker_task.rs | 9 +-
> src/tools.rs | 1 +
> src/tools/compression.rs | 251 ++++++++++++++++++++++++++++++++++
> tests/worker-task-abort.rs | 2 +-
> 8 files changed, 424 insertions(+), 29 deletions(-)
> create mode 100644 src/tools/compression.rs
>
^ permalink raw reply [flat|nested] 8+ messages in thread