From: Dietmar Maurer <dietmar@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups
Date: Tue, 9 Nov 2021 07:52:52 +0100 [thread overview]
Message-ID: <20211109065253.980304-16-dietmar@proxmox.com> (raw)
In-Reply-To: <20211109065253.980304-1-dietmar@proxmox.com>
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
Cargo.toml | 1 +
src/cached_traffic_control.rs | 240 ++++++++++++++++++++++++++++++++++
src/lib.rs | 3 +
3 files changed, 244 insertions(+)
create mode 100644 src/cached_traffic_control.rs
diff --git a/Cargo.toml b/Cargo.toml
index 0f163d65..ac0983b9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -48,6 +48,7 @@ apt-pkg-native = "0.3.2"
base64 = "0.12"
bitflags = "1.2.1"
bytes = "1.0"
+cidr = "0.2.1"
crc32fast = "1"
endian_trait = { version = "0.6", features = ["arrays"] }
env_logger = "0.7"
diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs
new file mode 100644
index 00000000..5a7f46da
--- /dev/null
+++ b/src/cached_traffic_control.rs
@@ -0,0 +1,240 @@
+//! Cached traffic control configuration
+use std::sync::{Arc, Mutex};
+use std::collections::HashMap;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+
+use anyhow::Error;
+use cidr::IpInet;
+
+use proxmox_http::client::RateLimiter;
+use proxmox_time::epoch_i64;
+
+use proxmox_systemd::daily_duration::parse_daily_duration;
+
+use pbs_api_types::TrafficControlRule;
+
+use pbs_config::memcom::Memcom;
+
+pub struct TrafficControlCache {
+ last_update: i64,
+ last_traffic_control_generation: usize,
+ rules: Vec<(TrafficControlRule, Vec<IpInet>)>,
+ limiter_map: HashMap<String, (Arc<Mutex<RateLimiter>>, Arc<Mutex<RateLimiter>>)>,
+}
+
+fn timeframe_match(
+ duration_list: &[String],
+ now: i64,
+) -> Result<bool, Error> {
+
+ for duration_str in duration_list.iter() {
+ let duration = parse_daily_duration(duration_str)?;
+ if duration.time_match(now, false)? {
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+}
+
+fn network_match_len(
+ networks: &[IpInet],
+ ip: &IpAddr,
+) -> Option<u8> {
+
+ let mut match_len = None;
+
+ for cidr in networks.iter() {
+ if cidr.contains(ip) {
+ let network_length = cidr.network_length();
+ match match_len {
+ Some(len) => {
+ if network_length > len {
+ match_len = Some(network_length);
+ }
+ }
+ None => match_len = Some(network_length),
+ }
+ }
+ }
+ match_len
+}
+
+fn cannonical_ip(ip: IpAddr) -> IpAddr {
+ // TODO: use std::net::IpAddr::to_cananical once stable
+ match ip {
+ IpAddr::V4(addr) => IpAddr::V4(addr),
+ IpAddr::V6(addr) => {
+ match addr.octets() {
+ [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
+ IpAddr::V4(Ipv4Addr::new(a, b, c, d))
+ }
+ _ => IpAddr::V6(addr),
+ }
+ }
+ }
+}
+
+impl TrafficControlCache {
+
+ pub fn new() -> Self {
+ Self {
+ rules: Vec::new(),
+ limiter_map: HashMap::new(),
+ last_traffic_control_generation: 0,
+ last_update: 0,
+ }
+ }
+
+ pub fn reload(&mut self) {
+ let now = epoch_i64();
+
+ let memcom = match Memcom::new() {
+ Ok(memcom) => memcom,
+ Err(err) => {
+ log::error!("TrafficControlCache::reload failed in Memcom::new: {}", err);
+ return;
+ }
+ };
+
+ let traffic_control_generation = memcom.traffic_control_generation();
+
+ if (self.last_update != 0) &&
+ (traffic_control_generation == self.last_traffic_control_generation) &&
+ ((now - self.last_update) < 60) { return; }
+
+ log::debug!("reload traffic control rules");
+
+ self.last_traffic_control_generation = traffic_control_generation;
+ self.last_update = now;
+
+ match self.reload_impl() {
+ Ok(()) => (),
+ Err(err) => {
+ log::error!("TrafficControlCache::reload failed -> {}", err);
+ }
+ }
+ }
+
+ fn reload_impl(&mut self) -> Result<(), Error> {
+ let (config, _) = pbs_config::traffic_control::config()?;
+
+ self.limiter_map.retain(|key, _value| config.sections.contains_key(key));
+
+ let rules: Vec<TrafficControlRule> =
+ config.convert_to_typed_array("rule")?;
+
+ let now = proxmox_time::epoch_i64();
+
+ let mut active_rules = Vec::new();
+
+ for rule in rules {
+ if let Some(ref timeframe) = rule.config.timeframe {
+ if timeframe_match(timeframe, now)? {
+ self.limiter_map.remove(&rule.name);
+ continue;
+ }
+ }
+
+ let rate = rule.config.rate;
+ let burst = rule.config.burst.unwrap_or(rate);
+
+ if let Some(limiter) = self.limiter_map.get(&rule.name) {
+ limiter.0.lock().unwrap().update_rate(rate, burst);
+ limiter.1.lock().unwrap().update_rate(rate, burst);
+ } else {
+
+ let read_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+ let write_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+
+ self.limiter_map.insert(
+ rule.name.clone(),
+ (read_limiter, write_limiter),
+ );
+ }
+
+ let mut networks = Vec::new();
+
+ for network in rule.config.network.iter() {
+ let cidr = match network.parse() {
+ Ok(cidr) => cidr,
+ Err(err) => {
+ log::error!("unable to parse network '{}' - {}", network, err);
+ continue;
+ }
+ };
+ networks.push(cidr);
+ }
+
+ active_rules.push((rule, networks));
+ }
+
+ self.rules = active_rules;
+
+ Ok(())
+ }
+
+ pub fn lookup_rate_limiter(
+ &self,
+ peer: Option<SocketAddr>,
+ ) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+
+ let peer = match peer {
+ None => return (None, None),
+ Some(peer) => peer,
+ };
+
+ let peer_ip = cannonical_ip(peer.ip());
+
+ log::debug!("lookup_rate_limiter {} {:?}", peer_ip.is_ipv4(), peer_ip);
+
+ let mut last_rule_match = None;
+
+ for (rule, networks) in self.rules.iter() {
+ if let Some(match_len) = network_match_len(networks, &peer_ip) {
+ match last_rule_match {
+ None => last_rule_match = Some((rule, match_len)),
+ Some((_, last_len)) => {
+ if match_len > last_len {
+ last_rule_match = Some((rule, match_len));
+ }
+ }
+ }
+ }
+ }
+
+ match last_rule_match {
+ Some((rule, _)) => {
+ match self.limiter_map.get(&rule.name) {
+ Some((read_limiter, write_limiter)) => {
+ (Some(Arc::clone(read_limiter)), Some(Arc::clone(write_limiter)))
+ }
+ None => (None, None), // should never happen
+ }
+ }
+ None => (None, None),
+ }
+ }
+}
+
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn testnetwork_match() -> Result<(), Error> {
+
+ let networks = ["192.168.2.1/24", "127.0.0.0/8"];
+ let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect();
+
+ assert_eq!(network_match_len(&networks, &"192.168.2.1".parse()?), Some(24));
+ assert_eq!(network_match_len(&networks, &"192.168.2.254".parse()?), Some(24));
+ assert_eq!(network_match_len(&networks, &"192.168.3.1".parse()?), None);
+ assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(8));
+ assert_eq!(network_match_len(&networks, &"128.1.1.0".parse()?), None);
+
+ Ok(())
+
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5f6d5e7e..8f5ed245 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -33,6 +33,9 @@ pub mod client_helpers;
pub mod rrd_cache;
+mod cached_traffic_control;
+pub use cached_traffic_control::TrafficControlCache;
+
/// Get the server's certificate info (from `proxy.pem`).
pub fn cert_info() -> Result<CertInfo, anyhow::Error> {
CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem")))
--
2.30.2
next prev parent reply other threads:[~2021-11-09 6:53 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
2021-11-09 6:52 ` Dietmar Maurer [this message]
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20211109065253.980304-16-dietmar@proxmox.com \
--to=dietmar@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.