* [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite)
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
` (14 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/mod.rs | 6 +
.../src/client/rate_limited_stream.rs | 144 ++++++++++++++++++
proxmox-http/src/client/rate_limiter.rs | 75 +++++++++
3 files changed, 225 insertions(+)
create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
create mode 100644 proxmox-http/src/client/rate_limiter.rs
diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs
index b6ee4b0..30e66d5 100644
--- a/proxmox-http/src/client/mod.rs
+++ b/proxmox-http/src/client/mod.rs
@@ -2,6 +2,12 @@
//!
//! Contains a lightweight wrapper around `hyper` with support for TLS connections.
+mod rate_limiter;
+pub use rate_limiter::RateLimiter;
+
+mod rate_limited_stream;
+pub use rate_limited_stream::RateLimitedStream;
+
mod connector;
pub use connector::HttpsConnector;
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
new file mode 100644
index 0000000..a11b59e
--- /dev/null
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -0,0 +1,144 @@
+use std::pin::Pin;
+use std::marker::Unpin;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::Future;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
+use tokio::time::Sleep;
+
+use std::task::{Context, Poll};
+
+use super::RateLimiter;
+
+/// A rate limited stream using [RateLimiter]
+pub struct RateLimitedStream<S> {
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ read_delay: Option<Pin<Box<Sleep>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_delay: Option<Pin<Box<Sleep>>>,
+ stream: S,
+}
+
+impl <S> RateLimitedStream<S> {
+
+ const MIN_DELAY: Duration = Duration::from_millis(20);
+
+ /// Creates a new instance with reads and writes limited to the same `rate`.
+ pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
+ let now = Instant::now();
+ let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+ let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+ Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
+ }
+
+ /// Creates a new instance with specified [RateLimiters] for reads and writes.
+ pub fn with_limiter(
+ stream: S,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ ) -> Self {
+ Self {
+ read_limiter,
+ read_delay: None,
+ write_limiter,
+ write_delay: None,
+ stream,
+ }
+ }
+}
+
+impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
+
+ fn poll_write(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.write_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
+
+ if let Some(ref write_limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = &result {
+ let now = Instant::now();
+ let delay = write_limiter.lock().unwrap()
+ .register_traffic(now, *count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.write_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_flush(ctx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_shutdown(ctx)
+ }
+}
+
+impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.read_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.read_delay = None;
+
+ let filled_len = buf.filled().len();
+ let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
+
+ if let Some(ref read_limiter) = this.read_limiter {
+ if let Poll::Ready(Ok(())) = &result {
+ let count = buf.filled().len() - filled_len;
+ let now = Instant::now();
+ let delay = read_limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.read_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+}
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
new file mode 100644
index 0000000..4742387
--- /dev/null
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -0,0 +1,75 @@
+use std::time::{Duration, Instant};
+use std::convert::TryInto;
+
+/// Token bucket based rate limiter
+pub struct RateLimiter {
+ rate: u64, // tokens/second
+ start_time: Instant,
+ traffic: u64, // overall traffic
+ bucket_size: u64,
+ last_update: Instant,
+ consumed_tokens: u64,
+}
+
+impl RateLimiter {
+
+ const NO_DELAY: Duration = Duration::from_millis(0);
+
+ /// Creates a new instance, using [Instant::now] as start time.
+ pub fn new(rate: u64, bucket_size: u64) -> Self {
+ let start_time = Instant::now();
+ Self::with_start_time(rate, bucket_size, start_time)
+ }
+
+ /// Creates a new instance with specified `rate`, `bucket_size` and `start_time`.
+ pub fn with_start_time(rate: u64, bucket_size: u64, start_time: Instant) -> Self {
+ Self {
+ rate,
+ start_time,
+ traffic: 0,
+ bucket_size,
+ last_update: start_time,
+ // start with empty bucket (all tokens consumed)
+ consumed_tokens: bucket_size,
+ }
+ }
+
+ /// Returns the average rate (since `start_time`)
+ pub fn average_rate(&self, current_time: Instant) -> f64 {
+ let time_diff = (current_time - self.start_time).as_secs_f64();
+ if time_diff <= 0.0 {
+ 0.0
+ } else {
+ (self.traffic as f64) / time_diff
+ }
+ }
+
+ fn refill_bucket(&mut self, current_time: Instant) {
+ let time_diff = (current_time - self.last_update).as_nanos();
+
+ if time_diff <= 0 {
+ //log::error!("update_time: got negative time diff");
+ return;
+ }
+
+ self.last_update = current_time;
+
+ let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
+ .try_into().unwrap_or(u64::MAX);
+
+ self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
+ }
+
+ /// Register traffic, returning a proposed delay to reach the expected rate.
+ pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
+ self.refill_bucket(current_time);
+
+ self.traffic += data_len;
+ self.consumed_tokens += data_len;
+
+ if self.consumed_tokens <= self.bucket_size {
+ return Self::NO_DELAY;
+ }
+ Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
` (13 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
pbs-client/src/http_client.rs | 24 ++++++++++++++++++++++--
pbs-client/src/tools/mod.rs | 23 ++++++++++++++++++++---
2 files changed, 42 insertions(+), 5 deletions(-)
diff --git a/pbs-client/src/http_client.rs b/pbs-client/src/http_client.rs
index 73c83f7a..defaef8a 100644
--- a/pbs-client/src/http_client.rs
+++ b/pbs-client/src/http_client.rs
@@ -20,7 +20,7 @@ use proxmox::{
};
use proxmox_router::HttpError;
-use proxmox_http::client::HttpsConnector;
+use proxmox_http::client::{HttpsConnector, RateLimiter};
use proxmox_http::uri::build_authority;
use pbs_api_types::{Authid, Userid};
@@ -51,6 +51,8 @@ pub struct HttpClientOptions {
ticket_cache: bool,
fingerprint_cache: bool,
verify_cert: bool,
+ rate_limit: Option<u64>,
+ bucket_size: Option<u64>,
}
impl HttpClientOptions {
@@ -109,6 +111,16 @@ impl HttpClientOptions {
self.verify_cert = verify_cert;
self
}
+
+ pub fn rate_limit(mut self, rate_limit: Option<u64>) -> Self {
+ self.rate_limit = rate_limit;
+ self
+ }
+
+ pub fn bucket_size(mut self, bucket_size: Option<u64>) -> Self {
+ self.bucket_size = bucket_size;
+ self
+ }
}
impl Default for HttpClientOptions {
@@ -121,6 +133,8 @@ impl Default for HttpClientOptions {
ticket_cache: false,
fingerprint_cache: false,
verify_cert: true,
+ rate_limit: None,
+ bucket_size: None,
}
}
}
@@ -343,7 +357,13 @@ impl HttpClient {
httpc.enforce_http(false); // we want https...
httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
- let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+ let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
+ if let Some(rate_limit) = options.rate_limit {
+ let bucket_size = options.bucket_size.unwrap_or_else(|| rate_limit*3);
+ https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
+ https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
+ }
let client = Client::builder()
//.http2_initial_stream_window_size( (1 << 31) - 2)
diff --git a/pbs-client/src/tools/mod.rs b/pbs-client/src/tools/mod.rs
index a12635cf..539ad662 100644
--- a/pbs-client/src/tools/mod.rs
+++ b/pbs-client/src/tools/mod.rs
@@ -135,15 +135,32 @@ pub fn extract_repository_from_map(param: &HashMap<String, String>) -> Option<Ba
}
pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> {
- connect_do(repo.host(), repo.port(), repo.auth_id())
+ connect_do(repo.host(), repo.port(), repo.auth_id(), None, None)
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
}
-fn connect_do(server: &str, port: u16, auth_id: &Authid) -> Result<HttpClient, Error> {
+pub fn connect_rate_limited(
+ repo: &BackupRepository,
+ rate: Option<u64>,
+ bucket_size: Option<u64>,
+) -> Result<HttpClient, Error> {
+ connect_do(repo.host(), repo.port(), repo.auth_id(), rate, bucket_size)
+ .map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
+}
+
+fn connect_do(
+ server: &str,
+ port: u16,
+ auth_id: &Authid,
+ rate_limit: Option<u64>,
+ bucket_size: Option<u64>,
+) -> Result<HttpClient, Error> {
let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
- let options = HttpClientOptions::new_interactive(password, fingerprint);
+ let options = HttpClientOptions::new_interactive(password, fingerprint)
+ .rate_limit(rate_limit)
+ .bucket_size(bucket_size);
HttpClient::new(server, port, auth_id, options)
}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
` (12 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
.../src/client/rate_limited_stream.rs | 92 +++++++++++++------
1 file changed, 62 insertions(+), 30 deletions(-)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index a11b59e..0cc0ebb 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::pin::Pin;
use std::marker::Unpin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
+use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
@@ -22,8 +23,6 @@ pub struct RateLimitedStream<S> {
impl <S> RateLimitedStream<S> {
- const MIN_DELAY: Duration = Duration::from_millis(20);
-
/// Creates a new instance with reads and writes limited to the same `rate`.
pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
let now = Instant::now();
@@ -48,6 +47,33 @@ impl <S> RateLimitedStream<S> {
}
}
+fn register_traffic(
+ limiter: &Mutex<RateLimiter>,
+ count: usize,
+) -> Option<Pin<Box<Sleep>>>{
+
+ const MIN_DELAY: Duration = Duration::from_millis(10);
+
+ let now = Instant::now();
+ let delay = limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ Some(Box::pin(sleep))
+ } else {
+ None
+ }
+}
+
+fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
+ match delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ }
+}
+
impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
fn poll_write(
@@ -57,12 +83,7 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.write_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -70,15 +91,37 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
- if let Some(ref write_limiter) = this.write_limiter {
- if let Poll::Ready(Ok(count)) = &result {
- let now = Instant::now();
- let delay = write_limiter.lock().unwrap()
- .register_traffic(now, *count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.write_delay = Some(Box::pin(sleep));
- }
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
+ }
+ }
+
+ result
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.stream.is_write_vectored()
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
+
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
}
}
@@ -111,12 +154,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.read_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.read_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -128,13 +166,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
if let Some(ref read_limiter) = this.read_limiter {
if let Poll::Ready(Ok(())) = &result {
let count = buf.filled().len() - filled_len;
- let now = Instant::now();
- let delay = read_limiter.lock().unwrap()
- .register_traffic(now, count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.read_delay = Some(Box::pin(sleep));
- }
+ this.read_delay = register_traffic(read_limiter, count);
}
}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (2 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
` (11 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-backup-client/src/main.rs | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index cb083006..d81271d0 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -45,7 +45,7 @@ use pbs_client::tools::{
complete_archive_name, complete_auth_id, complete_backup_group, complete_backup_snapshot,
complete_backup_source, complete_chunk_size, complete_group_or_snapshot,
complete_img_archive_name, complete_pxar_archive_name, complete_repository, connect,
- extract_repository_from_value,
+ connect_rate_limited, extract_repository_from_value,
key_source::{
crypto_parameters, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
KEYFILE_SCHEMA, MASTER_PUBKEY_FD_SCHEMA, MASTER_PUBKEY_FILE_SCHEMA,
@@ -582,6 +582,18 @@ fn spawn_catalog_upload(
schema: CHUNK_SIZE_SCHEMA,
optional: true,
},
+ rate: {
+ type: u64,
+ description: "Rate limit for TBF in bytes/second.",
+ optional: true,
+ minimum: 1,
+ },
+ burst: {
+ type: u64,
+ description: "Size of the TBF bucket, in bytes.",
+ optional: true,
+ minimum: 1,
+ },
"exclude": {
type: Array,
description: "List of paths or patterns for matching files to exclude.",
@@ -630,6 +642,9 @@ async fn create_backup(
verify_chunk_size(size)?;
}
+ let rate_limit = param["rate"].as_u64();
+ let bucket_size = param["burst"].as_u64();
+
let crypto = crypto_parameters(¶m)?;
let backup_id = param["backup-id"].as_str().unwrap_or(&proxmox::tools::nodename());
@@ -724,7 +739,7 @@ async fn create_backup(
let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
- let client = connect(&repo)?;
+ let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
record_repository(&repo);
println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?);
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (3 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
` (10 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
So that we can limit used bandwidth.
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/connector.rs | 51 ++++++++++++++++---
.../src/client/rate_limited_stream.rs | 8 +++
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
use anyhow::{bail, format_err, Error};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::*;
use http::Uri;
use hyper::client::HttpConnector;
use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
use crate::tls::MaybeTlsStream;
use crate::uri::build_authority;
+use super::{RateLimiter, RateLimitedStream};
+
#[derive(Clone)]
pub struct HttpsConnector {
connector: HttpConnector,
ssl_connector: Arc<SslConnector>,
proxy: Option<ProxyConfig>,
tcp_keepalive: u32,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
}
impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
ssl_connector: Arc::new(ssl_connector),
proxy: None,
tcp_keepalive,
+ read_limiter: None,
+ write_limiter: None,
}
}
@@ -45,13 +51,21 @@ impl HttpsConnector {
self.proxy = Some(proxy);
}
- async fn secure_stream(
- tcp_stream: TcpStream,
+ pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.read_limiter = limiter;
+ }
+
+ pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.write_limiter = limiter;
+ }
+
+ async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+ tcp_stream: S,
ssl_connector: &SslConnector,
host: &str,
- ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+ ) -> Result<MaybeTlsStream<S>, Error> {
let config = ssl_connector.configure()?;
- let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+ let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
Pin::new(&mut conn).connect().await?;
Ok(MaybeTlsStream::Secured(conn))
}
@@ -107,7 +121,7 @@ impl HttpsConnector {
}
impl hyper::service::Service<Uri> for HttpsConnector {
- type Response = MaybeTlsStream<TcpStream>;
+ type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
};
let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
let keepalive = self.tcp_keepalive;
+ let read_limiter = self.read_limiter.clone();
+ let write_limiter = self.write_limiter.clone();
+
if let Some(ref proxy) = self.proxy {
let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
if use_connect {
async move {
- let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+ let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
format_err!("error connecting to {} - {}", proxy_authority, err)
})?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let mut tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
if let Some(authorization) = authorization {
connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
Ok(MaybeTlsStream::Proxied(tcp_stream))
}
.boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
if is_https {
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
} else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 0cc0ebb..00ba066 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
use std::task::{Context, Poll};
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
}
}
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+ fn connected(&self) -> Connected {
+ self.stream.connected()
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (4 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
` (9 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/rest.rs | 28 ++++++++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 1fa76f21..b88e5d12 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -33,6 +33,7 @@ url = "2.1"
proxmox = "0.15.0"
proxmox-io = "1"
proxmox-lang = "1"
+proxmox-http = { version = "0.5.0", features = [ "client" ] }
proxmox-router = "1.1"
proxmox-schema = { version = "1", features = [ "api-macro", "upid-api-impl" ] }
proxmox-time = "1"
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index 74bc8bb1..f27f703d 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -31,6 +31,8 @@ use proxmox_schema::{
ParameterSchema,
};
+use proxmox_http::client::RateLimitedStream;
+
use pbs_tools::compression::{DeflateEncoder, Level};
use pbs_tools::stream::AsyncReaderStream;
@@ -73,6 +75,32 @@ impl RestServer {
}
}
+impl Service<&Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>>
+ for RestServer
+{
+ type Response = ApiService;
+ type Error = Error;
+ type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
+
+ fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(
+ &mut self,
+ ctx: &Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>,
+ ) -> Self::Future {
+ match ctx.get_ref().peer_addr() {
+ Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
+ Ok(peer) => future::ok(ApiService {
+ peer,
+ api_config: self.api_config.clone(),
+ })
+ .boxed(),
+ }
+ }
+}
+
impl Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>>
for RestServer
{
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (5 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
` (8 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
We will use this to specify timesframes for network rate limits (only
apply limite when inside the time frame).
Note: This is not systemd related, but we can reuse some of the parser
method.
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-systemd/src/daily_duration.rs | 92 +++++++++++++++++++++++++++
proxmox-systemd/src/lib.rs | 1 +
proxmox-systemd/src/parse_time.rs | 56 ++++++++++++++++
proxmox-systemd/src/time.rs | 2 +-
4 files changed, 150 insertions(+), 1 deletion(-)
create mode 100644 proxmox-systemd/src/daily_duration.rs
diff --git a/proxmox-systemd/src/daily_duration.rs b/proxmox-systemd/src/daily_duration.rs
new file mode 100644
index 00000000..bed4eb47
--- /dev/null
+++ b/proxmox-systemd/src/daily_duration.rs
@@ -0,0 +1,92 @@
+use std::cmp::{Ordering, PartialOrd};
+
+use super::time::{WeekDays};
+
+pub use super::parse_time::parse_daily_duration;
+
+/// Time of Day (hour with minute)
+#[derive(Default, PartialEq, Clone, Debug)]
+pub struct HmTime {
+ pub hour: u32,
+ pub minute: u32,
+}
+
+impl PartialOrd for HmTime {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ let mut order = self.hour.cmp(&other.hour);
+ if order == Ordering::Equal {
+ order = self.minute.cmp(&other.minute);
+ }
+ Some(order)
+ }
+}
+
+#[derive(Default, Clone, Debug)]
+pub struct DailyDuration {
+ /// the days in a week this duration should trigger
+ pub days: WeekDays,
+ pub start: HmTime,
+ pub end: HmTime,
+}
+
+#[cfg(test)]
+mod test {
+
+ use anyhow::{bail, Error};
+
+ use super::*;
+
+ fn test_parse(
+ duration_str: &str,
+ start_h: u32, start_m: u32,
+ end_h: u32, end_m: u32,
+ days: &[usize],
+ ) -> Result<(), Error> {
+ let mut day_bits = 0;
+ for day in days { day_bits |= 1<<day; }
+ let expected_days = WeekDays::from_bits(day_bits).unwrap();
+
+ let duration = parse_daily_duration(duration_str)?;
+
+ if duration.start.hour != start_h {
+ bail!("start hour missmatch, extected {}, got {:?}", start_h, duration);
+ }
+ if duration.start.minute != start_m {
+ bail!("start minute missmatch, extected {}, got {:?}", start_m, duration);
+ }
+ if duration.end.hour != end_h {
+ bail!("end hour missmatch, extected {}, got {:?}", end_h, duration);
+ }
+ if duration.end.minute != end_m {
+ bail!("end minute missmatch, extected {}, got {:?}", end_m, duration);
+ }
+
+ if duration.days != expected_days {
+ bail!("weekday missmatch, extected {:?}, got {:?}", expected_days, duration);
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_daily_duration_parser() -> Result<(), Error> {
+
+ assert!(parse_daily_duration("").is_err());
+ assert!(parse_daily_duration(" 8-12").is_err());
+ assert!(parse_daily_duration("8:60-12").is_err());
+ assert!(parse_daily_duration("8-25").is_err());
+ assert!(parse_daily_duration("12-8").is_err());
+
+ test_parse("8-12", 8, 0, 12, 0, &[])?;
+ test_parse("8:0-12:0", 8, 0, 12, 0, &[])?;
+ test_parse("8:00-12:00", 8, 0, 12, 0, &[])?;
+ test_parse("8:05-12:20", 8, 5, 12, 20, &[])?;
+ test_parse("8:05 - 12:20", 8, 5, 12, 20, &[])?;
+
+ test_parse("mon 8-12", 8, 0, 12, 0, &[0])?;
+ test_parse("tue..fri 8-12", 8, 0, 12, 0, &[1,2,3,4])?;
+ test_parse("sat,tue..thu,fri 8-12", 8, 0, 12, 0, &[1,2,3,4,5])?;
+
+ Ok(())
+ }
+}
diff --git a/proxmox-systemd/src/lib.rs b/proxmox-systemd/src/lib.rs
index b4ab4b72..7c2b1f90 100644
--- a/proxmox-systemd/src/lib.rs
+++ b/proxmox-systemd/src/lib.rs
@@ -1,4 +1,5 @@
pub mod time;
+pub mod daily_duration;
mod parse_time;
mod unit;
diff --git a/proxmox-systemd/src/parse_time.rs b/proxmox-systemd/src/parse_time.rs
index ba9449b1..d212e264 100644
--- a/proxmox-systemd/src/parse_time.rs
+++ b/proxmox-systemd/src/parse_time.rs
@@ -4,6 +4,7 @@ use anyhow::{bail, Error};
use lazy_static::lazy_static;
use super::time::*;
+use super::daily_duration::*;
use nom::{
error::{context, ParseError, VerboseError},
@@ -452,3 +453,58 @@ fn parse_time_span_incomplete(mut i: &str) -> IResult<&str, TimeSpan> {
Ok((i, ts))
}
+
+pub fn parse_daily_duration(i: &str) -> Result<DailyDuration, Error> {
+ parse_complete_line("daily duration", i, parse_daily_duration_incomplete)
+}
+
+fn parse_daily_duration_incomplete(mut i: &str) -> IResult<&str, DailyDuration> {
+
+ let mut duration = DailyDuration::default();
+
+ if i.starts_with(|c: char| char::is_ascii_alphabetic(&c)) {
+
+ let (n, range_list) = context(
+ "weekday range list",
+ separated_nonempty_list(tag(","), parse_weekdays_range)
+ )(i)?;
+
+ i = space0(n)?.0;
+
+ for range in range_list { duration.days.insert(range); }
+ }
+
+ let (i, start) = parse_hm_time(i)?;
+
+ let i = space0(i)?.0;
+
+ let (i, _) = tag("-")(i)?;
+
+ let i = space0(i)?.0;
+
+ let end_time_start = i;
+
+ let (i, end) = parse_hm_time(i)?;
+
+ if start > end {
+ return Err(parse_error(end_time_start, "end time before start time"));
+ }
+
+ duration.start = start;
+ duration.end = end;
+
+ Ok((i, duration))
+}
+
+fn parse_hm_time(i: &str) -> IResult<&str, HmTime> {
+
+ let (i, (hour, opt_minute)) = tuple((
+ parse_time_comp(24),
+ opt(preceded(tag(":"), parse_time_comp(60))),
+ ))(i)?;
+
+ match opt_minute {
+ Some(minute) => Ok((i, HmTime { hour, minute })),
+ None => Ok((i, HmTime { hour, minute: 0})),
+ }
+}
diff --git a/proxmox-systemd/src/time.rs b/proxmox-systemd/src/time.rs
index b81e970e..e5fe7965 100644
--- a/proxmox-systemd/src/time.rs
+++ b/proxmox-systemd/src/time.rs
@@ -5,7 +5,7 @@ use bitflags::bitflags;
use proxmox_time::TmEditor;
-pub use super::parse_time::*;
+pub use super::parse_time::{parse_calendar_event, parse_time_span};
bitflags!{
#[derive(Default)]
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (6 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
` (7 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
.../src/client/rate_limited_stream.rs | 43 +++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 00ba066..ea99383 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -19,6 +19,8 @@ pub struct RateLimitedStream<S> {
read_delay: Option<Pin<Box<Sleep>>>,
write_limiter: Option<Arc<Mutex<RateLimiter>>>,
write_delay: Option<Pin<Box<Sleep>>>,
+ update_limiter_cb: Option<Box<dyn Fn() -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) + Send>>,
+ last_limiter_update: Instant,
stream: S,
}
@@ -43,9 +45,44 @@ impl <S> RateLimitedStream<S> {
read_delay: None,
write_limiter,
write_delay: None,
+ update_limiter_cb: None,
+ last_limiter_update: Instant::now(),
stream,
}
}
+
+ /// Creates a new instance with limiter update callback.
+ ///
+ /// The fuction is called every minute to update/change the used limiters.
+ ///
+ /// Note: This function is called within an async context, so it
+ /// should be fast and must not block.
+ pub fn with_limiter_update_cb<F: Fn() -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) + Send + 'static>(
+ stream: S,
+ update_limiter_cb: F,
+ ) -> Self {
+ let (read_limiter, write_limiter) = update_limiter_cb();
+ Self {
+ read_limiter,
+ read_delay: None,
+ write_limiter,
+ write_delay: None,
+ update_limiter_cb: Some(Box::new(update_limiter_cb)),
+ last_limiter_update: Instant::now(),
+ stream,
+ }
+ }
+
+ fn update_limiters(&mut self) {
+ if let Some(ref update_limiter_cb) = self.update_limiter_cb {
+ if self.last_limiter_update.elapsed().as_secs() >= 5 {
+ self.last_limiter_update = Instant::now();
+ let (read_limiter, write_limiter) = update_limiter_cb();
+ self.read_limiter = read_limiter;
+ self.write_limiter = write_limiter;
+ }
+ }
+ }
}
fn register_traffic(
@@ -90,6 +127,8 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
this.write_delay = None;
+ this.update_limiters();
+
let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
if let Some(ref limiter) = this.write_limiter {
@@ -118,6 +157,8 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
this.write_delay = None;
+ this.update_limiters();
+
let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
if let Some(ref limiter) = this.write_limiter {
@@ -161,6 +202,8 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
this.read_delay = None;
+ this.update_limiters();
+
let filled_len = buf.filled().len();
let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match()
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (7 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
` (6 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
proxmox-systemd/src/daily_duration.rs | 60 +++++++++++++++++++++++++++
1 file changed, 60 insertions(+)
diff --git a/proxmox-systemd/src/daily_duration.rs b/proxmox-systemd/src/daily_duration.rs
index bed4eb47..25157b25 100644
--- a/proxmox-systemd/src/daily_duration.rs
+++ b/proxmox-systemd/src/daily_duration.rs
@@ -1,4 +1,9 @@
use std::cmp::{Ordering, PartialOrd};
+use std::convert::TryInto;
+
+use anyhow::Error;
+
+use proxmox_time::TmEditor;
use super::time::{WeekDays};
@@ -29,6 +34,30 @@ pub struct DailyDuration {
pub end: HmTime,
}
+impl DailyDuration {
+
+ // Test it time is within this frame
+ pub fn time_match(&self, epoch: i64, utc: bool) -> Result<bool, Error> {
+
+ let t = TmEditor::with_epoch(epoch, utc)?;
+
+ let all_days = self.days.is_empty() || self.days.is_all();
+
+ if !all_days { // match day first
+ let day_num: u32 = t.day_num().try_into()?;
+ let day = WeekDays::from_bits(1<<day_num).unwrap();
+ if !self.days.contains(day) { return Ok(false); }
+ }
+
+ let hour = t.hour().try_into()?;
+ let minute = t.min().try_into()?;
+
+ let ctime = HmTime { hour, minute };
+
+ Ok(ctime >= self.start && ctime < self.end)
+ }
+}
+
#[cfg(test)]
mod test {
@@ -68,6 +97,10 @@ mod test {
Ok(())
}
+ const fn make_test_time(mday: i32, hour: i32, min: i32) -> i64 {
+ (mday*3600*24 + hour*3600 + min*60) as i64
+ }
+
#[test]
fn test_daily_duration_parser() -> Result<(), Error> {
@@ -89,4 +122,31 @@ mod test {
Ok(())
}
+
+ #[test]
+ fn test_time_match() -> Result<(), Error> {
+ const THURSDAY_80_00: i64 = make_test_time(0, 8, 0);
+ const THURSDAY_12_00: i64 = make_test_time(0, 12, 0);
+ const DAY: i64 = 3600*24;
+
+ let duration = parse_daily_duration("thu..fri 8:05-12")?;
+
+ assert!(!duration.time_match(THURSDAY_80_00, true)?);
+ assert!(!duration.time_match(THURSDAY_80_00 + DAY, true)?);
+ assert!(!duration.time_match(THURSDAY_80_00 + 2*DAY, true)?);
+
+ assert!(duration.time_match(THURSDAY_80_00 + 5*60, true)?);
+ assert!(duration.time_match(THURSDAY_80_00 + 5*60 + DAY, true)?);
+ assert!(!duration.time_match(THURSDAY_80_00 + 5*60 + 2*DAY, true)?);
+
+ assert!(duration.time_match(THURSDAY_12_00 - 1, true)?);
+ assert!(duration.time_match(THURSDAY_12_00 - 1 + DAY, true)?);
+ assert!(!duration.time_match(THURSDAY_12_00 - 1 + 2*DAY, true)?);
+
+ assert!(!duration.time_match(THURSDAY_12_00, true)?);
+ assert!(!duration.time_match(THURSDAY_12_00 + DAY, true)?);
+ assert!(!duration.time_match(THURSDAY_12_00 + 2*DAY, true)?);
+
+ Ok(())
+ }
}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (8 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
` (5 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
proxmox-http/src/client/rate_limiter.rs | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
index 4742387..677dfb1 100644
--- a/proxmox-http/src/client/rate_limiter.rs
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -36,7 +36,7 @@ impl RateLimiter {
/// Returns the average rate (since `start_time`)
pub fn average_rate(&self, current_time: Instant) -> f64 {
- let time_diff = (current_time - self.start_time).as_secs_f64();
+ let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64();
if time_diff <= 0.0 {
0.0
} else {
@@ -45,12 +45,15 @@ impl RateLimiter {
}
fn refill_bucket(&mut self, current_time: Instant) {
- let time_diff = (current_time - self.last_update).as_nanos();
+ let time_diff = match current_time.checked_duration_since(self.last_update) {
+ Some(duration) => duration.as_nanos(),
+ None => {
+ //log::error!("update_time: got negative time diff");
+ return;
+ }
+ };
- if time_diff <= 0 {
- //log::error!("update_time: got negative time diff");
- return;
- }
+ if time_diff == 0 { return; }
self.last_update = current_time;
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (9 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
` (4 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
pbs-api-types/src/lib.rs | 7 +
pbs-api-types/src/traffic_control.rs | 81 +++++
pbs-config/src/lib.rs | 1 +
pbs-config/src/traffic_control.rs | 91 ++++++
src/api2/config/mod.rs | 2 +
src/api2/config/traffic_control.rs | 283 ++++++++++++++++++
src/bin/proxmox-backup-manager.rs | 1 +
src/bin/proxmox_backup_manager/mod.rs | 2 +
.../proxmox_backup_manager/traffic_control.rs | 105 +++++++
9 files changed, 573 insertions(+)
create mode 100644 pbs-api-types/src/traffic_control.rs
create mode 100644 pbs-config/src/traffic_control.rs
create mode 100644 src/api2/config/traffic_control.rs
create mode 100644 src/bin/proxmox_backup_manager/traffic_control.rs
diff --git a/pbs-api-types/src/lib.rs b/pbs-api-types/src/lib.rs
index 96ac657b..a61de960 100644
--- a/pbs-api-types/src/lib.rs
+++ b/pbs-api-types/src/lib.rs
@@ -7,6 +7,7 @@ use proxmox_schema::{
api, const_regex, ApiStringFormat, ApiType, ArraySchema, Schema, StringSchema, ReturnType,
};
use proxmox::{IPRE, IPRE_BRACKET, IPV4OCTET, IPV4RE, IPV6H16, IPV6LS32, IPV6RE};
+use proxmox_systemd::daily_duration::parse_daily_duration;
#[rustfmt::skip]
#[macro_export]
@@ -73,6 +74,9 @@ pub use remote::*;
mod tape;
pub use tape::*;
+mod traffic_control;
+pub use traffic_control::*;
+
mod zfs;
pub use zfs::*;
@@ -152,6 +156,9 @@ pub const HOSTNAME_FORMAT: ApiStringFormat = ApiStringFormat::Pattern(&HOSTNAME_
pub const DNS_ALIAS_FORMAT: ApiStringFormat =
ApiStringFormat::Pattern(&DNS_ALIAS_REGEX);
+pub const DAILY_DURATION_FORMAT: ApiStringFormat =
+ ApiStringFormat::VerifyFn(|s| parse_daily_duration(s).map(drop));
+
pub const SEARCH_DOMAIN_SCHEMA: Schema =
StringSchema::new("Search domain for host-name lookup.").schema();
diff --git a/pbs-api-types/src/traffic_control.rs b/pbs-api-types/src/traffic_control.rs
new file mode 100644
index 00000000..c9fe4765
--- /dev/null
+++ b/pbs-api-types/src/traffic_control.rs
@@ -0,0 +1,81 @@
+use serde::{Deserialize, Serialize};
+
+use proxmox_schema::{api, Schema, StringSchema};
+
+use crate::{
+ CIDR_SCHEMA, DAILY_DURATION_FORMAT,
+ PROXMOX_SAFE_ID_FORMAT, SINGLE_LINE_COMMENT_SCHEMA,
+};
+
+pub const TRAFFIC_CONTROL_TIMEFRAME_SCHEMA: Schema = StringSchema::new(
+ "Timeframe to specify when the rule is actice.")
+ .format(&DAILY_DURATION_FORMAT)
+ .schema();
+
+pub const TRAFFIC_CONTROL_ID_SCHEMA: Schema = StringSchema::new("Rule ID.")
+ .format(&PROXMOX_SAFE_ID_FORMAT)
+ .min_length(3)
+ .max_length(32)
+ .schema();
+
+#[api(
+ properties: {
+ comment: {
+ optional: true,
+ schema: SINGLE_LINE_COMMENT_SCHEMA,
+ },
+ network: {
+ type: Array,
+ items: {
+ schema: CIDR_SCHEMA,
+ },
+ },
+ timeframe: {
+ type: Array,
+ items: {
+ schema: TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+ },
+ optional: true,
+ },
+ },
+)]
+#[derive(Serialize,Deserialize,Default)]
+#[serde(rename_all="kebab-case")]
+/// Network Rate Limit Configuration
+pub struct RateLimitConfig {
+ #[serde(skip_serializing_if="Option::is_none")]
+ pub comment: Option<String>,
+ /// Rule applies to Source IPs within this networks
+ pub network: Vec<String>,
+ /// Maximal rate in bytes/second
+ pub rate: u64,
+ /// Bucket size for TBF in bytes
+ #[serde(skip_serializing_if="Option::is_none")]
+ pub burst: Option<u64>,
+ // fixme: expose this?
+ // /// Bandwidth is shared accross all connections
+ // #[serde(skip_serializing_if="Option::is_none")]
+ // pub shared: Option<bool>,
+ /// Enable the rule at specific times
+ #[serde(skip_serializing_if="Option::is_none")]
+ pub timeframe: Option<Vec<String>>,
+}
+
+#[api(
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ config: {
+ type: RateLimitConfig,
+ },
+ },
+)]
+#[derive(Serialize,Deserialize)]
+#[serde(rename_all = "kebab-case")]
+/// Traffic control rule
+pub struct TrafficControlRule {
+ pub name: String,
+ #[serde(flatten)]
+ pub config: RateLimitConfig,
+}
diff --git a/pbs-config/src/lib.rs b/pbs-config/src/lib.rs
index 8ce84fec..930b5f7b 100644
--- a/pbs-config/src/lib.rs
+++ b/pbs-config/src/lib.rs
@@ -12,6 +12,7 @@ pub mod sync;
pub mod tape_encryption_keys;
pub mod tape_job;
pub mod token_shadow;
+pub mod traffic_control;
pub mod user;
pub mod verify;
diff --git a/pbs-config/src/traffic_control.rs b/pbs-config/src/traffic_control.rs
new file mode 100644
index 00000000..1c04f589
--- /dev/null
+++ b/pbs-config/src/traffic_control.rs
@@ -0,0 +1,91 @@
+//! Traffic Control Settings (Network rate limits)
+use std::collections::HashMap;
+
+use anyhow::Error;
+use lazy_static::lazy_static;
+
+use proxmox_schema::{ApiType, Schema};
+
+use pbs_api_types::{TrafficControlRule, TRAFFIC_CONTROL_ID_SCHEMA};
+
+use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+
+use crate::{open_backup_lockfile, replace_backup_config, BackupLockGuard};
+
+
+lazy_static! {
+ /// Static [`SectionConfig`] to access parser/writer functions.
+ pub static ref CONFIG: SectionConfig = init();
+}
+
+fn init() -> SectionConfig {
+ let mut config = SectionConfig::new(&TRAFFIC_CONTROL_ID_SCHEMA);
+
+ let obj_schema = match TrafficControlRule::API_SCHEMA {
+ Schema::AllOf(ref allof_schema) => allof_schema,
+ _ => unreachable!(),
+ };
+ let plugin = SectionConfigPlugin::new("rule".to_string(), Some("name".to_string()), obj_schema);
+ config.register_plugin(plugin);
+
+ config
+}
+
+/// Configuration file name
+pub const TRAFFIC_CONTROL_CFG_FILENAME: &str = "/etc/proxmox-backup/traffic-control.cfg";
+/// Lock file name (used to prevent concurrent access)
+pub const TRAFFIC_CONTROL_CFG_LOCKFILE: &str = "/etc/proxmox-backup/.traffic-control.lck";
+
+/// Get exclusive lock
+pub fn lock_config() -> Result<BackupLockGuard, Error> {
+ open_backup_lockfile(TRAFFIC_CONTROL_CFG_LOCKFILE, None, true)
+}
+
+/// Read and parse the configuration file
+pub fn config() -> Result<(SectionConfigData, [u8;32]), Error> {
+
+ let content = proxmox::tools::fs::file_read_optional_string(TRAFFIC_CONTROL_CFG_FILENAME)?
+ .unwrap_or_else(|| "".to_string());
+
+ let digest = openssl::sha::sha256(content.as_bytes());
+ let data = CONFIG.parse(TRAFFIC_CONTROL_CFG_FILENAME, &content)?;
+ Ok((data, digest))
+}
+
+/// Save the configuration file
+pub fn save_config(config: &SectionConfigData) -> Result<(), Error> {
+ let raw = CONFIG.write(TRAFFIC_CONTROL_CFG_FILENAME, &config)?;
+ replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())
+}
+
+
+// shell completion helper
+pub fn complete_traffic_control_name(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
+ match config() {
+ Ok((data, _digest)) => data.sections.iter().map(|(id, _)| id.to_string()).collect(),
+ Err(_) => return vec![],
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use anyhow::{Error, bail};
+
+ #[test]
+ fn test1() -> Result<(), Error> {
+ let content = "rule: rule1
+ comment localnet at working hours
+ network 192.168.2.0/24
+ network 192.168.3.0/24
+ rate 50
+ timeframe mon..wed 8:00-16:30
+ timeframe fri 9:00-12:00
+";
+ let data = CONFIG.parse(TRAFFIC_CONTROL_CFG_FILENAME, &content)?;
+ eprintln!("GOT {:?}", data);
+
+ Ok(())
+ }
+
+}
diff --git a/src/api2/config/mod.rs b/src/api2/config/mod.rs
index 473337f5..c256ba64 100644
--- a/src/api2/config/mod.rs
+++ b/src/api2/config/mod.rs
@@ -14,6 +14,7 @@ pub mod changer;
pub mod media_pool;
pub mod tape_encryption_keys;
pub mod tape_backup_job;
+pub mod traffic_control;
const SUBDIRS: SubdirMap = &[
("access", &access::ROUTER),
@@ -26,6 +27,7 @@ const SUBDIRS: SubdirMap = &[
("sync", &sync::ROUTER),
("tape-backup-job", &tape_backup_job::ROUTER),
("tape-encryption-keys", &tape_encryption_keys::ROUTER),
+ ("traffic-control", &traffic_control::ROUTER),
("verify", &verify::ROUTER),
];
diff --git a/src/api2/config/traffic_control.rs b/src/api2/config/traffic_control.rs
new file mode 100644
index 00000000..5d5cc6d0
--- /dev/null
+++ b/src/api2/config/traffic_control.rs
@@ -0,0 +1,283 @@
+use anyhow::{bail, Error};
+use serde_json::Value;
+use ::serde::{Deserialize, Serialize};
+
+use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
+use proxmox_schema::api;
+
+use pbs_api_types::{
+ TrafficControlRule, RateLimitConfig,
+ CIDR_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, SINGLE_LINE_COMMENT_SCHEMA,
+ TRAFFIC_CONTROL_ID_SCHEMA, TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+ PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+#[api(
+ input: {
+ properties: {},
+ },
+ returns: {
+ description: "The list of configured traffic control rules (with config digest).",
+ type: Array,
+ items: { type: TrafficControlRule },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// List traffic control rules
+pub fn list_traffic_controls(
+ _param: Value,
+ _info: &ApiMethod,
+ mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<TrafficControlRule>, Error> {
+ let (config, digest) = pbs_config::traffic_control::config()?;
+
+ let list: Vec<TrafficControlRule> = config.convert_to_typed_array("rule")?;
+
+ rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into();
+
+ Ok(list)
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ config: {
+ type: RateLimitConfig,
+ flatten: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Create new traffic control rule.
+pub fn create_traffic_control(
+ name: String,
+ config: RateLimitConfig,
+) -> Result<(), Error> {
+
+ let _lock = pbs_config::traffic_control::lock_config()?;
+
+ let (mut section_config, _digest) = pbs_config::traffic_control::config()?;
+
+ if section_config.sections.get(&name).is_some() {
+ bail!("traffic control rule '{}' already exists.", name);
+ }
+
+ let rule = TrafficControlRule { name: name.clone(), config };
+
+ section_config.set_data(&name, "rule", &rule)?;
+
+ pbs_config::traffic_control::save_config(§ion_config)?;
+
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ },
+ },
+ returns: { type: TrafficControlRule },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+ }
+)]
+/// Read traffic control configuration data.
+pub fn read_traffic_control(
+ name: String,
+ _info: &ApiMethod,
+ mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<TrafficControlRule, Error> {
+ let (config, digest) = pbs_config::traffic_control::config()?;
+ let data: TrafficControlRule = config.lookup("rule", &name)?;
+ rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into();
+ Ok(data)
+}
+
+#[api()]
+#[derive(Serialize, Deserialize)]
+#[allow(non_camel_case_types)]
+/// Deletable property name
+pub enum DeletableProperty {
+ /// Delete the burst property.
+ burst,
+ /// Delete the comment property.
+ comment,
+ /// Delete the timeframe property
+ timeframe,
+}
+
+// fixme: use TrafficControlUpdater
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ comment: {
+ schema: SINGLE_LINE_COMMENT_SCHEMA,
+ optional: true,
+ },
+ rate: {
+ type: u64,
+ description: "Rate limit for TBF in bytes/second.",
+ optional: true,
+ minimum: 1,
+ },
+ burst: {
+ type: u64,
+ description: "Size of the TBF bucket, in bytes.",
+ optional: true,
+ minimum: 1,
+ },
+ network: {
+ description: "List of networks.",
+ optional: true,
+ type: Array,
+ items: {
+ schema: CIDR_SCHEMA,
+ },
+ },
+ timeframe: {
+ description: "List of time frames.",
+ optional: true,
+ type: Array,
+ items: {
+ schema: TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+ },
+ },
+ delete: {
+ description: "List of properties to delete.",
+ type: Array,
+ optional: true,
+ items: {
+ type: DeletableProperty,
+ }
+ },
+ digest: {
+ optional: true,
+ schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Update traffic control configuration.
+pub fn update_traffic_control(
+ name: String,
+ rate: Option<u64>,
+ burst: Option<u64>,
+ comment: Option<String>,
+ network: Option<Vec<String>>,
+ timeframe: Option<Vec<String>>,
+ delete: Option<Vec<DeletableProperty>>,
+ digest: Option<String>,
+) -> Result<(), Error> {
+
+ let _lock = pbs_config::traffic_control::lock_config()?;
+
+ let (mut config, expected_digest) = pbs_config::traffic_control::config()?;
+
+ if let Some(ref digest) = digest {
+ let digest = proxmox::tools::hex_to_digest(digest)?;
+ crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+ }
+
+ let mut data: TrafficControlRule = config.lookup("rule", &name)?;
+
+ if let Some(delete) = delete {
+ for delete_prop in delete {
+ match delete_prop {
+ DeletableProperty::burst => { data.config.burst = None; },
+ DeletableProperty::comment => { data.config.comment = None; },
+ DeletableProperty::timeframe => { data.config.timeframe = None; },
+ }
+ }
+ }
+
+ if let Some(comment) = comment {
+ let comment = comment.trim().to_string();
+ if comment.is_empty() {
+ data.config.comment = None;
+ } else {
+ data.config.comment = Some(comment);
+ }
+ }
+
+ if let Some(rate) = rate { data.config.rate = rate; }
+
+ if burst.is_some() { data.config.burst = burst; }
+
+ if let Some(network) = network { data.config.network = network; }
+ if timeframe.is_some() { data.config.timeframe = timeframe; }
+
+ config.set_data(&name, "rule", &data)?;
+
+ pbs_config::traffic_control::save_config(&config)?;
+
+ Ok(())
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ digest: {
+ optional: true,
+ schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Remove a traffic control rule from the configuration file.
+pub fn delete_traffic_control(name: String, digest: Option<String>) -> Result<(), Error> {
+
+ let _lock = pbs_config::traffic_control::lock_config()?;
+
+ let (mut config, expected_digest) = pbs_config::traffic_control::config()?;
+
+ if let Some(ref digest) = digest {
+ let digest = proxmox::tools::hex_to_digest(digest)?;
+ crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+ }
+
+ match config.sections.get(&name) {
+ Some(_) => { config.sections.remove(&name); },
+ None => bail!("traffic control rule '{}' does not exist.", name),
+ }
+
+ pbs_config::traffic_control::save_config(&config)?;
+
+ Ok(())
+}
+
+
+const ITEM_ROUTER: Router = Router::new()
+ .get(&API_METHOD_READ_TRAFFIC_CONTROL)
+ .put(&API_METHOD_UPDATE_TRAFFIC_CONTROL)
+ .delete(&API_METHOD_DELETE_TRAFFIC_CONTROL);
+
+pub const ROUTER: Router = Router::new()
+ .get(&API_METHOD_LIST_TRAFFIC_CONTROLS)
+ .post(&API_METHOD_CREATE_TRAFFIC_CONTROL)
+ .match_all("name", &ITEM_ROUTER);
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index 92e6bb2a..26cb5a1f 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -374,6 +374,7 @@ async fn run() -> Result<(), Error> {
.insert("user", user_commands())
.insert("openid", openid_commands())
.insert("remote", remote_commands())
+ .insert("traffic-control", traffic_control_commands())
.insert("garbage-collection", garbage_collection_commands())
.insert("acme", acme_mgmt_cli())
.insert("cert", cert_mgmt_cli())
diff --git a/src/bin/proxmox_backup_manager/mod.rs b/src/bin/proxmox_backup_manager/mod.rs
index a3a16246..a4d224ce 100644
--- a/src/bin/proxmox_backup_manager/mod.rs
+++ b/src/bin/proxmox_backup_manager/mod.rs
@@ -26,3 +26,5 @@ mod node;
pub use node::*;
mod openid;
pub use openid::*;
+mod traffic_control;
+pub use traffic_control::*;
diff --git a/src/bin/proxmox_backup_manager/traffic_control.rs b/src/bin/proxmox_backup_manager/traffic_control.rs
new file mode 100644
index 00000000..34e4a2a5
--- /dev/null
+++ b/src/bin/proxmox_backup_manager/traffic_control.rs
@@ -0,0 +1,105 @@
+use anyhow::Error;
+use serde_json::Value;
+
+use proxmox_router::{cli::*, ApiHandler, RpcEnvironment};
+use proxmox_schema::api;
+
+use pbs_api_types::TRAFFIC_CONTROL_ID_SCHEMA;
+
+use proxmox_backup::api2;
+
+
+#[api(
+ input: {
+ properties: {
+ "output-format": {
+ schema: OUTPUT_FORMAT,
+ optional: true,
+ },
+ }
+ }
+)]
+/// List configured traffic control rules.
+fn list_traffic_controls(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
+
+ let output_format = get_output_format(¶m);
+
+ let info = &api2::config::traffic_control::API_METHOD_LIST_TRAFFIC_CONTROLS;
+ let mut data = match info.handler {
+ ApiHandler::Sync(handler) => (handler)(param, info, rpcenv)?,
+ _ => unreachable!(),
+ };
+
+ let options = default_table_format_options()
+ .column(ColumnConfig::new("name"))
+ .column(ColumnConfig::new("rate"))
+ .column(ColumnConfig::new("burst"))
+ .column(ColumnConfig::new("network"))
+ .column(ColumnConfig::new("timeframe"))
+ .column(ColumnConfig::new("comment"));
+
+ format_and_print_result_full(&mut data, &info.returns, &output_format, &options);
+
+ Ok(Value::Null)
+}
+
+#[api(
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ "output-format": {
+ schema: OUTPUT_FORMAT,
+ optional: true,
+ },
+ }
+ }
+)]
+/// Show traffic control configuration
+fn show_traffic_control(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
+
+ let output_format = get_output_format(¶m);
+
+ let info = &api2::config::traffic_control::API_METHOD_READ_TRAFFIC_CONTROL;
+ let mut data = match info.handler {
+ ApiHandler::Sync(handler) => (handler)(param, info, rpcenv)?,
+ _ => unreachable!(),
+ };
+
+ let options = default_table_format_options();
+ format_and_print_result_full(&mut data, &info.returns, &output_format, &options);
+
+ Ok(Value::Null)
+}
+
+pub fn traffic_control_commands() -> CommandLineInterface {
+
+ let cmd_def = CliCommandMap::new()
+ .insert("list", CliCommand::new(&API_METHOD_LIST_TRAFFIC_CONTROLS))
+ .insert(
+ "show",
+ CliCommand::new(&API_METHOD_SHOW_TRAFFIC_CONTROL)
+ .arg_param(&["name"])
+ .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+ )
+ .insert(
+ "create",
+ CliCommand::new(&api2::config::traffic_control::API_METHOD_CREATE_TRAFFIC_CONTROL)
+ .arg_param(&["name"])
+ )
+ .insert(
+ "update",
+ CliCommand::new(&api2::config::traffic_control::API_METHOD_UPDATE_TRAFFIC_CONTROL)
+ .arg_param(&["name"])
+ .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+ )
+ .insert(
+ "remove",
+ CliCommand::new(&api2::config::traffic_control::API_METHOD_DELETE_TRAFFIC_CONTROL)
+ .arg_param(&["name"])
+ .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+ );
+
+ cmd_def.into()
+}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (10 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
` (3 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
proxmox-http/src/client/rate_limited_stream.rs | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index ea99383..865a426 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -24,6 +24,12 @@ pub struct RateLimitedStream<S> {
stream: S,
}
+impl RateLimitedStream<tokio::net::TcpStream> {
+ pub fn peer_addr(&self) -> std::io::Result<std::net::SocketAddr> {
+ self.stream.peer_addr()
+ }
+}
+
impl <S> RateLimitedStream<S> {
/// Creates a new instance with reads and writes limited to the same `rate`.
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (11 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
` (2 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
proxmox-http/src/client/rate_limiter.rs | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
index 677dfb1..e669410 100644
--- a/proxmox-http/src/client/rate_limiter.rs
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -34,6 +34,12 @@ impl RateLimiter {
}
}
+ /// Update rate and bucket size
+ pub fn update_rate(&mut self, rate: u64, bucket_size: u64) {
+ self.rate = rate;
+ self.bucket_size = bucket_size;
+ }
+
/// Returns the average rate (since `start_time`)
pub fn average_rate(&self, current_time: Instant) -> f64 {
let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64();
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (12 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
pbs-config/src/lib.rs | 2 +-
pbs-config/src/memcom.rs | 14 ++++++++++++++
pbs-config/src/traffic_control.rs | 11 +++++++++--
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/pbs-config/src/lib.rs b/pbs-config/src/lib.rs
index 930b5f7b..bc3b19f0 100644
--- a/pbs-config/src/lib.rs
+++ b/pbs-config/src/lib.rs
@@ -16,7 +16,7 @@ pub mod traffic_control;
pub mod user;
pub mod verify;
-pub(crate) mod memcom;
+pub mod memcom;
use anyhow::{format_err, Error};
diff --git a/pbs-config/src/memcom.rs b/pbs-config/src/memcom.rs
index 4ab07ec9..7b82798b 100644
--- a/pbs-config/src/memcom.rs
+++ b/pbs-config/src/memcom.rs
@@ -23,6 +23,8 @@ pub struct Memcom {
struct Head {
// User (user.cfg) cache generation/version.
user_cache_generation: AtomicUsize,
+ // Traffic control (traffic-control.cfg) generation/version.
+ traffic_control_generation: AtomicUsize,
}
static INSTANCE: OnceCell<Arc<Memcom>> = OnceCell::new();
@@ -81,4 +83,16 @@ impl Memcom {
.user_cache_generation
.fetch_add(1, Ordering::AcqRel);
}
+
+ /// Returns the traffic control generation number.
+ pub fn traffic_control_generation(&self) -> usize {
+ self.head().traffic_control_generation.load(Ordering::Acquire)
+ }
+
+ /// Increase the traffic control generation number.
+ pub fn increase_traffic_control_generation(&self) {
+ self.head()
+ .traffic_control_generation
+ .fetch_add(1, Ordering::AcqRel);
+ }
}
diff --git a/pbs-config/src/traffic_control.rs b/pbs-config/src/traffic_control.rs
index 1c04f589..816bc7a2 100644
--- a/pbs-config/src/traffic_control.rs
+++ b/pbs-config/src/traffic_control.rs
@@ -10,9 +10,9 @@ use pbs_api_types::{TrafficControlRule, TRAFFIC_CONTROL_ID_SCHEMA};
use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+use crate::memcom::Memcom;
use crate::{open_backup_lockfile, replace_backup_config, BackupLockGuard};
-
lazy_static! {
/// Static [`SectionConfig`] to access parser/writer functions.
pub static ref CONFIG: SectionConfig = init();
@@ -55,7 +55,14 @@ pub fn config() -> Result<(SectionConfigData, [u8;32]), Error> {
/// Save the configuration file
pub fn save_config(config: &SectionConfigData) -> Result<(), Error> {
let raw = CONFIG.write(TRAFFIC_CONTROL_CFG_FILENAME, &config)?;
- replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())
+ replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())?;
+
+ // increase traffic control generation
+ // We use this in TrafficControlCache
+ let memcom = Memcom::new()?;
+ memcom.increase_traffic_control_generation();
+
+ Ok(())
}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (13 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
Cargo.toml | 1 +
src/cached_traffic_control.rs | 240 ++++++++++++++++++++++++++++++++++
src/lib.rs | 3 +
3 files changed, 244 insertions(+)
create mode 100644 src/cached_traffic_control.rs
diff --git a/Cargo.toml b/Cargo.toml
index 0f163d65..ac0983b9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -48,6 +48,7 @@ apt-pkg-native = "0.3.2"
base64 = "0.12"
bitflags = "1.2.1"
bytes = "1.0"
+cidr = "0.2.1"
crc32fast = "1"
endian_trait = { version = "0.6", features = ["arrays"] }
env_logger = "0.7"
diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs
new file mode 100644
index 00000000..5a7f46da
--- /dev/null
+++ b/src/cached_traffic_control.rs
@@ -0,0 +1,240 @@
+//! Cached traffic control configuration
+use std::sync::{Arc, Mutex};
+use std::collections::HashMap;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+
+use anyhow::Error;
+use cidr::IpInet;
+
+use proxmox_http::client::RateLimiter;
+use proxmox_time::epoch_i64;
+
+use proxmox_systemd::daily_duration::parse_daily_duration;
+
+use pbs_api_types::TrafficControlRule;
+
+use pbs_config::memcom::Memcom;
+
+pub struct TrafficControlCache {
+ last_update: i64,
+ last_traffic_control_generation: usize,
+ rules: Vec<(TrafficControlRule, Vec<IpInet>)>,
+ limiter_map: HashMap<String, (Arc<Mutex<RateLimiter>>, Arc<Mutex<RateLimiter>>)>,
+}
+
+fn timeframe_match(
+ duration_list: &[String],
+ now: i64,
+) -> Result<bool, Error> {
+
+ for duration_str in duration_list.iter() {
+ let duration = parse_daily_duration(duration_str)?;
+ if duration.time_match(now, false)? {
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+}
+
+fn network_match_len(
+ networks: &[IpInet],
+ ip: &IpAddr,
+) -> Option<u8> {
+
+ let mut match_len = None;
+
+ for cidr in networks.iter() {
+ if cidr.contains(ip) {
+ let network_length = cidr.network_length();
+ match match_len {
+ Some(len) => {
+ if network_length > len {
+ match_len = Some(network_length);
+ }
+ }
+ None => match_len = Some(network_length),
+ }
+ }
+ }
+ match_len
+}
+
+fn cannonical_ip(ip: IpAddr) -> IpAddr {
+ // TODO: use std::net::IpAddr::to_cananical once stable
+ match ip {
+ IpAddr::V4(addr) => IpAddr::V4(addr),
+ IpAddr::V6(addr) => {
+ match addr.octets() {
+ [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
+ IpAddr::V4(Ipv4Addr::new(a, b, c, d))
+ }
+ _ => IpAddr::V6(addr),
+ }
+ }
+ }
+}
+
+impl TrafficControlCache {
+
+ pub fn new() -> Self {
+ Self {
+ rules: Vec::new(),
+ limiter_map: HashMap::new(),
+ last_traffic_control_generation: 0,
+ last_update: 0,
+ }
+ }
+
+ pub fn reload(&mut self) {
+ let now = epoch_i64();
+
+ let memcom = match Memcom::new() {
+ Ok(memcom) => memcom,
+ Err(err) => {
+ log::error!("TrafficControlCache::reload failed in Memcom::new: {}", err);
+ return;
+ }
+ };
+
+ let traffic_control_generation = memcom.traffic_control_generation();
+
+ if (self.last_update != 0) &&
+ (traffic_control_generation == self.last_traffic_control_generation) &&
+ ((now - self.last_update) < 60) { return; }
+
+ log::debug!("reload traffic control rules");
+
+ self.last_traffic_control_generation = traffic_control_generation;
+ self.last_update = now;
+
+ match self.reload_impl() {
+ Ok(()) => (),
+ Err(err) => {
+ log::error!("TrafficControlCache::reload failed -> {}", err);
+ }
+ }
+ }
+
+ fn reload_impl(&mut self) -> Result<(), Error> {
+ let (config, _) = pbs_config::traffic_control::config()?;
+
+ self.limiter_map.retain(|key, _value| config.sections.contains_key(key));
+
+ let rules: Vec<TrafficControlRule> =
+ config.convert_to_typed_array("rule")?;
+
+ let now = proxmox_time::epoch_i64();
+
+ let mut active_rules = Vec::new();
+
+ for rule in rules {
+ if let Some(ref timeframe) = rule.config.timeframe {
+ if timeframe_match(timeframe, now)? {
+ self.limiter_map.remove(&rule.name);
+ continue;
+ }
+ }
+
+ let rate = rule.config.rate;
+ let burst = rule.config.burst.unwrap_or(rate);
+
+ if let Some(limiter) = self.limiter_map.get(&rule.name) {
+ limiter.0.lock().unwrap().update_rate(rate, burst);
+ limiter.1.lock().unwrap().update_rate(rate, burst);
+ } else {
+
+ let read_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+ let write_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+
+ self.limiter_map.insert(
+ rule.name.clone(),
+ (read_limiter, write_limiter),
+ );
+ }
+
+ let mut networks = Vec::new();
+
+ for network in rule.config.network.iter() {
+ let cidr = match network.parse() {
+ Ok(cidr) => cidr,
+ Err(err) => {
+ log::error!("unable to parse network '{}' - {}", network, err);
+ continue;
+ }
+ };
+ networks.push(cidr);
+ }
+
+ active_rules.push((rule, networks));
+ }
+
+ self.rules = active_rules;
+
+ Ok(())
+ }
+
+ pub fn lookup_rate_limiter(
+ &self,
+ peer: Option<SocketAddr>,
+ ) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+
+ let peer = match peer {
+ None => return (None, None),
+ Some(peer) => peer,
+ };
+
+ let peer_ip = cannonical_ip(peer.ip());
+
+ log::debug!("lookup_rate_limiter {} {:?}", peer_ip.is_ipv4(), peer_ip);
+
+ let mut last_rule_match = None;
+
+ for (rule, networks) in self.rules.iter() {
+ if let Some(match_len) = network_match_len(networks, &peer_ip) {
+ match last_rule_match {
+ None => last_rule_match = Some((rule, match_len)),
+ Some((_, last_len)) => {
+ if match_len > last_len {
+ last_rule_match = Some((rule, match_len));
+ }
+ }
+ }
+ }
+ }
+
+ match last_rule_match {
+ Some((rule, _)) => {
+ match self.limiter_map.get(&rule.name) {
+ Some((read_limiter, write_limiter)) => {
+ (Some(Arc::clone(read_limiter)), Some(Arc::clone(write_limiter)))
+ }
+ None => (None, None), // should never happen
+ }
+ }
+ None => (None, None),
+ }
+ }
+}
+
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn testnetwork_match() -> Result<(), Error> {
+
+ let networks = ["192.168.2.1/24", "127.0.0.0/8"];
+ let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect();
+
+ assert_eq!(network_match_len(&networks, &"192.168.2.1".parse()?), Some(24));
+ assert_eq!(network_match_len(&networks, &"192.168.2.254".parse()?), Some(24));
+ assert_eq!(network_match_len(&networks, &"192.168.3.1".parse()?), None);
+ assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(8));
+ assert_eq!(network_match_len(&networks, &"128.1.1.0".parse()?), None);
+
+ Ok(())
+
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5f6d5e7e..8f5ed245 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -33,6 +33,9 @@ pub mod client_helpers;
pub mod rrd_cache;
+mod cached_traffic_control;
+pub use cached_traffic_control::TrafficControlCache;
+
/// Get the server's certificate info (from `proxy.pem`).
pub fn cert_info() -> Result<CertInfo, anyhow::Error> {
CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem")))
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (14 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 25 ++++++++++++++++++++++++-
1 file changed, 24 insertions(+), 1 deletion(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 1589a57d..01308463 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -21,6 +21,7 @@ use proxmox::sys::linux::socket::set_tcp_keepalive;
use proxmox::tools::fs::CreateOptions;
use proxmox_lang::try_block;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
+use proxmox_http::client::{RateLimiter, RateLimitedStream};
use pbs_tools::{task_log, task_warn};
use pbs_datastore::DataStore;
@@ -70,6 +71,7 @@ use proxmox_backup::api2::pull::do_sync_job;
use proxmox_backup::api2::tape::backup::do_tape_backup_job;
use proxmox_backup::server::do_verification_job;
use proxmox_backup::server::do_prune_job;
+use proxmox_backup::TrafficControlCache;
fn main() -> Result<(), Error> {
proxmox_backup::tools::setup_safe_path_env();
@@ -351,7 +353,7 @@ fn make_tls_acceptor() -> Result<SslAcceptor, Error> {
}
type ClientStreamResult =
- Result<std::pin::Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>, Error>;
+ Result<std::pin::Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>, Error>;
const MAX_PENDING_ACCEPTS: usize = 1024;
fn accept_connections(
@@ -387,6 +389,9 @@ async fn accept_connection(
sock.set_nodelay(true).unwrap();
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+ let peer = sock.peer_addr().ok();
+ let sock = RateLimitedStream::with_limiter_update_cb(sock, move || lookup_rate_limiter(peer));
+
let ssl = { // limit acceptor_guard scope
// Acceptor can be reloaded using the command socket "reload-certificate" command
let acceptor_guard = acceptor.lock().unwrap();
@@ -1075,3 +1080,21 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
}
}
}
+
+// Rate Limiter lookup
+
+// Test WITH
+// proxmox-backup-client restore vm/201/2021-10-22T09:55:56Z drive-scsi0.img img1.img --repository localhost:store2
+
+lazy_static::lazy_static!{
+ static ref TRAFFIC_CONTROL_CACHE: Arc<Mutex<TrafficControlCache>> =
+ Arc::new(Mutex::new(TrafficControlCache::new()));
+}
+
+fn lookup_rate_limiter(
+ peer: Option<std::net::SocketAddr>,
+) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+ let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap();
+ cache.reload();
+ cache.lookup_rate_limiter(peer)
+}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread