all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [patch proxmox-backup 1/2] proxmox-rrd: use a journal to reduce amount of bytes written
@ 2021-10-08  8:04 Dietmar Maurer
  2021-10-08  8:04 ` [pbs-devel] [patch proxmox-backup 2/2] RRD_CACHE: use a OnceCell instead of lazy_static Dietmar Maurer
  0 siblings, 1 reply; 2+ messages in thread
From: Dietmar Maurer @ 2021-10-08  8:04 UTC (permalink / raw)
  To: pbs-devel

Apply the journal every 30 seconds.
---
 proxmox-rrd/Cargo.toml          |   1 +
 proxmox-rrd/src/cache.rs        | 223 ++++++++++++++++++++++++++++----
 proxmox-rrd/src/lib.rs          |   6 +-
 proxmox-rrd/src/rrd.rs          |  30 +++++
 src/bin/proxmox-backup-api.rs   |   2 +-
 src/bin/proxmox-backup-proxy.rs |  54 ++++----
 src/lib.rs                      |   5 +-
 7 files changed, 261 insertions(+), 60 deletions(-)

diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml
index 19db5bf6..1c75b2e1 100644
--- a/proxmox-rrd/Cargo.toml
+++ b/proxmox-rrd/Cargo.toml
@@ -9,6 +9,7 @@ description = "Simple RRD database implementation."
 anyhow = "1.0"
 bitflags = "1.2.1"
 log = "0.4"
+nix = "0.19.1"
 
 proxmox = { version = "0.13.5", features = ["api-macro"] }
 
diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs
index 1084a037..57cbb394 100644
--- a/proxmox-rrd/src/cache.rs
+++ b/proxmox-rrd/src/cache.rs
@@ -1,24 +1,46 @@
+use std::fs::File;
 use std::path::{Path, PathBuf};
 use std::collections::HashMap;
-use std::sync::{RwLock};
+use std::sync::RwLock;
+use std::io::Write;
+use std::io::{BufRead, BufReader};
+use std::os::unix::io::AsRawFd;
 
-use anyhow::{format_err, Error};
+use anyhow::{format_err, bail, Error};
+use nix::fcntl::OFlag;
 
-use proxmox::tools::fs::{create_path, CreateOptions};
+use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
 
 use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
 
 use crate::{DST, rrd::RRD};
 
+const RRD_JOURNAL_NAME: &str = "rrd.journal";
+
 /// RRD cache - keep RRD data in RAM, but write updates to disk
 ///
 /// This cache is designed to run as single instance (no concurrent
 /// access from other processes).
 pub struct RRDCache {
+    apply_interval: f64,
     basedir: PathBuf,
     file_options: CreateOptions,
     dir_options: CreateOptions,
-    cache: RwLock<HashMap<String, RRD>>,
+    state: RwLock<RRDCacheState>,
+}
+
+// shared state behind RwLock
+struct RRDCacheState {
+    rrd_map: HashMap<String, RRD>,
+    journal: File,
+    last_journal_flush: f64,
+}
+
+struct JournalEntry {
+    time: f64,
+    value: f64,
+    dst: DST,
+    rel_path: String,
 }
 
 impl RRDCache {
@@ -28,21 +50,166 @@ impl RRDCache {
         basedir: P,
         file_options: Option<CreateOptions>,
         dir_options: Option<CreateOptions>,
-    ) -> Self {
+        apply_interval: f64,
+    ) -> Result<Self, Error> {
         let basedir = basedir.as_ref().to_owned();
-        Self {
+
+        let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
+        let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
+
+        create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
+            .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+
+        let mut journal_path = basedir.clone();
+        journal_path.push(RRD_JOURNAL_NAME);
+
+        let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
+        let journal = atomic_open_or_create_file(&journal_path, flags,  &[], file_options.clone())?;
+
+        let state = RRDCacheState {
+            journal,
+            rrd_map: HashMap::new(),
+            last_journal_flush: 0.0,
+        };
+
+        Ok(Self {
             basedir,
-            file_options: file_options.unwrap_or_else(|| CreateOptions::new()),
-            dir_options: dir_options.unwrap_or_else(|| CreateOptions::new()),
-            cache: RwLock::new(HashMap::new()),
+            file_options,
+            dir_options,
+            apply_interval,
+            state: RwLock::new(state),
+        })
+    }
+
+    fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
+
+        let line = line.trim();
+
+        let parts: Vec<&str> = line.splitn(4, ':').collect();
+        if parts.len() != 4 {
+            bail!("wrong numper of components");
         }
+
+        let time: f64 = parts[0].parse()
+            .map_err(|_| format_err!("unable to parse time"))?;
+        let value: f64 = parts[1].parse()
+            .map_err(|_| format_err!("unable to parse value"))?;
+        let dst: u8 = parts[2].parse()
+            .map_err(|_| format_err!("unable to parse data source type"))?;
+
+        let dst = match dst {
+            0 => DST::Gauge,
+            1 => DST::Derive,
+            _ => bail!("got strange value for data source type '{}'", dst),
+        };
+
+        let rel_path = parts[3].to_string();
+
+        Ok(JournalEntry { time, value, dst, rel_path })
+    }
+
+    fn append_journal_entry(
+        state: &mut RRDCacheState,
+        time: f64,
+        value: f64,
+        dst: DST,
+        rel_path: &str,
+    ) -> Result<(), Error> {
+        let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
+        state.journal.write_all(journal_entry.as_bytes())?;
+        Ok(())
     }
 
-    /// Create rrdd stat dir with correct permission
-    pub fn create_rrdb_dir(&self) -> Result<(), Error> {
+    pub fn apply_journal(&self) -> Result<(), Error> {
+        let mut state = self.state.write().unwrap(); // block writers
+        self.apply_journal_locked(&mut state)
+    }
 
-        create_path(&self.basedir, Some(self.dir_options.clone()), Some(self.dir_options.clone()))
-            .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+    fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
+
+        log::info!("applying rrd journal");
+
+        state.last_journal_flush = proxmox::tools::time::epoch_f64();
+
+        let mut journal_path = self.basedir.clone();
+        journal_path.push(RRD_JOURNAL_NAME);
+
+        let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
+        let journal = atomic_open_or_create_file(&journal_path, flags,  &[], self.file_options.clone())?;
+        let mut journal = BufReader::new(journal);
+
+        let mut last_update_map = HashMap::new();
+
+        let mut get_last_update = |rel_path: &str, rrd: &RRD| {
+            if let Some(time) = last_update_map.get(rel_path) {
+                return *time;
+            }
+            let last_update =  rrd.last_update();
+            last_update_map.insert(rel_path.to_string(), last_update);
+            last_update
+        };
+
+        let mut linenr = 0;
+        loop {
+            linenr += 1;
+            let mut line = String::new();
+            let len = journal.read_line(&mut line)?;
+            if len == 0 { break; }
+
+            let entry = match Self::parse_journal_line(&line) {
+                Ok(entry) => entry,
+                Err(err) => {
+                    log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
+                    continue; // skip unparsable lines
+                }
+            };
+
+            if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
+                if entry.time > get_last_update(&entry.rel_path, &rrd) {
+                    rrd.update(entry.time, entry.value);
+                }
+            } else {
+                let mut path = self.basedir.clone();
+                path.push(&entry.rel_path);
+                create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+
+                let mut rrd = match RRD::load(&path) {
+                    Ok(rrd) => rrd,
+                    Err(err) => {
+                        if err.kind() != std::io::ErrorKind::NotFound {
+                            log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
+                        }
+                        RRD::new(entry.dst)
+                    },
+                };
+                if entry.time > get_last_update(&entry.rel_path, &rrd) {
+                    rrd.update(entry.time, entry.value);
+                }
+                state.rrd_map.insert(entry.rel_path.clone(), rrd);
+            }
+        }
+
+        // save all RRDs
+
+        let mut errors = 0;
+        for (rel_path, rrd) in state.rrd_map.iter() {
+            let mut path = self.basedir.clone();
+            path.push(&rel_path);
+            if let Err(err) = rrd.save(&path, self.file_options.clone()) {
+                errors += 1;
+                log::error!("unable to save {:?}: {}", path, err);
+            }
+        }
+
+        // if everything went ok, commit the journal
+
+        if errors == 0 {
+            nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
+                .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
+            log::info!("rrd journal successfully committed");
+        } else {
+            log::error!("errors during rrd flush - unable to commit rrd journal");
+        }
 
         Ok(())
     }
@@ -53,21 +220,26 @@ impl RRDCache {
         rel_path: &str,
         value: f64,
         dst: DST,
-        save: bool,
     ) -> Result<(), Error> {
 
-        let mut path = self.basedir.clone();
-        path.push(rel_path);
-
-        create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.file_options.clone()))?;
+        let mut state = self.state.write().unwrap(); // block other writers
 
-        let mut map = self.cache.write().unwrap();
         let now = proxmox::tools::time::epoch_f64();
 
-        if let Some(rrd) = map.get_mut(rel_path) {
+        if (now - state.last_journal_flush) > self.apply_interval {
+            if let Err(err) = self.apply_journal_locked(&mut state) {
+                log::error!("apply journal failed: {}", err);
+            }
+        }
+
+        Self::append_journal_entry(&mut state, now, value, dst, rel_path)?;
+
+        if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
             rrd.update(now, value);
-            if save { rrd.save(&path, self.file_options.clone())?; }
         } else {
+            let mut path = self.basedir.clone();
+            path.push(rel_path);
+            create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
             let mut rrd = match RRD::load(&path) {
                 Ok(rrd) => rrd,
                 Err(err) => {
@@ -78,10 +250,7 @@ impl RRDCache {
                 },
             };
             rrd.update(now, value);
-            if save {
-                rrd.save(&path, self.file_options.clone())?;
-            }
-            map.insert(rel_path.into(), rrd);
+            state.rrd_map.insert(rel_path.into(), rrd);
         }
 
         Ok(())
@@ -97,9 +266,9 @@ impl RRDCache {
         mode: RRDMode,
     ) -> Option<(u64, u64, Vec<Option<f64>>)> {
 
-        let map = self.cache.read().unwrap();
+        let state = self.state.read().unwrap();
 
-        match map.get(&format!("{}/{}", base, name)) {
+        match state.rrd_map.get(&format!("{}/{}", base, name)) {
             Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)),
             None => None,
         }
diff --git a/proxmox-rrd/src/lib.rs b/proxmox-rrd/src/lib.rs
index 303cd55d..d83e6ffc 100644
--- a/proxmox-rrd/src/lib.rs
+++ b/proxmox-rrd/src/lib.rs
@@ -13,9 +13,11 @@ mod cache;
 pub use cache::*;
 
 /// RRD data source tyoe
+#[repr(u8)]
+#[derive(Copy, Clone)]
 pub enum DST {
     /// Gauge values are stored unmodified.
-    Gauge,
+    Gauge = 0,
     /// Stores the difference to the previous value.
-    Derive,
+    Derive = 1,
 }
diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs
index f4c08909..026498ed 100644
--- a/proxmox-rrd/src/rrd.rs
+++ b/proxmox-rrd/src/rrd.rs
@@ -336,6 +336,36 @@ impl RRD {
         replace_file(filename, rrd_slice, options)
     }
 
+    pub fn last_update(&self) -> f64 {
+
+        let mut last_update = 0.0;
+
+        {
+            let mut check_last_update = |rra: &RRA| {
+                if rra.last_update > last_update {
+                    last_update = rra.last_update;
+                }
+            };
+
+            check_last_update(&self.hour_avg);
+            check_last_update(&self.hour_max);
+
+            check_last_update(&self.day_avg);
+            check_last_update(&self.day_max);
+
+            check_last_update(&self.week_avg);
+            check_last_update(&self.week_max);
+
+            check_last_update(&self.month_avg);
+            check_last_update(&self.month_max);
+
+            check_last_update(&self.year_avg);
+            check_last_update(&self.year_max);
+        }
+
+        last_update
+    }
+
     /// Update the value (in memory)
     ///
     /// Note: This does not call [Self::save].
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index da9ae5dd..b7c4e689 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -75,7 +75,7 @@ async fn run() -> Result<(), Error> {
 
     proxmox_backup::server::create_run_dir()?;
 
-    RRD_CACHE.create_rrdb_dir()?;
+    RRD_CACHE.apply_journal()?;
 
     proxmox_backup::server::jobstate::create_jobstate_dir()?;
     proxmox_backup::tape::create_tape_status_dir()?;
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 4c879483..ce939400 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -889,14 +889,10 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
 
 async fn run_stat_generator() {
 
-    let mut count = 0;
     loop {
-        count += 1;
-        let save = if count >= 6 { count = 0; true } else { false };
-
         let delay_target = Instant::now() +  Duration::from_secs(10);
 
-        generate_host_stats(save).await;
+        generate_host_stats().await;
 
         tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
 
@@ -904,19 +900,19 @@ async fn run_stat_generator() {
 
 }
 
-fn rrd_update_gauge(name: &str, value: f64, save: bool) {
-    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge, save) {
+fn rrd_update_gauge(name: &str, value: f64) {
+    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge) {
         eprintln!("rrd::update_value '{}' failed - {}", name, err);
     }
 }
 
-fn rrd_update_derive(name: &str, value: f64, save: bool) {
-    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive, save) {
+fn rrd_update_derive(name: &str, value: f64) {
+    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive) {
         eprintln!("rrd::update_value '{}' failed - {}", name, err);
     }
 }
 
-async fn generate_host_stats(save: bool) {
+async fn generate_host_stats() {
     use proxmox::sys::linux::procfs::{
         read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
 
@@ -924,8 +920,8 @@ async fn generate_host_stats(save: bool) {
 
         match read_proc_stat() {
             Ok(stat) => {
-                rrd_update_gauge("host/cpu", stat.cpu, save);
-                rrd_update_gauge("host/iowait", stat.iowait_percent, save);
+                rrd_update_gauge("host/cpu", stat.cpu);
+                rrd_update_gauge("host/iowait", stat.iowait_percent);
             }
             Err(err) => {
                 eprintln!("read_proc_stat failed - {}", err);
@@ -934,10 +930,10 @@ async fn generate_host_stats(save: bool) {
 
         match read_meminfo() {
             Ok(meminfo) => {
-                rrd_update_gauge("host/memtotal", meminfo.memtotal as f64, save);
-                rrd_update_gauge("host/memused", meminfo.memused as f64, save);
-                rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64, save);
-                rrd_update_gauge("host/swapused", meminfo.swapused as f64, save);
+                rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+                rrd_update_gauge("host/memused", meminfo.memused as f64);
+                rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+                rrd_update_gauge("host/swapused", meminfo.swapused as f64);
             }
             Err(err) => {
                 eprintln!("read_meminfo failed - {}", err);
@@ -954,8 +950,8 @@ async fn generate_host_stats(save: bool) {
                     netin += item.receive;
                     netout += item.send;
                 }
-                rrd_update_derive("host/netin", netin as f64, save);
-                rrd_update_derive("host/netout", netout as f64, save);
+                rrd_update_derive("host/netin", netin as f64);
+                rrd_update_derive("host/netout", netout as f64);
             }
             Err(err) => {
                 eprintln!("read_prox_net_dev failed - {}", err);
@@ -964,7 +960,7 @@ async fn generate_host_stats(save: bool) {
 
         match read_loadavg() {
             Ok(loadavg) => {
-                rrd_update_gauge("host/loadavg", loadavg.0 as f64, save);
+                rrd_update_gauge("host/loadavg", loadavg.0 as f64);
             }
             Err(err) => {
                 eprintln!("read_loadavg failed - {}", err);
@@ -973,7 +969,7 @@ async fn generate_host_stats(save: bool) {
 
         let disk_manager = DiskManage::new();
 
-        gather_disk_stats(disk_manager.clone(), Path::new("/"), "host", save);
+        gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
 
         match pbs_config::datastore::config() {
             Ok((config, _)) => {
@@ -984,7 +980,7 @@ async fn generate_host_stats(save: bool) {
 
                     let rrd_prefix = format!("datastore/{}", config.name);
                     let path = std::path::Path::new(&config.path);
-                    gather_disk_stats(disk_manager.clone(), path, &rrd_prefix, save);
+                    gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
                 }
             }
             Err(err) => {
@@ -1025,14 +1021,14 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
     next <= now
 }
 
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str, save: bool) {
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
 
     match proxmox_backup::tools::disks::disk_usage(path) {
         Ok(status) => {
             let rrd_key = format!("{}/total", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.total as f64, save);
+            rrd_update_gauge(&rrd_key, status.total as f64);
             let rrd_key = format!("{}/used", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.used as f64, save);
+            rrd_update_gauge(&rrd_key, status.used as f64);
         }
         Err(err) => {
             eprintln!("read disk_usage on {:?} failed - {}", path, err);
@@ -1064,17 +1060,17 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
             }
             if let Some(stat) = device_stat {
                 let rrd_key = format!("{}/read_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.read_ios as f64, save);
+                rrd_update_derive(&rrd_key, stat.read_ios as f64);
                 let rrd_key = format!("{}/read_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64, save);
+                rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
 
                 let rrd_key = format!("{}/write_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.write_ios as f64, save);
+                rrd_update_derive(&rrd_key, stat.write_ios as f64);
                 let rrd_key = format!("{}/write_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64, save);
+                rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
 
                 let rrd_key = format!("{}/io_ticks", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0, save);
+                rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
             }
         }
         Err(err) => {
diff --git a/src/lib.rs b/src/lib.rs
index 98b6b987..5d2b4590 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -51,10 +51,13 @@ lazy_static::lazy_static!{
             .owner(backup_user.uid)
             .group(backup_user.gid);
 
+        let apply_interval = 30.0*60.0; // 30 minutes
+
         RRDCache::new(
             "/var/lib/proxmox-backup/rrdb",
             Some(file_options),
             Some(dir_options),
-        )
+            apply_interval,
+        ).unwrap()
     };
 }
-- 
2.30.2





^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2021-10-08  8:05 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-10-08  8:04 [pbs-devel] [patch proxmox-backup 1/2] proxmox-rrd: use a journal to reduce amount of bytes written Dietmar Maurer
2021-10-08  8:04 ` [pbs-devel] [patch proxmox-backup 2/2] RRD_CACHE: use a OnceCell instead of lazy_static Dietmar Maurer

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal