* [pbs-devel] [patch proxmox-backup 1/2] proxmox-rrd: use a journal to reduce amount of bytes written
@ 2021-10-08 8:04 Dietmar Maurer
2021-10-08 8:04 ` [pbs-devel] [patch proxmox-backup 2/2] RRD_CACHE: use a OnceCell instead of lazy_static Dietmar Maurer
0 siblings, 1 reply; 2+ messages in thread
From: Dietmar Maurer @ 2021-10-08 8:04 UTC (permalink / raw)
To: pbs-devel
Apply the journal every 30 seconds.
---
proxmox-rrd/Cargo.toml | 1 +
proxmox-rrd/src/cache.rs | 223 ++++++++++++++++++++++++++++----
proxmox-rrd/src/lib.rs | 6 +-
proxmox-rrd/src/rrd.rs | 30 +++++
src/bin/proxmox-backup-api.rs | 2 +-
src/bin/proxmox-backup-proxy.rs | 54 ++++----
src/lib.rs | 5 +-
7 files changed, 261 insertions(+), 60 deletions(-)
diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml
index 19db5bf6..1c75b2e1 100644
--- a/proxmox-rrd/Cargo.toml
+++ b/proxmox-rrd/Cargo.toml
@@ -9,6 +9,7 @@ description = "Simple RRD database implementation."
anyhow = "1.0"
bitflags = "1.2.1"
log = "0.4"
+nix = "0.19.1"
proxmox = { version = "0.13.5", features = ["api-macro"] }
diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs
index 1084a037..57cbb394 100644
--- a/proxmox-rrd/src/cache.rs
+++ b/proxmox-rrd/src/cache.rs
@@ -1,24 +1,46 @@
+use std::fs::File;
use std::path::{Path, PathBuf};
use std::collections::HashMap;
-use std::sync::{RwLock};
+use std::sync::RwLock;
+use std::io::Write;
+use std::io::{BufRead, BufReader};
+use std::os::unix::io::AsRawFd;
-use anyhow::{format_err, Error};
+use anyhow::{format_err, bail, Error};
+use nix::fcntl::OFlag;
-use proxmox::tools::fs::{create_path, CreateOptions};
+use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
use crate::{DST, rrd::RRD};
+const RRD_JOURNAL_NAME: &str = "rrd.journal";
+
/// RRD cache - keep RRD data in RAM, but write updates to disk
///
/// This cache is designed to run as single instance (no concurrent
/// access from other processes).
pub struct RRDCache {
+ apply_interval: f64,
basedir: PathBuf,
file_options: CreateOptions,
dir_options: CreateOptions,
- cache: RwLock<HashMap<String, RRD>>,
+ state: RwLock<RRDCacheState>,
+}
+
+// shared state behind RwLock
+struct RRDCacheState {
+ rrd_map: HashMap<String, RRD>,
+ journal: File,
+ last_journal_flush: f64,
+}
+
+struct JournalEntry {
+ time: f64,
+ value: f64,
+ dst: DST,
+ rel_path: String,
}
impl RRDCache {
@@ -28,21 +50,166 @@ impl RRDCache {
basedir: P,
file_options: Option<CreateOptions>,
dir_options: Option<CreateOptions>,
- ) -> Self {
+ apply_interval: f64,
+ ) -> Result<Self, Error> {
let basedir = basedir.as_ref().to_owned();
- Self {
+
+ let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
+ let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
+
+ create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
+ .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+
+ let mut journal_path = basedir.clone();
+ journal_path.push(RRD_JOURNAL_NAME);
+
+ let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
+ let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?;
+
+ let state = RRDCacheState {
+ journal,
+ rrd_map: HashMap::new(),
+ last_journal_flush: 0.0,
+ };
+
+ Ok(Self {
basedir,
- file_options: file_options.unwrap_or_else(|| CreateOptions::new()),
- dir_options: dir_options.unwrap_or_else(|| CreateOptions::new()),
- cache: RwLock::new(HashMap::new()),
+ file_options,
+ dir_options,
+ apply_interval,
+ state: RwLock::new(state),
+ })
+ }
+
+ fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
+
+ let line = line.trim();
+
+ let parts: Vec<&str> = line.splitn(4, ':').collect();
+ if parts.len() != 4 {
+ bail!("wrong numper of components");
}
+
+ let time: f64 = parts[0].parse()
+ .map_err(|_| format_err!("unable to parse time"))?;
+ let value: f64 = parts[1].parse()
+ .map_err(|_| format_err!("unable to parse value"))?;
+ let dst: u8 = parts[2].parse()
+ .map_err(|_| format_err!("unable to parse data source type"))?;
+
+ let dst = match dst {
+ 0 => DST::Gauge,
+ 1 => DST::Derive,
+ _ => bail!("got strange value for data source type '{}'", dst),
+ };
+
+ let rel_path = parts[3].to_string();
+
+ Ok(JournalEntry { time, value, dst, rel_path })
+ }
+
+ fn append_journal_entry(
+ state: &mut RRDCacheState,
+ time: f64,
+ value: f64,
+ dst: DST,
+ rel_path: &str,
+ ) -> Result<(), Error> {
+ let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
+ state.journal.write_all(journal_entry.as_bytes())?;
+ Ok(())
}
- /// Create rrdd stat dir with correct permission
- pub fn create_rrdb_dir(&self) -> Result<(), Error> {
+ pub fn apply_journal(&self) -> Result<(), Error> {
+ let mut state = self.state.write().unwrap(); // block writers
+ self.apply_journal_locked(&mut state)
+ }
- create_path(&self.basedir, Some(self.dir_options.clone()), Some(self.dir_options.clone()))
- .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+ fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
+
+ log::info!("applying rrd journal");
+
+ state.last_journal_flush = proxmox::tools::time::epoch_f64();
+
+ let mut journal_path = self.basedir.clone();
+ journal_path.push(RRD_JOURNAL_NAME);
+
+ let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
+ let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?;
+ let mut journal = BufReader::new(journal);
+
+ let mut last_update_map = HashMap::new();
+
+ let mut get_last_update = |rel_path: &str, rrd: &RRD| {
+ if let Some(time) = last_update_map.get(rel_path) {
+ return *time;
+ }
+ let last_update = rrd.last_update();
+ last_update_map.insert(rel_path.to_string(), last_update);
+ last_update
+ };
+
+ let mut linenr = 0;
+ loop {
+ linenr += 1;
+ let mut line = String::new();
+ let len = journal.read_line(&mut line)?;
+ if len == 0 { break; }
+
+ let entry = match Self::parse_journal_line(&line) {
+ Ok(entry) => entry,
+ Err(err) => {
+ log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
+ continue; // skip unparsable lines
+ }
+ };
+
+ if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
+ if entry.time > get_last_update(&entry.rel_path, &rrd) {
+ rrd.update(entry.time, entry.value);
+ }
+ } else {
+ let mut path = self.basedir.clone();
+ path.push(&entry.rel_path);
+ create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+
+ let mut rrd = match RRD::load(&path) {
+ Ok(rrd) => rrd,
+ Err(err) => {
+ if err.kind() != std::io::ErrorKind::NotFound {
+ log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
+ }
+ RRD::new(entry.dst)
+ },
+ };
+ if entry.time > get_last_update(&entry.rel_path, &rrd) {
+ rrd.update(entry.time, entry.value);
+ }
+ state.rrd_map.insert(entry.rel_path.clone(), rrd);
+ }
+ }
+
+ // save all RRDs
+
+ let mut errors = 0;
+ for (rel_path, rrd) in state.rrd_map.iter() {
+ let mut path = self.basedir.clone();
+ path.push(&rel_path);
+ if let Err(err) = rrd.save(&path, self.file_options.clone()) {
+ errors += 1;
+ log::error!("unable to save {:?}: {}", path, err);
+ }
+ }
+
+ // if everything went ok, commit the journal
+
+ if errors == 0 {
+ nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
+ .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
+ log::info!("rrd journal successfully committed");
+ } else {
+ log::error!("errors during rrd flush - unable to commit rrd journal");
+ }
Ok(())
}
@@ -53,21 +220,26 @@ impl RRDCache {
rel_path: &str,
value: f64,
dst: DST,
- save: bool,
) -> Result<(), Error> {
- let mut path = self.basedir.clone();
- path.push(rel_path);
-
- create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.file_options.clone()))?;
+ let mut state = self.state.write().unwrap(); // block other writers
- let mut map = self.cache.write().unwrap();
let now = proxmox::tools::time::epoch_f64();
- if let Some(rrd) = map.get_mut(rel_path) {
+ if (now - state.last_journal_flush) > self.apply_interval {
+ if let Err(err) = self.apply_journal_locked(&mut state) {
+ log::error!("apply journal failed: {}", err);
+ }
+ }
+
+ Self::append_journal_entry(&mut state, now, value, dst, rel_path)?;
+
+ if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
rrd.update(now, value);
- if save { rrd.save(&path, self.file_options.clone())?; }
} else {
+ let mut path = self.basedir.clone();
+ path.push(rel_path);
+ create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
let mut rrd = match RRD::load(&path) {
Ok(rrd) => rrd,
Err(err) => {
@@ -78,10 +250,7 @@ impl RRDCache {
},
};
rrd.update(now, value);
- if save {
- rrd.save(&path, self.file_options.clone())?;
- }
- map.insert(rel_path.into(), rrd);
+ state.rrd_map.insert(rel_path.into(), rrd);
}
Ok(())
@@ -97,9 +266,9 @@ impl RRDCache {
mode: RRDMode,
) -> Option<(u64, u64, Vec<Option<f64>>)> {
- let map = self.cache.read().unwrap();
+ let state = self.state.read().unwrap();
- match map.get(&format!("{}/{}", base, name)) {
+ match state.rrd_map.get(&format!("{}/{}", base, name)) {
Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)),
None => None,
}
diff --git a/proxmox-rrd/src/lib.rs b/proxmox-rrd/src/lib.rs
index 303cd55d..d83e6ffc 100644
--- a/proxmox-rrd/src/lib.rs
+++ b/proxmox-rrd/src/lib.rs
@@ -13,9 +13,11 @@ mod cache;
pub use cache::*;
/// RRD data source tyoe
+#[repr(u8)]
+#[derive(Copy, Clone)]
pub enum DST {
/// Gauge values are stored unmodified.
- Gauge,
+ Gauge = 0,
/// Stores the difference to the previous value.
- Derive,
+ Derive = 1,
}
diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs
index f4c08909..026498ed 100644
--- a/proxmox-rrd/src/rrd.rs
+++ b/proxmox-rrd/src/rrd.rs
@@ -336,6 +336,36 @@ impl RRD {
replace_file(filename, rrd_slice, options)
}
+ pub fn last_update(&self) -> f64 {
+
+ let mut last_update = 0.0;
+
+ {
+ let mut check_last_update = |rra: &RRA| {
+ if rra.last_update > last_update {
+ last_update = rra.last_update;
+ }
+ };
+
+ check_last_update(&self.hour_avg);
+ check_last_update(&self.hour_max);
+
+ check_last_update(&self.day_avg);
+ check_last_update(&self.day_max);
+
+ check_last_update(&self.week_avg);
+ check_last_update(&self.week_max);
+
+ check_last_update(&self.month_avg);
+ check_last_update(&self.month_max);
+
+ check_last_update(&self.year_avg);
+ check_last_update(&self.year_max);
+ }
+
+ last_update
+ }
+
/// Update the value (in memory)
///
/// Note: This does not call [Self::save].
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index da9ae5dd..b7c4e689 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -75,7 +75,7 @@ async fn run() -> Result<(), Error> {
proxmox_backup::server::create_run_dir()?;
- RRD_CACHE.create_rrdb_dir()?;
+ RRD_CACHE.apply_journal()?;
proxmox_backup::server::jobstate::create_jobstate_dir()?;
proxmox_backup::tape::create_tape_status_dir()?;
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 4c879483..ce939400 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -889,14 +889,10 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
async fn run_stat_generator() {
- let mut count = 0;
loop {
- count += 1;
- let save = if count >= 6 { count = 0; true } else { false };
-
let delay_target = Instant::now() + Duration::from_secs(10);
- generate_host_stats(save).await;
+ generate_host_stats().await;
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
@@ -904,19 +900,19 @@ async fn run_stat_generator() {
}
-fn rrd_update_gauge(name: &str, value: f64, save: bool) {
- if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge, save) {
+fn rrd_update_gauge(name: &str, value: f64) {
+ if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge) {
eprintln!("rrd::update_value '{}' failed - {}", name, err);
}
}
-fn rrd_update_derive(name: &str, value: f64, save: bool) {
- if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive, save) {
+fn rrd_update_derive(name: &str, value: f64) {
+ if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive) {
eprintln!("rrd::update_value '{}' failed - {}", name, err);
}
}
-async fn generate_host_stats(save: bool) {
+async fn generate_host_stats() {
use proxmox::sys::linux::procfs::{
read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
@@ -924,8 +920,8 @@ async fn generate_host_stats(save: bool) {
match read_proc_stat() {
Ok(stat) => {
- rrd_update_gauge("host/cpu", stat.cpu, save);
- rrd_update_gauge("host/iowait", stat.iowait_percent, save);
+ rrd_update_gauge("host/cpu", stat.cpu);
+ rrd_update_gauge("host/iowait", stat.iowait_percent);
}
Err(err) => {
eprintln!("read_proc_stat failed - {}", err);
@@ -934,10 +930,10 @@ async fn generate_host_stats(save: bool) {
match read_meminfo() {
Ok(meminfo) => {
- rrd_update_gauge("host/memtotal", meminfo.memtotal as f64, save);
- rrd_update_gauge("host/memused", meminfo.memused as f64, save);
- rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64, save);
- rrd_update_gauge("host/swapused", meminfo.swapused as f64, save);
+ rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+ rrd_update_gauge("host/memused", meminfo.memused as f64);
+ rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+ rrd_update_gauge("host/swapused", meminfo.swapused as f64);
}
Err(err) => {
eprintln!("read_meminfo failed - {}", err);
@@ -954,8 +950,8 @@ async fn generate_host_stats(save: bool) {
netin += item.receive;
netout += item.send;
}
- rrd_update_derive("host/netin", netin as f64, save);
- rrd_update_derive("host/netout", netout as f64, save);
+ rrd_update_derive("host/netin", netin as f64);
+ rrd_update_derive("host/netout", netout as f64);
}
Err(err) => {
eprintln!("read_prox_net_dev failed - {}", err);
@@ -964,7 +960,7 @@ async fn generate_host_stats(save: bool) {
match read_loadavg() {
Ok(loadavg) => {
- rrd_update_gauge("host/loadavg", loadavg.0 as f64, save);
+ rrd_update_gauge("host/loadavg", loadavg.0 as f64);
}
Err(err) => {
eprintln!("read_loadavg failed - {}", err);
@@ -973,7 +969,7 @@ async fn generate_host_stats(save: bool) {
let disk_manager = DiskManage::new();
- gather_disk_stats(disk_manager.clone(), Path::new("/"), "host", save);
+ gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
match pbs_config::datastore::config() {
Ok((config, _)) => {
@@ -984,7 +980,7 @@ async fn generate_host_stats(save: bool) {
let rrd_prefix = format!("datastore/{}", config.name);
let path = std::path::Path::new(&config.path);
- gather_disk_stats(disk_manager.clone(), path, &rrd_prefix, save);
+ gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
}
}
Err(err) => {
@@ -1025,14 +1021,14 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
next <= now
}
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str, save: bool) {
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
match proxmox_backup::tools::disks::disk_usage(path) {
Ok(status) => {
let rrd_key = format!("{}/total", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.total as f64, save);
+ rrd_update_gauge(&rrd_key, status.total as f64);
let rrd_key = format!("{}/used", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.used as f64, save);
+ rrd_update_gauge(&rrd_key, status.used as f64);
}
Err(err) => {
eprintln!("read disk_usage on {:?} failed - {}", path, err);
@@ -1064,17 +1060,17 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
}
if let Some(stat) = device_stat {
let rrd_key = format!("{}/read_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.read_ios as f64, save);
+ rrd_update_derive(&rrd_key, stat.read_ios as f64);
let rrd_key = format!("{}/read_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64, save);
+ rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
let rrd_key = format!("{}/write_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.write_ios as f64, save);
+ rrd_update_derive(&rrd_key, stat.write_ios as f64);
let rrd_key = format!("{}/write_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64, save);
+ rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
let rrd_key = format!("{}/io_ticks", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0, save);
+ rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
}
}
Err(err) => {
diff --git a/src/lib.rs b/src/lib.rs
index 98b6b987..5d2b4590 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -51,10 +51,13 @@ lazy_static::lazy_static!{
.owner(backup_user.uid)
.group(backup_user.gid);
+ let apply_interval = 30.0*60.0; // 30 minutes
+
RRDCache::new(
"/var/lib/proxmox-backup/rrdb",
Some(file_options),
Some(dir_options),
- )
+ apply_interval,
+ ).unwrap()
};
}
--
2.30.2
^ permalink raw reply [flat|nested] 2+ messages in thread
* [pbs-devel] [patch proxmox-backup 2/2] RRD_CACHE: use a OnceCell instead of lazy_static
2021-10-08 8:04 [pbs-devel] [patch proxmox-backup 1/2] proxmox-rrd: use a journal to reduce amount of bytes written Dietmar Maurer
@ 2021-10-08 8:04 ` Dietmar Maurer
0 siblings, 0 replies; 2+ messages in thread
From: Dietmar Maurer @ 2021-10-08 8:04 UTC (permalink / raw)
To: pbs-devel
And initialize only with proxmox-backup-proxy. Other binaries dont need it.
---
src/api2/node/rrd.rs | 6 ++--
src/api2/status.rs | 7 ++--
src/bin/proxmox-backup-api.rs | 4 ---
src/bin/proxmox-backup-proxy.rs | 17 +++++++---
src/lib.rs | 59 +++++++++++++++++++++------------
5 files changed, 59 insertions(+), 34 deletions(-)
diff --git a/src/api2/node/rrd.rs b/src/api2/node/rrd.rs
index 00555030..49d2f570 100644
--- a/src/api2/node/rrd.rs
+++ b/src/api2/node/rrd.rs
@@ -9,7 +9,7 @@ use pbs_api_types::{
use proxmox_rrd::rrd::RRD_DATA_ENTRIES;
-use crate::RRD_CACHE;
+use crate::get_rrd_cache;
pub fn create_value_from_rrd(
basedir: &str,
@@ -21,8 +21,10 @@ pub fn create_value_from_rrd(
let mut result = Vec::new();
let now = proxmox::tools::time::epoch_f64();
+ let rrd_cache = get_rrd_cache()?;
+
for name in list {
- let (start, reso, list) = match RRD_CACHE.extract_cached_data(basedir, name, now, timeframe, cf) {
+ let (start, reso, list) = match rrd_cache.extract_cached_data(basedir, name, now, timeframe, cf) {
Some(result) => result,
None => continue,
};
diff --git a/src/api2/status.rs b/src/api2/status.rs
index 548e5319..2cef1e9b 100644
--- a/src/api2/status.rs
+++ b/src/api2/status.rs
@@ -23,7 +23,7 @@ use pbs_datastore::DataStore;
use pbs_config::CachedUserInfo;
use crate::tools::statistics::{linear_regression};
-use crate::RRD_CACHE;
+use crate::get_rrd_cache;
#[api(
returns: {
@@ -91,6 +91,8 @@ pub fn datastore_status(
let mut list = Vec::new();
+ let rrd_cache = get_rrd_cache()?;
+
for (store, (_, _)) in &config.sections {
let user_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
let allowed = (user_privs & (PRIV_DATASTORE_AUDIT| PRIV_DATASTORE_BACKUP)) != 0;
@@ -124,7 +126,8 @@ pub fn datastore_status(
let rrd_dir = format!("datastore/{}", store);
let now = proxmox::tools::time::epoch_f64();
- let get_rrd = |what: &str| RRD_CACHE.extract_cached_data(
+
+ let get_rrd = |what: &str| rrd_cache.extract_cached_data(
&rrd_dir,
what,
now,
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index b7c4e689..b17b3436 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -17,7 +17,6 @@ use proxmox_rest_server::{daemon, AuthError, ApiConfig, RestServer, RestEnvironm
use proxmox_backup::server::auth::check_pbs_auth;
use proxmox_backup::auth_helpers::*;
-use proxmox_backup::RRD_CACHE;
use proxmox_backup::config;
fn main() {
@@ -74,9 +73,6 @@ async fn run() -> Result<(), Error> {
config::update_self_signed_cert(false)?;
proxmox_backup::server::create_run_dir()?;
-
- RRD_CACHE.apply_journal()?;
-
proxmox_backup::server::jobstate::create_jobstate_dir()?;
proxmox_backup::tape::create_tape_status_dir()?;
proxmox_backup::tape::create_drive_state_dir()?;
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index ce939400..ef227556 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -32,7 +32,7 @@ use proxmox_rest_server::{
};
use proxmox_backup::{
- RRD_CACHE,
+ get_rrd_cache, initialize_rrd_cache,
server::{
auth::check_pbs_auth,
jobstate::{
@@ -208,6 +208,9 @@ async fn run() -> Result<(), Error> {
let _ = public_auth_key(); // load with lazy_static
let _ = csrf_secret(); // load with lazy_static
+ let rrd_cache = initialize_rrd_cache()?;
+ rrd_cache.apply_journal()?;
+
let mut config = ApiConfig::new(
pbs_buildcfg::JS_DIR,
&proxmox_backup::api2::ROUTER,
@@ -901,14 +904,18 @@ async fn run_stat_generator() {
}
fn rrd_update_gauge(name: &str, value: f64) {
- if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge) {
- eprintln!("rrd::update_value '{}' failed - {}", name, err);
+ if let Ok(rrd_cache) = get_rrd_cache() {
+ if let Err(err) = rrd_cache.update_value(name, value, DST::Gauge) {
+ eprintln!("rrd::update_value '{}' failed - {}", name, err);
+ }
}
}
fn rrd_update_derive(name: &str, value: f64) {
- if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive) {
- eprintln!("rrd::update_value '{}' failed - {}", name, err);
+ if let Ok(rrd_cache) = get_rrd_cache() {
+ if let Err(err) = rrd_cache.update_value(name, value, DST::Derive) {
+ eprintln!("rrd::update_value '{}' failed - {}", name, err);
+ }
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 5d2b4590..a1ac23bf 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -5,6 +5,9 @@
use std::path::PathBuf;
+use once_cell::sync::OnceCell;
+use anyhow::{format_err, Error};
+
use proxmox::tools::fs::CreateOptions;
use pbs_buildcfg::configdir;
@@ -39,25 +42,39 @@ pub fn cert_info() -> Result<CertInfo, anyhow::Error> {
CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem")))
}
-lazy_static::lazy_static!{
- /// Proxmox Backup Server RRD cache instance
- pub static ref RRD_CACHE: RRDCache = {
- let backup_user = pbs_config::backup_user().unwrap();
- let file_options = CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid);
-
- let dir_options = CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid);
-
- let apply_interval = 30.0*60.0; // 30 minutes
-
- RRDCache::new(
- "/var/lib/proxmox-backup/rrdb",
- Some(file_options),
- Some(dir_options),
- apply_interval,
- ).unwrap()
- };
+pub static RRD_CACHE: OnceCell<RRDCache> = OnceCell::new();
+
+/// Get the RRD cache instance
+pub fn get_rrd_cache() -> Result<&'static RRDCache, Error> {
+ RRD_CACHE.get().ok_or_else(|| format_err!("RRD cache not initialized!"))
+}
+
+/// Initialize the RRD cache instance
+///
+/// Note: Only a single process must do this (proxmox-backup-proxy)
+pub fn initialize_rrd_cache() -> Result<&'static RRDCache, Error> {
+
+ let backup_user = pbs_config::backup_user()?;
+
+ let file_options = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ let dir_options = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ let apply_interval = 30.0*60.0; // 30 minutes
+
+ let cache = RRDCache::new(
+ "/var/lib/proxmox-backup/rrdb",
+ Some(file_options),
+ Some(dir_options),
+ apply_interval,
+ )?;
+
+ RRD_CACHE.set(cache)
+ .map_err(|_| format_err!("RRD cache already initialized!"))?;
+
+ Ok(RRD_CACHE.get().unwrap())
}
--
2.30.2
^ permalink raw reply [flat|nested] 2+ messages in thread
end of thread, other threads:[~2021-10-08 8:05 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-10-08 8:04 [pbs-devel] [patch proxmox-backup 1/2] proxmox-rrd: use a journal to reduce amount of bytes written Dietmar Maurer
2021-10-08 8:04 ` [pbs-devel] [patch proxmox-backup 2/2] RRD_CACHE: use a OnceCell instead of lazy_static Dietmar Maurer
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox