public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation
@ 2021-11-09  6:52 Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
                   ` (15 more replies)
  0 siblings, 16 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

This implements a simple Token Bucket based rate limiter. That filter
can be used to:

- limit http client connection
- limit speed at server side (proxmox-backup-proxy)

There is also a new configuration file to configure the server side
rate limits: /etc/proxmox-backup/traffic-control.cfg

The server side filter dynamically updates when the user changes
configuration (even for existing connections).


Dietmar Maurer (proxmox/7):
  Implement a rate limiting stream (AsyncRead, AsyncWrite)
  RateLimitedStream: implement poll_write_vectored
  HttpsConnector: use RateLimitedStream
  RateLimitedStream: allow periodic limiter updates
  RateLimiter: avoid panic in time computations
  RateLimitedStream: implement peer_addr
  RateLimiter: add update_rate method

 proxmox-http/src/client/connector.rs          |  51 +++-
 proxmox-http/src/client/mod.rs                |   6 +
 .../src/client/rate_limited_stream.rs         | 233 ++++++++++++++++++
 proxmox-http/src/client/rate_limiter.rs       |  84 +++++++
 4 files changed, 366 insertions(+), 8 deletions(-)
 create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
 create mode 100644 proxmox-http/src/client/rate_limiter.rs

Dietmar Maurer (proxmox-backup/9):
  pbs-client: add option to use the new RateLimiter
  proxmox-backup-client: add rate/burst parameter to backup CLI
  implement Servive for RateLimitedStream
  New DailyDuration type with nom parser
  DailyDuration: implement time_match()
  Add traffic control configuration config with API
  traffic_control: use Memcom to track. config versions
  implement a traffic control cache for fast rate control limiter
    lockups
  proxmox-backup-proxy: implement traffic control

 Cargo.toml                                    |   1 +
 pbs-api-types/src/lib.rs                      |   7 +
 pbs-api-types/src/traffic_control.rs          |  81 +++++
 pbs-client/src/http_client.rs                 |  24 +-
 pbs-client/src/tools/mod.rs                   |  23 +-
 pbs-config/src/lib.rs                         |   3 +-
 pbs-config/src/memcom.rs                      |  14 +
 pbs-config/src/traffic_control.rs             |  98 ++++++
 proxmox-backup-client/src/main.rs             |  19 +-
 proxmox-rest-server/Cargo.toml                |   1 +
 proxmox-rest-server/src/rest.rs               |  28 ++
 proxmox-systemd/src/daily_duration.rs         | 152 ++++++++++
 proxmox-systemd/src/lib.rs                    |   1 +
 proxmox-systemd/src/parse_time.rs             |  56 ++++
 proxmox-systemd/src/time.rs                   |   2 +-
 src/api2/config/mod.rs                        |   2 +
 src/api2/config/traffic_control.rs            | 283 ++++++++++++++++++
 src/bin/proxmox-backup-manager.rs             |   1 +
 src/bin/proxmox-backup-proxy.rs               |  25 +-
 src/bin/proxmox_backup_manager/mod.rs         |   2 +
 .../proxmox_backup_manager/traffic_control.rs | 105 +++++++
 src/cached_traffic_control.rs                 | 240 +++++++++++++++
 src/lib.rs                                    |   3 +
 23 files changed, 1161 insertions(+), 10 deletions(-)
 create mode 100644 pbs-api-types/src/traffic_control.rs
 create mode 100644 pbs-config/src/traffic_control.rs
 create mode 100644 proxmox-systemd/src/daily_duration.rs
 create mode 100644 src/api2/config/traffic_control.rs
 create mode 100644 src/bin/proxmox_backup_manager/traffic_control.rs
 create mode 100644 src/cached_traffic_control.rs

-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite)
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
                   ` (14 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-http/src/client/mod.rs                |   6 +
 .../src/client/rate_limited_stream.rs         | 144 ++++++++++++++++++
 proxmox-http/src/client/rate_limiter.rs       |  75 +++++++++
 3 files changed, 225 insertions(+)
 create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
 create mode 100644 proxmox-http/src/client/rate_limiter.rs

diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs
index b6ee4b0..30e66d5 100644
--- a/proxmox-http/src/client/mod.rs
+++ b/proxmox-http/src/client/mod.rs
@@ -2,6 +2,12 @@
 //!
 //! Contains a lightweight wrapper around `hyper` with support for TLS connections.
 
+mod rate_limiter;
+pub use rate_limiter::RateLimiter;
+
+mod rate_limited_stream;
+pub use rate_limited_stream::RateLimitedStream;
+
 mod connector;
 pub use connector::HttpsConnector;
 
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
new file mode 100644
index 0000000..a11b59e
--- /dev/null
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -0,0 +1,144 @@
+use std::pin::Pin;
+use std::marker::Unpin;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::Future;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
+use tokio::time::Sleep;
+
+use std::task::{Context, Poll};
+
+use super::RateLimiter;
+
+/// A rate limited stream using [RateLimiter]
+pub struct RateLimitedStream<S> {
+    read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    read_delay: Option<Pin<Box<Sleep>>>,
+    write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    write_delay: Option<Pin<Box<Sleep>>>,
+    stream: S,
+}
+
+impl <S> RateLimitedStream<S> {
+
+    const MIN_DELAY: Duration = Duration::from_millis(20);
+
+    /// Creates a new instance with reads and writes limited to the same `rate`.
+    pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
+        let now = Instant::now();
+        let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+        let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+        Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
+    }
+
+    /// Creates a new instance with specified [RateLimiters] for reads and writes.
+    pub fn with_limiter(
+        stream: S,
+        read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+        write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    ) -> Self {
+        Self {
+            read_limiter,
+            read_delay: None,
+            write_limiter,
+            write_delay: None,
+            stream,
+        }
+    }
+}
+
+impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
+
+    fn poll_write(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        buf: &[u8]
+    ) -> Poll<Result<usize, std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = match this.write_delay {
+            Some(ref mut future) => {
+                future.as_mut().poll(ctx).is_ready()
+            }
+            None => true,
+        };
+
+        if !is_ready { return Poll::Pending; }
+
+        this.write_delay = None;
+
+        let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
+
+        if let Some(ref write_limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = &result {
+                let now = Instant::now();
+                let delay = write_limiter.lock().unwrap()
+                    .register_traffic(now, *count as u64);
+                if delay >= Self::MIN_DELAY {
+                    let sleep = tokio::time::sleep(delay);
+                    this.write_delay = Some(Box::pin(sleep));
+                }
+            }
+        }
+
+        result
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.stream).poll_flush(ctx)
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.stream).poll_shutdown(ctx)
+    }
+}
+
+impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
+
+    fn poll_read(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<Result<(), std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = match this.read_delay {
+            Some(ref mut future) => {
+                future.as_mut().poll(ctx).is_ready()
+            }
+            None => true,
+        };
+
+        if !is_ready { return Poll::Pending; }
+
+        this.read_delay = None;
+
+        let filled_len = buf.filled().len();
+        let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
+
+        if let Some(ref read_limiter) = this.read_limiter {
+            if let Poll::Ready(Ok(())) = &result {
+                let count = buf.filled().len() - filled_len;
+                let now = Instant::now();
+                let delay = read_limiter.lock().unwrap()
+                    .register_traffic(now, count as u64);
+                if delay >= Self::MIN_DELAY {
+                    let sleep = tokio::time::sleep(delay);
+                    this.read_delay = Some(Box::pin(sleep));
+                }
+            }
+        }
+
+        result
+    }
+
+}
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
new file mode 100644
index 0000000..4742387
--- /dev/null
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -0,0 +1,75 @@
+use std::time::{Duration, Instant};
+use std::convert::TryInto;
+
+/// Token bucket based rate limiter
+pub struct RateLimiter {
+    rate: u64, // tokens/second
+    start_time: Instant,
+    traffic: u64, // overall traffic
+    bucket_size: u64,
+    last_update: Instant,
+    consumed_tokens: u64,
+}
+
+impl RateLimiter {
+
+    const NO_DELAY: Duration = Duration::from_millis(0);
+
+    /// Creates a new instance, using [Instant::now] as start time.
+    pub fn new(rate: u64, bucket_size: u64) -> Self {
+        let start_time = Instant::now();
+        Self::with_start_time(rate, bucket_size, start_time)
+    }
+
+    /// Creates a new instance with specified `rate`, `bucket_size` and `start_time`.
+    pub fn with_start_time(rate: u64, bucket_size: u64, start_time: Instant) -> Self {
+        Self {
+            rate,
+            start_time,
+            traffic: 0,
+            bucket_size,
+            last_update: start_time,
+            // start with empty bucket (all tokens consumed)
+            consumed_tokens: bucket_size,
+        }
+    }
+
+    /// Returns the average rate (since `start_time`)
+    pub fn average_rate(&self, current_time: Instant) -> f64 {
+        let time_diff = (current_time - self.start_time).as_secs_f64();
+        if time_diff <= 0.0 {
+            0.0
+        } else {
+            (self.traffic as f64) / time_diff
+        }
+    }
+
+    fn refill_bucket(&mut self, current_time: Instant) {
+        let time_diff = (current_time - self.last_update).as_nanos();
+
+        if time_diff <= 0 {
+            //log::error!("update_time: got negative time diff");
+            return;
+        }
+
+        self.last_update = current_time;
+
+        let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
+            .try_into().unwrap_or(u64::MAX);
+
+        self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
+    }
+
+    /// Register traffic, returning a proposed delay to reach the expected rate.
+    pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
+        self.refill_bucket(current_time);
+
+        self.traffic += data_len;
+        self.consumed_tokens += data_len;
+
+        if self.consumed_tokens <= self.bucket_size {
+            return Self::NO_DELAY;
+        }
+        Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
+    }
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
                   ` (13 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 pbs-client/src/http_client.rs | 24 ++++++++++++++++++++++--
 pbs-client/src/tools/mod.rs   | 23 ++++++++++++++++++++---
 2 files changed, 42 insertions(+), 5 deletions(-)

diff --git a/pbs-client/src/http_client.rs b/pbs-client/src/http_client.rs
index 73c83f7a..defaef8a 100644
--- a/pbs-client/src/http_client.rs
+++ b/pbs-client/src/http_client.rs
@@ -20,7 +20,7 @@ use proxmox::{
 };
 use proxmox_router::HttpError;
 
-use proxmox_http::client::HttpsConnector;
+use proxmox_http::client::{HttpsConnector, RateLimiter};
 use proxmox_http::uri::build_authority;
 
 use pbs_api_types::{Authid, Userid};
@@ -51,6 +51,8 @@ pub struct HttpClientOptions {
     ticket_cache: bool,
     fingerprint_cache: bool,
     verify_cert: bool,
+    rate_limit: Option<u64>,
+    bucket_size: Option<u64>,
 }
 
 impl HttpClientOptions {
@@ -109,6 +111,16 @@ impl HttpClientOptions {
         self.verify_cert = verify_cert;
         self
     }
+
+    pub fn rate_limit(mut self, rate_limit:  Option<u64>) -> Self {
+        self.rate_limit = rate_limit;
+        self
+    }
+
+    pub fn bucket_size(mut self, bucket_size:  Option<u64>) -> Self {
+        self.bucket_size = bucket_size;
+        self
+    }
 }
 
 impl Default for HttpClientOptions {
@@ -121,6 +133,8 @@ impl Default for HttpClientOptions {
             ticket_cache: false,
             fingerprint_cache: false,
             verify_cert: true,
+            rate_limit: None,
+            bucket_size: None,
         }
     }
 }
@@ -343,7 +357,13 @@ impl HttpClient {
         httpc.enforce_http(false); // we want https...
 
         httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
-        let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+        let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
+        if let Some(rate_limit) = options.rate_limit {
+            let bucket_size = options.bucket_size.unwrap_or_else(|| rate_limit*3);
+            https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
+            https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
+        }
 
         let client = Client::builder()
         //.http2_initial_stream_window_size( (1 << 31) - 2)
diff --git a/pbs-client/src/tools/mod.rs b/pbs-client/src/tools/mod.rs
index a12635cf..539ad662 100644
--- a/pbs-client/src/tools/mod.rs
+++ b/pbs-client/src/tools/mod.rs
@@ -135,15 +135,32 @@ pub fn extract_repository_from_map(param: &HashMap<String, String>) -> Option<Ba
 }
 
 pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> {
-    connect_do(repo.host(), repo.port(), repo.auth_id())
+    connect_do(repo.host(), repo.port(), repo.auth_id(), None, None)
         .map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
 }
 
-fn connect_do(server: &str, port: u16, auth_id: &Authid) -> Result<HttpClient, Error> {
+pub fn connect_rate_limited(
+    repo: &BackupRepository,
+    rate: Option<u64>,
+    bucket_size: Option<u64>,
+) -> Result<HttpClient, Error> {
+    connect_do(repo.host(), repo.port(), repo.auth_id(), rate, bucket_size)
+        .map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
+}
+
+fn connect_do(
+    server: &str,
+    port: u16,
+    auth_id: &Authid,
+    rate_limit: Option<u64>,
+    bucket_size: Option<u64>,
+) -> Result<HttpClient, Error> {
     let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
 
     let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
-    let options = HttpClientOptions::new_interactive(password, fingerprint);
+    let options = HttpClientOptions::new_interactive(password, fingerprint)
+        .rate_limit(rate_limit)
+        .bucket_size(bucket_size);
 
     HttpClient::new(server, port, auth_id, options)
 }
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
                   ` (12 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 .../src/client/rate_limited_stream.rs         | 92 +++++++++++++------
 1 file changed, 62 insertions(+), 30 deletions(-)

diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index a11b59e..0cc0ebb 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::pin::Pin;
 use std::marker::Unpin;
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, Instant};
+use std::io::IoSlice;
 
 use futures::Future;
 use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
