From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 33A3F739E8 for ; Fri, 8 Oct 2021 10:05:00 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 28A49236C3 for ; Fri, 8 Oct 2021 10:05:00 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 53DDE236BA for ; Fri, 8 Oct 2021 10:04:58 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 105AB4598D; Fri, 8 Oct 2021 10:04:58 +0200 (CEST) From: Dietmar Maurer To: pbs-devel@lists.proxmox.com Date: Fri, 8 Oct 2021 10:04:53 +0200 Message-Id: <20211008080454.1844879-1-dietmar@proxmox.com> X-Mailer: git-send-email 2.30.2 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.559 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox-backup-proxy.rs, status.total, proxmox-backup-api.rs, lib.rs, cache.rs, rrd.rs] Subject: [pbs-devel] [patch proxmox-backup 1/2] proxmox-rrd: use a journal to reduce amount of bytes written X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 08 Oct 2021 08:05:00 -0000 Apply the journal every 30 seconds. --- proxmox-rrd/Cargo.toml | 1 + proxmox-rrd/src/cache.rs | 223 ++++++++++++++++++++++++++++---- proxmox-rrd/src/lib.rs | 6 +- proxmox-rrd/src/rrd.rs | 30 +++++ src/bin/proxmox-backup-api.rs | 2 +- src/bin/proxmox-backup-proxy.rs | 54 ++++---- src/lib.rs | 5 +- 7 files changed, 261 insertions(+), 60 deletions(-) diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml index 19db5bf6..1c75b2e1 100644 --- a/proxmox-rrd/Cargo.toml +++ b/proxmox-rrd/Cargo.toml @@ -9,6 +9,7 @@ description = "Simple RRD database implementation." anyhow = "1.0" bitflags = "1.2.1" log = "0.4" +nix = "0.19.1" proxmox = { version = "0.13.5", features = ["api-macro"] } diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs index 1084a037..57cbb394 100644 --- a/proxmox-rrd/src/cache.rs +++ b/proxmox-rrd/src/cache.rs @@ -1,24 +1,46 @@ +use std::fs::File; use std::path::{Path, PathBuf}; use std::collections::HashMap; -use std::sync::{RwLock}; +use std::sync::RwLock; +use std::io::Write; +use std::io::{BufRead, BufReader}; +use std::os::unix::io::AsRawFd; -use anyhow::{format_err, Error}; +use anyhow::{format_err, bail, Error}; +use nix::fcntl::OFlag; -use proxmox::tools::fs::{create_path, CreateOptions}; +use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions}; use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution}; use crate::{DST, rrd::RRD}; +const RRD_JOURNAL_NAME: &str = "rrd.journal"; + /// RRD cache - keep RRD data in RAM, but write updates to disk /// /// This cache is designed to run as single instance (no concurrent /// access from other processes). pub struct RRDCache { + apply_interval: f64, basedir: PathBuf, file_options: CreateOptions, dir_options: CreateOptions, - cache: RwLock>, + state: RwLock, +} + +// shared state behind RwLock +struct RRDCacheState { + rrd_map: HashMap, + journal: File, + last_journal_flush: f64, +} + +struct JournalEntry { + time: f64, + value: f64, + dst: DST, + rel_path: String, } impl RRDCache { @@ -28,21 +50,166 @@ impl RRDCache { basedir: P, file_options: Option, dir_options: Option, - ) -> Self { + apply_interval: f64, + ) -> Result { let basedir = basedir.as_ref().to_owned(); - Self { + + let file_options = file_options.unwrap_or_else(|| CreateOptions::new()); + let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new()); + + create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone())) + .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; + + let mut journal_path = basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND; + let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?; + + let state = RRDCacheState { + journal, + rrd_map: HashMap::new(), + last_journal_flush: 0.0, + }; + + Ok(Self { basedir, - file_options: file_options.unwrap_or_else(|| CreateOptions::new()), - dir_options: dir_options.unwrap_or_else(|| CreateOptions::new()), - cache: RwLock::new(HashMap::new()), + file_options, + dir_options, + apply_interval, + state: RwLock::new(state), + }) + } + + fn parse_journal_line(line: &str) -> Result { + + let line = line.trim(); + + let parts: Vec<&str> = line.splitn(4, ':').collect(); + if parts.len() != 4 { + bail!("wrong numper of components"); } + + let time: f64 = parts[0].parse() + .map_err(|_| format_err!("unable to parse time"))?; + let value: f64 = parts[1].parse() + .map_err(|_| format_err!("unable to parse value"))?; + let dst: u8 = parts[2].parse() + .map_err(|_| format_err!("unable to parse data source type"))?; + + let dst = match dst { + 0 => DST::Gauge, + 1 => DST::Derive, + _ => bail!("got strange value for data source type '{}'", dst), + }; + + let rel_path = parts[3].to_string(); + + Ok(JournalEntry { time, value, dst, rel_path }) + } + + fn append_journal_entry( + state: &mut RRDCacheState, + time: f64, + value: f64, + dst: DST, + rel_path: &str, + ) -> Result<(), Error> { + let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path); + state.journal.write_all(journal_entry.as_bytes())?; + Ok(()) } - /// Create rrdd stat dir with correct permission - pub fn create_rrdb_dir(&self) -> Result<(), Error> { + pub fn apply_journal(&self) -> Result<(), Error> { + let mut state = self.state.write().unwrap(); // block writers + self.apply_journal_locked(&mut state) + } - create_path(&self.basedir, Some(self.dir_options.clone()), Some(self.dir_options.clone())) - .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; + fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> { + + log::info!("applying rrd journal"); + + state.last_journal_flush = proxmox::tools::time::epoch_f64(); + + let mut journal_path = self.basedir.clone(); + journal_path.push(RRD_JOURNAL_NAME); + + let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY; + let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?; + let mut journal = BufReader::new(journal); + + let mut last_update_map = HashMap::new(); + + let mut get_last_update = |rel_path: &str, rrd: &RRD| { + if let Some(time) = last_update_map.get(rel_path) { + return *time; + } + let last_update = rrd.last_update(); + last_update_map.insert(rel_path.to_string(), last_update); + last_update + }; + + let mut linenr = 0; + loop { + linenr += 1; + let mut line = String::new(); + let len = journal.read_line(&mut line)?; + if len == 0 { break; } + + let entry = match Self::parse_journal_line(&line) { + Ok(entry) => entry, + Err(err) => { + log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err); + continue; // skip unparsable lines + } + }; + + if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) { + if entry.time > get_last_update(&entry.rel_path, &rrd) { + rrd.update(entry.time, entry.value); + } + } else { + let mut path = self.basedir.clone(); + path.push(&entry.rel_path); + create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?; + + let mut rrd = match RRD::load(&path) { + Ok(rrd) => rrd, + Err(err) => { + if err.kind() != std::io::ErrorKind::NotFound { + log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err); + } + RRD::new(entry.dst) + }, + }; + if entry.time > get_last_update(&entry.rel_path, &rrd) { + rrd.update(entry.time, entry.value); + } + state.rrd_map.insert(entry.rel_path.clone(), rrd); + } + } + + // save all RRDs + + let mut errors = 0; + for (rel_path, rrd) in state.rrd_map.iter() { + let mut path = self.basedir.clone(); + path.push(&rel_path); + if let Err(err) = rrd.save(&path, self.file_options.clone()) { + errors += 1; + log::error!("unable to save {:?}: {}", path, err); + } + } + + // if everything went ok, commit the journal + + if errors == 0 { + nix::unistd::ftruncate(state.journal.as_raw_fd(), 0) + .map_err(|err| format_err!("unable to truncate journal - {}", err))?; + log::info!("rrd journal successfully committed"); + } else { + log::error!("errors during rrd flush - unable to commit rrd journal"); + } Ok(()) } @@ -53,21 +220,26 @@ impl RRDCache { rel_path: &str, value: f64, dst: DST, - save: bool, ) -> Result<(), Error> { - let mut path = self.basedir.clone(); - path.push(rel_path); - - create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.file_options.clone()))?; + let mut state = self.state.write().unwrap(); // block other writers - let mut map = self.cache.write().unwrap(); let now = proxmox::tools::time::epoch_f64(); - if let Some(rrd) = map.get_mut(rel_path) { + if (now - state.last_journal_flush) > self.apply_interval { + if let Err(err) = self.apply_journal_locked(&mut state) { + log::error!("apply journal failed: {}", err); + } + } + + Self::append_journal_entry(&mut state, now, value, dst, rel_path)?; + + if let Some(rrd) = state.rrd_map.get_mut(rel_path) { rrd.update(now, value); - if save { rrd.save(&path, self.file_options.clone())?; } } else { + let mut path = self.basedir.clone(); + path.push(rel_path); + create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?; let mut rrd = match RRD::load(&path) { Ok(rrd) => rrd, Err(err) => { @@ -78,10 +250,7 @@ impl RRDCache { }, }; rrd.update(now, value); - if save { - rrd.save(&path, self.file_options.clone())?; - } - map.insert(rel_path.into(), rrd); + state.rrd_map.insert(rel_path.into(), rrd); } Ok(()) @@ -97,9 +266,9 @@ impl RRDCache { mode: RRDMode, ) -> Option<(u64, u64, Vec>)> { - let map = self.cache.read().unwrap(); + let state = self.state.read().unwrap(); - match map.get(&format!("{}/{}", base, name)) { + match state.rrd_map.get(&format!("{}/{}", base, name)) { Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)), None => None, } diff --git a/proxmox-rrd/src/lib.rs b/proxmox-rrd/src/lib.rs index 303cd55d..d83e6ffc 100644 --- a/proxmox-rrd/src/lib.rs +++ b/proxmox-rrd/src/lib.rs @@ -13,9 +13,11 @@ mod cache; pub use cache::*; /// RRD data source tyoe +#[repr(u8)] +#[derive(Copy, Clone)] pub enum DST { /// Gauge values are stored unmodified. - Gauge, + Gauge = 0, /// Stores the difference to the previous value. - Derive, + Derive = 1, } diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs index f4c08909..026498ed 100644 --- a/proxmox-rrd/src/rrd.rs +++ b/proxmox-rrd/src/rrd.rs @@ -336,6 +336,36 @@ impl RRD { replace_file(filename, rrd_slice, options) } + pub fn last_update(&self) -> f64 { + + let mut last_update = 0.0; + + { + let mut check_last_update = |rra: &RRA| { + if rra.last_update > last_update { + last_update = rra.last_update; + } + }; + + check_last_update(&self.hour_avg); + check_last_update(&self.hour_max); + + check_last_update(&self.day_avg); + check_last_update(&self.day_max); + + check_last_update(&self.week_avg); + check_last_update(&self.week_max); + + check_last_update(&self.month_avg); + check_last_update(&self.month_max); + + check_last_update(&self.year_avg); + check_last_update(&self.year_max); + } + + last_update + } + /// Update the value (in memory) /// /// Note: This does not call [Self::save]. diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index da9ae5dd..b7c4e689 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -75,7 +75,7 @@ async fn run() -> Result<(), Error> { proxmox_backup::server::create_run_dir()?; - RRD_CACHE.create_rrdb_dir()?; + RRD_CACHE.apply_journal()?; proxmox_backup::server::jobstate::create_jobstate_dir()?; proxmox_backup::tape::create_tape_status_dir()?; diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 4c879483..ce939400 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -889,14 +889,10 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> { async fn run_stat_generator() { - let mut count = 0; loop { - count += 1; - let save = if count >= 6 { count = 0; true } else { false }; - let delay_target = Instant::now() + Duration::from_secs(10); - generate_host_stats(save).await; + generate_host_stats().await; tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; @@ -904,19 +900,19 @@ async fn run_stat_generator() { } -fn rrd_update_gauge(name: &str, value: f64, save: bool) { - if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge, save) { +fn rrd_update_gauge(name: &str, value: f64) { + if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge) { eprintln!("rrd::update_value '{}' failed - {}", name, err); } } -fn rrd_update_derive(name: &str, value: f64, save: bool) { - if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive, save) { +fn rrd_update_derive(name: &str, value: f64) { + if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive) { eprintln!("rrd::update_value '{}' failed - {}", name, err); } } -async fn generate_host_stats(save: bool) { +async fn generate_host_stats() { use proxmox::sys::linux::procfs::{ read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg}; @@ -924,8 +920,8 @@ async fn generate_host_stats(save: bool) { match read_proc_stat() { Ok(stat) => { - rrd_update_gauge("host/cpu", stat.cpu, save); - rrd_update_gauge("host/iowait", stat.iowait_percent, save); + rrd_update_gauge("host/cpu", stat.cpu); + rrd_update_gauge("host/iowait", stat.iowait_percent); } Err(err) => { eprintln!("read_proc_stat failed - {}", err); @@ -934,10 +930,10 @@ async fn generate_host_stats(save: bool) { match read_meminfo() { Ok(meminfo) => { - rrd_update_gauge("host/memtotal", meminfo.memtotal as f64, save); - rrd_update_gauge("host/memused", meminfo.memused as f64, save); - rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64, save); - rrd_update_gauge("host/swapused", meminfo.swapused as f64, save); + rrd_update_gauge("host/memtotal", meminfo.memtotal as f64); + rrd_update_gauge("host/memused", meminfo.memused as f64); + rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64); + rrd_update_gauge("host/swapused", meminfo.swapused as f64); } Err(err) => { eprintln!("read_meminfo failed - {}", err); @@ -954,8 +950,8 @@ async fn generate_host_stats(save: bool) { netin += item.receive; netout += item.send; } - rrd_update_derive("host/netin", netin as f64, save); - rrd_update_derive("host/netout", netout as f64, save); + rrd_update_derive("host/netin", netin as f64); + rrd_update_derive("host/netout", netout as f64); } Err(err) => { eprintln!("read_prox_net_dev failed - {}", err); @@ -964,7 +960,7 @@ async fn generate_host_stats(save: bool) { match read_loadavg() { Ok(loadavg) => { - rrd_update_gauge("host/loadavg", loadavg.0 as f64, save); + rrd_update_gauge("host/loadavg", loadavg.0 as f64); } Err(err) => { eprintln!("read_loadavg failed - {}", err); @@ -973,7 +969,7 @@ async fn generate_host_stats(save: bool) { let disk_manager = DiskManage::new(); - gather_disk_stats(disk_manager.clone(), Path::new("/"), "host", save); + gather_disk_stats(disk_manager.clone(), Path::new("/"), "host"); match pbs_config::datastore::config() { Ok((config, _)) => { @@ -984,7 +980,7 @@ async fn generate_host_stats(save: bool) { let rrd_prefix = format!("datastore/{}", config.name); let path = std::path::Path::new(&config.path); - gather_disk_stats(disk_manager.clone(), path, &rrd_prefix, save); + gather_disk_stats(disk_manager.clone(), path, &rrd_prefix); } } Err(err) => { @@ -1025,14 +1021,14 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool { next <= now } -fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &str, save: bool) { +fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &str) { match proxmox_backup::tools::disks::disk_usage(path) { Ok(status) => { let rrd_key = format!("{}/total", rrd_prefix); - rrd_update_gauge(&rrd_key, status.total as f64, save); + rrd_update_gauge(&rrd_key, status.total as f64); let rrd_key = format!("{}/used", rrd_prefix); - rrd_update_gauge(&rrd_key, status.used as f64, save); + rrd_update_gauge(&rrd_key, status.used as f64); } Err(err) => { eprintln!("read disk_usage on {:?} failed - {}", path, err); @@ -1064,17 +1060,17 @@ fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &st } if let Some(stat) = device_stat { let rrd_key = format!("{}/read_ios", rrd_prefix); - rrd_update_derive(&rrd_key, stat.read_ios as f64, save); + rrd_update_derive(&rrd_key, stat.read_ios as f64); let rrd_key = format!("{}/read_bytes", rrd_prefix); - rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64, save); + rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64); let rrd_key = format!("{}/write_ios", rrd_prefix); - rrd_update_derive(&rrd_key, stat.write_ios as f64, save); + rrd_update_derive(&rrd_key, stat.write_ios as f64); let rrd_key = format!("{}/write_bytes", rrd_prefix); - rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64, save); + rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64); let rrd_key = format!("{}/io_ticks", rrd_prefix); - rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0, save); + rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0); } } Err(err) => { diff --git a/src/lib.rs b/src/lib.rs index 98b6b987..5d2b4590 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,10 +51,13 @@ lazy_static::lazy_static!{ .owner(backup_user.uid) .group(backup_user.gid); + let apply_interval = 30.0*60.0; // 30 minutes + RRDCache::new( "/var/lib/proxmox-backup/rrdb", Some(file_options), Some(dir_options), - ) + apply_interval, + ).unwrap() }; } -- 2.30.2