From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 0AF597D548 for ; Tue, 9 Nov 2021 07:53:41 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id ADAD02E15C for ; Tue, 9 Nov 2021 07:53:10 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id B10112DF77 for ; Tue, 9 Nov 2021 07:53:01 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 85CA246064; Tue, 9 Nov 2021 07:52:57 +0100 (CET) From: Dietmar Maurer To: pbs-devel@lists.proxmox.com Date: Tue, 9 Nov 2021 07:52:52 +0100 Message-Id: <20211109065253.980304-16-dietmar@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20211109065253.980304-1-dietmar@proxmox.com> References: <20211109065253.980304-1-dietmar@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.502 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs] Subject: [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 09 Nov 2021 06:53:41 -0000 Signed-off-by: Dietmar Maurer --- Cargo.toml | 1 + src/cached_traffic_control.rs | 240 ++++++++++++++++++++++++++++++++++ src/lib.rs | 3 + 3 files changed, 244 insertions(+) create mode 100644 src/cached_traffic_control.rs diff --git a/Cargo.toml b/Cargo.toml index 0f163d65..ac0983b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ apt-pkg-native = "0.3.2" base64 = "0.12" bitflags = "1.2.1" bytes = "1.0" +cidr = "0.2.1" crc32fast = "1" endian_trait = { version = "0.6", features = ["arrays"] } env_logger = "0.7" diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs new file mode 100644 index 00000000..5a7f46da --- /dev/null +++ b/src/cached_traffic_control.rs @@ -0,0 +1,240 @@ +//! Cached traffic control configuration +use std::sync::{Arc, Mutex}; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use anyhow::Error; +use cidr::IpInet; + +use proxmox_http::client::RateLimiter; +use proxmox_time::epoch_i64; + +use proxmox_systemd::daily_duration::parse_daily_duration; + +use pbs_api_types::TrafficControlRule; + +use pbs_config::memcom::Memcom; + +pub struct TrafficControlCache { + last_update: i64, + last_traffic_control_generation: usize, + rules: Vec<(TrafficControlRule, Vec)>, + limiter_map: HashMap>, Arc>)>, +} + +fn timeframe_match( + duration_list: &[String], + now: i64, +) -> Result { + + for duration_str in duration_list.iter() { + let duration = parse_daily_duration(duration_str)?; + if duration.time_match(now, false)? { + return Ok(true); + } + } + + Ok(false) +} + +fn network_match_len( + networks: &[IpInet], + ip: &IpAddr, +) -> Option { + + let mut match_len = None; + + for cidr in networks.iter() { + if cidr.contains(ip) { + let network_length = cidr.network_length(); + match match_len { + Some(len) => { + if network_length > len { + match_len = Some(network_length); + } + } + None => match_len = Some(network_length), + } + } + } + match_len +} + +fn cannonical_ip(ip: IpAddr) -> IpAddr { + // TODO: use std::net::IpAddr::to_cananical once stable + match ip { + IpAddr::V4(addr) => IpAddr::V4(addr), + IpAddr::V6(addr) => { + match addr.octets() { + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { + IpAddr::V4(Ipv4Addr::new(a, b, c, d)) + } + _ => IpAddr::V6(addr), + } + } + } +} + +impl TrafficControlCache { + + pub fn new() -> Self { + Self { + rules: Vec::new(), + limiter_map: HashMap::new(), + last_traffic_control_generation: 0, + last_update: 0, + } + } + + pub fn reload(&mut self) { + let now = epoch_i64(); + + let memcom = match Memcom::new() { + Ok(memcom) => memcom, + Err(err) => { + log::error!("TrafficControlCache::reload failed in Memcom::new: {}", err); + return; + } + }; + + let traffic_control_generation = memcom.traffic_control_generation(); + + if (self.last_update != 0) && + (traffic_control_generation == self.last_traffic_control_generation) && + ((now - self.last_update) < 60) { return; } + + log::debug!("reload traffic control rules"); + + self.last_traffic_control_generation = traffic_control_generation; + self.last_update = now; + + match self.reload_impl() { + Ok(()) => (), + Err(err) => { + log::error!("TrafficControlCache::reload failed -> {}", err); + } + } + } + + fn reload_impl(&mut self) -> Result<(), Error> { + let (config, _) = pbs_config::traffic_control::config()?; + + self.limiter_map.retain(|key, _value| config.sections.contains_key(key)); + + let rules: Vec = + config.convert_to_typed_array("rule")?; + + let now = proxmox_time::epoch_i64(); + + let mut active_rules = Vec::new(); + + for rule in rules { + if let Some(ref timeframe) = rule.config.timeframe { + if timeframe_match(timeframe, now)? { + self.limiter_map.remove(&rule.name); + continue; + } + } + + let rate = rule.config.rate; + let burst = rule.config.burst.unwrap_or(rate); + + if let Some(limiter) = self.limiter_map.get(&rule.name) { + limiter.0.lock().unwrap().update_rate(rate, burst); + limiter.1.lock().unwrap().update_rate(rate, burst); + } else { + + let read_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst))); + let write_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst))); + + self.limiter_map.insert( + rule.name.clone(), + (read_limiter, write_limiter), + ); + } + + let mut networks = Vec::new(); + + for network in rule.config.network.iter() { + let cidr = match network.parse() { + Ok(cidr) => cidr, + Err(err) => { + log::error!("unable to parse network '{}' - {}", network, err); + continue; + } + }; + networks.push(cidr); + } + + active_rules.push((rule, networks)); + } + + self.rules = active_rules; + + Ok(()) + } + + pub fn lookup_rate_limiter( + &self, + peer: Option, + ) -> (Option>>, Option>>) { + + let peer = match peer { + None => return (None, None), + Some(peer) => peer, + }; + + let peer_ip = cannonical_ip(peer.ip()); + + log::debug!("lookup_rate_limiter {} {:?}", peer_ip.is_ipv4(), peer_ip); + + let mut last_rule_match = None; + + for (rule, networks) in self.rules.iter() { + if let Some(match_len) = network_match_len(networks, &peer_ip) { + match last_rule_match { + None => last_rule_match = Some((rule, match_len)), + Some((_, last_len)) => { + if match_len > last_len { + last_rule_match = Some((rule, match_len)); + } + } + } + } + } + + match last_rule_match { + Some((rule, _)) => { + match self.limiter_map.get(&rule.name) { + Some((read_limiter, write_limiter)) => { + (Some(Arc::clone(read_limiter)), Some(Arc::clone(write_limiter))) + } + None => (None, None), // should never happen + } + } + None => (None, None), + } + } +} + + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn testnetwork_match() -> Result<(), Error> { + + let networks = ["192.168.2.1/24", "127.0.0.0/8"]; + let networks: Vec = networks.iter().map(|n| n.parse().unwrap()).collect(); + + assert_eq!(network_match_len(&networks, &"192.168.2.1".parse()?), Some(24)); + assert_eq!(network_match_len(&networks, &"192.168.2.254".parse()?), Some(24)); + assert_eq!(network_match_len(&networks, &"192.168.3.1".parse()?), None); + assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(8)); + assert_eq!(network_match_len(&networks, &"128.1.1.0".parse()?), None); + + Ok(()) + + } +} diff --git a/src/lib.rs b/src/lib.rs index 5f6d5e7e..8f5ed245 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,9 @@ pub mod client_helpers; pub mod rrd_cache; +mod cached_traffic_control; +pub use cached_traffic_control::TrafficControlCache; + /// Get the server's certificate info (from `proxy.pem`). pub fn cert_info() -> Result { CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem"))) -- 2.30.2