From: Lukas Wagner <l.wagner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v2 01/13] proxy: server: move rrd stat/metric server to separate module
Date: Tue, 15 Oct 2024 10:46:24 +0200 [thread overview]
Message-ID: <20241015084636.57106-2-l.wagner@proxmox.com> (raw)
In-Reply-To: <20241015084636.57106-1-l.wagner@proxmox.com>
With the upcoming pull-metric system/metric caching, these
things should go into a sepearate module.
No functional changes intended.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 419 +--------------------------
src/server/metric_collection/mod.rs | 426 ++++++++++++++++++++++++++++
src/server/mod.rs | 2 +
3 files changed, 433 insertions(+), 414 deletions(-)
create mode 100644 src/server/metric_collection/mod.rs
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 041f3aff..859f5b0f 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -17,10 +17,8 @@ use serde_json::{json, Value};
use proxmox_lang::try_block;
use proxmox_log::init_logger;
-use proxmox_metrics::MetricsData;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
-use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
-use proxmox_sys::linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat};
+use proxmox_sys::fs::CreateOptions;
use proxmox_sys::logrotate::LogRotate;
use pbs_datastore::DataStore;
@@ -30,15 +28,11 @@ use proxmox_rest_server::{
RestEnvironment, RestServer, WorkerTask,
};
-use proxmox_backup::rrd_cache::{
- initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge,
-};
use proxmox_backup::{
server::{
auth::check_pbs_auth,
jobstate::{self, Job},
},
- tools::disks::BlockDevStat,
traffic_control_cache::{SharedRateLimit, TRAFFIC_CONTROL_CACHE},
};
@@ -51,11 +45,8 @@ use pbs_api_types::{
};
use proxmox_backup::auth_helpers::*;
-use proxmox_backup::server;
-use proxmox_backup::tools::{
- disks::{zfs_dataset_stats, DiskManage},
- PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
-};
+use proxmox_backup::server::{self, metric_collection};
+use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
use proxmox_backup::api2::pull::do_sync_job;
use proxmox_backup::api2::tape::backup::do_tape_backup_job;
@@ -186,9 +177,7 @@ async fn run() -> Result<(), Error> {
proxmox_backup::auth_helpers::setup_auth_context(false);
proxmox_backup::server::notifications::init()?;
-
- let rrd_cache = initialize_rrd_cache()?;
- rrd_cache.apply_journal()?;
+ metric_collection::init()?;
let mut indexpath = PathBuf::from(pbs_buildcfg::JS_DIR);
indexpath.push("index.hbs");
@@ -356,7 +345,7 @@ async fn run() -> Result<(), Error> {
});
start_task_scheduler();
- start_stat_generator();
+ metric_collection::start_collection_task();
start_traffic_control_updater();
server.await?;
@@ -389,14 +378,6 @@ fn make_tls_acceptor() -> Result<SslAcceptor, Error> {
acceptor.build()
}
-fn start_stat_generator() {
- tokio::spawn(async {
- let abort_future = pin!(proxmox_daemon::shutdown_future());
- let future = pin!(run_stat_generator());
- futures::future::select(future, abort_future).await;
- });
-}
-
fn start_task_scheduler() {
tokio::spawn(async {
let abort_future = pin!(proxmox_daemon::shutdown_future());
@@ -867,349 +848,6 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
}
}
-async fn run_stat_generator() {
- loop {
- let delay_target = Instant::now() + Duration::from_secs(10);
-
- let stats_future = tokio::task::spawn_blocking(|| {
- let hoststats = collect_host_stats_sync();
- let (hostdisk, datastores) = collect_disk_stats_sync();
- Arc::new((hoststats, hostdisk, datastores))
- });
- let stats = match stats_future.await {
- Ok(res) => res,
- Err(err) => {
- log::error!("collecting host stats panicked: {err}");
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
- continue;
- }
- };
-
- let rrd_future = tokio::task::spawn_blocking({
- let stats = Arc::clone(&stats);
- move || {
- rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
- rrd_sync_journal();
- }
- });
-
- let metrics_future = send_data_to_metric_servers(stats);
-
- let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
- if let Err(err) = rrd_res {
- log::error!("rrd update panicked: {err}");
- }
- if let Err(err) = metrics_res {
- log::error!("error during metrics sending: {err}");
- }
-
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
- }
-}
-
-async fn send_data_to_metric_servers(
- stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
-) -> Result<(), Error> {
- let (config, _digest) = pbs_config::metrics::config()?;
- let channel_list = get_metric_server_connections(config)?;
-
- if channel_list.is_empty() {
- return Ok(());
- }
-
- let ctime = proxmox_time::epoch_i64();
- let nodename = proxmox_sys::nodename();
-
- let mut values = Vec::new();
-
- let mut cpuvalue = match &stats.0.proc {
- Some(stat) => serde_json::to_value(stat)?,
- None => json!({}),
- };
-
- if let Some(loadavg) = &stats.0.load {
- cpuvalue["avg1"] = Value::from(loadavg.0);
- cpuvalue["avg5"] = Value::from(loadavg.1);
- cpuvalue["avg15"] = Value::from(loadavg.2);
- }
-
- values.push(Arc::new(
- MetricsData::new("cpustat", ctime, cpuvalue)?
- .tag("object", "host")
- .tag("host", nodename),
- ));
-
- if let Some(stat) = &stats.0.meminfo {
- values.push(Arc::new(
- MetricsData::new("memory", ctime, stat)?
- .tag("object", "host")
- .tag("host", nodename),
- ));
- }
-
- if let Some(netdev) = &stats.0.net {
- for item in netdev {
- values.push(Arc::new(
- MetricsData::new("nics", ctime, item)?
- .tag("object", "host")
- .tag("host", nodename)
- .tag("instance", item.device.clone()),
- ));
- }
- }
-
- values.push(Arc::new(
- MetricsData::new("blockstat", ctime, stats.1.to_value())?
- .tag("object", "host")
- .tag("host", nodename),
- ));
-
- for datastore in stats.2.iter() {
- values.push(Arc::new(
- MetricsData::new("blockstat", ctime, datastore.to_value())?
- .tag("object", "host")
- .tag("host", nodename)
- .tag("datastore", datastore.name.clone()),
- ));
- }
-
- // we must have a concrete functions, because the inferred lifetime from a
- // closure is not general enough for the tokio::spawn call we are in here...
- fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
- &item.0
- }
-
- let results =
- proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
- for (res, name) in results
- .into_iter()
- .zip(channel_list.iter().map(|(_, name)| name))
- {
- if let Err(err) = res {
- log::error!("error sending into channel of {name}: {err}");
- }
- }
-
- futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
- if let Err(err) = channel.join().await {
- log::error!("error sending to metric server {name}: {err}");
- }
- }))
- .await;
-
- Ok(())
-}
-
-/// Get the metric server connections from a config
-pub fn get_metric_server_connections(
- metric_config: proxmox_section_config::SectionConfigData,
-) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
- let mut res = Vec::new();
-
- for config in
- metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
- {
- if !config.enable {
- continue;
- }
- let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
- res.push((future, config.name));
- }
-
- for config in
- metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
- {
- if !config.enable {
- continue;
- }
- let future = proxmox_metrics::influxdb_http(
- &config.url,
- config.organization.as_deref().unwrap_or("proxmox"),
- config.bucket.as_deref().unwrap_or("proxmox"),
- config.token.as_deref(),
- config.verify_tls.unwrap_or(true),
- config.max_body_size.unwrap_or(25_000_000),
- )?;
- res.push((future, config.name));
- }
- Ok(res)
-}
-
-struct HostStats {
- proc: Option<ProcFsStat>,
- meminfo: Option<ProcFsMemInfo>,
- net: Option<Vec<ProcFsNetDev>>,
- load: Option<Loadavg>,
-}
-
-struct DiskStat {
- name: String,
- usage: Option<FileSystemInformation>,
- dev: Option<BlockDevStat>,
-}
-
-impl DiskStat {
- fn to_value(&self) -> Value {
- let mut value = json!({});
- if let Some(usage) = &self.usage {
- value["total"] = Value::from(usage.total);
- value["used"] = Value::from(usage.used);
- value["avail"] = Value::from(usage.available);
- }
-
- if let Some(dev) = &self.dev {
- value["read_ios"] = Value::from(dev.read_ios);
- value["read_bytes"] = Value::from(dev.read_sectors * 512);
- value["write_ios"] = Value::from(dev.write_ios);
- value["write_bytes"] = Value::from(dev.write_sectors * 512);
- value["io_ticks"] = Value::from(dev.io_ticks / 1000);
- }
- value
- }
-}
-
-fn collect_host_stats_sync() -> HostStats {
- use proxmox_sys::linux::procfs::{
- read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
- };
-
- let proc = match read_proc_stat() {
- Ok(stat) => Some(stat),
- Err(err) => {
- eprintln!("read_proc_stat failed - {err}");
- None
- }
- };
-
- let meminfo = match read_meminfo() {
- Ok(stat) => Some(stat),
- Err(err) => {
- eprintln!("read_meminfo failed - {err}");
- None
- }
- };
-
- let net = match read_proc_net_dev() {
- Ok(netdev) => Some(netdev),
- Err(err) => {
- eprintln!("read_prox_net_dev failed - {err}");
- None
- }
- };
-
- let load = match read_loadavg() {
- Ok(loadavg) => Some(loadavg),
- Err(err) => {
- eprintln!("read_loadavg failed - {err}");
- None
- }
- };
-
- HostStats {
- proc,
- meminfo,
- net,
- load,
- }
-}
-
-fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
- let disk_manager = DiskManage::new();
-
- let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
-
- let mut datastores = Vec::new();
- match pbs_config::datastore::config() {
- Ok((config, _)) => {
- let datastore_list: Vec<DataStoreConfig> = config
- .convert_to_typed_array("datastore")
- .unwrap_or_default();
-
- for config in datastore_list {
- if config
- .get_maintenance_mode()
- .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
- {
- continue;
- }
- let path = std::path::Path::new(&config.path);
- datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
- }
- }
- Err(err) => {
- eprintln!("read datastore config failed - {err}");
- }
- }
-
- (root, datastores)
-}
-
-fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
- if let Some(stat) = &host.proc {
- rrd_update_gauge("host/cpu", stat.cpu);
- rrd_update_gauge("host/iowait", stat.iowait_percent);
- }
-
- if let Some(meminfo) = &host.meminfo {
- rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
- rrd_update_gauge("host/memused", meminfo.memused as f64);
- rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
- rrd_update_gauge("host/swapused", meminfo.swapused as f64);
- }
-
- if let Some(netdev) = &host.net {
- use pbs_config::network::is_physical_nic;
- let mut netin = 0;
- let mut netout = 0;
- for item in netdev {
- if !is_physical_nic(&item.device) {
- continue;
- }
- netin += item.receive;
- netout += item.send;
- }
- rrd_update_derive("host/netin", netin as f64);
- rrd_update_derive("host/netout", netout as f64);
- }
-
- if let Some(loadavg) = &host.load {
- rrd_update_gauge("host/loadavg", loadavg.0);
- }
-
- rrd_update_disk_stat(hostdisk, "host");
-
- for stat in datastores {
- let rrd_prefix = format!("datastore/{}", stat.name);
- rrd_update_disk_stat(stat, &rrd_prefix);
- }
-}
-
-fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
- if let Some(status) = &disk.usage {
- let rrd_key = format!("{}/total", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.total as f64);
- let rrd_key = format!("{}/used", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.used as f64);
- let rrd_key = format!("{}/available", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.available as f64);
- }
-
- if let Some(stat) = &disk.dev {
- let rrd_key = format!("{}/read_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.read_ios as f64);
- let rrd_key = format!("{}/read_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
-
- let rrd_key = format!("{}/write_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.write_ios as f64);
- let rrd_key = format!("{}/write_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
-
- let rrd_key = format!("{}/io_ticks", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
- }
-}
-
fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
let event: CalendarEvent = match event_str.parse() {
Ok(event) => event,
@@ -1240,53 +878,6 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
next <= now
}
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
- let usage = match proxmox_sys::fs::fs_info(path) {
- Ok(status) => Some(status),
- Err(err) => {
- eprintln!("read fs info on {path:?} failed - {err}");
- None
- }
- };
-
- let dev = match disk_manager.find_mounted_device(path) {
- Ok(None) => None,
- Ok(Some((fs_type, device, source))) => {
- let mut device_stat = None;
- match (fs_type.as_str(), source) {
- ("zfs", Some(source)) => match source.into_string() {
- Ok(dataset) => match zfs_dataset_stats(&dataset) {
- Ok(stat) => device_stat = Some(stat),
- Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
- },
- Err(source) => {
- eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
- }
- },
- _ => {
- if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
- match disk.read_stat() {
- Ok(stat) => device_stat = stat,
- Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
- }
- }
- }
- }
- device_stat
- }
- Err(err) => {
- eprintln!("find_mounted_device failed - {err}");
- None
- }
- };
-
- DiskStat {
- name: name.to_string(),
- usage,
- dev,
- }
-}
-
// Rate Limiter lookup
async fn run_traffic_control_updater() {
loop {
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
new file mode 100644
index 00000000..e220f51a
--- /dev/null
+++ b/src/server/metric_collection/mod.rs
@@ -0,0 +1,426 @@
+use std::{
+ path::Path,
+ pin::pin,
+ sync::Arc,
+ time::{Duration, Instant},
+};
+
+use anyhow::Error;
+use pbs_api_types::{DataStoreConfig, Operation};
+use serde_json::{json, Value};
+use tokio::join;
+
+use proxmox_metrics::MetricsData;
+use proxmox_sys::{
+ fs::FileSystemInformation,
+ linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
+};
+
+use crate::{
+ rrd_cache::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge},
+ tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage},
+};
+
+pub fn init() -> Result<(), Error> {
+ let rrd_cache = initialize_rrd_cache()?;
+ rrd_cache.apply_journal()?;
+ Ok(())
+}
+
+pub fn start_collection_task() {
+ tokio::spawn(async {
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ let future = pin!(run_stat_generator());
+ futures::future::select(future, abort_future).await;
+ });
+}
+
+async fn run_stat_generator() {
+ loop {
+ let delay_target = Instant::now() + Duration::from_secs(10);
+
+ let stats_future = tokio::task::spawn_blocking(|| {
+ let hoststats = collect_host_stats_sync();
+ let (hostdisk, datastores) = collect_disk_stats_sync();
+ Arc::new((hoststats, hostdisk, datastores))
+ });
+ let stats = match stats_future.await {
+ Ok(res) => res,
+ Err(err) => {
+ log::error!("collecting host stats panicked: {err}");
+ tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+ continue;
+ }
+ };
+
+ let rrd_future = tokio::task::spawn_blocking({
+ let stats = Arc::clone(&stats);
+ move || {
+ rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
+ rrd_sync_journal();
+ }
+ });
+
+ let metrics_future = send_data_to_metric_servers(stats);
+
+ let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+ if let Err(err) = rrd_res {
+ log::error!("rrd update panicked: {err}");
+ }
+ if let Err(err) = metrics_res {
+ log::error!("error during metrics sending: {err}");
+ }
+
+ tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+ }
+}
+
+async fn send_data_to_metric_servers(
+ stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
+) -> Result<(), Error> {
+ let (config, _digest) = pbs_config::metrics::config()?;
+ let channel_list = get_metric_server_connections(config)?;
+
+ if channel_list.is_empty() {
+ return Ok(());
+ }
+
+ let ctime = proxmox_time::epoch_i64();
+ let nodename = proxmox_sys::nodename();
+
+ let mut values = Vec::new();
+
+ let mut cpuvalue = match &stats.0.proc {
+ Some(stat) => serde_json::to_value(stat)?,
+ None => json!({}),
+ };
+
+ if let Some(loadavg) = &stats.0.load {
+ cpuvalue["avg1"] = Value::from(loadavg.0);
+ cpuvalue["avg5"] = Value::from(loadavg.1);
+ cpuvalue["avg15"] = Value::from(loadavg.2);
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("cpustat", ctime, cpuvalue)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ if let Some(stat) = &stats.0.meminfo {
+ values.push(Arc::new(
+ MetricsData::new("memory", ctime, stat)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+ }
+
+ if let Some(netdev) = &stats.0.net {
+ for item in netdev {
+ values.push(Arc::new(
+ MetricsData::new("nics", ctime, item)?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("instance", item.device.clone()),
+ ));
+ }
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, stats.1.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ for datastore in stats.2.iter() {
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, datastore.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("datastore", datastore.name.clone()),
+ ));
+ }
+
+ // we must have a concrete functions, because the inferred lifetime from a
+ // closure is not general enough for the tokio::spawn call we are in here...
+ fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
+ &item.0
+ }
+
+ let results =
+ proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
+ for (res, name) in results
+ .into_iter()
+ .zip(channel_list.iter().map(|(_, name)| name))
+ {
+ if let Err(err) = res {
+ log::error!("error sending into channel of {name}: {err}");
+ }
+ }
+
+ futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
+ if let Err(err) = channel.join().await {
+ log::error!("error sending to metric server {name}: {err}");
+ }
+ }))
+ .await;
+
+ Ok(())
+}
+
+/// Get the metric server connections from a config
+fn get_metric_server_connections(
+ metric_config: proxmox_section_config::SectionConfigData,
+) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
+ let mut res = Vec::new();
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+ {
+ if !config.enable {
+ continue;
+ }
+ let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
+ res.push((future, config.name));
+ }
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+ {
+ if !config.enable {
+ continue;
+ }
+ let future = proxmox_metrics::influxdb_http(
+ &config.url,
+ config.organization.as_deref().unwrap_or("proxmox"),
+ config.bucket.as_deref().unwrap_or("proxmox"),
+ config.token.as_deref(),
+ config.verify_tls.unwrap_or(true),
+ config.max_body_size.unwrap_or(25_000_000),
+ )?;
+ res.push((future, config.name));
+ }
+ Ok(res)
+}
+
+struct HostStats {
+ proc: Option<ProcFsStat>,
+ meminfo: Option<ProcFsMemInfo>,
+ net: Option<Vec<ProcFsNetDev>>,
+ load: Option<Loadavg>,
+}
+
+struct DiskStat {
+ name: String,
+ usage: Option<FileSystemInformation>,
+ dev: Option<BlockDevStat>,
+}
+
+impl DiskStat {
+ fn to_value(&self) -> Value {
+ let mut value = json!({});
+ if let Some(usage) = &self.usage {
+ value["total"] = Value::from(usage.total);
+ value["used"] = Value::from(usage.used);
+ value["avail"] = Value::from(usage.available);
+ }
+
+ if let Some(dev) = &self.dev {
+ value["read_ios"] = Value::from(dev.read_ios);
+ value["read_bytes"] = Value::from(dev.read_sectors * 512);
+ value["write_ios"] = Value::from(dev.write_ios);
+ value["write_bytes"] = Value::from(dev.write_sectors * 512);
+ value["io_ticks"] = Value::from(dev.io_ticks / 1000);
+ }
+ value
+ }
+}
+
+fn collect_host_stats_sync() -> HostStats {
+ use proxmox_sys::linux::procfs::{
+ read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
+ };
+
+ let proc = match read_proc_stat() {
+ Ok(stat) => Some(stat),
+ Err(err) => {
+ eprintln!("read_proc_stat failed - {err}");
+ None
+ }
+ };
+
+ let meminfo = match read_meminfo() {
+ Ok(stat) => Some(stat),
+ Err(err) => {
+ eprintln!("read_meminfo failed - {err}");
+ None
+ }
+ };
+
+ let net = match read_proc_net_dev() {
+ Ok(netdev) => Some(netdev),
+ Err(err) => {
+ eprintln!("read_prox_net_dev failed - {err}");
+ None
+ }
+ };
+
+ let load = match read_loadavg() {
+ Ok(loadavg) => Some(loadavg),
+ Err(err) => {
+ eprintln!("read_loadavg failed - {err}");
+ None
+ }
+ };
+
+ HostStats {
+ proc,
+ meminfo,
+ net,
+ load,
+ }
+}
+
+fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
+ let disk_manager = DiskManage::new();
+
+ let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+
+ let mut datastores = Vec::new();
+ match pbs_config::datastore::config() {
+ Ok((config, _)) => {
+ let datastore_list: Vec<DataStoreConfig> = config
+ .convert_to_typed_array("datastore")
+ .unwrap_or_default();
+
+ for config in datastore_list {
+ if config
+ .get_maintenance_mode()
+ .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
+ {
+ continue;
+ }
+ let path = std::path::Path::new(&config.path);
+ datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
+ }
+ }
+ Err(err) => {
+ eprintln!("read datastore config failed - {err}");
+ }
+ }
+
+ (root, datastores)
+}
+
+fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
+ if let Some(stat) = &host.proc {
+ rrd_update_gauge("host/cpu", stat.cpu);
+ rrd_update_gauge("host/iowait", stat.iowait_percent);
+ }
+
+ if let Some(meminfo) = &host.meminfo {
+ rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+ rrd_update_gauge("host/memused", meminfo.memused as f64);
+ rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+ rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+ }
+
+ if let Some(netdev) = &host.net {
+ use pbs_config::network::is_physical_nic;
+ let mut netin = 0;
+ let mut netout = 0;
+ for item in netdev {
+ if !is_physical_nic(&item.device) {
+ continue;
+ }
+ netin += item.receive;
+ netout += item.send;
+ }
+ rrd_update_derive("host/netin", netin as f64);
+ rrd_update_derive("host/netout", netout as f64);
+ }
+
+ if let Some(loadavg) = &host.load {
+ rrd_update_gauge("host/loadavg", loadavg.0);
+ }
+
+ rrd_update_disk_stat(hostdisk, "host");
+
+ for stat in datastores {
+ let rrd_prefix = format!("datastore/{}", stat.name);
+ rrd_update_disk_stat(stat, &rrd_prefix);
+ }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+ if let Some(status) = &disk.usage {
+ let rrd_key = format!("{}/total", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.total as f64);
+ let rrd_key = format!("{}/used", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.used as f64);
+ let rrd_key = format!("{}/available", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.available as f64);
+ }
+
+ if let Some(stat) = &disk.dev {
+ let rrd_key = format!("{}/read_ios", rrd_prefix);
+ rrd_update_derive(&rrd_key, stat.read_ios as f64);
+ let rrd_key = format!("{}/read_bytes", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
+
+ let rrd_key = format!("{}/write_ios", rrd_prefix);
+ rrd_update_derive(&rrd_key, stat.write_ios as f64);
+ let rrd_key = format!("{}/write_bytes", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
+
+ let rrd_key = format!("{}/io_ticks", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
+ }
+}
+
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
+ let usage = match proxmox_sys::fs::fs_info(path) {
+ Ok(status) => Some(status),
+ Err(err) => {
+ eprintln!("read fs info on {path:?} failed - {err}");
+ None
+ }
+ };
+
+ let dev = match disk_manager.find_mounted_device(path) {
+ Ok(None) => None,
+ Ok(Some((fs_type, device, source))) => {
+ let mut device_stat = None;
+ match (fs_type.as_str(), source) {
+ ("zfs", Some(source)) => match source.into_string() {
+ Ok(dataset) => match zfs_dataset_stats(&dataset) {
+ Ok(stat) => device_stat = Some(stat),
+ Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
+ },
+ Err(source) => {
+ eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
+ }
+ },
+ _ => {
+ if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
+ match disk.read_stat() {
+ Ok(stat) => device_stat = stat,
+ Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
+ }
+ }
+ }
+ }
+ device_stat
+ }
+ Err(err) => {
+ eprintln!("find_mounted_device failed - {err}");
+ None
+ }
+ };
+
+ DiskStat {
+ name: name.to_string(),
+ usage,
+ dev,
+ }
+}
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 7f845e5b..ff92a821 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -33,6 +33,8 @@ pub use report::*;
pub mod auth;
+pub mod metric_collection;
+
pub(crate) mod pull;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-10-15 8:46 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-10-15 8:46 [pbs-devel] [PATCH proxmox-backup v2 00/13] add metric endpoint Lukas Wagner
2024-10-15 8:46 ` Lukas Wagner [this message]
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 02/13] metric collection: add doc comments for public functions Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 03/13] metric collection: move rrd_cache to new metric_collection module Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 04/13] metric_collection: split out push metric part Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 05/13] metric collection: rrd: move rrd update function to rrd module Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 06/13] metric collection: rrd: restrict function visibility Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 07/13] metric collection: rrd: remove rrd prefix from some function names Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 08/13] metric collection: drop std::path prefix where not needed Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 09/13] metric collection: move impl block for DiskStats to metric_server module Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 10/13] pbs-api-types: add types for the new metrics endpoint Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 11/13] metric collection: initialize metric cache on startup Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 12/13] metric collection: put metrics in a cache Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 13/13] api: add /status/metrics API Lukas Wagner
2024-10-15 13:02 ` [pbs-devel] applied-series: [PATCH proxmox-backup v2 00/13] add metric endpoint Wolfgang Bumiller
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241015084636.57106-2-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox