* [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation
@ 2021-11-09 6:52 Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
` (15 more replies)
0 siblings, 16 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
This implements a simple Token Bucket based rate limiter. That filter
can be used to:
- limit http client connection
- limit speed at server side (proxmox-backup-proxy)
There is also a new configuration file to configure the server side
rate limits: /etc/proxmox-backup/traffic-control.cfg
The server side filter dynamically updates when the user changes
configuration (even for existing connections).
Dietmar Maurer (proxmox/7):
Implement a rate limiting stream (AsyncRead, AsyncWrite)
RateLimitedStream: implement poll_write_vectored
HttpsConnector: use RateLimitedStream
RateLimitedStream: allow periodic limiter updates
RateLimiter: avoid panic in time computations
RateLimitedStream: implement peer_addr
RateLimiter: add update_rate method
proxmox-http/src/client/connector.rs | 51 +++-
proxmox-http/src/client/mod.rs | 6 +
.../src/client/rate_limited_stream.rs | 233 ++++++++++++++++++
proxmox-http/src/client/rate_limiter.rs | 84 +++++++
4 files changed, 366 insertions(+), 8 deletions(-)
create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
create mode 100644 proxmox-http/src/client/rate_limiter.rs
Dietmar Maurer (proxmox-backup/9):
pbs-client: add option to use the new RateLimiter
proxmox-backup-client: add rate/burst parameter to backup CLI
implement Servive for RateLimitedStream
New DailyDuration type with nom parser
DailyDuration: implement time_match()
Add traffic control configuration config with API
traffic_control: use Memcom to track. config versions
implement a traffic control cache for fast rate control limiter
lockups
proxmox-backup-proxy: implement traffic control
Cargo.toml | 1 +
pbs-api-types/src/lib.rs | 7 +
pbs-api-types/src/traffic_control.rs | 81 +++++
pbs-client/src/http_client.rs | 24 +-
pbs-client/src/tools/mod.rs | 23 +-
pbs-config/src/lib.rs | 3 +-
pbs-config/src/memcom.rs | 14 +
pbs-config/src/traffic_control.rs | 98 ++++++
proxmox-backup-client/src/main.rs | 19 +-
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/rest.rs | 28 ++
proxmox-systemd/src/daily_duration.rs | 152 ++++++++++
proxmox-systemd/src/lib.rs | 1 +
proxmox-systemd/src/parse_time.rs | 56 ++++
proxmox-systemd/src/time.rs | 2 +-
src/api2/config/mod.rs | 2 +
src/api2/config/traffic_control.rs | 283 ++++++++++++++++++
src/bin/proxmox-backup-manager.rs | 1 +
src/bin/proxmox-backup-proxy.rs | 25 +-
src/bin/proxmox_backup_manager/mod.rs | 2 +
.../proxmox_backup_manager/traffic_control.rs | 105 +++++++
src/cached_traffic_control.rs | 240 +++++++++++++++
src/lib.rs | 3 +
23 files changed, 1161 insertions(+), 10 deletions(-)
create mode 100644 pbs-api-types/src/traffic_control.rs
create mode 100644 pbs-config/src/traffic_control.rs
create mode 100644 proxmox-systemd/src/daily_duration.rs
create mode 100644 src/api2/config/traffic_control.rs
create mode 100644 src/bin/proxmox_backup_manager/traffic_control.rs
create mode 100644 src/cached_traffic_control.rs
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite)
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
` (14 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/mod.rs | 6 +
.../src/client/rate_limited_stream.rs | 144 ++++++++++++++++++
proxmox-http/src/client/rate_limiter.rs | 75 +++++++++
3 files changed, 225 insertions(+)
create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
create mode 100644 proxmox-http/src/client/rate_limiter.rs
diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs
index b6ee4b0..30e66d5 100644
--- a/proxmox-http/src/client/mod.rs
+++ b/proxmox-http/src/client/mod.rs
@@ -2,6 +2,12 @@
//!
//! Contains a lightweight wrapper around `hyper` with support for TLS connections.
+mod rate_limiter;
+pub use rate_limiter::RateLimiter;
+
+mod rate_limited_stream;
+pub use rate_limited_stream::RateLimitedStream;
+
mod connector;
pub use connector::HttpsConnector;
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
new file mode 100644
index 0000000..a11b59e
--- /dev/null
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -0,0 +1,144 @@
+use std::pin::Pin;
+use std::marker::Unpin;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::Future;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
+use tokio::time::Sleep;
+
+use std::task::{Context, Poll};
+
+use super::RateLimiter;
+
+/// A rate limited stream using [RateLimiter]
+pub struct RateLimitedStream<S> {
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ read_delay: Option<Pin<Box<Sleep>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_delay: Option<Pin<Box<Sleep>>>,
+ stream: S,
+}
+
+impl <S> RateLimitedStream<S> {
+
+ const MIN_DELAY: Duration = Duration::from_millis(20);
+
+ /// Creates a new instance with reads and writes limited to the same `rate`.
+ pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
+ let now = Instant::now();
+ let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+ let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+ Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
+ }
+
+ /// Creates a new instance with specified [RateLimiters] for reads and writes.
+ pub fn with_limiter(
+ stream: S,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ ) -> Self {
+ Self {
+ read_limiter,
+ read_delay: None,
+ write_limiter,
+ write_delay: None,
+ stream,
+ }
+ }
+}
+
+impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
+
+ fn poll_write(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.write_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
+
+ if let Some(ref write_limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = &result {
+ let now = Instant::now();
+ let delay = write_limiter.lock().unwrap()
+ .register_traffic(now, *count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.write_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_flush(ctx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_shutdown(ctx)
+ }
+}
+
+impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.read_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.read_delay = None;
+
+ let filled_len = buf.filled().len();
+ let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
+
+ if let Some(ref read_limiter) = this.read_limiter {
+ if let Poll::Ready(Ok(())) = &result {
+ let count = buf.filled().len() - filled_len;
+ let now = Instant::now();
+ let delay = read_limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.read_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+}
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
new file mode 100644
index 0000000..4742387
--- /dev/null
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -0,0 +1,75 @@
+use std::time::{Duration, Instant};
+use std::convert::TryInto;
+
+/// Token bucket based rate limiter
+pub struct RateLimiter {
+ rate: u64, // tokens/second
+ start_time: Instant,
+ traffic: u64, // overall traffic
+ bucket_size: u64,
+ last_update: Instant,
+ consumed_tokens: u64,
+}
+
+impl RateLimiter {
+
+ const NO_DELAY: Duration = Duration::from_millis(0);
+
+ /// Creates a new instance, using [Instant::now] as start time.
+ pub fn new(rate: u64, bucket_size: u64) -> Self {
+ let start_time = Instant::now();
+ Self::with_start_time(rate, bucket_size, start_time)
+ }
+
+ /// Creates a new instance with specified `rate`, `bucket_size` and `start_time`.
+ pub fn with_start_time(rate: u64, bucket_size: u64, start_time: Instant) -> Self {
+ Self {
+ rate,
+ start_time,
+ traffic: 0,
+ bucket_size,
+ last_update: start_time,
+ // start with empty bucket (all tokens consumed)
+ consumed_tokens: bucket_size,
+ }
+ }
+
+ /// Returns the average rate (since `start_time`)
+ pub fn average_rate(&self, current_time: Instant) -> f64 {
+ let time_diff = (current_time - self.start_time).as_secs_f64();
+ if time_diff <= 0.0 {
+ 0.0
+ } else {
+ (self.traffic as f64) / time_diff
+ }
+ }
+
+ fn refill_bucket(&mut self, current_time: Instant) {
+ let time_diff = (current_time - self.last_update).as_nanos();
+
+ if time_diff <= 0 {
+ //log::error!("update_time: got negative time diff");
+ return;
+ }
+
+ self.last_update = current_time;
+
+ let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
+ .try_into().unwrap_or(u64::MAX);
+
+ self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
+ }
+
+ /// Register traffic, returning a proposed delay to reach the expected rate.
+ pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
+ self.refill_bucket(current_time);
+
+ self.traffic += data_len;
+ self.consumed_tokens += data_len;
+
+ if self.consumed_tokens <= self.bucket_size {
+ return Self::NO_DELAY;
+ }
+ Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
` (13 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
pbs-client/src/http_client.rs | 24 ++++++++++++++++++++++--
pbs-client/src/tools/mod.rs | 23 ++++++++++++++++++++---
2 files changed, 42 insertions(+), 5 deletions(-)
diff --git a/pbs-client/src/http_client.rs b/pbs-client/src/http_client.rs
index 73c83f7a..defaef8a 100644
--- a/pbs-client/src/http_client.rs
+++ b/pbs-client/src/http_client.rs
@@ -20,7 +20,7 @@ use proxmox::{
};
use proxmox_router::HttpError;
-use proxmox_http::client::HttpsConnector;
+use proxmox_http::client::{HttpsConnector, RateLimiter};
use proxmox_http::uri::build_authority;
use pbs_api_types::{Authid, Userid};
@@ -51,6 +51,8 @@ pub struct HttpClientOptions {
ticket_cache: bool,
fingerprint_cache: bool,
verify_cert: bool,
+ rate_limit: Option<u64>,
+ bucket_size: Option<u64>,
}
impl HttpClientOptions {
@@ -109,6 +111,16 @@ impl HttpClientOptions {
self.verify_cert = verify_cert;
self
}
+
+ pub fn rate_limit(mut self, rate_limit: Option<u64>) -> Self {
+ self.rate_limit = rate_limit;
+ self
+ }
+
+ pub fn bucket_size(mut self, bucket_size: Option<u64>) -> Self {
+ self.bucket_size = bucket_size;
+ self
+ }
}
impl Default for HttpClientOptions {
@@ -121,6 +133,8 @@ impl Default for HttpClientOptions {
ticket_cache: false,
fingerprint_cache: false,
verify_cert: true,
+ rate_limit: None,
+ bucket_size: None,
}
}
}
@@ -343,7 +357,13 @@ impl HttpClient {
httpc.enforce_http(false); // we want https...
httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
- let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+ let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
+ if let Some(rate_limit) = options.rate_limit {
+ let bucket_size = options.bucket_size.unwrap_or_else(|| rate_limit*3);
+ https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
+ https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
+ }
let client = Client::builder()
//.http2_initial_stream_window_size( (1 << 31) - 2)
diff --git a/pbs-client/src/tools/mod.rs b/pbs-client/src/tools/mod.rs
index a12635cf..539ad662 100644
--- a/pbs-client/src/tools/mod.rs
+++ b/pbs-client/src/tools/mod.rs
@@ -135,15 +135,32 @@ pub fn extract_repository_from_map(param: &HashMap<String, String>) -> Option<Ba
}
pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> {
- connect_do(repo.host(), repo.port(), repo.auth_id())
+ connect_do(repo.host(), repo.port(), repo.auth_id(), None, None)
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
}
-fn connect_do(server: &str, port: u16, auth_id: &Authid) -> Result<HttpClient, Error> {
+pub fn connect_rate_limited(
+ repo: &BackupRepository,
+ rate: Option<u64>,
+ bucket_size: Option<u64>,
+) -> Result<HttpClient, Error> {
+ connect_do(repo.host(), repo.port(), repo.auth_id(), rate, bucket_size)
+ .map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
+}
+
+fn connect_do(
+ server: &str,
+ port: u16,
+ auth_id: &Authid,
+ rate_limit: Option<u64>,
+ bucket_size: Option<u64>,
+) -> Result<HttpClient, Error> {
let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
- let options = HttpClientOptions::new_interactive(password, fingerprint);
+ let options = HttpClientOptions::new_interactive(password, fingerprint)
+ .rate_limit(rate_limit)
+ .bucket_size(bucket_size);
HttpClient::new(server, port, auth_id, options)
}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
` (12 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
.../src/client/rate_limited_stream.rs | 92 +++++++++++++------
1 file changed, 62 insertions(+), 30 deletions(-)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index a11b59e..0cc0ebb 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::pin::Pin;
use std::marker::Unpin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
+use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
@@ -22,8 +23,6 @@ pub struct RateLimitedStream<S> {
impl <S> RateLimitedStream<S> {
- const MIN_DELAY: Duration = Duration::from_millis(20);
-
/// Creates a new instance with reads and writes limited to the same `rate`.
pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
let now = Instant::now();
@@ -48,6 +47,33 @@ impl <S> RateLimitedStream<S> {
}
}
+fn register_traffic(
+ limiter: &Mutex<RateLimiter>,
+ count: usize,
+) -> Option<Pin<Box<Sleep>>>{
+
+ const MIN_DELAY: Duration = Duration::from_millis(10);
+
+ let now = Instant::now();
+ let delay = limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ Some(Box::pin(sleep))
+ } else {
+ None
+ }
+}
+
+fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
+ match delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ }
+}
+
impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
fn poll_write(
@@ -57,12 +83,7 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.write_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -70,15 +91,37 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
- if let Some(ref write_limiter) = this.write_limiter {
- if let Poll::Ready(Ok(count)) = &result {
- let now = Instant::now();
- let delay = write_limiter.lock().unwrap()
- .register_traffic(now, *count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.write_delay = Some(Box::pin(sleep));
- }
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
+ }
+ }
+
+ result
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.stream.is_write_vectored()
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
+
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
}
}
@@ -111,12 +154,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.read_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.read_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -128,13 +166,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
if let Some(ref read_limiter) = this.read_limiter {
if let Poll::Ready(Ok(())) = &result {
let count = buf.filled().len() - filled_len;
- let now = Instant::now();
- let delay = read_limiter.lock().unwrap()
- .register_traffic(now, count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.read_delay = Some(Box::pin(sleep));
- }
+ this.read_delay = register_traffic(read_limiter, count);
}
}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (2 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
` (11 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-backup-client/src/main.rs | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index cb083006..d81271d0 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -45,7 +45,7 @@ use pbs_client::tools::{
complete_archive_name, complete_auth_id, complete_backup_group, complete_backup_snapshot,
complete_backup_source, complete_chunk_size, complete_group_or_snapshot,
complete_img_archive_name, complete_pxar_archive_name, complete_repository, connect,
- extract_repository_from_value,
+ connect_rate_limited, extract_repository_from_value,
key_source::{
crypto_parameters, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
KEYFILE_SCHEMA, MASTER_PUBKEY_FD_SCHEMA, MASTER_PUBKEY_FILE_SCHEMA,
@@ -582,6 +582,18 @@ fn spawn_catalog_upload(
schema: CHUNK_SIZE_SCHEMA,
optional: true,
},
+ rate: {
+ type: u64,
+ description: "Rate limit for TBF in bytes/second.",
+ optional: true,
+ minimum: 1,
+ },
+ burst: {
+ type: u64,
+ description: "Size of the TBF bucket, in bytes.",
+ optional: true,
+ minimum: 1,
+ },
"exclude": {
type: Array,
description: "List of paths or patterns for matching files to exclude.",
@@ -630,6 +642,9 @@ async fn create_backup(
verify_chunk_size(size)?;
}
+ let rate_limit = param["rate"].as_u64();
+ let bucket_size = param["burst"].as_u64();
+
let crypto = crypto_parameters(¶m)?;
let backup_id = param["backup-id"].as_str().unwrap_or(&proxmox::tools::nodename());
@@ -724,7 +739,7 @@ async fn create_backup(
let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
- let client = connect(&repo)?;
+ let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
record_repository(&repo);
println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?);
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (3 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
` (10 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
So that we can limit used bandwidth.
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-http/src/client/connector.rs | 51 ++++++++++++++++---
.../src/client/rate_limited_stream.rs | 8 +++
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
use anyhow::{bail, format_err, Error};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::*;
use http::Uri;
use hyper::client::HttpConnector;
use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_openssl::SslStream;
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
use crate::tls::MaybeTlsStream;
use crate::uri::build_authority;
+use super::{RateLimiter, RateLimitedStream};
+
#[derive(Clone)]
pub struct HttpsConnector {
connector: HttpConnector,
ssl_connector: Arc<SslConnector>,
proxy: Option<ProxyConfig>,
tcp_keepalive: u32,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
}
impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
ssl_connector: Arc::new(ssl_connector),
proxy: None,
tcp_keepalive,
+ read_limiter: None,
+ write_limiter: None,
}
}
@@ -45,13 +51,21 @@ impl HttpsConnector {
self.proxy = Some(proxy);
}
- async fn secure_stream(
- tcp_stream: TcpStream,
+ pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.read_limiter = limiter;
+ }
+
+ pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ self.write_limiter = limiter;
+ }
+
+ async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+ tcp_stream: S,
ssl_connector: &SslConnector,
host: &str,
- ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+ ) -> Result<MaybeTlsStream<S>, Error> {
let config = ssl_connector.configure()?;
- let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+ let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
Pin::new(&mut conn).connect().await?;
Ok(MaybeTlsStream::Secured(conn))
}
@@ -107,7 +121,7 @@ impl HttpsConnector {
}
impl hyper::service::Service<Uri> for HttpsConnector {
- type Response = MaybeTlsStream<TcpStream>;
+ type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
};
let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
let keepalive = self.tcp_keepalive;
+ let read_limiter = self.read_limiter.clone();
+ let write_limiter = self.write_limiter.clone();
+
if let Some(ref proxy) = self.proxy {
let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
if use_connect {
async move {
- let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+ let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
format_err!("error connecting to {} - {}", proxy_authority, err)
})?;
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let mut tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
if let Some(authorization) = authorization {
connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
Ok(MaybeTlsStream::Proxied(tcp_stream))
}
.boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+ let tcp_stream = RateLimitedStream::with_limiter(
+ tcp_stream,
+ read_limiter,
+ write_limiter,
+ );
+
if is_https {
Self::secure_stream(tcp_stream, &ssl_connector, &host).await
} else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 0cc0ebb..00ba066 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
use std::task::{Context, Poll};
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
}
}
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+ fn connected(&self) -> Connected {
+ self.stream.connected()
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (4 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
` (9 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/rest.rs | 28 ++++++++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 1fa76f21..b88e5d12 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -33,6 +33,7 @@ url = "2.1"
proxmox = "0.15.0"
proxmox-io = "1"
proxmox-lang = "1"
+proxmox-http = { version = "0.5.0", features = [ "client" ] }
proxmox-router = "1.1"
proxmox-schema = { version = "1", features = [ "api-macro", "upid-api-impl" ] }
proxmox-time = "1"
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index 74bc8bb1..f27f703d 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -31,6 +31,8 @@ use proxmox_schema::{
ParameterSchema,
};
+use proxmox_http::client::RateLimitedStream;
+
use pbs_tools::compression::{DeflateEncoder, Level};
use pbs_tools::stream::AsyncReaderStream;
@@ -73,6 +75,32 @@ impl RestServer {
}
}
+impl Service<&Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>>
+ for RestServer
+{
+ type Response = ApiService;
+ type Error = Error;
+ type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
+
+ fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(
+ &mut self,
+ ctx: &Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>,
+ ) -> Self::Future {
+ match ctx.get_ref().peer_addr() {
+ Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
+ Ok(peer) => future::ok(ApiService {
+ peer,
+ api_config: self.api_config.clone(),
+ })
+ .boxed(),
+ }
+ }
+}
+
impl Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>>
for RestServer
{
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (5 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
` (8 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
We will use this to specify timesframes for network rate limits (only
apply limite when inside the time frame).
Note: This is not systemd related, but we can reuse some of the parser
method.
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
proxmox-systemd/src/daily_duration.rs | 92 +++++++++++++++++++++++++++
proxmox-systemd/src/lib.rs | 1 +
proxmox-systemd/src/parse_time.rs | 56 ++++++++++++++++
proxmox-systemd/src/time.rs | 2 +-
4 files changed, 150 insertions(+), 1 deletion(-)
create mode 100644 proxmox-systemd/src/daily_duration.rs
diff --git a/proxmox-systemd/src/daily_duration.rs b/proxmox-systemd/src/daily_duration.rs
new file mode 100644
index 00000000..bed4eb47
--- /dev/null
+++ b/proxmox-systemd/src/daily_duration.rs
@@ -0,0 +1,92 @@
+use std::cmp::{Ordering, PartialOrd};
+
+use super::time::{WeekDays};
+
+pub use super::parse_time::parse_daily_duration;
+
+/// Time of Day (hour with minute)
+#[derive(Default, PartialEq, Clone, Debug)]
+pub struct HmTime {
+ pub hour: u32,
+ pub minute: u32,
+}
+
+impl PartialOrd for HmTime {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ let mut order = self.hour.cmp(&other.hour);
+ if order == Ordering::Equal {
+ order = self.minute.cmp(&other.minute);
+ }
+ Some(order)
+ }
+}
+
+#[derive(Default, Clone, Debug)]
+pub struct DailyDuration {
+ /// the days in a week this duration should trigger
+ pub days: WeekDays,
+ pub start: HmTime,
+ pub end: HmTime,
+}
+
+#[cfg(test)]
+mod test {
+
+ use anyhow::{bail, Error};
+
+ use super::*;
+
+ fn test_parse(
+ duration_str: &str,
+ start_h: u32, start_m: u32,
+ end_h: u32, end_m: u32,
+ days: &[usize],
+ ) -> Result<(), Error> {
+ let mut day_bits = 0;
+ for day in days { day_bits |= 1<<day; }
+ let expected_days = WeekDays::from_bits(day_bits).unwrap();
+
+ let duration = parse_daily_duration(duration_str)?;
+
+ if duration.start.hour != start_h {
+ bail!("start hour missmatch, extected {}, got {:?}", start_h, duration);
+ }
+ if duration.start.minute != start_m {
+ bail!("start minute missmatch, extected {}, got {:?}", start_m, duration);
+ }
+ if duration.end.hour != end_h {
+ bail!("end hour missmatch, extected {}, got {:?}", end_h, duration);
+ }
+ if duration.end.minute != end_m {
+ bail!("end minute missmatch, extected {}, got {:?}", end_m, duration);
+ }
+
+ if duration.days != expected_days {
+ bail!("weekday missmatch, extected {:?}, got {:?}", expected_days, duration);
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_daily_duration_parser() -> Result<(), Error> {
+
+ assert!(parse_daily_duration("").is_err());
+ assert!(parse_daily_duration(" 8-12").is_err());
+ assert!(parse_daily_duration("8:60-12").is_err());
+ assert!(parse_daily_duration("8-25").is_err());
+ assert!(parse_daily_duration("12-8").is_err());
+
+ test_parse("8-12", 8, 0, 12, 0, &[])?;
+ test_parse("8:0-12:0", 8, 0, 12, 0, &[])?;
+ test_parse("8:00-12:00", 8, 0, 12, 0, &[])?;
+ test_parse("8:05-12:20", 8, 5, 12, 20, &[])?;
+ test_parse("8:05 - 12:20", 8, 5, 12, 20, &[])?;
+
+ test_parse("mon 8-12", 8, 0, 12, 0, &[0])?;
+ test_parse("tue..fri 8-12", 8, 0, 12, 0, &[1,2,3,4])?;
+ test_parse("sat,tue..thu,fri 8-12", 8, 0, 12, 0, &[1,2,3,4,5])?;
+
+ Ok(())
+ }
+}
diff --git a/proxmox-systemd/src/lib.rs b/proxmox-systemd/src/lib.rs
index b4ab4b72..7c2b1f90 100644
--- a/proxmox-systemd/src/lib.rs
+++ b/proxmox-systemd/src/lib.rs
@@ -1,4 +1,5 @@
pub mod time;
+pub mod daily_duration;
mod parse_time;
mod unit;
diff --git a/proxmox-systemd/src/parse_time.rs b/proxmox-systemd/src/parse_time.rs
index ba9449b1..d212e264 100644
--- a/proxmox-systemd/src/parse_time.rs
+++ b/proxmox-systemd/src/parse_time.rs
@@ -4,6 +4,7 @@ use anyhow::{bail, Error};
use lazy_static::lazy_static;
use super::time::*;
+use super::daily_duration::*;
use nom::{
error::{context, ParseError, VerboseError},
@@ -452,3 +453,58 @@ fn parse_time_span_incomplete(mut i: &str) -> IResult<&str, TimeSpan> {
Ok((i, ts))
}
+
+pub fn parse_daily_duration(i: &str) -> Result<DailyDuration, Error> {
+ parse_complete_line("daily duration", i, parse_daily_duration_incomplete)
+}
+
+fn parse_daily_duration_incomplete(mut i: &str) -> IResult<&str, DailyDuration> {
+
+ let mut duration = DailyDuration::default();
+
+ if i.starts_with(|c: char| char::is_ascii_alphabetic(&c)) {
+
+ let (n, range_list) = context(
+ "weekday range list",
+ separated_nonempty_list(tag(","), parse_weekdays_range)
+ )(i)?;
+
+ i = space0(n)?.0;
+
+ for range in range_list { duration.days.insert(range); }
+ }
+
+ let (i, start) = parse_hm_time(i)?;
+
+ let i = space0(i)?.0;
+
+ let (i, _) = tag("-")(i)?;
+
+ let i = space0(i)?.0;
+
+ let end_time_start = i;
+
+ let (i, end) = parse_hm_time(i)?;
+
+ if start > end {
+ return Err(parse_error(end_time_start, "end time before start time"));
+ }
+
+ duration.start = start;
+ duration.end = end;
+
+ Ok((i, duration))
+}
+
+fn parse_hm_time(i: &str) -> IResult<&str, HmTime> {
+
+ let (i, (hour, opt_minute)) = tuple((
+ parse_time_comp(24),
+ opt(preceded(tag(":"), parse_time_comp(60))),
+ ))(i)?;
+
+ match opt_minute {
+ Some(minute) => Ok((i, HmTime { hour, minute })),
+ None => Ok((i, HmTime { hour, minute: 0})),
+ }
+}
diff --git a/proxmox-systemd/src/time.rs b/proxmox-systemd/src/time.rs
index b81e970e..e5fe7965 100644
--- a/proxmox-systemd/src/time.rs
+++ b/proxmox-systemd/src/time.rs
@@ -5,7 +5,7 @@ use bitflags::bitflags;
use proxmox_time::TmEditor;
-pub use super::parse_time::*;
+pub use super::parse_time::{parse_calendar_event, parse_time_span};
bitflags!{
#[derive(Default)]
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (6 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
` (7 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
.../src/client/rate_limited_stream.rs | 43 +++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 00ba066..ea99383 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -19,6 +19,8 @@ pub struct RateLimitedStream<S> {
read_delay: Option<Pin<Box<Sleep>>>,
write_limiter: Option<Arc<Mutex<RateLimiter>>>,
write_delay: Option<Pin<Box<Sleep>>>,
+ update_limiter_cb: Option<Box<dyn Fn() -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) + Send>>,
+ last_limiter_update: Instant,
stream: S,
}
@@ -43,9 +45,44 @@ impl <S> RateLimitedStream<S> {
read_delay: None,
write_limiter,
write_delay: None,
+ update_limiter_cb: None,
+ last_limiter_update: Instant::now(),
stream,
}
}
+
+ /// Creates a new instance with limiter update callback.
+ ///
+ /// The fuction is called every minute to update/change the used limiters.
+ ///
+ /// Note: This function is called within an async context, so it
+ /// should be fast and must not block.
+ pub fn with_limiter_update_cb<F: Fn() -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) + Send + 'static>(
+ stream: S,
+ update_limiter_cb: F,
+ ) -> Self {
+ let (read_limiter, write_limiter) = update_limiter_cb();
+ Self {
+ read_limiter,
+ read_delay: None,
+ write_limiter,
+ write_delay: None,
+ update_limiter_cb: Some(Box::new(update_limiter_cb)),
+ last_limiter_update: Instant::now(),
+ stream,
+ }
+ }
+
+ fn update_limiters(&mut self) {
+ if let Some(ref update_limiter_cb) = self.update_limiter_cb {
+ if self.last_limiter_update.elapsed().as_secs() >= 5 {
+ self.last_limiter_update = Instant::now();
+ let (read_limiter, write_limiter) = update_limiter_cb();
+ self.read_limiter = read_limiter;
+ self.write_limiter = write_limiter;
+ }
+ }
+ }
}
fn register_traffic(
@@ -90,6 +127,8 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
this.write_delay = None;
+ this.update_limiters();
+
let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
if let Some(ref limiter) = this.write_limiter {
@@ -118,6 +157,8 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
this.write_delay = None;
+ this.update_limiters();
+
let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
if let Some(ref limiter) = this.write_limiter {
@@ -161,6 +202,8 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
this.read_delay = None;
+ this.update_limiters();
+
let filled_len = buf.filled().len();
let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match()
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (7 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
` (6 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
proxmox-systemd/src/daily_duration.rs | 60 +++++++++++++++++++++++++++
1 file changed, 60 insertions(+)
diff --git a/proxmox-systemd/src/daily_duration.rs b/proxmox-systemd/src/daily_duration.rs
index bed4eb47..25157b25 100644
--- a/proxmox-systemd/src/daily_duration.rs
+++ b/proxmox-systemd/src/daily_duration.rs
@@ -1,4 +1,9 @@
use std::cmp::{Ordering, PartialOrd};
+use std::convert::TryInto;
+
+use anyhow::Error;
+
+use proxmox_time::TmEditor;
use super::time::{WeekDays};
@@ -29,6 +34,30 @@ pub struct DailyDuration {
pub end: HmTime,
}
+impl DailyDuration {
+
+ // Test it time is within this frame
+ pub fn time_match(&self, epoch: i64, utc: bool) -> Result<bool, Error> {
+
+ let t = TmEditor::with_epoch(epoch, utc)?;
+
+ let all_days = self.days.is_empty() || self.days.is_all();
+
+ if !all_days { // match day first
+ let day_num: u32 = t.day_num().try_into()?;
+ let day = WeekDays::from_bits(1<<day_num).unwrap();
+ if !self.days.contains(day) { return Ok(false); }
+ }
+
+ let hour = t.hour().try_into()?;
+ let minute = t.min().try_into()?;
+
+ let ctime = HmTime { hour, minute };
+
+ Ok(ctime >= self.start && ctime < self.end)
+ }
+}
+
#[cfg(test)]
mod test {
@@ -68,6 +97,10 @@ mod test {
Ok(())
}
+ const fn make_test_time(mday: i32, hour: i32, min: i32) -> i64 {
+ (mday*3600*24 + hour*3600 + min*60) as i64
+ }
+
#[test]
fn test_daily_duration_parser() -> Result<(), Error> {
@@ -89,4 +122,31 @@ mod test {
Ok(())
}
+
+ #[test]
+ fn test_time_match() -> Result<(), Error> {
+ const THURSDAY_80_00: i64 = make_test_time(0, 8, 0);
+ const THURSDAY_12_00: i64 = make_test_time(0, 12, 0);
+ const DAY: i64 = 3600*24;
+
+ let duration = parse_daily_duration("thu..fri 8:05-12")?;
+
+ assert!(!duration.time_match(THURSDAY_80_00, true)?);
+ assert!(!duration.time_match(THURSDAY_80_00 + DAY, true)?);
+ assert!(!duration.time_match(THURSDAY_80_00 + 2*DAY, true)?);
+
+ assert!(duration.time_match(THURSDAY_80_00 + 5*60, true)?);
+ assert!(duration.time_match(THURSDAY_80_00 + 5*60 + DAY, true)?);
+ assert!(!duration.time_match(THURSDAY_80_00 + 5*60 + 2*DAY, true)?);
+
+ assert!(duration.time_match(THURSDAY_12_00 - 1, true)?);
+ assert!(duration.time_match(THURSDAY_12_00 - 1 + DAY, true)?);
+ assert!(!duration.time_match(THURSDAY_12_00 - 1 + 2*DAY, true)?);
+
+ assert!(!duration.time_match(THURSDAY_12_00, true)?);
+ assert!(!duration.time_match(THURSDAY_12_00 + DAY, true)?);
+ assert!(!duration.time_match(THURSDAY_12_00 + 2*DAY, true)?);
+
+ Ok(())
+ }
}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (8 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
` (5 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
proxmox-http/src/client/rate_limiter.rs | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
index 4742387..677dfb1 100644
--- a/proxmox-http/src/client/rate_limiter.rs
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -36,7 +36,7 @@ impl RateLimiter {
/// Returns the average rate (since `start_time`)
pub fn average_rate(&self, current_time: Instant) -> f64 {
- let time_diff = (current_time - self.start_time).as_secs_f64();
+ let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64();
if time_diff <= 0.0 {
0.0
} else {
@@ -45,12 +45,15 @@ impl RateLimiter {
}
fn refill_bucket(&mut self, current_time: Instant) {
- let time_diff = (current_time - self.last_update).as_nanos();
+ let time_diff = match current_time.checked_duration_since(self.last_update) {
+ Some(duration) => duration.as_nanos(),
+ None => {
+ //log::error!("update_time: got negative time diff");
+ return;
+ }
+ };
- if time_diff <= 0 {
- //log::error!("update_time: got negative time diff");
- return;
- }
+ if time_diff == 0 { return; }
self.last_update = current_time;
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (9 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
` (4 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
pbs-api-types/src/lib.rs | 7 +
pbs-api-types/src/traffic_control.rs | 81 +++++
pbs-config/src/lib.rs | 1 +
pbs-config/src/traffic_control.rs | 91 ++++++
src/api2/config/mod.rs | 2 +
src/api2/config/traffic_control.rs | 283 ++++++++++++++++++
src/bin/proxmox-backup-manager.rs | 1 +
src/bin/proxmox_backup_manager/mod.rs | 2 +
.../proxmox_backup_manager/traffic_control.rs | 105 +++++++
9 files changed, 573 insertions(+)
create mode 100644 pbs-api-types/src/traffic_control.rs
create mode 100644 pbs-config/src/traffic_control.rs
create mode 100644 src/api2/config/traffic_control.rs
create mode 100644 src/bin/proxmox_backup_manager/traffic_control.rs
diff --git a/pbs-api-types/src/lib.rs b/pbs-api-types/src/lib.rs
index 96ac657b..a61de960 100644
--- a/pbs-api-types/src/lib.rs
+++ b/pbs-api-types/src/lib.rs
@@ -7,6 +7,7 @@ use proxmox_schema::{
api, const_regex, ApiStringFormat, ApiType, ArraySchema, Schema, StringSchema, ReturnType,
};
use proxmox::{IPRE, IPRE_BRACKET, IPV4OCTET, IPV4RE, IPV6H16, IPV6LS32, IPV6RE};
+use proxmox_systemd::daily_duration::parse_daily_duration;
#[rustfmt::skip]
#[macro_export]
@@ -73,6 +74,9 @@ pub use remote::*;
mod tape;
pub use tape::*;
+mod traffic_control;
+pub use traffic_control::*;
+
mod zfs;
pub use zfs::*;
@@ -152,6 +156,9 @@ pub const HOSTNAME_FORMAT: ApiStringFormat = ApiStringFormat::Pattern(&HOSTNAME_
pub const DNS_ALIAS_FORMAT: ApiStringFormat =
ApiStringFormat::Pattern(&DNS_ALIAS_REGEX);
+pub const DAILY_DURATION_FORMAT: ApiStringFormat =
+ ApiStringFormat::VerifyFn(|s| parse_daily_duration(s).map(drop));
+
pub const SEARCH_DOMAIN_SCHEMA: Schema =
StringSchema::new("Search domain for host-name lookup.").schema();
diff --git a/pbs-api-types/src/traffic_control.rs b/pbs-api-types/src/traffic_control.rs
new file mode 100644
index 00000000..c9fe4765
--- /dev/null
+++ b/pbs-api-types/src/traffic_control.rs
@@ -0,0 +1,81 @@
+use serde::{Deserialize, Serialize};
+
+use proxmox_schema::{api, Schema, StringSchema};
+
+use crate::{
+ CIDR_SCHEMA, DAILY_DURATION_FORMAT,
+ PROXMOX_SAFE_ID_FORMAT, SINGLE_LINE_COMMENT_SCHEMA,
+};
+
+pub const TRAFFIC_CONTROL_TIMEFRAME_SCHEMA: Schema = StringSchema::new(
+ "Timeframe to specify when the rule is actice.")
+ .format(&DAILY_DURATION_FORMAT)
+ .schema();
+
+pub const TRAFFIC_CONTROL_ID_SCHEMA: Schema = StringSchema::new("Rule ID.")
+ .format(&PROXMOX_SAFE_ID_FORMAT)
+ .min_length(3)
+ .max_length(32)
+ .schema();
+
+#[api(
+ properties: {
+ comment: {
+ optional: true,
+ schema: SINGLE_LINE_COMMENT_SCHEMA,
+ },
+ network: {
+ type: Array,
+ items: {
+ schema: CIDR_SCHEMA,
+ },
+ },
+ timeframe: {
+ type: Array,
+ items: {
+ schema: TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+ },
+ optional: true,
+ },
+ },
+)]
+#[derive(Serialize,Deserialize,Default)]
+#[serde(rename_all="kebab-case")]
+/// Network Rate Limit Configuration
+pub struct RateLimitConfig {
+ #[serde(skip_serializing_if="Option::is_none")]
+ pub comment: Option<String>,
+ /// Rule applies to Source IPs within this networks
+ pub network: Vec<String>,
+ /// Maximal rate in bytes/second
+ pub rate: u64,
+ /// Bucket size for TBF in bytes
+ #[serde(skip_serializing_if="Option::is_none")]
+ pub burst: Option<u64>,
+ // fixme: expose this?
+ // /// Bandwidth is shared accross all connections
+ // #[serde(skip_serializing_if="Option::is_none")]
+ // pub shared: Option<bool>,
+ /// Enable the rule at specific times
+ #[serde(skip_serializing_if="Option::is_none")]
+ pub timeframe: Option<Vec<String>>,
+}
+
+#[api(
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ config: {
+ type: RateLimitConfig,
+ },
+ },
+)]
+#[derive(Serialize,Deserialize)]
+#[serde(rename_all = "kebab-case")]
+/// Traffic control rule
+pub struct TrafficControlRule {
+ pub name: String,
+ #[serde(flatten)]
+ pub config: RateLimitConfig,
+}
diff --git a/pbs-config/src/lib.rs b/pbs-config/src/lib.rs
index 8ce84fec..930b5f7b 100644
--- a/pbs-config/src/lib.rs
+++ b/pbs-config/src/lib.rs
@@ -12,6 +12,7 @@ pub mod sync;
pub mod tape_encryption_keys;
pub mod tape_job;
pub mod token_shadow;
+pub mod traffic_control;
pub mod user;
pub mod verify;
diff --git a/pbs-config/src/traffic_control.rs b/pbs-config/src/traffic_control.rs
new file mode 100644
index 00000000..1c04f589
--- /dev/null
+++ b/pbs-config/src/traffic_control.rs
@@ -0,0 +1,91 @@
+//! Traffic Control Settings (Network rate limits)
+use std::collections::HashMap;
+
+use anyhow::Error;
+use lazy_static::lazy_static;
+
+use proxmox_schema::{ApiType, Schema};
+
+use pbs_api_types::{TrafficControlRule, TRAFFIC_CONTROL_ID_SCHEMA};
+
+use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+
+use crate::{open_backup_lockfile, replace_backup_config, BackupLockGuard};
+
+
+lazy_static! {
+ /// Static [`SectionConfig`] to access parser/writer functions.
+ pub static ref CONFIG: SectionConfig = init();
+}
+
+fn init() -> SectionConfig {
+ let mut config = SectionConfig::new(&TRAFFIC_CONTROL_ID_SCHEMA);
+
+ let obj_schema = match TrafficControlRule::API_SCHEMA {
+ Schema::AllOf(ref allof_schema) => allof_schema,
+ _ => unreachable!(),
+ };
+ let plugin = SectionConfigPlugin::new("rule".to_string(), Some("name".to_string()), obj_schema);
+ config.register_plugin(plugin);
+
+ config
+}
+
+/// Configuration file name
+pub const TRAFFIC_CONTROL_CFG_FILENAME: &str = "/etc/proxmox-backup/traffic-control.cfg";
+/// Lock file name (used to prevent concurrent access)
+pub const TRAFFIC_CONTROL_CFG_LOCKFILE: &str = "/etc/proxmox-backup/.traffic-control.lck";
+
+/// Get exclusive lock
+pub fn lock_config() -> Result<BackupLockGuard, Error> {
+ open_backup_lockfile(TRAFFIC_CONTROL_CFG_LOCKFILE, None, true)
+}
+
+/// Read and parse the configuration file
+pub fn config() -> Result<(SectionConfigData, [u8;32]), Error> {
+
+ let content = proxmox::tools::fs::file_read_optional_string(TRAFFIC_CONTROL_CFG_FILENAME)?
+ .unwrap_or_else(|| "".to_string());
+
+ let digest = openssl::sha::sha256(content.as_bytes());
+ let data = CONFIG.parse(TRAFFIC_CONTROL_CFG_FILENAME, &content)?;
+ Ok((data, digest))
+}
+
+/// Save the configuration file
+pub fn save_config(config: &SectionConfigData) -> Result<(), Error> {
+ let raw = CONFIG.write(TRAFFIC_CONTROL_CFG_FILENAME, &config)?;
+ replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())
+}
+
+
+// shell completion helper
+pub fn complete_traffic_control_name(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
+ match config() {
+ Ok((data, _digest)) => data.sections.iter().map(|(id, _)| id.to_string()).collect(),
+ Err(_) => return vec![],
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use anyhow::{Error, bail};
+
+ #[test]
+ fn test1() -> Result<(), Error> {
+ let content = "rule: rule1
+ comment localnet at working hours
+ network 192.168.2.0/24
+ network 192.168.3.0/24
+ rate 50
+ timeframe mon..wed 8:00-16:30
+ timeframe fri 9:00-12:00
+";
+ let data = CONFIG.parse(TRAFFIC_CONTROL_CFG_FILENAME, &content)?;
+ eprintln!("GOT {:?}", data);
+
+ Ok(())
+ }
+
+}
diff --git a/src/api2/config/mod.rs b/src/api2/config/mod.rs
index 473337f5..c256ba64 100644
--- a/src/api2/config/mod.rs
+++ b/src/api2/config/mod.rs
@@ -14,6 +14,7 @@ pub mod changer;
pub mod media_pool;
pub mod tape_encryption_keys;
pub mod tape_backup_job;
+pub mod traffic_control;
const SUBDIRS: SubdirMap = &[
("access", &access::ROUTER),
@@ -26,6 +27,7 @@ const SUBDIRS: SubdirMap = &[
("sync", &sync::ROUTER),
("tape-backup-job", &tape_backup_job::ROUTER),
("tape-encryption-keys", &tape_encryption_keys::ROUTER),
+ ("traffic-control", &traffic_control::ROUTER),
("verify", &verify::ROUTER),
];
diff --git a/src/api2/config/traffic_control.rs b/src/api2/config/traffic_control.rs
new file mode 100644
index 00000000..5d5cc6d0
--- /dev/null
+++ b/src/api2/config/traffic_control.rs
@@ -0,0 +1,283 @@
+use anyhow::{bail, Error};
+use serde_json::Value;
+use ::serde::{Deserialize, Serialize};
+
+use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
+use proxmox_schema::api;
+
+use pbs_api_types::{
+ TrafficControlRule, RateLimitConfig,
+ CIDR_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, SINGLE_LINE_COMMENT_SCHEMA,
+ TRAFFIC_CONTROL_ID_SCHEMA, TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+ PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+#[api(
+ input: {
+ properties: {},
+ },
+ returns: {
+ description: "The list of configured traffic control rules (with config digest).",
+ type: Array,
+ items: { type: TrafficControlRule },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// List traffic control rules
+pub fn list_traffic_controls(
+ _param: Value,
+ _info: &ApiMethod,
+ mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<TrafficControlRule>, Error> {
+ let (config, digest) = pbs_config::traffic_control::config()?;
+
+ let list: Vec<TrafficControlRule> = config.convert_to_typed_array("rule")?;
+
+ rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into();
+
+ Ok(list)
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ config: {
+ type: RateLimitConfig,
+ flatten: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Create new traffic control rule.
+pub fn create_traffic_control(
+ name: String,
+ config: RateLimitConfig,
+) -> Result<(), Error> {
+
+ let _lock = pbs_config::traffic_control::lock_config()?;
+
+ let (mut section_config, _digest) = pbs_config::traffic_control::config()?;
+
+ if section_config.sections.get(&name).is_some() {
+ bail!("traffic control rule '{}' already exists.", name);
+ }
+
+ let rule = TrafficControlRule { name: name.clone(), config };
+
+ section_config.set_data(&name, "rule", &rule)?;
+
+ pbs_config::traffic_control::save_config(§ion_config)?;
+
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ },
+ },
+ returns: { type: TrafficControlRule },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+ }
+)]
+/// Read traffic control configuration data.
+pub fn read_traffic_control(
+ name: String,
+ _info: &ApiMethod,
+ mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<TrafficControlRule, Error> {
+ let (config, digest) = pbs_config::traffic_control::config()?;
+ let data: TrafficControlRule = config.lookup("rule", &name)?;
+ rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into();
+ Ok(data)
+}
+
+#[api()]
+#[derive(Serialize, Deserialize)]
+#[allow(non_camel_case_types)]
+/// Deletable property name
+pub enum DeletableProperty {
+ /// Delete the burst property.
+ burst,
+ /// Delete the comment property.
+ comment,
+ /// Delete the timeframe property
+ timeframe,
+}
+
+// fixme: use TrafficControlUpdater
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ comment: {
+ schema: SINGLE_LINE_COMMENT_SCHEMA,
+ optional: true,
+ },
+ rate: {
+ type: u64,
+ description: "Rate limit for TBF in bytes/second.",
+ optional: true,
+ minimum: 1,
+ },
+ burst: {
+ type: u64,
+ description: "Size of the TBF bucket, in bytes.",
+ optional: true,
+ minimum: 1,
+ },
+ network: {
+ description: "List of networks.",
+ optional: true,
+ type: Array,
+ items: {
+ schema: CIDR_SCHEMA,
+ },
+ },
+ timeframe: {
+ description: "List of time frames.",
+ optional: true,
+ type: Array,
+ items: {
+ schema: TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+ },
+ },
+ delete: {
+ description: "List of properties to delete.",
+ type: Array,
+ optional: true,
+ items: {
+ type: DeletableProperty,
+ }
+ },
+ digest: {
+ optional: true,
+ schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Update traffic control configuration.
+pub fn update_traffic_control(
+ name: String,
+ rate: Option<u64>,
+ burst: Option<u64>,
+ comment: Option<String>,
+ network: Option<Vec<String>>,
+ timeframe: Option<Vec<String>>,
+ delete: Option<Vec<DeletableProperty>>,
+ digest: Option<String>,
+) -> Result<(), Error> {
+
+ let _lock = pbs_config::traffic_control::lock_config()?;
+
+ let (mut config, expected_digest) = pbs_config::traffic_control::config()?;
+
+ if let Some(ref digest) = digest {
+ let digest = proxmox::tools::hex_to_digest(digest)?;
+ crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+ }
+
+ let mut data: TrafficControlRule = config.lookup("rule", &name)?;
+
+ if let Some(delete) = delete {
+ for delete_prop in delete {
+ match delete_prop {
+ DeletableProperty::burst => { data.config.burst = None; },
+ DeletableProperty::comment => { data.config.comment = None; },
+ DeletableProperty::timeframe => { data.config.timeframe = None; },
+ }
+ }
+ }
+
+ if let Some(comment) = comment {
+ let comment = comment.trim().to_string();
+ if comment.is_empty() {
+ data.config.comment = None;
+ } else {
+ data.config.comment = Some(comment);
+ }
+ }
+
+ if let Some(rate) = rate { data.config.rate = rate; }
+
+ if burst.is_some() { data.config.burst = burst; }
+
+ if let Some(network) = network { data.config.network = network; }
+ if timeframe.is_some() { data.config.timeframe = timeframe; }
+
+ config.set_data(&name, "rule", &data)?;
+
+ pbs_config::traffic_control::save_config(&config)?;
+
+ Ok(())
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ digest: {
+ optional: true,
+ schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Remove a traffic control rule from the configuration file.
+pub fn delete_traffic_control(name: String, digest: Option<String>) -> Result<(), Error> {
+
+ let _lock = pbs_config::traffic_control::lock_config()?;
+
+ let (mut config, expected_digest) = pbs_config::traffic_control::config()?;
+
+ if let Some(ref digest) = digest {
+ let digest = proxmox::tools::hex_to_digest(digest)?;
+ crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+ }
+
+ match config.sections.get(&name) {
+ Some(_) => { config.sections.remove(&name); },
+ None => bail!("traffic control rule '{}' does not exist.", name),
+ }
+
+ pbs_config::traffic_control::save_config(&config)?;
+
+ Ok(())
+}
+
+
+const ITEM_ROUTER: Router = Router::new()
+ .get(&API_METHOD_READ_TRAFFIC_CONTROL)
+ .put(&API_METHOD_UPDATE_TRAFFIC_CONTROL)
+ .delete(&API_METHOD_DELETE_TRAFFIC_CONTROL);
+
+pub const ROUTER: Router = Router::new()
+ .get(&API_METHOD_LIST_TRAFFIC_CONTROLS)
+ .post(&API_METHOD_CREATE_TRAFFIC_CONTROL)
+ .match_all("name", &ITEM_ROUTER);
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index 92e6bb2a..26cb5a1f 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -374,6 +374,7 @@ async fn run() -> Result<(), Error> {
.insert("user", user_commands())
.insert("openid", openid_commands())
.insert("remote", remote_commands())
+ .insert("traffic-control", traffic_control_commands())
.insert("garbage-collection", garbage_collection_commands())
.insert("acme", acme_mgmt_cli())
.insert("cert", cert_mgmt_cli())
diff --git a/src/bin/proxmox_backup_manager/mod.rs b/src/bin/proxmox_backup_manager/mod.rs
index a3a16246..a4d224ce 100644
--- a/src/bin/proxmox_backup_manager/mod.rs
+++ b/src/bin/proxmox_backup_manager/mod.rs
@@ -26,3 +26,5 @@ mod node;
pub use node::*;
mod openid;
pub use openid::*;
+mod traffic_control;
+pub use traffic_control::*;
diff --git a/src/bin/proxmox_backup_manager/traffic_control.rs b/src/bin/proxmox_backup_manager/traffic_control.rs
new file mode 100644
index 00000000..34e4a2a5
--- /dev/null
+++ b/src/bin/proxmox_backup_manager/traffic_control.rs
@@ -0,0 +1,105 @@
+use anyhow::Error;
+use serde_json::Value;
+
+use proxmox_router::{cli::*, ApiHandler, RpcEnvironment};
+use proxmox_schema::api;
+
+use pbs_api_types::TRAFFIC_CONTROL_ID_SCHEMA;
+
+use proxmox_backup::api2;
+
+
+#[api(
+ input: {
+ properties: {
+ "output-format": {
+ schema: OUTPUT_FORMAT,
+ optional: true,
+ },
+ }
+ }
+)]
+/// List configured traffic control rules.
+fn list_traffic_controls(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
+
+ let output_format = get_output_format(¶m);
+
+ let info = &api2::config::traffic_control::API_METHOD_LIST_TRAFFIC_CONTROLS;
+ let mut data = match info.handler {
+ ApiHandler::Sync(handler) => (handler)(param, info, rpcenv)?,
+ _ => unreachable!(),
+ };
+
+ let options = default_table_format_options()
+ .column(ColumnConfig::new("name"))
+ .column(ColumnConfig::new("rate"))
+ .column(ColumnConfig::new("burst"))
+ .column(ColumnConfig::new("network"))
+ .column(ColumnConfig::new("timeframe"))
+ .column(ColumnConfig::new("comment"));
+
+ format_and_print_result_full(&mut data, &info.returns, &output_format, &options);
+
+ Ok(Value::Null)
+}
+
+#[api(
+ input: {
+ properties: {
+ name: {
+ schema: TRAFFIC_CONTROL_ID_SCHEMA,
+ },
+ "output-format": {
+ schema: OUTPUT_FORMAT,
+ optional: true,
+ },
+ }
+ }
+)]
+/// Show traffic control configuration
+fn show_traffic_control(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
+
+ let output_format = get_output_format(¶m);
+
+ let info = &api2::config::traffic_control::API_METHOD_READ_TRAFFIC_CONTROL;
+ let mut data = match info.handler {
+ ApiHandler::Sync(handler) => (handler)(param, info, rpcenv)?,
+ _ => unreachable!(),
+ };
+
+ let options = default_table_format_options();
+ format_and_print_result_full(&mut data, &info.returns, &output_format, &options);
+
+ Ok(Value::Null)
+}
+
+pub fn traffic_control_commands() -> CommandLineInterface {
+
+ let cmd_def = CliCommandMap::new()
+ .insert("list", CliCommand::new(&API_METHOD_LIST_TRAFFIC_CONTROLS))
+ .insert(
+ "show",
+ CliCommand::new(&API_METHOD_SHOW_TRAFFIC_CONTROL)
+ .arg_param(&["name"])
+ .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+ )
+ .insert(
+ "create",
+ CliCommand::new(&api2::config::traffic_control::API_METHOD_CREATE_TRAFFIC_CONTROL)
+ .arg_param(&["name"])
+ )
+ .insert(
+ "update",
+ CliCommand::new(&api2::config::traffic_control::API_METHOD_UPDATE_TRAFFIC_CONTROL)
+ .arg_param(&["name"])
+ .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+ )
+ .insert(
+ "remove",
+ CliCommand::new(&api2::config::traffic_control::API_METHOD_DELETE_TRAFFIC_CONTROL)
+ .arg_param(&["name"])
+ .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+ );
+
+ cmd_def.into()
+}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (10 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
` (3 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
proxmox-http/src/client/rate_limited_stream.rs | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index ea99383..865a426 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -24,6 +24,12 @@ pub struct RateLimitedStream<S> {
stream: S,
}
+impl RateLimitedStream<tokio::net::TcpStream> {
+ pub fn peer_addr(&self) -> std::io::Result<std::net::SocketAddr> {
+ self.stream.peer_addr()
+ }
+}
+
impl <S> RateLimitedStream<S> {
/// Creates a new instance with reads and writes limited to the same `rate`.
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (11 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
` (2 subsequent siblings)
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
---
proxmox-http/src/client/rate_limiter.rs | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
index 677dfb1..e669410 100644
--- a/proxmox-http/src/client/rate_limiter.rs
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -34,6 +34,12 @@ impl RateLimiter {
}
}
+ /// Update rate and bucket size
+ pub fn update_rate(&mut self, rate: u64, bucket_size: u64) {
+ self.rate = rate;
+ self.bucket_size = bucket_size;
+ }
+
/// Returns the average rate (since `start_time`)
pub fn average_rate(&self, current_time: Instant) -> f64 {
let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64();
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (12 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
pbs-config/src/lib.rs | 2 +-
pbs-config/src/memcom.rs | 14 ++++++++++++++
pbs-config/src/traffic_control.rs | 11 +++++++++--
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/pbs-config/src/lib.rs b/pbs-config/src/lib.rs
index 930b5f7b..bc3b19f0 100644
--- a/pbs-config/src/lib.rs
+++ b/pbs-config/src/lib.rs
@@ -16,7 +16,7 @@ pub mod traffic_control;
pub mod user;
pub mod verify;
-pub(crate) mod memcom;
+pub mod memcom;
use anyhow::{format_err, Error};
diff --git a/pbs-config/src/memcom.rs b/pbs-config/src/memcom.rs
index 4ab07ec9..7b82798b 100644
--- a/pbs-config/src/memcom.rs
+++ b/pbs-config/src/memcom.rs
@@ -23,6 +23,8 @@ pub struct Memcom {
struct Head {
// User (user.cfg) cache generation/version.
user_cache_generation: AtomicUsize,
+ // Traffic control (traffic-control.cfg) generation/version.
+ traffic_control_generation: AtomicUsize,
}
static INSTANCE: OnceCell<Arc<Memcom>> = OnceCell::new();
@@ -81,4 +83,16 @@ impl Memcom {
.user_cache_generation
.fetch_add(1, Ordering::AcqRel);
}
+
+ /// Returns the traffic control generation number.
+ pub fn traffic_control_generation(&self) -> usize {
+ self.head().traffic_control_generation.load(Ordering::Acquire)
+ }
+
+ /// Increase the traffic control generation number.
+ pub fn increase_traffic_control_generation(&self) {
+ self.head()
+ .traffic_control_generation
+ .fetch_add(1, Ordering::AcqRel);
+ }
}
diff --git a/pbs-config/src/traffic_control.rs b/pbs-config/src/traffic_control.rs
index 1c04f589..816bc7a2 100644
--- a/pbs-config/src/traffic_control.rs
+++ b/pbs-config/src/traffic_control.rs
@@ -10,9 +10,9 @@ use pbs_api_types::{TrafficControlRule, TRAFFIC_CONTROL_ID_SCHEMA};
use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+use crate::memcom::Memcom;
use crate::{open_backup_lockfile, replace_backup_config, BackupLockGuard};
-
lazy_static! {
/// Static [`SectionConfig`] to access parser/writer functions.
pub static ref CONFIG: SectionConfig = init();
@@ -55,7 +55,14 @@ pub fn config() -> Result<(SectionConfigData, [u8;32]), Error> {
/// Save the configuration file
pub fn save_config(config: &SectionConfigData) -> Result<(), Error> {
let raw = CONFIG.write(TRAFFIC_CONTROL_CFG_FILENAME, &config)?;
- replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())
+ replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())?;
+
+ // increase traffic control generation
+ // We use this in TrafficControlCache
+ let memcom = Memcom::new()?;
+ memcom.increase_traffic_control_generation();
+
+ Ok(())
}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (13 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
Cargo.toml | 1 +
src/cached_traffic_control.rs | 240 ++++++++++++++++++++++++++++++++++
src/lib.rs | 3 +
3 files changed, 244 insertions(+)
create mode 100644 src/cached_traffic_control.rs
diff --git a/Cargo.toml b/Cargo.toml
index 0f163d65..ac0983b9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -48,6 +48,7 @@ apt-pkg-native = "0.3.2"
base64 = "0.12"
bitflags = "1.2.1"
bytes = "1.0"
+cidr = "0.2.1"
crc32fast = "1"
endian_trait = { version = "0.6", features = ["arrays"] }
env_logger = "0.7"
diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs
new file mode 100644
index 00000000..5a7f46da
--- /dev/null
+++ b/src/cached_traffic_control.rs
@@ -0,0 +1,240 @@
+//! Cached traffic control configuration
+use std::sync::{Arc, Mutex};
+use std::collections::HashMap;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+
+use anyhow::Error;
+use cidr::IpInet;
+
+use proxmox_http::client::RateLimiter;
+use proxmox_time::epoch_i64;
+
+use proxmox_systemd::daily_duration::parse_daily_duration;
+
+use pbs_api_types::TrafficControlRule;
+
+use pbs_config::memcom::Memcom;
+
+pub struct TrafficControlCache {
+ last_update: i64,
+ last_traffic_control_generation: usize,
+ rules: Vec<(TrafficControlRule, Vec<IpInet>)>,
+ limiter_map: HashMap<String, (Arc<Mutex<RateLimiter>>, Arc<Mutex<RateLimiter>>)>,
+}
+
+fn timeframe_match(
+ duration_list: &[String],
+ now: i64,
+) -> Result<bool, Error> {
+
+ for duration_str in duration_list.iter() {
+ let duration = parse_daily_duration(duration_str)?;
+ if duration.time_match(now, false)? {
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+}
+
+fn network_match_len(
+ networks: &[IpInet],
+ ip: &IpAddr,
+) -> Option<u8> {
+
+ let mut match_len = None;
+
+ for cidr in networks.iter() {
+ if cidr.contains(ip) {
+ let network_length = cidr.network_length();
+ match match_len {
+ Some(len) => {
+ if network_length > len {
+ match_len = Some(network_length);
+ }
+ }
+ None => match_len = Some(network_length),
+ }
+ }
+ }
+ match_len
+}
+
+fn cannonical_ip(ip: IpAddr) -> IpAddr {
+ // TODO: use std::net::IpAddr::to_cananical once stable
+ match ip {
+ IpAddr::V4(addr) => IpAddr::V4(addr),
+ IpAddr::V6(addr) => {
+ match addr.octets() {
+ [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
+ IpAddr::V4(Ipv4Addr::new(a, b, c, d))
+ }
+ _ => IpAddr::V6(addr),
+ }
+ }
+ }
+}
+
+impl TrafficControlCache {
+
+ pub fn new() -> Self {
+ Self {
+ rules: Vec::new(),
+ limiter_map: HashMap::new(),
+ last_traffic_control_generation: 0,
+ last_update: 0,
+ }
+ }
+
+ pub fn reload(&mut self) {
+ let now = epoch_i64();
+
+ let memcom = match Memcom::new() {
+ Ok(memcom) => memcom,
+ Err(err) => {
+ log::error!("TrafficControlCache::reload failed in Memcom::new: {}", err);
+ return;
+ }
+ };
+
+ let traffic_control_generation = memcom.traffic_control_generation();
+
+ if (self.last_update != 0) &&
+ (traffic_control_generation == self.last_traffic_control_generation) &&
+ ((now - self.last_update) < 60) { return; }
+
+ log::debug!("reload traffic control rules");
+
+ self.last_traffic_control_generation = traffic_control_generation;
+ self.last_update = now;
+
+ match self.reload_impl() {
+ Ok(()) => (),
+ Err(err) => {
+ log::error!("TrafficControlCache::reload failed -> {}", err);
+ }
+ }
+ }
+
+ fn reload_impl(&mut self) -> Result<(), Error> {
+ let (config, _) = pbs_config::traffic_control::config()?;
+
+ self.limiter_map.retain(|key, _value| config.sections.contains_key(key));
+
+ let rules: Vec<TrafficControlRule> =
+ config.convert_to_typed_array("rule")?;
+
+ let now = proxmox_time::epoch_i64();
+
+ let mut active_rules = Vec::new();
+
+ for rule in rules {
+ if let Some(ref timeframe) = rule.config.timeframe {
+ if timeframe_match(timeframe, now)? {
+ self.limiter_map.remove(&rule.name);
+ continue;
+ }
+ }
+
+ let rate = rule.config.rate;
+ let burst = rule.config.burst.unwrap_or(rate);
+
+ if let Some(limiter) = self.limiter_map.get(&rule.name) {
+ limiter.0.lock().unwrap().update_rate(rate, burst);
+ limiter.1.lock().unwrap().update_rate(rate, burst);
+ } else {
+
+ let read_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+ let write_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+
+ self.limiter_map.insert(
+ rule.name.clone(),
+ (read_limiter, write_limiter),
+ );
+ }
+
+ let mut networks = Vec::new();
+
+ for network in rule.config.network.iter() {
+ let cidr = match network.parse() {
+ Ok(cidr) => cidr,
+ Err(err) => {
+ log::error!("unable to parse network '{}' - {}", network, err);
+ continue;
+ }
+ };
+ networks.push(cidr);
+ }
+
+ active_rules.push((rule, networks));
+ }
+
+ self.rules = active_rules;
+
+ Ok(())
+ }
+
+ pub fn lookup_rate_limiter(
+ &self,
+ peer: Option<SocketAddr>,
+ ) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+
+ let peer = match peer {
+ None => return (None, None),
+ Some(peer) => peer,
+ };
+
+ let peer_ip = cannonical_ip(peer.ip());
+
+ log::debug!("lookup_rate_limiter {} {:?}", peer_ip.is_ipv4(), peer_ip);
+
+ let mut last_rule_match = None;
+
+ for (rule, networks) in self.rules.iter() {
+ if let Some(match_len) = network_match_len(networks, &peer_ip) {
+ match last_rule_match {
+ None => last_rule_match = Some((rule, match_len)),
+ Some((_, last_len)) => {
+ if match_len > last_len {
+ last_rule_match = Some((rule, match_len));
+ }
+ }
+ }
+ }
+ }
+
+ match last_rule_match {
+ Some((rule, _)) => {
+ match self.limiter_map.get(&rule.name) {
+ Some((read_limiter, write_limiter)) => {
+ (Some(Arc::clone(read_limiter)), Some(Arc::clone(write_limiter)))
+ }
+ None => (None, None), // should never happen
+ }
+ }
+ None => (None, None),
+ }
+ }
+}
+
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn testnetwork_match() -> Result<(), Error> {
+
+ let networks = ["192.168.2.1/24", "127.0.0.0/8"];
+ let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect();
+
+ assert_eq!(network_match_len(&networks, &"192.168.2.1".parse()?), Some(24));
+ assert_eq!(network_match_len(&networks, &"192.168.2.254".parse()?), Some(24));
+ assert_eq!(network_match_len(&networks, &"192.168.3.1".parse()?), None);
+ assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(8));
+ assert_eq!(network_match_len(&networks, &"128.1.1.0".parse()?), None);
+
+ Ok(())
+
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5f6d5e7e..8f5ed245 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -33,6 +33,9 @@ pub mod client_helpers;
pub mod rrd_cache;
+mod cached_traffic_control;
+pub use cached_traffic_control::TrafficControlCache;
+
/// Get the server's certificate info (from `proxy.pem`).
pub fn cert_info() -> Result<CertInfo, anyhow::Error> {
CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem")))
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
` (14 preceding siblings ...)
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
@ 2021-11-09 6:52 ` Dietmar Maurer
15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09 6:52 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 25 ++++++++++++++++++++++++-
1 file changed, 24 insertions(+), 1 deletion(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 1589a57d..01308463 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -21,6 +21,7 @@ use proxmox::sys::linux::socket::set_tcp_keepalive;
use proxmox::tools::fs::CreateOptions;
use proxmox_lang::try_block;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
+use proxmox_http::client::{RateLimiter, RateLimitedStream};
use pbs_tools::{task_log, task_warn};
use pbs_datastore::DataStore;
@@ -70,6 +71,7 @@ use proxmox_backup::api2::pull::do_sync_job;
use proxmox_backup::api2::tape::backup::do_tape_backup_job;
use proxmox_backup::server::do_verification_job;
use proxmox_backup::server::do_prune_job;
+use proxmox_backup::TrafficControlCache;
fn main() -> Result<(), Error> {
proxmox_backup::tools::setup_safe_path_env();
@@ -351,7 +353,7 @@ fn make_tls_acceptor() -> Result<SslAcceptor, Error> {
}
type ClientStreamResult =
- Result<std::pin::Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>, Error>;
+ Result<std::pin::Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>, Error>;
const MAX_PENDING_ACCEPTS: usize = 1024;
fn accept_connections(
@@ -387,6 +389,9 @@ async fn accept_connection(
sock.set_nodelay(true).unwrap();
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+ let peer = sock.peer_addr().ok();
+ let sock = RateLimitedStream::with_limiter_update_cb(sock, move || lookup_rate_limiter(peer));
+
let ssl = { // limit acceptor_guard scope
// Acceptor can be reloaded using the command socket "reload-certificate" command
let acceptor_guard = acceptor.lock().unwrap();
@@ -1075,3 +1080,21 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
}
}
}
+
+// Rate Limiter lookup
+
+// Test WITH
+// proxmox-backup-client restore vm/201/2021-10-22T09:55:56Z drive-scsi0.img img1.img --repository localhost:store2
+
+lazy_static::lazy_static!{
+ static ref TRAFFIC_CONTROL_CACHE: Arc<Mutex<TrafficControlCache>> =
+ Arc::new(Mutex::new(TrafficControlCache::new()));
+}
+
+fn lookup_rate_limiter(
+ peer: Option<std::net::SocketAddr>,
+) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+ let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap();
+ cache.reload();
+ cache.lookup_rate_limiter(peer)
+}
--
2.30.2
^ permalink raw reply [flat|nested] 17+ messages in thread
end of thread, other threads:[~2021-11-09 6:53 UTC | newest]
Thread overview: 17+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.