@@ -22,8 +23,6 @@ pub struct RateLimitedStream<S> {
 
 impl <S> RateLimitedStream<S> {
 
-    const MIN_DELAY: Duration = Duration::from_millis(20);
-
     /// Creates a new instance with reads and writes limited to the same `rate`.
     pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
         let now = Instant::now();
@@ -48,6 +47,33 @@ impl <S> RateLimitedStream<S> {
     }
 }
 
+fn register_traffic(
+    limiter: &Mutex<RateLimiter>,
+    count: usize,
+) -> Option<Pin<Box<Sleep>>>{
+
+    const MIN_DELAY: Duration = Duration::from_millis(10);
+
+    let now = Instant::now();
+    let delay = limiter.lock().unwrap()
+        .register_traffic(now, count as u64);
+    if delay >= MIN_DELAY {
+        let sleep = tokio::time::sleep(delay);
+        Some(Box::pin(sleep))
+    } else {
+        None
+    }
+}
+
+fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
+    match delay {
+        Some(ref mut future) => {
+            future.as_mut().poll(ctx).is_ready()
+        }
+        None => true,
+    }
+}
+
 impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
 
     fn poll_write(
@@ -57,12 +83,7 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
     ) -> Poll<Result<usize, std::io::Error>> {
         let this = self.get_mut();
 
-        let is_ready = match this.write_delay {
-            Some(ref mut future) => {
-                future.as_mut().poll(ctx).is_ready()
-            }
-            None => true,
-        };
+        let is_ready = delay_is_ready(&mut this.write_delay, ctx);
 
         if !is_ready { return Poll::Pending; }
 
@@ -70,15 +91,37 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
 
         let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
 
-        if let Some(ref write_limiter) = this.write_limiter {
-            if let Poll::Ready(Ok(count)) = &result {
-                let now = Instant::now();
-                let delay = write_limiter.lock().unwrap()
-                    .register_traffic(now, *count as u64);
-                if delay >= Self::MIN_DELAY {
-                    let sleep = tokio::time::sleep(delay);
-                    this.write_delay = Some(Box::pin(sleep));
-                }
+        if let Some(ref limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = result {
+                this.write_delay = register_traffic(limiter, count);
+            }
+        }
+
+        result
+    }
+
+    fn is_write_vectored(&self) -> bool {
+        self.stream.is_write_vectored()
+    }
+
+    fn poll_write_vectored(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        bufs: &[IoSlice<'_>]
+    ) -> Poll<Result<usize, std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = delay_is_ready(&mut this.write_delay, ctx);
+
+        if !is_ready { return Poll::Pending; }
+
+        this.write_delay = None;
+
+        let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
+
+        if let Some(ref limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = result {
+                this.write_delay = register_traffic(limiter, count);
             }
         }
 
@@ -111,12 +154,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
     ) -> Poll<Result<(), std::io::Error>> {
         let this = self.get_mut();
 
-        let is_ready = match this.read_delay {
-            Some(ref mut future) => {
-                future.as_mut().poll(ctx).is_ready()
-            }
-            None => true,
-        };
+        let is_ready = delay_is_ready(&mut this.read_delay, ctx);
 
         if !is_ready { return Poll::Pending; }
 
@@ -128,13 +166,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
         if let Some(ref read_limiter) = this.read_limiter {
             if let Poll::Ready(Ok(())) = &result {
                 let count = buf.filled().len() - filled_len;
-                let now = Instant::now();
-                let delay = read_limiter.lock().unwrap()
-                    .register_traffic(now, count as u64);
-                if delay >= Self::MIN_DELAY {
-                    let sleep = tokio::time::sleep(delay);
-                    this.read_delay = Some(Box::pin(sleep));
-                }
+                this.read_delay = register_traffic(read_limiter, count);
             }
         }
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (2 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
                   ` (11 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-backup-client/src/main.rs | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)

diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index cb083006..d81271d0 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -45,7 +45,7 @@ use pbs_client::tools::{
     complete_archive_name, complete_auth_id, complete_backup_group, complete_backup_snapshot,
     complete_backup_source, complete_chunk_size, complete_group_or_snapshot,
     complete_img_archive_name, complete_pxar_archive_name, complete_repository, connect,
-    extract_repository_from_value,
+    connect_rate_limited, extract_repository_from_value,
     key_source::{
         crypto_parameters, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
         KEYFILE_SCHEMA, MASTER_PUBKEY_FD_SCHEMA, MASTER_PUBKEY_FILE_SCHEMA,
@@ -582,6 +582,18 @@ fn spawn_catalog_upload(
                schema: CHUNK_SIZE_SCHEMA,
                optional: true,
            },
+           rate: {
+               type: u64,
+               description: "Rate limit for TBF in bytes/second.",
+               optional: true,
+               minimum: 1,
+           },
+           burst: {
+               type: u64,
+               description: "Size of the TBF bucket, in bytes.",
+               optional: true,
+               minimum: 1,
+           },
            "exclude": {
                type: Array,
                description: "List of paths or patterns for matching files to exclude.",
@@ -630,6 +642,9 @@ async fn create_backup(
         verify_chunk_size(size)?;
     }
 
+    let rate_limit = param["rate"].as_u64();
+    let bucket_size = param["burst"].as_u64();
+
     let crypto = crypto_parameters(&param)?;
 
     let backup_id = param["backup-id"].as_str().unwrap_or(&proxmox::tools::nodename());
@@ -724,7 +739,7 @@ async fn create_backup(
 
     let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
 
-    let client = connect(&repo)?;
+    let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
     record_repository(&repo);
 
     println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?);
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (3 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
                   ` (10 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

So that we can limit used bandwidth.

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-http/src/client/connector.rs          | 51 ++++++++++++++++---
 .../src/client/rate_limited_stream.rs         |  8 +++
 2 files changed, 51 insertions(+), 8 deletions(-)

diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
 use anyhow::{bail, format_err, Error};
 use std::os::unix::io::AsRawFd;
 use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
 
 use futures::*;
 use http::Uri;
 use hyper::client::HttpConnector;
 use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tokio::net::TcpStream;
 use tokio_openssl::SslStream;
 
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
 use crate::tls::MaybeTlsStream;
 use crate::uri::build_authority;
 
+use super::{RateLimiter, RateLimitedStream};
+
 #[derive(Clone)]
 pub struct HttpsConnector {
     connector: HttpConnector,
     ssl_connector: Arc<SslConnector>,
     proxy: Option<ProxyConfig>,
     tcp_keepalive: u32,
+    read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    write_limiter: Option<Arc<Mutex<RateLimiter>>>,
 }
 
 impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
             ssl_connector: Arc::new(ssl_connector),
             proxy: None,
             tcp_keepalive,
+            read_limiter: None,
+            write_limiter: None,
         }
     }
 
@@ -45,13 +51,21 @@ impl HttpsConnector {
         self.proxy = Some(proxy);
     }
 
-    async fn secure_stream(
-        tcp_stream: TcpStream,
+    pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+        self.read_limiter = limiter;
+    }
+
+    pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+        self.write_limiter = limiter;
+    }
+
+    async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+        tcp_stream: S,
         ssl_connector: &SslConnector,
         host: &str,
-    ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+    ) -> Result<MaybeTlsStream<S>, Error> {
         let config = ssl_connector.configure()?;
-        let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+        let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
         Pin::new(&mut conn).connect().await?;
         Ok(MaybeTlsStream::Secured(conn))
     }
@@ -107,7 +121,7 @@ impl HttpsConnector {
 }
 
 impl hyper::service::Service<Uri> for HttpsConnector {
-    type Response = MaybeTlsStream<TcpStream>;
+    type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
     type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
         };
         let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
         let keepalive = self.tcp_keepalive;
+        let read_limiter = self.read_limiter.clone();
+        let write_limiter = self.write_limiter.clone();
+
 
         if let Some(ref proxy) = self.proxy {
             let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
             if use_connect {
                 async move {
-                    let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
                         format_err!("error connecting to {} - {}", proxy_authority, err)
                     })?;
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                    let mut tcp_stream = RateLimitedStream::with_limiter(
+                        tcp_stream,
+                        read_limiter,
+                        write_limiter,
+                    );
+
                     let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
                     if let Some(authorization) = authorization {
                         connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                    let tcp_stream = RateLimitedStream::with_limiter(
+                        tcp_stream,
+                        read_limiter,
+                        write_limiter,
+                    );
+
                     Ok(MaybeTlsStream::Proxied(tcp_stream))
                 }
                 .boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
                 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                let tcp_stream = RateLimitedStream::with_limiter(
+                    tcp_stream,
+                    read_limiter,
+                    write_limiter,
+                );
+
                 if is_https {
                     Self::secure_stream(tcp_stream, &ssl_connector, &host).await
                 } else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 0cc0ebb..00ba066 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
 use futures::Future;
 use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
 use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
 
 use std::task::{Context, Poll};
 
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
     }
 
 }
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+    fn connected(&self) -> Connected {
+        self.stream.connected()
+    }
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (4 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
                   ` (9 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-rest-server/Cargo.toml  |  1 +
 proxmox-rest-server/src/rest.rs | 28 ++++++++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 1fa76f21..b88e5d12 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -33,6 +33,7 @@ url = "2.1"
 proxmox = "0.15.0"
 proxmox-io = "1"
 proxmox-lang = "1"
+proxmox-http = { version = "0.5.0", features = [ "client" ] }
 proxmox-router = "1.1"
 proxmox-schema = { version = "1", features = [ "api-macro", "upid-api-impl" ] }
 proxmox-time = "1"
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index 74bc8bb1..f27f703d 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -31,6 +31,8 @@ use proxmox_schema::{
     ParameterSchema,
 };
 
+use proxmox_http::client::RateLimitedStream;
+
 use pbs_tools::compression::{DeflateEncoder, Level};
 use pbs_tools::stream::AsyncReaderStream;
 
@@ -73,6 +75,32 @@ impl RestServer {
     }
 }
 
+impl Service<&Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>>
+    for RestServer
+{
+    type Response = ApiService;
+    type Error = Error;
+    type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
+
+    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(
+        &mut self,
+        ctx: &Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>,
+    ) -> Self::Future {
+        match ctx.get_ref().peer_addr() {
+            Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
+            Ok(peer) => future::ok(ApiService {
+                peer,
+                api_config: self.api_config.clone(),
+            })
+            .boxed(),
+        }
+    }
+}
+
 impl Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>>
     for RestServer
 {
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (5 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
                   ` (8 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

We will use this to specify timesframes for network rate limits (only
apply limite when inside the time frame).

Note: This is not systemd related, but we can reuse some of the parser
method.

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 proxmox-systemd/src/daily_duration.rs | 92 +++++++++++++++++++++++++++
 proxmox-systemd/src/lib.rs            |  1 +
 proxmox-systemd/src/parse_time.rs     | 56 ++++++++++++++++
 proxmox-systemd/src/time.rs           |  2 +-
 4 files changed, 150 insertions(+), 1 deletion(-)
 create mode 100644 proxmox-systemd/src/daily_duration.rs

diff --git a/proxmox-systemd/src/daily_duration.rs b/proxmox-systemd/src/daily_duration.rs
new file mode 100644
index 00000000..bed4eb47
--- /dev/null
+++ b/proxmox-systemd/src/daily_duration.rs
@@ -0,0 +1,92 @@
+use std::cmp::{Ordering, PartialOrd};
+
+use super::time::{WeekDays};
+
+pub use super::parse_time::parse_daily_duration;
+
+/// Time of Day (hour with minute)
+#[derive(Default, PartialEq, Clone, Debug)]
+pub struct HmTime {
+    pub hour: u32,
+    pub minute: u32,
+}
+
+impl PartialOrd for HmTime {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        let mut order = self.hour.cmp(&other.hour);
+        if order == Ordering::Equal {
+            order =  self.minute.cmp(&other.minute);
+        }
+        Some(order)
+    }
+}
+
+#[derive(Default, Clone, Debug)]
+pub struct DailyDuration {
+    /// the days in a week this duration should trigger
+    pub days: WeekDays,
+    pub start: HmTime,
+    pub end: HmTime,
+}
+
+#[cfg(test)]
+mod test {
+
+    use anyhow::{bail, Error};
+
+    use super::*;
+
+    fn test_parse(
+        duration_str: &str,
+        start_h: u32, start_m: u32,
+        end_h: u32, end_m: u32,
+        days: &[usize],
+    ) -> Result<(), Error> {
+        let mut day_bits = 0;
+        for day in days { day_bits |= 1<<day; }
+        let expected_days = WeekDays::from_bits(day_bits).unwrap();
+
+        let duration = parse_daily_duration(duration_str)?;
+
+        if duration.start.hour != start_h {
+            bail!("start hour missmatch, extected {}, got {:?}", start_h, duration);
+        }
+        if duration.start.minute != start_m {
+            bail!("start minute missmatch, extected {}, got {:?}", start_m, duration);
+        }
+        if duration.end.hour != end_h {
+            bail!("end hour missmatch, extected {}, got {:?}", end_h, duration);
+        }
+        if duration.end.minute != end_m {
+            bail!("end minute missmatch, extected {}, got {:?}", end_m, duration);
+        }
+
+        if duration.days != expected_days {
+            bail!("weekday missmatch, extected {:?}, got {:?}", expected_days, duration);
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_daily_duration_parser() -> Result<(), Error> {
+
+        assert!(parse_daily_duration("").is_err());
+        assert!(parse_daily_duration(" 8-12").is_err());
+        assert!(parse_daily_duration("8:60-12").is_err());
+        assert!(parse_daily_duration("8-25").is_err());
+        assert!(parse_daily_duration("12-8").is_err());
+
+        test_parse("8-12", 8, 0, 12, 0, &[])?;
+        test_parse("8:0-12:0", 8, 0, 12, 0, &[])?;
+        test_parse("8:00-12:00", 8, 0, 12, 0, &[])?;
+        test_parse("8:05-12:20", 8, 5, 12, 20, &[])?;
+        test_parse("8:05 - 12:20", 8, 5, 12, 20, &[])?;
+
+        test_parse("mon 8-12", 8, 0, 12, 0, &[0])?;
+        test_parse("tue..fri 8-12", 8, 0, 12, 0, &[1,2,3,4])?;
+        test_parse("sat,tue..thu,fri 8-12", 8, 0, 12, 0, &[1,2,3,4,5])?;
+
+        Ok(())
+    }
+}
diff --git a/proxmox-systemd/src/lib.rs b/proxmox-systemd/src/lib.rs
index b4ab4b72..7c2b1f90 100644
--- a/proxmox-systemd/src/lib.rs
+++ b/proxmox-systemd/src/lib.rs
@@ -1,4 +1,5 @@
 pub mod time;
+pub mod daily_duration;
 
 mod parse_time;
 mod unit;
diff --git a/proxmox-systemd/src/parse_time.rs b/proxmox-systemd/src/parse_time.rs
index ba9449b1..d212e264 100644
--- a/proxmox-systemd/src/parse_time.rs
+++ b/proxmox-systemd/src/parse_time.rs
@@ -4,6 +4,7 @@ use anyhow::{bail, Error};
 use lazy_static::lazy_static;
 
 use super::time::*;
+use super::daily_duration::*;
 
 use nom::{
     error::{context, ParseError, VerboseError},
@@ -452,3 +453,58 @@ fn parse_time_span_incomplete(mut i: &str) -> IResult<&str, TimeSpan> {
 
     Ok((i, ts))
 }
+
+pub fn parse_daily_duration(i: &str) -> Result<DailyDuration, Error> {
+    parse_complete_line("daily duration", i, parse_daily_duration_incomplete)
+}
+
+fn parse_daily_duration_incomplete(mut i: &str) -> IResult<&str, DailyDuration> {
+
+    let mut duration = DailyDuration::default();
+
+    if i.starts_with(|c: char| char::is_ascii_alphabetic(&c)) {
+
+        let (n, range_list) =  context(
+            "weekday range list",
+            separated_nonempty_list(tag(","), parse_weekdays_range)
+        )(i)?;
+
+        i = space0(n)?.0;
+
+        for range in range_list  { duration.days.insert(range); }
+    }
+
+    let (i, start) = parse_hm_time(i)?;
+
+    let i = space0(i)?.0;
+
+    let (i, _) = tag("-")(i)?;
+
+    let i = space0(i)?.0;
+
+    let end_time_start = i;
+
+    let (i, end) = parse_hm_time(i)?;
+
+    if start > end {
+        return Err(parse_error(end_time_start, "end time before start time"));
+    }
+
+    duration.start = start;
+    duration.end = end;
+
+    Ok((i, duration))
+}
+
+fn parse_hm_time(i: &str) -> IResult<&str, HmTime> {
+
+    let (i, (hour, opt_minute)) = tuple((
+        parse_time_comp(24),
+        opt(preceded(tag(":"), parse_time_comp(60))),
+    ))(i)?;
+
+    match opt_minute {
+        Some(minute) => Ok((i, HmTime { hour, minute })),
+        None => Ok((i, HmTime { hour, minute: 0})),
+    }
+}
diff --git a/proxmox-systemd/src/time.rs b/proxmox-systemd/src/time.rs
index b81e970e..e5fe7965 100644
--- a/proxmox-systemd/src/time.rs
+++ b/proxmox-systemd/src/time.rs
@@ -5,7 +5,7 @@ use bitflags::bitflags;
 
 use proxmox_time::TmEditor;
 
-pub use super::parse_time::*;
+pub use super::parse_time::{parse_calendar_event, parse_time_span};
 
 bitflags!{
     #[derive(Default)]
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (6 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
                   ` (7 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

---
 .../src/client/rate_limited_stream.rs         | 43 +++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 00ba066..ea99383 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -19,6 +19,8 @@ pub struct RateLimitedStream<S> {
     read_delay: Option<Pin<Box<Sleep>>>,
     write_limiter: Option<Arc<Mutex<RateLimiter>>>,
     write_delay: Option<Pin<Box<Sleep>>>,
+    update_limiter_cb: Option<Box<dyn Fn() -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) + Send>>,
+    last_limiter_update: Instant,
     stream: S,
 }
 
@@ -43,9 +45,44 @@ impl <S> RateLimitedStream<S> {
             read_delay: None,
             write_limiter,
             write_delay: None,
+            update_limiter_cb: None,
+            last_limiter_update: Instant::now(),
             stream,
         }
     }
+
+    /// Creates a new instance with limiter update callback.
+    ///
+    /// The fuction is called every minute to update/change the used limiters.
+    ///
+    /// Note: This function is called within an async context, so it
+    /// should be fast and must not block.
+    pub fn with_limiter_update_cb<F: Fn() -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) + Send + 'static>(
+        stream: S,
+        update_limiter_cb: F,
+    ) -> Self {
+        let (read_limiter, write_limiter) = update_limiter_cb();
+        Self {
+            read_limiter,
+            read_delay: None,
+            write_limiter,
+            write_delay: None,
+            update_limiter_cb: Some(Box::new(update_limiter_cb)),
+            last_limiter_update: Instant::now(),
+            stream,
+        }
+    }
+
+    fn update_limiters(&mut self) {
+        if let Some(ref update_limiter_cb) = self.update_limiter_cb {
+            if self.last_limiter_update.elapsed().as_secs() >= 5 {
+                self.last_limiter_update = Instant::now();
+                let (read_limiter, write_limiter) = update_limiter_cb();
+                self.read_limiter = read_limiter;
+                self.write_limiter = write_limiter;
+            }
+        }
+    }
 }
 
 fn register_traffic(
@@ -90,6 +127,8 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
 
         this.write_delay = None;
 
+        this.update_limiters();
+
         let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
 
         if let Some(ref limiter) = this.write_limiter {
@@ -118,6 +157,8 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
 
         this.write_delay = None;
 
+        this.update_limiters();
+
         let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
 
         if let Some(ref limiter) = this.write_limiter {
@@ -161,6 +202,8 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
 
         this.read_delay = None;
 
+        this.update_limiters();
+
         let filled_len = buf.filled().len();
         let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match()
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (7 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
                   ` (6 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

---
 proxmox-systemd/src/daily_duration.rs | 60 +++++++++++++++++++++++++++
 1 file changed, 60 insertions(+)

diff --git a/proxmox-systemd/src/daily_duration.rs b/proxmox-systemd/src/daily_duration.rs
index bed4eb47..25157b25 100644
--- a/proxmox-systemd/src/daily_duration.rs
+++ b/proxmox-systemd/src/daily_duration.rs
@@ -1,4 +1,9 @@
 use std::cmp::{Ordering, PartialOrd};
+use std::convert::TryInto;
+
+use anyhow::Error;
+
+use proxmox_time::TmEditor;
 
 use super::time::{WeekDays};
 
@@ -29,6 +34,30 @@ pub struct DailyDuration {
     pub end: HmTime,
 }
 
+impl DailyDuration {
+
+    // Test it time is within this frame
+    pub fn time_match(&self, epoch: i64, utc: bool) -> Result<bool, Error> {
+
+        let t = TmEditor::with_epoch(epoch, utc)?;
+
+        let all_days = self.days.is_empty() || self.days.is_all();
+
+        if !all_days { // match day first
+            let day_num: u32 = t.day_num().try_into()?;
+            let day = WeekDays::from_bits(1<<day_num).unwrap();
+            if !self.days.contains(day) { return Ok(false); }
+        }
+
+        let hour = t.hour().try_into()?;
+        let minute = t.min().try_into()?;
+
+        let ctime = HmTime { hour, minute };
+
+        Ok(ctime >= self.start && ctime < self.end)
+    }
+}
+
 #[cfg(test)]
 mod test {
 
@@ -68,6 +97,10 @@ mod test {
         Ok(())
     }
 
+    const fn make_test_time(mday: i32, hour: i32, min: i32) -> i64 {
+        (mday*3600*24 + hour*3600 + min*60) as i64
+    }
+
     #[test]
     fn test_daily_duration_parser() -> Result<(), Error> {
 
@@ -89,4 +122,31 @@ mod test {
 
         Ok(())
     }
+
+    #[test]
+    fn test_time_match() -> Result<(), Error> {
+        const THURSDAY_80_00: i64 = make_test_time(0, 8, 0);
+        const THURSDAY_12_00: i64 = make_test_time(0, 12, 0);
+        const DAY: i64 = 3600*24;
+
+        let duration = parse_daily_duration("thu..fri 8:05-12")?;
+
+        assert!(!duration.time_match(THURSDAY_80_00, true)?);
+        assert!(!duration.time_match(THURSDAY_80_00 + DAY, true)?);
+        assert!(!duration.time_match(THURSDAY_80_00 + 2*DAY, true)?);
+
+        assert!(duration.time_match(THURSDAY_80_00 + 5*60, true)?);
+        assert!(duration.time_match(THURSDAY_80_00 + 5*60 + DAY, true)?);
+        assert!(!duration.time_match(THURSDAY_80_00 + 5*60 + 2*DAY, true)?);
+
+        assert!(duration.time_match(THURSDAY_12_00 - 1, true)?);
+        assert!(duration.time_match(THURSDAY_12_00 - 1 + DAY, true)?);
+        assert!(!duration.time_match(THURSDAY_12_00 - 1 + 2*DAY, true)?);
+
+        assert!(!duration.time_match(THURSDAY_12_00, true)?);
+        assert!(!duration.time_match(THURSDAY_12_00 + DAY, true)?);
+        assert!(!duration.time_match(THURSDAY_12_00 + 2*DAY, true)?);
+
+        Ok(())
+    }
 }
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (8 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
                   ` (5 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

---
 proxmox-http/src/client/rate_limiter.rs | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
index 4742387..677dfb1 100644
--- a/proxmox-http/src/client/rate_limiter.rs
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -36,7 +36,7 @@ impl RateLimiter {
 
     /// Returns the average rate (since `start_time`)
     pub fn average_rate(&self, current_time: Instant) -> f64 {
-        let time_diff = (current_time - self.start_time).as_secs_f64();
+        let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64();
         if time_diff <= 0.0 {
             0.0
         } else {
@@ -45,12 +45,15 @@ impl RateLimiter {
     }
 
     fn refill_bucket(&mut self, current_time: Instant) {
-        let time_diff = (current_time - self.last_update).as_nanos();
+        let time_diff = match current_time.checked_duration_since(self.last_update) {
+            Some(duration) => duration.as_nanos(),
+            None => {
+                //log::error!("update_time: got negative time diff");
+                return;
+            }
+        };
 
-        if time_diff <= 0 {
-            //log::error!("update_time: got negative time diff");
-            return;
-        }
+        if time_diff == 0 { return; }
 
         self.last_update = current_time;
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (9 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
                   ` (4 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 pbs-api-types/src/lib.rs                      |   7 +
 pbs-api-types/src/traffic_control.rs          |  81 +++++
 pbs-config/src/lib.rs                         |   1 +
 pbs-config/src/traffic_control.rs             |  91 ++++++
 src/api2/config/mod.rs                        |   2 +
 src/api2/config/traffic_control.rs            | 283 ++++++++++++++++++
 src/bin/proxmox-backup-manager.rs             |   1 +
 src/bin/proxmox_backup_manager/mod.rs         |   2 +
 .../proxmox_backup_manager/traffic_control.rs | 105 +++++++
 9 files changed, 573 insertions(+)
 create mode 100644 pbs-api-types/src/traffic_control.rs
 create mode 100644 pbs-config/src/traffic_control.rs
 create mode 100644 src/api2/config/traffic_control.rs
 create mode 100644 src/bin/proxmox_backup_manager/traffic_control.rs

diff --git a/pbs-api-types/src/lib.rs b/pbs-api-types/src/lib.rs
index 96ac657b..a61de960 100644
--- a/pbs-api-types/src/lib.rs
+++ b/pbs-api-types/src/lib.rs
@@ -7,6 +7,7 @@ use proxmox_schema::{
     api, const_regex, ApiStringFormat, ApiType, ArraySchema, Schema, StringSchema, ReturnType,
 };
 use proxmox::{IPRE, IPRE_BRACKET, IPV4OCTET, IPV4RE, IPV6H16, IPV6LS32, IPV6RE};
+use proxmox_systemd::daily_duration::parse_daily_duration;
 
 #[rustfmt::skip]
 #[macro_export]
@@ -73,6 +74,9 @@ pub use remote::*;
 mod tape;
 pub use tape::*;
 
+mod traffic_control;
+pub use traffic_control::*;
+
 mod zfs;
 pub use zfs::*;
 
@@ -152,6 +156,9 @@ pub const HOSTNAME_FORMAT: ApiStringFormat = ApiStringFormat::Pattern(&HOSTNAME_
 pub const DNS_ALIAS_FORMAT: ApiStringFormat =
     ApiStringFormat::Pattern(&DNS_ALIAS_REGEX);
 
+pub const DAILY_DURATION_FORMAT: ApiStringFormat =
+    ApiStringFormat::VerifyFn(|s| parse_daily_duration(s).map(drop));
+
 pub const SEARCH_DOMAIN_SCHEMA: Schema =
     StringSchema::new("Search domain for host-name lookup.").schema();
 
diff --git a/pbs-api-types/src/traffic_control.rs b/pbs-api-types/src/traffic_control.rs
new file mode 100644
index 00000000..c9fe4765
--- /dev/null
+++ b/pbs-api-types/src/traffic_control.rs
@@ -0,0 +1,81 @@
+use serde::{Deserialize, Serialize};
+
+use proxmox_schema::{api, Schema, StringSchema};
+
+use crate::{
+    CIDR_SCHEMA, DAILY_DURATION_FORMAT,
+    PROXMOX_SAFE_ID_FORMAT, SINGLE_LINE_COMMENT_SCHEMA,
+};
+
+pub const TRAFFIC_CONTROL_TIMEFRAME_SCHEMA: Schema = StringSchema::new(
+    "Timeframe to specify when the rule is actice.")
+    .format(&DAILY_DURATION_FORMAT)
+    .schema();
+
+pub const TRAFFIC_CONTROL_ID_SCHEMA: Schema = StringSchema::new("Rule ID.")
+    .format(&PROXMOX_SAFE_ID_FORMAT)
+    .min_length(3)
+    .max_length(32)
+    .schema();
+
+#[api(
+    properties: {
+        comment: {
+            optional: true,
+            schema: SINGLE_LINE_COMMENT_SCHEMA,
+        },
+        network: {
+            type: Array,
+            items: {
+                schema: CIDR_SCHEMA,
+            },
+        },
+        timeframe: {
+            type: Array,
+            items: {
+                schema: TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+            },
+            optional: true,
+        },
+    },
+)]
+#[derive(Serialize,Deserialize,Default)]
+#[serde(rename_all="kebab-case")]
+/// Network Rate Limit Configuration
+pub struct RateLimitConfig {
+    #[serde(skip_serializing_if="Option::is_none")]
+    pub comment: Option<String>,
+    /// Rule applies to Source IPs within this networks
+    pub network: Vec<String>,
+    /// Maximal rate in bytes/second
+    pub rate: u64,
+    /// Bucket size for TBF in bytes
+    #[serde(skip_serializing_if="Option::is_none")]
+    pub burst: Option<u64>,
+    // fixme: expose this?
+    //    /// Bandwidth is shared accross all connections
+    //    #[serde(skip_serializing_if="Option::is_none")]
+    //    pub shared: Option<bool>,
+    /// Enable the rule at specific times
+    #[serde(skip_serializing_if="Option::is_none")]
+    pub timeframe: Option<Vec<String>>,
+}
+
+#[api(
+    properties: {
+        name: {
+            schema: TRAFFIC_CONTROL_ID_SCHEMA,
+        },
+        config: {
+            type: RateLimitConfig,
+        },
+    },
+)]
+#[derive(Serialize,Deserialize)]
+#[serde(rename_all = "kebab-case")]
+///  Traffic control rule
+pub struct TrafficControlRule {
+    pub name: String,
+    #[serde(flatten)]
+    pub config: RateLimitConfig,
+}
diff --git a/pbs-config/src/lib.rs b/pbs-config/src/lib.rs
index 8ce84fec..930b5f7b 100644
--- a/pbs-config/src/lib.rs
+++ b/pbs-config/src/lib.rs
@@ -12,6 +12,7 @@ pub mod sync;
 pub mod tape_encryption_keys;
 pub mod tape_job;
 pub mod token_shadow;
+pub mod traffic_control;
 pub mod user;
 pub mod verify;
 
diff --git a/pbs-config/src/traffic_control.rs b/pbs-config/src/traffic_control.rs
new file mode 100644
index 00000000..1c04f589
--- /dev/null
+++ b/pbs-config/src/traffic_control.rs
@@ -0,0 +1,91 @@
+//! Traffic Control Settings (Network rate limits)
+use std::collections::HashMap;
+
+use anyhow::Error;
+use lazy_static::lazy_static;
+
+use proxmox_schema::{ApiType, Schema};
+
+use pbs_api_types::{TrafficControlRule, TRAFFIC_CONTROL_ID_SCHEMA};
+
+use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+
+use crate::{open_backup_lockfile, replace_backup_config, BackupLockGuard};
+
+
+lazy_static! {
+    /// Static [`SectionConfig`] to access parser/writer functions.
+    pub static ref CONFIG: SectionConfig = init();
+}
+
+fn init() -> SectionConfig {
+    let mut config = SectionConfig::new(&TRAFFIC_CONTROL_ID_SCHEMA);
+
+    let obj_schema = match TrafficControlRule::API_SCHEMA {
+        Schema::AllOf(ref allof_schema) => allof_schema,
+        _ => unreachable!(),
+    };
+    let plugin = SectionConfigPlugin::new("rule".to_string(), Some("name".to_string()), obj_schema);
+    config.register_plugin(plugin);
+
+    config
+}
+
+/// Configuration file name
+pub const TRAFFIC_CONTROL_CFG_FILENAME: &str = "/etc/proxmox-backup/traffic-control.cfg";
+/// Lock file name (used to prevent concurrent access)
+pub const TRAFFIC_CONTROL_CFG_LOCKFILE: &str = "/etc/proxmox-backup/.traffic-control.lck";
+
+/// Get exclusive lock
+pub fn lock_config() -> Result<BackupLockGuard, Error> {
+    open_backup_lockfile(TRAFFIC_CONTROL_CFG_LOCKFILE, None, true)
+}
+
+/// Read and parse the configuration file
+pub fn config() -> Result<(SectionConfigData, [u8;32]), Error> {
+
+    let content = proxmox::tools::fs::file_read_optional_string(TRAFFIC_CONTROL_CFG_FILENAME)?
+        .unwrap_or_else(|| "".to_string());
+
+    let digest = openssl::sha::sha256(content.as_bytes());
+    let data = CONFIG.parse(TRAFFIC_CONTROL_CFG_FILENAME, &content)?;
+    Ok((data, digest))
+}
+
+/// Save the configuration file
+pub fn save_config(config: &SectionConfigData) -> Result<(), Error> {
+    let raw = CONFIG.write(TRAFFIC_CONTROL_CFG_FILENAME, &config)?;
+    replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())
+}
+
+
+// shell completion helper
+pub fn complete_traffic_control_name(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
+    match config() {
+        Ok((data, _digest)) => data.sections.iter().map(|(id, _)| id.to_string()).collect(),
+        Err(_) => return vec![],
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use anyhow::{Error, bail};
+
+    #[test]
+    fn test1() -> Result<(), Error> {
+        let content = "rule: rule1
+ comment localnet at working hours
+ network 192.168.2.0/24
+ network 192.168.3.0/24
+ rate 50
+ timeframe mon..wed 8:00-16:30
+ timeframe fri 9:00-12:00
+";
+        let data = CONFIG.parse(TRAFFIC_CONTROL_CFG_FILENAME, &content)?;
+        eprintln!("GOT {:?}", data);
+
+        Ok(())
+    }
+
+}
diff --git a/src/api2/config/mod.rs b/src/api2/config/mod.rs
index 473337f5..c256ba64 100644
--- a/src/api2/config/mod.rs
+++ b/src/api2/config/mod.rs
@@ -14,6 +14,7 @@ pub mod changer;
 pub mod media_pool;
 pub mod tape_encryption_keys;
 pub mod tape_backup_job;
+pub mod traffic_control;
 
 const SUBDIRS: SubdirMap = &[
     ("access", &access::ROUTER),
@@ -26,6 +27,7 @@ const SUBDIRS: SubdirMap = &[
     ("sync", &sync::ROUTER),
     ("tape-backup-job", &tape_backup_job::ROUTER),
     ("tape-encryption-keys", &tape_encryption_keys::ROUTER),
+    ("traffic-control", &traffic_control::ROUTER),
     ("verify", &verify::ROUTER),
 ];
 
diff --git a/src/api2/config/traffic_control.rs b/src/api2/config/traffic_control.rs
new file mode 100644
index 00000000..5d5cc6d0
--- /dev/null
+++ b/src/api2/config/traffic_control.rs
@@ -0,0 +1,283 @@
+use anyhow::{bail, Error};
+use serde_json::Value;
+use ::serde::{Deserialize, Serialize};
+
+use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
+use proxmox_schema::api;
+
+use pbs_api_types::{
+    TrafficControlRule, RateLimitConfig,
+    CIDR_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, SINGLE_LINE_COMMENT_SCHEMA,
+    TRAFFIC_CONTROL_ID_SCHEMA, TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+    PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+#[api(
+    input: {
+        properties: {},
+    },
+    returns: {
+        description: "The list of configured traffic control rules (with config digest).",
+        type: Array,
+        items: { type: TrafficControlRule },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// List traffic control rules
+pub fn list_traffic_controls(
+    _param: Value,
+    _info: &ApiMethod,
+    mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<TrafficControlRule>, Error> {
+    let (config, digest) = pbs_config::traffic_control::config()?;
+
+    let list: Vec<TrafficControlRule> = config.convert_to_typed_array("rule")?;
+
+    rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into();
+
+    Ok(list)
+}
+
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            name: {
+                schema: TRAFFIC_CONTROL_ID_SCHEMA,
+            },
+            config: {
+                type: RateLimitConfig,
+                flatten: true,
+            },
+         },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Create new traffic control rule.
+pub fn create_traffic_control(
+    name: String,
+    config: RateLimitConfig,
+) -> Result<(), Error> {
+
+    let _lock = pbs_config::traffic_control::lock_config()?;
+
+    let (mut section_config, _digest) = pbs_config::traffic_control::config()?;
+
+    if section_config.sections.get(&name).is_some() {
+        bail!("traffic control rule '{}' already exists.", name);
+    }
+
+    let rule = TrafficControlRule { name: name.clone(), config };
+
+    section_config.set_data(&name, "rule", &rule)?;
+
+    pbs_config::traffic_control::save_config(&section_config)?;
+
+    Ok(())
+}
+
+#[api(
+   input: {
+        properties: {
+            name: {
+                schema: TRAFFIC_CONTROL_ID_SCHEMA,
+            },
+        },
+    },
+    returns: { type: TrafficControlRule },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+    }
+)]
+/// Read traffic control configuration data.
+pub fn read_traffic_control(
+    name: String,
+    _info: &ApiMethod,
+    mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<TrafficControlRule, Error> {
+    let (config, digest) = pbs_config::traffic_control::config()?;
+    let data: TrafficControlRule = config.lookup("rule", &name)?;
+    rpcenv["digest"] = proxmox::tools::digest_to_hex(&digest).into();
+    Ok(data)
+}
+
+#[api()]
+#[derive(Serialize, Deserialize)]
+#[allow(non_camel_case_types)]
+/// Deletable property name
+pub enum DeletableProperty {
+    /// Delete the burst property.
+    burst,
+    /// Delete the comment property.
+    comment,
+    /// Delete the timeframe property
+    timeframe,
+}
+
+// fixme: use  TrafficControlUpdater
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            name: {
+                schema: TRAFFIC_CONTROL_ID_SCHEMA,
+            },
+            comment: {
+                schema: SINGLE_LINE_COMMENT_SCHEMA,
+                optional: true,
+            },
+            rate: {
+                type: u64,
+                description: "Rate limit for TBF in bytes/second.",
+                optional: true,
+                minimum: 1,
+            },
+            burst: {
+                type: u64,
+                description: "Size of the TBF bucket, in bytes.",
+                optional: true,
+                minimum: 1,
+            },
+            network: {
+                description: "List of networks.",
+                optional: true,
+                type: Array,
+                items: {
+                    schema: CIDR_SCHEMA,
+                },
+            },
+            timeframe: {
+                description: "List of time frames.",
+                optional: true,
+                type: Array,
+                items: {
+                    schema: TRAFFIC_CONTROL_TIMEFRAME_SCHEMA,
+                },
+            },
+            delete: {
+                description: "List of properties to delete.",
+                type: Array,
+                optional: true,
+                items: {
+                    type: DeletableProperty,
+                }
+            },
+            digest: {
+                optional: true,
+                schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Update traffic control configuration.
+pub fn update_traffic_control(
+    name: String,
+    rate: Option<u64>,
+    burst: Option<u64>,
+    comment: Option<String>,
+    network: Option<Vec<String>>,
+    timeframe: Option<Vec<String>>,
+    delete: Option<Vec<DeletableProperty>>,
+    digest: Option<String>,
+) -> Result<(), Error> {
+
+    let _lock = pbs_config::traffic_control::lock_config()?;
+
+    let (mut config, expected_digest) = pbs_config::traffic_control::config()?;
+
+    if let Some(ref digest) = digest {
+        let digest = proxmox::tools::hex_to_digest(digest)?;
+        crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+    }
+
+    let mut data: TrafficControlRule = config.lookup("rule", &name)?;
+
+    if let Some(delete) = delete {
+        for delete_prop in delete {
+            match delete_prop {
+                DeletableProperty::burst => { data.config.burst = None; },
+                DeletableProperty::comment => { data.config.comment = None; },
+                DeletableProperty::timeframe => { data.config.timeframe = None; },
+            }
+        }
+    }
+
+    if let Some(comment) = comment {
+        let comment = comment.trim().to_string();
+        if comment.is_empty() {
+            data.config.comment = None;
+        } else {
+            data.config.comment = Some(comment);
+        }
+    }
+
+    if let Some(rate) = rate { data.config.rate = rate; }
+
+    if burst.is_some() { data.config.burst = burst; }
+    
+    if let Some(network) = network { data.config.network = network; }
+    if timeframe.is_some() { data.config.timeframe = timeframe; }
+
+    config.set_data(&name, "rule", &data)?;
+
+    pbs_config::traffic_control::save_config(&config)?;
+
+    Ok(())
+}
+
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            name: {
+                schema: TRAFFIC_CONTROL_ID_SCHEMA,
+            },
+            digest: {
+                optional: true,
+                schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Remove a traffic control rule from the configuration file.
+pub fn delete_traffic_control(name: String, digest: Option<String>) -> Result<(), Error> {
+
+    let _lock = pbs_config::traffic_control::lock_config()?;
+
+    let (mut config, expected_digest) = pbs_config::traffic_control::config()?;
+
+    if let Some(ref digest) = digest {
+        let digest = proxmox::tools::hex_to_digest(digest)?;
+        crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+    }
+
+    match config.sections.get(&name) {
+        Some(_) => { config.sections.remove(&name); },
+        None => bail!("traffic control rule '{}' does not exist.", name),
+    }
+
+    pbs_config::traffic_control::save_config(&config)?;
+
+    Ok(())
+}
+
+
+const ITEM_ROUTER: Router = Router::new()
+    .get(&API_METHOD_READ_TRAFFIC_CONTROL)
+    .put(&API_METHOD_UPDATE_TRAFFIC_CONTROL)
+    .delete(&API_METHOD_DELETE_TRAFFIC_CONTROL);
+
+pub const ROUTER: Router = Router::new()
+    .get(&API_METHOD_LIST_TRAFFIC_CONTROLS)
+    .post(&API_METHOD_CREATE_TRAFFIC_CONTROL)
+    .match_all("name", &ITEM_ROUTER);
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index 92e6bb2a..26cb5a1f 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -374,6 +374,7 @@ async fn run() -> Result<(), Error> {
         .insert("user", user_commands())
         .insert("openid", openid_commands())
         .insert("remote", remote_commands())
+        .insert("traffic-control", traffic_control_commands())
         .insert("garbage-collection", garbage_collection_commands())
         .insert("acme", acme_mgmt_cli())
         .insert("cert", cert_mgmt_cli())
diff --git a/src/bin/proxmox_backup_manager/mod.rs b/src/bin/proxmox_backup_manager/mod.rs
index a3a16246..a4d224ce 100644
--- a/src/bin/proxmox_backup_manager/mod.rs
+++ b/src/bin/proxmox_backup_manager/mod.rs
@@ -26,3 +26,5 @@ mod node;
 pub use node::*;
 mod openid;
 pub use openid::*;
+mod traffic_control;
+pub use traffic_control::*;
diff --git a/src/bin/proxmox_backup_manager/traffic_control.rs b/src/bin/proxmox_backup_manager/traffic_control.rs
new file mode 100644
index 00000000..34e4a2a5
--- /dev/null
+++ b/src/bin/proxmox_backup_manager/traffic_control.rs
@@ -0,0 +1,105 @@
+use anyhow::Error;
+use serde_json::Value;
+
+use proxmox_router::{cli::*, ApiHandler, RpcEnvironment};
+use proxmox_schema::api;
+
+use pbs_api_types::TRAFFIC_CONTROL_ID_SCHEMA;
+
+use proxmox_backup::api2;
+
+
+#[api(
+    input: {
+        properties: {
+            "output-format": {
+                schema: OUTPUT_FORMAT,
+                optional: true,
+            },
+        }
+    }
+)]
+/// List configured traffic control rules.
+fn list_traffic_controls(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
+
+    let output_format = get_output_format(&param);
+
+    let info = &api2::config::traffic_control::API_METHOD_LIST_TRAFFIC_CONTROLS;
+    let mut data = match info.handler {
+        ApiHandler::Sync(handler) => (handler)(param, info, rpcenv)?,
+        _ => unreachable!(),
+    };
+
+    let options = default_table_format_options()
+        .column(ColumnConfig::new("name"))
+        .column(ColumnConfig::new("rate"))
+        .column(ColumnConfig::new("burst"))
+        .column(ColumnConfig::new("network"))
+        .column(ColumnConfig::new("timeframe"))
+        .column(ColumnConfig::new("comment"));
+
+    format_and_print_result_full(&mut data, &info.returns, &output_format, &options);
+
+    Ok(Value::Null)
+}
+
+#[api(
+    input: {
+        properties: {
+            name: {
+                schema: TRAFFIC_CONTROL_ID_SCHEMA,
+            },
+            "output-format": {
+                schema: OUTPUT_FORMAT,
+                optional: true,
+            },
+        }
+    }
+)]
+/// Show traffic control configuration
+fn show_traffic_control(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
+
+    let output_format = get_output_format(&param);
+
+    let info = &api2::config::traffic_control::API_METHOD_READ_TRAFFIC_CONTROL;
+    let mut data = match info.handler {
+        ApiHandler::Sync(handler) => (handler)(param, info, rpcenv)?,
+        _ => unreachable!(),
+    };
+
+    let options = default_table_format_options();
+    format_and_print_result_full(&mut data, &info.returns, &output_format, &options);
+
+    Ok(Value::Null)
+}
+
+pub fn traffic_control_commands() -> CommandLineInterface {
+
+    let cmd_def = CliCommandMap::new()
+        .insert("list", CliCommand::new(&API_METHOD_LIST_TRAFFIC_CONTROLS))
+        .insert(
+            "show",
+            CliCommand::new(&API_METHOD_SHOW_TRAFFIC_CONTROL)
+                .arg_param(&["name"])
+                .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+        )
+        .insert(
+            "create",
+            CliCommand::new(&api2::config::traffic_control::API_METHOD_CREATE_TRAFFIC_CONTROL)
+                .arg_param(&["name"])
+        )
+        .insert(
+            "update",
+            CliCommand::new(&api2::config::traffic_control::API_METHOD_UPDATE_TRAFFIC_CONTROL)
+                .arg_param(&["name"])
+                .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+        )
+        .insert(
+            "remove",
+            CliCommand::new(&api2::config::traffic_control::API_METHOD_DELETE_TRAFFIC_CONTROL)
+                .arg_param(&["name"])
+                .completion_cb("name", pbs_config::traffic_control::complete_traffic_control_name)
+        );
+
+    cmd_def.into()
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (10 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
                   ` (3 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

---
 proxmox-http/src/client/rate_limited_stream.rs | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index ea99383..865a426 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -24,6 +24,12 @@ pub struct RateLimitedStream<S> {
     stream: S,
 }
 
+impl RateLimitedStream<tokio::net::TcpStream> {
+    pub fn peer_addr(&self) -> std::io::Result<std::net::SocketAddr> {
+        self.stream.peer_addr()
+    }
+}
+
 impl <S> RateLimitedStream<S> {
 
     /// Creates a new instance with reads and writes limited to the same `rate`.
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (11 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
                   ` (2 subsequent siblings)
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

---
 proxmox-http/src/client/rate_limiter.rs | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
index 677dfb1..e669410 100644
--- a/proxmox-http/src/client/rate_limiter.rs
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -34,6 +34,12 @@ impl RateLimiter {
         }
     }
 
+    /// Update rate and bucket size
+    pub fn update_rate(&mut self, rate: u64, bucket_size: u64) {
+        self.rate = rate;
+        self.bucket_size = bucket_size;
+    }
+
     /// Returns the average rate (since `start_time`)
     pub fn average_rate(&self, current_time: Instant) -> f64 {
         let time_diff = current_time.saturating_duration_since(self.start_time).as_secs_f64();
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (12 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 pbs-config/src/lib.rs             |  2 +-
 pbs-config/src/memcom.rs          | 14 ++++++++++++++
 pbs-config/src/traffic_control.rs | 11 +++++++++--
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/pbs-config/src/lib.rs b/pbs-config/src/lib.rs
index 930b5f7b..bc3b19f0 100644
--- a/pbs-config/src/lib.rs
+++ b/pbs-config/src/lib.rs
@@ -16,7 +16,7 @@ pub mod traffic_control;
 pub mod user;
 pub mod verify;
 
-pub(crate) mod memcom;
+pub mod memcom;
 
 use anyhow::{format_err, Error};
 
diff --git a/pbs-config/src/memcom.rs b/pbs-config/src/memcom.rs
index 4ab07ec9..7b82798b 100644
--- a/pbs-config/src/memcom.rs
+++ b/pbs-config/src/memcom.rs
@@ -23,6 +23,8 @@ pub struct Memcom {
 struct Head {
     // User (user.cfg) cache generation/version.
     user_cache_generation: AtomicUsize,
+    // Traffic control (traffic-control.cfg) generation/version.
+    traffic_control_generation: AtomicUsize,
 }
 
 static INSTANCE: OnceCell<Arc<Memcom>> = OnceCell::new();
@@ -81,4 +83,16 @@ impl Memcom {
             .user_cache_generation
             .fetch_add(1, Ordering::AcqRel);
     }
+
+    /// Returns the traffic control generation number.
+    pub fn traffic_control_generation(&self) -> usize {
+        self.head().traffic_control_generation.load(Ordering::Acquire)
+    }
+
+    /// Increase the traffic control generation number.
+    pub fn increase_traffic_control_generation(&self) {
+        self.head()
+            .traffic_control_generation
+            .fetch_add(1, Ordering::AcqRel);
+    }
 }
diff --git a/pbs-config/src/traffic_control.rs b/pbs-config/src/traffic_control.rs
index 1c04f589..816bc7a2 100644
--- a/pbs-config/src/traffic_control.rs
+++ b/pbs-config/src/traffic_control.rs
@@ -10,9 +10,9 @@ use pbs_api_types::{TrafficControlRule, TRAFFIC_CONTROL_ID_SCHEMA};
 
 use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
 
+use crate::memcom::Memcom;
 use crate::{open_backup_lockfile, replace_backup_config, BackupLockGuard};
 
-
 lazy_static! {
     /// Static [`SectionConfig`] to access parser/writer functions.
     pub static ref CONFIG: SectionConfig = init();
@@ -55,7 +55,14 @@ pub fn config() -> Result<(SectionConfigData, [u8;32]), Error> {
 /// Save the configuration file
 pub fn save_config(config: &SectionConfigData) -> Result<(), Error> {
     let raw = CONFIG.write(TRAFFIC_CONTROL_CFG_FILENAME, &config)?;
-    replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())
+    replace_backup_config(TRAFFIC_CONTROL_CFG_FILENAME, raw.as_bytes())?;
+
+    // increase traffic control generation
+    // We use this in TrafficControlCache
+    let memcom = Memcom::new()?;
+    memcom.increase_traffic_control_generation();
+
+    Ok(())
 }
 
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (13 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 Cargo.toml                    |   1 +
 src/cached_traffic_control.rs | 240 ++++++++++++++++++++++++++++++++++
 src/lib.rs                    |   3 +
 3 files changed, 244 insertions(+)
 create mode 100644 src/cached_traffic_control.rs

diff --git a/Cargo.toml b/Cargo.toml
index 0f163d65..ac0983b9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -48,6 +48,7 @@ apt-pkg-native = "0.3.2"
 base64 = "0.12"
 bitflags = "1.2.1"
 bytes = "1.0"
+cidr = "0.2.1"
 crc32fast = "1"
 endian_trait = { version = "0.6", features = ["arrays"] }
 env_logger = "0.7"
diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs
new file mode 100644
index 00000000..5a7f46da
--- /dev/null
+++ b/src/cached_traffic_control.rs
@@ -0,0 +1,240 @@
+//! Cached traffic control configuration
+use std::sync::{Arc, Mutex};
+use std::collections::HashMap;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+
+use anyhow::Error;
+use cidr::IpInet;
+
+use proxmox_http::client::RateLimiter;
+use proxmox_time::epoch_i64;
+
+use proxmox_systemd::daily_duration::parse_daily_duration;
+
+use pbs_api_types::TrafficControlRule;
+
+use pbs_config::memcom::Memcom;
+
+pub struct TrafficControlCache {
+    last_update: i64,
+    last_traffic_control_generation: usize,
+    rules: Vec<(TrafficControlRule, Vec<IpInet>)>,
+    limiter_map: HashMap<String, (Arc<Mutex<RateLimiter>>, Arc<Mutex<RateLimiter>>)>,
+}
+
+fn timeframe_match(
+    duration_list: &[String],
+    now: i64,
+) -> Result<bool, Error> {
+
+    for duration_str in duration_list.iter() {
+        let duration = parse_daily_duration(duration_str)?;
+        if duration.time_match(now, false)? {
+            return Ok(true);
+        }
+    }
+
+    Ok(false)
+}
+
+fn network_match_len(
+    networks: &[IpInet],
+    ip: &IpAddr,
+) -> Option<u8> {
+
+    let mut match_len = None;
+
+    for cidr in networks.iter() {
+        if cidr.contains(ip) {
+            let network_length = cidr.network_length();
+            match match_len {
+                Some(len) => {
+                    if network_length > len {
+                        match_len = Some(network_length);
+                    }
+                }
+                None => match_len = Some(network_length),
+            }
+        }
+    }
+    match_len
+}
+
+fn cannonical_ip(ip: IpAddr) -> IpAddr {
+    // TODO: use std::net::IpAddr::to_cananical once stable
+    match ip {
+        IpAddr::V4(addr) => IpAddr::V4(addr),
+        IpAddr::V6(addr) => {
+            match addr.octets() {
+                [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
+                    IpAddr::V4(Ipv4Addr::new(a, b, c, d))
+                }
+                _ => IpAddr::V6(addr),
+            }
+        }
+    }
+}
+
+impl TrafficControlCache {
+
+    pub fn new() -> Self {
+        Self {
+            rules: Vec::new(),
+            limiter_map: HashMap::new(),
+            last_traffic_control_generation: 0,
+            last_update: 0,
+        }
+    }
+
+    pub fn reload(&mut self) {
+        let now = epoch_i64();
+
+        let memcom = match Memcom::new() {
+            Ok(memcom) => memcom,
+            Err(err) => {
+                log::error!("TrafficControlCache::reload failed in Memcom::new: {}", err);
+                return;
+            }
+        };
+        
+        let traffic_control_generation = memcom.traffic_control_generation();
+
+        if (self.last_update != 0) &&
+            (traffic_control_generation == self.last_traffic_control_generation) &&
+            ((now - self.last_update) < 60) { return; }
+
+        log::debug!("reload traffic control rules");
+
+        self.last_traffic_control_generation = traffic_control_generation;
+        self.last_update = now;
+
+        match self.reload_impl() {
+            Ok(()) => (),
+            Err(err) => {
+                log::error!("TrafficControlCache::reload failed -> {}", err);
+            }
+        }
+    }
+    
+    fn reload_impl(&mut self) -> Result<(), Error> {
+        let (config, _) = pbs_config::traffic_control::config()?;
+
+        self.limiter_map.retain(|key, _value| config.sections.contains_key(key));
+
+        let rules: Vec<TrafficControlRule> =
+            config.convert_to_typed_array("rule")?;
+
+        let now = proxmox_time::epoch_i64();
+
+        let mut active_rules = Vec::new();
+
+        for rule in rules {
+            if let Some(ref timeframe) = rule.config.timeframe {
+                if timeframe_match(timeframe, now)? {
+                    self.limiter_map.remove(&rule.name);
+                    continue;
+                }
+            }
+
+            let rate = rule.config.rate;
+            let burst = rule.config.burst.unwrap_or(rate);
+
+            if let Some(limiter) = self.limiter_map.get(&rule.name) {
+                limiter.0.lock().unwrap().update_rate(rate, burst);
+                limiter.1.lock().unwrap().update_rate(rate, burst);
+            } else {
+
+                let read_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+                let write_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+
+                self.limiter_map.insert(
+                    rule.name.clone(),
+                    (read_limiter, write_limiter),
+                );
+            }
+
+            let mut networks = Vec::new();
+
+            for network in rule.config.network.iter() {
+                let cidr = match network.parse() {
+                    Ok(cidr) => cidr,
+                    Err(err) => {
+                        log::error!("unable to parse network '{}' - {}", network, err);
+                        continue;
+                    }
+                };
+                networks.push(cidr);
+            }
+            
+            active_rules.push((rule, networks));
+        }
+
+        self.rules = active_rules;
+
+        Ok(())
+    }
+
+    pub fn lookup_rate_limiter(
+        &self,
+        peer: Option<SocketAddr>,
+    ) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+
+        let peer = match peer {
+            None => return (None, None),
+            Some(peer) => peer,
+        };
+
+        let peer_ip = cannonical_ip(peer.ip());
+
+        log::debug!("lookup_rate_limiter {} {:?}", peer_ip.is_ipv4(),  peer_ip);
+
+        let mut last_rule_match = None;
+
+        for (rule, networks) in self.rules.iter() {
+            if let Some(match_len) = network_match_len(networks, &peer_ip) {
+                match last_rule_match {
+                    None => last_rule_match = Some((rule, match_len)),
+                    Some((_, last_len)) => {
+                        if match_len > last_len {
+                            last_rule_match = Some((rule, match_len));
+                        }
+                    }
+                }
+            }
+        }
+       
+        match last_rule_match {
+            Some((rule, _)) => {
+                match self.limiter_map.get(&rule.name) {
+                    Some((read_limiter, write_limiter)) => {
+                        (Some(Arc::clone(read_limiter)), Some(Arc::clone(write_limiter)))
+                    }
+                    None => (None, None), // should never happen
+                }
+            }
+            None => (None, None),
+        }
+    }
+}
+
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn testnetwork_match() -> Result<(), Error> {
+
+        let networks = ["192.168.2.1/24", "127.0.0.0/8"];
+        let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect();
+
+        assert_eq!(network_match_len(&networks, &"192.168.2.1".parse()?), Some(24));
+        assert_eq!(network_match_len(&networks, &"192.168.2.254".parse()?), Some(24));
+        assert_eq!(network_match_len(&networks, &"192.168.3.1".parse()?), None);
+        assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(8));
+        assert_eq!(network_match_len(&networks, &"128.1.1.0".parse()?), None);
+
+        Ok(())
+        
+    }    
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5f6d5e7e..8f5ed245 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -33,6 +33,9 @@ pub mod client_helpers;
 
 pub mod rrd_cache;
 
+mod cached_traffic_control;
+pub use cached_traffic_control::TrafficControlCache;
+
 /// Get the server's certificate info (from `proxy.pem`).
 pub fn cert_info() -> Result<CertInfo, anyhow::Error> {
     CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem")))
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control
  2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
                   ` (14 preceding siblings ...)
  2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
@ 2021-11-09  6:52 ` Dietmar Maurer
  15 siblings, 0 replies; 17+ messages in thread
From: Dietmar Maurer @ 2021-11-09  6:52 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 src/bin/proxmox-backup-proxy.rs | 25 ++++++++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)

diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 1589a57d..01308463 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -21,6 +21,7 @@ use proxmox::sys::linux::socket::set_tcp_keepalive;
 use proxmox::tools::fs::CreateOptions;
 use proxmox_lang::try_block;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
+use proxmox_http::client::{RateLimiter, RateLimitedStream};
 
 use pbs_tools::{task_log, task_warn};
 use pbs_datastore::DataStore;
@@ -70,6 +71,7 @@ use proxmox_backup::api2::pull::do_sync_job;
 use proxmox_backup::api2::tape::backup::do_tape_backup_job;
 use proxmox_backup::server::do_verification_job;
 use proxmox_backup::server::do_prune_job;
+use proxmox_backup::TrafficControlCache;
 
 fn main() -> Result<(), Error> {
     proxmox_backup::tools::setup_safe_path_env();
@@ -351,7 +353,7 @@ fn make_tls_acceptor() -> Result<SslAcceptor, Error> {
 }
 
 type ClientStreamResult =
-    Result<std::pin::Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>, Error>;
+    Result<std::pin::Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::TcpStream>>>>, Error>;
 const MAX_PENDING_ACCEPTS: usize = 1024;
 
 fn accept_connections(
@@ -387,6 +389,9 @@ async fn accept_connection(
         sock.set_nodelay(true).unwrap();
         let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
 
+        let peer = sock.peer_addr().ok();
+        let sock = RateLimitedStream::with_limiter_update_cb(sock, move || lookup_rate_limiter(peer));
+
         let ssl = { // limit acceptor_guard scope
             // Acceptor can be reloaded using the command socket "reload-certificate" command
             let acceptor_guard = acceptor.lock().unwrap();
@@ -1075,3 +1080,21 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
         }
     }
 }
+
+// Rate Limiter lookup
+
+// Test WITH
+// proxmox-backup-client restore vm/201/2021-10-22T09:55:56Z drive-scsi0.img img1.img --repository localhost:store2
+
+lazy_static::lazy_static!{
+    static ref TRAFFIC_CONTROL_CACHE: Arc<Mutex<TrafficControlCache>> =
+        Arc::new(Mutex::new(TrafficControlCache::new()));
+}
+
+fn lookup_rate_limiter(
+    peer: Option<std::net::SocketAddr>,
+) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+    let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap();
+    cache.reload();
+    cache.lookup_rate_limiter(peer)
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 17+ messages in thread

end of thread, other threads:[~2021-11-09  6:53 UTC | newest]

Thread overview: 17+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-11-09  6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
2021-11-09  6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal