From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 505E475440 for ; Wed, 13 Oct 2021 10:25:41 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 389C5E89C for ; Wed, 13 Oct 2021 10:25:11 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id D7CE2E5B7 for ; Wed, 13 Oct 2021 10:25:00 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id A646A4451C; Wed, 13 Oct 2021 10:25:00 +0200 (CEST) From: Dietmar Maurer To: pbs-devel@lists.proxmox.com Date: Wed, 13 Oct 2021 10:24:41 +0200 Message-Id: <20211013082452.619406-5-dietmar@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20211013082452.619406-1-dietmar@proxmox.com> References: <20211013082452.619406-1-dietmar@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.536 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [cache.rs, raa.data, lib.rs, status.rs, rrd.rs, self.data, item.cf, proxmox-backup-proxy.rs, ietf.org, self.cf] Subject: [pbs-devel] [PATCH proxmox-backup 04/15] proxmox-rrd: implement new CBOR based format X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 13 Oct 2021 08:25:41 -0000 Storing much more data points now got get better graphs. --- proxmox-rrd-api-types/src/lib.rs | 23 +- proxmox-rrd/Cargo.toml | 4 + proxmox-rrd/src/cache.rs | 51 ++- proxmox-rrd/src/lib.rs | 19 +- proxmox-rrd/src/rrd.rs | 534 +++++++++++++++---------------- proxmox-rrd/src/rrd_v1.rs | 296 +++++++++++++++++ src/api2/node/rrd.rs | 39 ++- src/api2/status.rs | 4 +- src/bin/proxmox-backup-proxy.rs | 2 +- 9 files changed, 641 insertions(+), 331 deletions(-) create mode 100644 proxmox-rrd/src/rrd_v1.rs diff --git a/proxmox-rrd-api-types/src/lib.rs b/proxmox-rrd-api-types/src/lib.rs index b5e62e73..32601477 100644 --- a/proxmox-rrd-api-types/src/lib.rs +++ b/proxmox-rrd-api-types/src/lib.rs @@ -14,19 +14,20 @@ pub enum RRDMode { } #[api()] -#[repr(u64)] #[derive(Copy, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] /// RRD time frame resolution pub enum RRDTimeFrameResolution { - /// 1 min => last 70 minutes - Hour = 60, - /// 30 min => last 35 hours - Day = 60*30, - /// 3 hours => about 8 days - Week = 60*180, - /// 12 hours => last 35 days - Month = 60*720, - /// 1 week => last 490 days - Year = 60*10080, + /// Hour + Hour, + /// Day + Day, + /// Week + Week, + /// Month + Month, + /// Year + Year, + /// Decade (10 years) + Decade, } diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml index c66344ac..b3dd02c3 100644 --- a/proxmox-rrd/Cargo.toml +++ b/proxmox-rrd/Cargo.toml @@ -10,8 +10,12 @@ anyhow = "1.0" bitflags = "1.2.1" log = "0.4" nix = "0.19.1" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_cbor = "0.11.1" proxmox = { version = "0.14.0" } proxmox-time = "1" +proxmox-schema = { version = "1", features = [ "api-macro" ] } proxmox-rrd-api-types = { path = "../proxmox-rrd-api-types" } diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs index 7c56e047..f14837fc 100644 --- a/proxmox-rrd/src/cache.rs +++ b/proxmox-rrd/src/cache.rs @@ -13,7 +13,7 @@ use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions} use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution}; -use crate::{DST, rrd::RRD}; +use crate::rrd::{DST, CF, RRD, RRA}; const RRD_JOURNAL_NAME: &str = "rrd.journal"; @@ -81,6 +81,29 @@ impl RRDCache { }) } + fn create_default_rrd(dst: DST) -> RRD { + + let mut rra_list = Vec::new(); + + // 1min * 1440 => 1day + rra_list.push(RRA::new(CF::Average, 60, 1440)); + rra_list.push(RRA::new(CF::Maximum, 60, 1440)); + + // 30min * 1440 => 30days = 1month + rra_list.push(RRA::new(CF::Average, 30*60, 1440)); + rra_list.push(RRA::new(CF::Maximum, 30*60, 1440)); + + // 6h * 1440 => 360days = 1year + rra_list.push(RRA::new(CF::Average, 6*3600, 1440)); + rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440)); + + // 1week * 570 => 10years + rra_list.push(RRA::new(CF::Average, 7*86400, 570)); + rra_list.push(RRA::new(CF::Maximum, 7*86400, 570)); + + RRD::new(dst, rra_list) + } + fn parse_journal_line(line: &str) -> Result { let line = line.trim(); @@ -179,7 +202,7 @@ impl RRDCache { if err.kind() != std::io::ErrorKind::NotFound { log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err); } - RRD::new(entry.dst) + Self::create_default_rrd(entry.dst) }, }; if entry.time > get_last_update(&entry.rel_path, &rrd) { @@ -246,7 +269,7 @@ impl RRDCache { if err.kind() != std::io::ErrorKind::NotFound { log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err); } - RRD::new(dst) + Self::create_default_rrd(dst) }, }; rrd.update(now, value); @@ -264,13 +287,29 @@ impl RRDCache { now: f64, timeframe: RRDTimeFrameResolution, mode: RRDMode, - ) -> Option<(u64, u64, Vec>)> { + ) -> Result>)>, Error> { let state = self.state.read().unwrap(); + let cf = match mode { + RRDMode::Max => CF::Maximum, + RRDMode::Average => CF::Average, + }; + + let now = now as u64; + + let (start, resolution) = match timeframe { + RRDTimeFrameResolution::Hour => (now - 3600, 60), + RRDTimeFrameResolution::Day => (now - 3600*24, 60), + RRDTimeFrameResolution::Week => (now - 3600*24*7, 30*60), + RRDTimeFrameResolution::Month => (now - 3600*24*30, 30*60), + RRDTimeFrameResolution::Year => (now - 3600*24*365, 6*60*60), + RRDTimeFrameResolution::Decade => (now - 10*3600*24*366, 7*86400), + }; + match state.rrd_map.get(&format!("{}/{}", base, name)) { - Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)), - None => None, + Some(rrd) => Ok(Some(rrd.extract_data(start, now, cf, resolution)?)), + None => Ok(None), } } } diff --git a/proxmox-rrd/src/lib.rs b/proxmox-rrd/src/lib.rs index d83e6ffc..2038170d 100644 --- a/proxmox-rrd/src/lib.rs +++ b/proxmox-rrd/src/lib.rs @@ -1,23 +1,14 @@ -//! # Simple Round Robin Database files with fixed format +//! # Round Robin Database files //! //! ## Features //! //! * One file stores a single data source -//! * Small/constant file size (6008 bytes) -//! * Stores avarage and maximum values -//! * Stores data for different time resolution ([RRDTimeFrameResolution](proxmox_rrd_api_types::RRDTimeFrameResolution)) +//! * Stores data for different time resolution +//! * Simple cache implementation with journal support + +mod rrd_v1; pub mod rrd; mod cache; pub use cache::*; - -/// RRD data source tyoe -#[repr(u8)] -#[derive(Copy, Clone)] -pub enum DST { - /// Gauge values are stored unmodified. - Gauge = 0, - /// Stores the difference to the previous value. - Derive = 1, -} diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs index 026498ed..82fa5a3a 100644 --- a/proxmox-rrd/src/rrd.rs +++ b/proxmox-rrd/src/rrd.rs @@ -1,82 +1,175 @@ -//! # Round Robin Database file format +//! # Proxmox RRD format version 2 +//! +//! The new format uses +//! [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) as storage +//! format. This way we can use the serde serialization framework, +//! which make our code more flexible, much nicer and type safe. +//! +//! ## Features +//! +//! * Well defined data format [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) +//! * Plattform independent (big endian f64, hopefully a standard format?) +//! * Arbitrary number of RRAs (dynamically changeable) -use std::io::Read; use std::path::Path; use anyhow::{bail, Error}; -use bitflags::bitflags; -use proxmox::tools::{fs::replace_file, fs::CreateOptions}; +use serde::{Serialize, Deserialize}; + +use proxmox::tools::fs::{replace_file, CreateOptions}; +use proxmox_schema::api; + +use crate::rrd_v1; + +/// Proxmox RRD v2 file magic number +// openssl::sha::sha256(b"Proxmox Round Robin Database file v2.0")[0..8]; +pub const PROXMOX_RRD_MAGIC_2_0: [u8; 8] = [224, 200, 228, 27, 239, 112, 122, 159]; + +#[api()] +#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)] +#[serde(rename_all = "kebab-case")] +/// RRD data source type +pub enum DST { + /// Gauge values are stored unmodified. + Gauge, + /// Stores the difference to the previous value. + Derive, + /// Stores the difference to the previous value (like Derive), but + /// detect counter overflow (and ignores that value) + Counter, +} + +#[api()] +#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)] +#[serde(rename_all = "kebab-case")] +/// Consolidation function +pub enum CF { + /// Average + Average, + /// Maximum + Maximum, + /// Minimum + Minimum, +} + +#[derive(Serialize, Deserialize)] +pub struct DataSource { + /// Data source type + pub dst: DST, + /// Last update time (epoch) + pub last_update: f64, + /// Stores the last value, used to compute differential value for + /// derive/counters + pub counter_value: f64, +} -use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution}; +impl DataSource { -/// The number of data entries per RRA -pub const RRD_DATA_ENTRIES: usize = 70; + pub fn new(dst: DST) -> Self { + Self { + dst, + last_update: 0.0, + counter_value: f64::NAN, + } + } -/// Proxmox RRD file magic number -// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8]; -pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186]; + fn compute_new_value(&mut self, time: f64, mut value: f64) -> Result { + if time <= self.last_update { + bail!("time in past ({} < {})", time, self.last_update); + } -use crate::DST; + if value.is_nan() { + bail!("new value is NAN"); + } -bitflags!{ - /// Flags to specify the data soure type and consolidation function - pub struct RRAFlags: u64 { - // Data Source Types - const DST_GAUGE = 1; - const DST_DERIVE = 2; - const DST_COUNTER = 4; - const DST_MASK = 255; // first 8 bits + // derive counter value + let is_counter = self.dst == DST::Counter; + + if is_counter || self.dst == DST::Derive { + let time_diff = time - self.last_update; + + let diff = if self.counter_value.is_nan() { + 0.0 + } else if is_counter && value < 0.0 { + bail!("got negative value for counter"); + } else if is_counter && value < self.counter_value { + // Note: We do not try automatic overflow corrections, but + // we update counter_value anyways, so that we can compute the diff + // next time. + self.counter_value = value; + bail!("conter overflow/reset detected"); + } else { + value - self.counter_value + }; + self.counter_value = value; + value = diff/time_diff; + } - // Consolidation Functions - const CF_AVERAGE = 1 << 8; - const CF_MAX = 2 << 8; - const CF_MASK = 255 << 8; + Ok(value) } + + } -/// Round Robin Archive with [RRD_DATA_ENTRIES] data slots. -/// -/// This data structure is used inside [RRD] and directly written to the -/// RRD files. -#[repr(C)] +#[derive(Serialize, Deserialize)] pub struct RRA { - /// Defined the data soure type and consolidation function - pub flags: RRAFlags, - /// Resulution (seconds) from [RRDTimeFrameResolution] pub resolution: u64, - /// Last update time (epoch) - pub last_update: f64, + pub cf: CF, /// Count values computed inside this update interval pub last_count: u64, - /// Stores the last value, used to compute differential value for derive/counters - pub counter_value: f64, - /// Data slots - pub data: [f64; RRD_DATA_ENTRIES], + /// The actual data + pub data: Vec, } impl RRA { - fn new(flags: RRAFlags, resolution: u64) -> Self { + + pub fn new(cf: CF, resolution: u64, points: usize) -> Self { Self { - flags, resolution, - last_update: 0.0, + cf, + resolution, last_count: 0, - counter_value: f64::NAN, - data: [f64::NAN; RRD_DATA_ENTRIES], + data: vec![f64::NAN; points], + } + } + + // directly overwrite data slots + // the caller need to set last_update value on the DataSource manually. + pub(crate) fn insert_data( + &mut self, + start: u64, + resolution: u64, + data: Vec>, + ) -> Result<(), Error> { + if resolution != self.resolution { + bail!("inser_data failed: got wrong resolution"); + } + let num_entries = self.data.len() as u64; + let mut index = ((start/self.resolution) % num_entries) as usize; + + for i in 0..data.len() { + if let Some(v) = data[i] { + self.data[index] = v; + } + index += 1; + if index >= self.data.len() { index = 0; } } + Ok(()) } - fn delete_old(&mut self, time: f64) { + fn delete_old_slots(&mut self, time: f64, last_update: f64) { let epoch = time as u64; - let last_update = self.last_update as u64; + let last_update = last_update as u64; let reso = self.resolution; + let num_entries = self.data.len() as u64; - let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso; + let min_time = epoch - num_entries*reso; let min_time = (min_time/reso + 1)*reso; - let mut t = last_update.saturating_sub((RRD_DATA_ENTRIES as u64)*reso); - let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; - for _ in 0..RRD_DATA_ENTRIES { - t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + let mut t = last_update.saturating_sub(num_entries*reso); + let mut index = ((t/reso) % num_entries) as usize; + for _ in 0..num_entries { + t += reso; + index = (index + 1) % (num_entries as usize); if t < min_time { self.data[index] = f64::NAN; } else { @@ -85,13 +178,14 @@ impl RRA { } } - fn compute_new_value(&mut self, time: f64, value: f64) { + fn compute_new_value(&mut self, time: f64, last_update: f64, value: f64) { let epoch = time as u64; - let last_update = self.last_update as u64; + let last_update = last_update as u64; let reso = self.resolution; + let num_entries = self.data.len() as u64; - let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize; - let last_index = ((last_update/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + let index = ((epoch/reso) % num_entries) as usize; + let last_index = ((last_update/reso) % num_entries) as usize; if (epoch - (last_update as u64)) > reso || index != last_index { self.last_count = 0; @@ -112,258 +206,111 @@ impl RRA { self.data[index] = value; self.last_count = 1; } else { - let new_value = if self.flags.contains(RRAFlags::CF_MAX) { - if last_value > value { last_value } else { value } - } else if self.flags.contains(RRAFlags::CF_AVERAGE) { - (last_value*(self.last_count as f64))/(new_count as f64) - + value/(new_count as f64) - } else { - log::error!("rrdb update failed - unknown CF"); - return; + let new_value = match self.cf { + CF::Maximum => if last_value > value { last_value } else { value }, + CF::Minimum => if last_value < value { last_value } else { value }, + CF::Average => { + (last_value*(self.last_count as f64))/(new_count as f64) + + value/(new_count as f64) + } }; self.data[index] = new_value; self.last_count = new_count; } - self.last_update = time; } - // Note: This may update the state even in case of errors (see counter overflow) - fn update(&mut self, time: f64, mut value: f64) -> Result<(), Error> { - - if time <= self.last_update { - bail!("time in past ({} < {})", time, self.last_update); - } - - if value.is_nan() { - bail!("new value is NAN"); - } - - // derive counter value - if self.flags.intersects(RRAFlags::DST_DERIVE | RRAFlags::DST_COUNTER) { - let time_diff = time - self.last_update; - let is_counter = self.flags.contains(RRAFlags::DST_COUNTER); - - let diff = if self.counter_value.is_nan() { - 0.0 - } else if is_counter && value < 0.0 { - bail!("got negative value for counter"); - } else if is_counter && value < self.counter_value { - // Note: We do not try automatic overflow corrections, but - // we update counter_value anyways, so that we can compute the diff - // next time. - self.counter_value = value; - bail!("conter overflow/reset detected"); - } else { - value - self.counter_value - }; - self.counter_value = value; - value = diff/time_diff; - } - - self.delete_old(time); - self.compute_new_value(time, value); - - Ok(()) - } -} - -/// Round Robin Database file format with fixed number of [RRA]s -#[repr(C)] -// Note: Avoid alignment problems by using 8byte types only -pub struct RRD { - /// The magic number to identify the file type - pub magic: [u8; 8], - /// Hourly data (average values) - pub hour_avg: RRA, - /// Hourly data (maximum values) - pub hour_max: RRA, - /// Dayly data (average values) - pub day_avg: RRA, - /// Dayly data (maximum values) - pub day_max: RRA, - /// Weekly data (average values) - pub week_avg: RRA, - /// Weekly data (maximum values) - pub week_max: RRA, - /// Monthly data (average values) - pub month_avg: RRA, - /// Monthly data (maximum values) - pub month_max: RRA, - /// Yearly data (average values) - pub year_avg: RRA, - /// Yearly data (maximum values) - pub year_max: RRA, -} - -impl RRD { - - /// Create a new empty instance - pub fn new(dst: DST) -> Self { - let flags = match dst { - DST::Gauge => RRAFlags::DST_GAUGE, - DST::Derive => RRAFlags::DST_DERIVE, - }; - - Self { - magic: PROXMOX_RRD_MAGIC_1_0, - hour_avg: RRA::new( - flags | RRAFlags::CF_AVERAGE, - RRDTimeFrameResolution::Hour as u64, - ), - hour_max: RRA::new( - flags | RRAFlags::CF_MAX, - RRDTimeFrameResolution::Hour as u64, - ), - day_avg: RRA::new( - flags | RRAFlags::CF_AVERAGE, - RRDTimeFrameResolution::Day as u64, - ), - day_max: RRA::new( - flags | RRAFlags::CF_MAX, - RRDTimeFrameResolution::Day as u64, - ), - week_avg: RRA::new( - flags | RRAFlags::CF_AVERAGE, - RRDTimeFrameResolution::Week as u64, - ), - week_max: RRA::new( - flags | RRAFlags::CF_MAX, - RRDTimeFrameResolution::Week as u64, - ), - month_avg: RRA::new( - flags | RRAFlags::CF_AVERAGE, - RRDTimeFrameResolution::Month as u64, - ), - month_max: RRA::new( - flags | RRAFlags::CF_MAX, - RRDTimeFrameResolution::Month as u64, - ), - year_avg: RRA::new( - flags | RRAFlags::CF_AVERAGE, - RRDTimeFrameResolution::Year as u64, - ), - year_max: RRA::new( - flags | RRAFlags::CF_MAX, - RRDTimeFrameResolution::Year as u64, - ), - } - } - - /// Extract data from the archive - pub fn extract_data( + fn extract_data( &self, - time: f64, - timeframe: RRDTimeFrameResolution, - mode: RRDMode, + start: u64, + end: u64, + last_update: f64, ) -> (u64, u64, Vec>) { - - let epoch = time as u64; - let reso = timeframe as u64; - - let end = reso*(epoch/reso + 1); - let start = end - reso*(RRD_DATA_ENTRIES as u64); + let last_update = last_update as u64; + let reso = self.resolution; + let num_entries = self.data.len() as u64; let mut list = Vec::new(); - let raa = match (mode, timeframe) { - (RRDMode::Average, RRDTimeFrameResolution::Hour) => &self.hour_avg, - (RRDMode::Max, RRDTimeFrameResolution::Hour) => &self.hour_max, - (RRDMode::Average, RRDTimeFrameResolution::Day) => &self.day_avg, - (RRDMode::Max, RRDTimeFrameResolution::Day) => &self.day_max, - (RRDMode::Average, RRDTimeFrameResolution::Week) => &self.week_avg, - (RRDMode::Max, RRDTimeFrameResolution::Week) => &self.week_max, - (RRDMode::Average, RRDTimeFrameResolution::Month) => &self.month_avg, - (RRDMode::Max, RRDTimeFrameResolution::Month) => &self.month_max, - (RRDMode::Average, RRDTimeFrameResolution::Year) => &self.year_avg, - (RRDMode::Max, RRDTimeFrameResolution::Year) => &self.year_max, - }; - - let rrd_end = reso*((raa.last_update as u64)/reso); - let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64); + let rrd_end = reso*(last_update/reso); + let rrd_start = rrd_end.saturating_sub(reso*num_entries); let mut t = start; - let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; - for _ in 0..RRD_DATA_ENTRIES { + let mut index = ((t/reso) % num_entries) as usize; + for _ in 0..num_entries { + if t > end { break; }; if t < rrd_start || t > rrd_end { list.push(None); } else { - let value = raa.data[index]; + let value = self.data[index]; if value.is_nan() { list.push(None); } else { list.push(Some(value)); } } - t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + t += reso; index = (index + 1) % (num_entries as usize); } (start, reso, list) } +} - /// Create instance from raw data, testing data len and magic number - pub fn from_raw(mut raw: &[u8]) -> Result { - let expected_len = std::mem::size_of::(); - if raw.len() != expected_len { - let msg = format!("wrong data size ({} != {})", raw.len(), expected_len); - return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); - } +#[derive(Serialize, Deserialize)] +pub struct RRD { + pub source: DataSource, + pub rra_list: Vec, +} - let mut rrd: RRD = unsafe { std::mem::zeroed() }; - unsafe { - let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len); - raw.read_exact(rrd_slice)?; - } +impl RRD { - if rrd.magic != PROXMOX_RRD_MAGIC_1_0 { - let msg = "wrong magic number".to_string(); - return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + pub fn new(dst: DST, rra_list: Vec) -> RRD { + + let source = DataSource::new(dst); + + RRD { + source, + rra_list, } - Ok(rrd) } /// Load data from a file pub fn load(path: &Path) -> Result { let raw = std::fs::read(path)?; - Self::from_raw(&raw) + if raw.len() < 8 { + let msg = format!("not an rrd file - file is too small ({})", raw.len()); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + if raw[0..8] == rrd_v1::PROXMOX_RRD_MAGIC_1_0 { + let v1 = rrd_v1::RRDv1::from_raw(&raw)?; + v1.to_rrd_v2() + .map_err(|err| { + let msg = format!("unable to convert from old V1 format - {}", err); + std::io::Error::new(std::io::ErrorKind::Other, msg) + }) + } else if raw[0..8] == PROXMOX_RRD_MAGIC_2_0 { + serde_cbor::from_slice(&raw[8..]) + .map_err(|err| { + let msg = format!("unable to decode RRD file - {}", err); + std::io::Error::new(std::io::ErrorKind::Other, msg) + }) + } else { + let msg = format!("not an rrd file - unknown magic number"); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } } /// Store data into a file (atomic replace file) pub fn save(&self, filename: &Path, options: CreateOptions) -> Result<(), Error> { - let rrd_slice = unsafe { - std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::()) - }; - replace_file(filename, rrd_slice, options) + let mut data: Vec = Vec::new(); + data.extend(&PROXMOX_RRD_MAGIC_2_0); + serde_cbor::to_writer(&mut data, self)?; + replace_file(filename, &data, options) } pub fn last_update(&self) -> f64 { - - let mut last_update = 0.0; - - { - let mut check_last_update = |rra: &RRA| { - if rra.last_update > last_update { - last_update = rra.last_update; - } - }; - - check_last_update(&self.hour_avg); - check_last_update(&self.hour_max); - - check_last_update(&self.day_avg); - check_last_update(&self.day_max); - - check_last_update(&self.week_avg); - check_last_update(&self.week_max); - - check_last_update(&self.month_avg); - check_last_update(&self.month_max); - - check_last_update(&self.year_avg); - check_last_update(&self.year_max); - } - - last_update + self.source.last_update } /// Update the value (in memory) @@ -371,32 +318,53 @@ impl RRD { /// Note: This does not call [Self::save]. pub fn update(&mut self, time: f64, value: f64) { - let mut log_error = true; - - let mut update_rra = |rra: &mut RRA| { - if let Err(err) = rra.update(time, value) { - if log_error { - log::error!("rrd update failed: {}", err); - // we only log the first error, because it is very - // likely other calls produce the same error - log_error = false; - } + let value = match self.source.compute_new_value(time, value) { + Ok(value) => value, + Err(err) => { + log::error!("rrd update failed: {}", err); + return; } }; - update_rra(&mut self.hour_avg); - update_rra(&mut self.hour_max); - - update_rra(&mut self.day_avg); - update_rra(&mut self.day_max); + let last_update = self.source.last_update; + self.source.last_update = time; - update_rra(&mut self.week_avg); - update_rra(&mut self.week_max); + for rra in self.rra_list.iter_mut() { + rra.delete_old_slots(time, last_update); + rra.compute_new_value(time, last_update, value); + } + } - update_rra(&mut self.month_avg); - update_rra(&mut self.month_max); + /// Extract data from the archive + /// + /// This selects the RRA with specified [CF] and (minimum) + /// resolution, and extract data from `start` to `end`. + pub fn extract_data( + &self, + start: u64, + end: u64, + cf: CF, + resolution: u64, + ) -> Result<(u64, u64, Vec>), Error> { + + let mut rra: Option<&RRA> = None; + for item in self.rra_list.iter() { + if item.cf != cf { continue; } + if item.resolution > resolution { continue; } + + if let Some(current) = rra { + if item.resolution > current.resolution { + rra = Some(item); + } + } else { + rra = Some(item); + } + } - update_rra(&mut self.year_avg); - update_rra(&mut self.year_max); + match rra { + Some(rra) => Ok(rra.extract_data(start, end, self.source.last_update)), + None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution), + } } + } diff --git a/proxmox-rrd/src/rrd_v1.rs b/proxmox-rrd/src/rrd_v1.rs new file mode 100644 index 00000000..919896f0 --- /dev/null +++ b/proxmox-rrd/src/rrd_v1.rs @@ -0,0 +1,296 @@ +use std::io::Read; + +use anyhow::Error; +use bitflags::bitflags; + +/// The number of data entries per RRA +pub const RRD_DATA_ENTRIES: usize = 70; + +/// Proxmox RRD file magic number +// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8]; +pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186]; + +use crate::rrd::{RRD, RRA, CF, DST, DataSource}; + +bitflags!{ + /// Flags to specify the data soure type and consolidation function + pub struct RRAFlags: u64 { + // Data Source Types + const DST_GAUGE = 1; + const DST_DERIVE = 2; + const DST_COUNTER = 4; + const DST_MASK = 255; // first 8 bits + + // Consolidation Functions + const CF_AVERAGE = 1 << 8; + const CF_MAX = 2 << 8; + const CF_MASK = 255 << 8; + } +} + +/// Round Robin Archive with [RRD_DATA_ENTRIES] data slots. +/// +/// This data structure is used inside [RRD] and directly written to the +/// RRD files. +#[repr(C)] +pub struct RRAv1 { + /// Defined the data soure type and consolidation function + pub flags: RRAFlags, + /// Resulution (seconds) from [RRDTimeFrameResolution] + pub resolution: u64, + /// Last update time (epoch) + pub last_update: f64, + /// Count values computed inside this update interval + pub last_count: u64, + /// Stores the last value, used to compute differential value for derive/counters + pub counter_value: f64, + /// Data slots + pub data: [f64; RRD_DATA_ENTRIES], +} + +impl RRAv1 { + + fn extract_data( + &self, + ) -> (u64, u64, Vec>) { + let reso = self.resolution; + + let mut list = Vec::new(); + + let rra_end = reso*((self.last_update as u64)/reso); + let rra_start = rra_end - reso*(RRD_DATA_ENTRIES as u64); + + let mut t = rra_start; + let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize; + for _ in 0..RRD_DATA_ENTRIES { + let value = self.data[index]; + if value.is_nan() { + list.push(None); + } else { + list.push(Some(value)); + } + + t += reso; index = (index + 1) % RRD_DATA_ENTRIES; + } + + (rra_start, reso, list) + } +} + +/// Round Robin Database file format with fixed number of [RRA]s +#[repr(C)] +// Note: Avoid alignment problems by using 8byte types only +pub struct RRDv1 { + /// The magic number to identify the file type + pub magic: [u8; 8], + /// Hourly data (average values) + pub hour_avg: RRAv1, + /// Hourly data (maximum values) + pub hour_max: RRAv1, + /// Dayly data (average values) + pub day_avg: RRAv1, + /// Dayly data (maximum values) + pub day_max: RRAv1, + /// Weekly data (average values) + pub week_avg: RRAv1, + /// Weekly data (maximum values) + pub week_max: RRAv1, + /// Monthly data (average values) + pub month_avg: RRAv1, + /// Monthly data (maximum values) + pub month_max: RRAv1, + /// Yearly data (average values) + pub year_avg: RRAv1, + /// Yearly data (maximum values) + pub year_max: RRAv1, +} + +impl RRDv1 { + + pub fn from_raw(mut raw: &[u8]) -> Result { + + let expected_len = std::mem::size_of::(); + + if raw.len() != expected_len { + let msg = format!("wrong data size ({} != {})", raw.len(), expected_len); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + let mut rrd: RRDv1 = unsafe { std::mem::zeroed() }; + unsafe { + let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len); + raw.read_exact(rrd_slice)?; + } + + if rrd.magic != PROXMOX_RRD_MAGIC_1_0 { + let msg = "wrong magic number".to_string(); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + Ok(rrd) + } + + pub fn to_rrd_v2(&self) -> Result { + + let mut rra_list = Vec::new(); + + // old format v1: + // + // hour 1 min, 70 points + // day 30 min, 70 points + // week 3 hours, 70 points + // month 12 hours, 70 points + // year 1 week, 70 points + // + // new default for RRD v2: + // + // day 1 min, 1440 points + // month 30 min, 1440 points + // year 365 min (6h), 1440 points + // decade 1 week, 570 points + + // Linear extrapolation + fn extrapolate_data(start: u64, reso: u64, factor: u64, data: Vec>) -> (u64, u64, Vec>) { + + let mut new = Vec::new(); + + for i in 0..data.len() { + let mut next = i + 1; + if next >= data.len() { next = 0 }; + let v = data[i]; + let v1 = data[next]; + match (v, v1) { + (Some(v), Some(v1)) => { + let diff = (v1 - v)/(factor as f64); + for j in 0..factor { + new.push(Some(v + diff*(j as f64))); + } + } + (Some(v), None) => { + new.push(Some(v)); + for _ in 0..factor-1 { + new.push(None); + } + } + (None, Some(v1)) => { + for _ in 0..factor-1 { + new.push(None); + } + new.push(Some(v1)); + } + (None, None) => { + for _ in 0..factor { + new.push(None); + } + } + } + } + + (start, reso/factor, new) + } + + // Try to convert to new, higher capacity format + + // compute daily average (merge old self.day_avg and self.hour_avg + let mut day_avg = RRA::new(CF::Average, 60, 1440); + + let (start, reso, data) = self.day_avg.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 30, data); + day_avg.insert_data(start, reso, data)?; + + let (start, reso, data) = self.hour_avg.extract_data(); + day_avg.insert_data(start, reso, data)?; + + // compute daily maximum (merge old self.day_max and self.hour_max + let mut day_max = RRA::new(CF::Maximum, 60, 1440); + + let (start, reso, data) = self.day_max.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 30, data); + day_max.insert_data(start, reso, data)?; + + let (start, reso, data) = self.hour_max.extract_data(); + day_max.insert_data(start, reso, data)?; + + // compute montly average (merge old self.month_avg, + // self.week_avg and self.day_avg) + let mut month_avg = RRA::new(CF::Average, 30*60, 1440); + + let (start, reso, data) = self.month_avg.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 24, data); + month_avg.insert_data(start, reso, data)?; + + let (start, reso, data) = self.week_avg.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 6, data); + month_avg.insert_data(start, reso, data)?; + + let (start, reso, data) = self.day_avg.extract_data(); + month_avg.insert_data(start, reso, data)?; + + // compute montly maximum (merge old self.month_max, + // self.week_max and self.day_max) + let mut month_max = RRA::new(CF::Maximum, 30*60, 1440); + + let (start, reso, data) = self.month_max.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 24, data); + month_max.insert_data(start, reso, data)?; + + let (start, reso, data) = self.week_max.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 6, data); + month_max.insert_data(start, reso, data)?; + + let (start, reso, data) = self.day_max.extract_data(); + month_max.insert_data(start, reso, data)?; + + // compute yearly average (merge old self.year_avg) + let mut year_avg = RRA::new(CF::Average, 6*3600, 1440); + + let (start, reso, data) = self.year_avg.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 28, data); + year_avg.insert_data(start, reso, data)?; + + // compute yearly maximum (merge old self.year_avg) + let mut year_max = RRA::new(CF::Maximum, 6*3600, 1440); + + let (start, reso, data) = self.year_max.extract_data(); + let (start, reso, data) = extrapolate_data(start, reso, 28, data); + year_max.insert_data(start, reso, data)?; + + // compute decade average (merge old self.year_avg) + let mut decade_avg = RRA::new(CF::Average, 7*86400, 570); + let (start, reso, data) = self.year_avg.extract_data(); + decade_avg.insert_data(start, reso, data)?; + + // compute decade maximum (merge old self.year_max) + let mut decade_max = RRA::new(CF::Maximum, 7*86400, 570); + let (start, reso, data) = self.year_max.extract_data(); + decade_max.insert_data(start, reso, data)?; + + rra_list.push(day_avg); + rra_list.push(day_max); + rra_list.push(month_avg); + rra_list.push(month_max); + rra_list.push(year_avg); + rra_list.push(year_max); + rra_list.push(decade_avg); + rra_list.push(decade_max); + + // use values from hour_avg for source (all RRAv1 must have the same config) + let dst = if self.hour_avg.flags.contains(RRAFlags::DST_COUNTER) { + DST::Counter + } else if self.hour_avg.flags.contains(RRAFlags::DST_DERIVE) { + DST::Derive + } else { + DST::Gauge + }; + + let source = DataSource { + dst, + counter_value: f64::NAN, + last_update: self.hour_avg.last_update, // IMPORTANT! + }; + Ok(RRD { + source, + rra_list, + }) + } +} diff --git a/src/api2/node/rrd.rs b/src/api2/node/rrd.rs index b53076d7..4fe18f46 100644 --- a/src/api2/node/rrd.rs +++ b/src/api2/node/rrd.rs @@ -1,5 +1,6 @@ -use anyhow::Error; +use anyhow::{bail, Error}; use serde_json::{Value, json}; +use std::collections::BTreeMap; use proxmox_router::{Permission, Router}; use proxmox_schema::api; @@ -8,8 +9,6 @@ use pbs_api_types::{ NODE_SCHEMA, RRDMode, RRDTimeFrameResolution, PRIV_SYS_AUDIT, }; -use proxmox_rrd::rrd::RRD_DATA_ENTRIES; - use crate::get_rrd_cache; pub fn create_value_from_rrd( @@ -19,32 +18,44 @@ pub fn create_value_from_rrd( cf: RRDMode, ) -> Result { - let mut result = Vec::new(); + let mut result: Vec = Vec::new(); let now = proxmox_time::epoch_f64(); let rrd_cache = get_rrd_cache()?; + let mut timemap = BTreeMap::new(); + + let mut last_resolution = None; + for name in list { - let (start, reso, list) = match rrd_cache.extract_cached_data(basedir, name, now, timeframe, cf) { + let (start, reso, data) = match rrd_cache.extract_cached_data(basedir, name, now, timeframe, cf)? { Some(result) => result, None => continue, }; + if let Some(expected_resolution) = last_resolution { + if reso != expected_resolution { + bail!("got unexpected RRD resolution ({} != {})", reso, expected_resolution); + } + } else { + last_resolution = Some(reso); + } + let mut t = start; - for index in 0..RRD_DATA_ENTRIES { - if result.len() <= index { - if let Some(value) = list[index] { - result.push(json!({ "time": t, *name: value })); - } else { - result.push(json!({ "time": t })); - } - } else if let Some(value) = list[index] { - result[index][name] = value.into(); + + for index in 0..data.len() { + let entry = timemap.entry(t).or_insert(json!({ "time": t })); + if let Some(value) = data[index] { + entry[*name] = value.into(); } t += reso; } } + for item in timemap.values() { + result.push(item.clone()); + } + Ok(result.into()) } diff --git a/src/api2/status.rs b/src/api2/status.rs index 40a9e5b4..2476fe97 100644 --- a/src/api2/status.rs +++ b/src/api2/status.rs @@ -134,8 +134,8 @@ pub fn datastore_status( RRDMode::Average, ); - let total_res = get_rrd("total"); - let used_res = get_rrd("used"); + let total_res = get_rrd("total")?; + let used_res = get_rrd("used")?; if let (Some((start, reso, total_list)), Some((_, _, used_list))) = (total_res, used_res) { let mut usage_list: Vec = Vec::new(); diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 0f0f6f59..2abc3eaa 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -24,7 +24,7 @@ use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation}; use pbs_tools::{task_log, task_warn}; use pbs_datastore::DataStore; -use proxmox_rrd::DST; +use proxmox_rrd::rrd::DST; use proxmox_rest_server::{ rotate_task_log_archive, extract_cookie , AuthError, ApiConfig, RestServer, RestEnvironment, -- 2.30.2