From: Lukas Wagner <l.wagner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 01/13] proxy: server: move rrd stat/metric server to separate module
Date: Fri, 11 Oct 2024 12:51:25 +0200 [thread overview]
Message-ID: <20241011105137.131530-2-l.wagner@proxmox.com> (raw)
In-Reply-To: <20241011105137.131530-1-l.wagner@proxmox.com>
With the upcoming pull-metric system/metric caching, these
things should go into a sepearate module.
No functional changes intended.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 419 +--------------------------
src/server/metric_collection/mod.rs | 426 ++++++++++++++++++++++++++++
src/server/mod.rs | 2 +
3 files changed, 433 insertions(+), 414 deletions(-)
create mode 100644 src/server/metric_collection/mod.rs
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 041f3aff..859f5b0f 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -17,10 +17,8 @@ use serde_json::{json, Value};
use proxmox_lang::try_block;
use proxmox_log::init_logger;
-use proxmox_metrics::MetricsData;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
-use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
-use proxmox_sys::linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat};
+use proxmox_sys::fs::CreateOptions;
use proxmox_sys::logrotate::LogRotate;
use pbs_datastore::DataStore;
@@ -30,15 +28,11 @@ use proxmox_rest_server::{
RestEnvironment, RestServer, WorkerTask,
};
-use proxmox_backup::rrd_cache::{
- initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge,
-};
use proxmox_backup::{
server::{
auth::check_pbs_auth,
jobstate::{self, Job},
},
- tools::disks::BlockDevStat,
traffic_control_cache::{SharedRateLimit, TRAFFIC_CONTROL_CACHE},
};
@@ -51,11 +45,8 @@ use pbs_api_types::{
};
use proxmox_backup::auth_helpers::*;
-use proxmox_backup::server;
-use proxmox_backup::tools::{
- disks::{zfs_dataset_stats, DiskManage},
- PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
-};
+use proxmox_backup::server::{self, metric_collection};
+use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
use proxmox_backup::api2::pull::do_sync_job;
use proxmox_backup::api2::tape::backup::do_tape_backup_job;
@@ -186,9 +177,7 @@ async fn run() -> Result<(), Error> {
proxmox_backup::auth_helpers::setup_auth_context(false);
proxmox_backup::server::notifications::init()?;
-
- let rrd_cache = initialize_rrd_cache()?;
- rrd_cache.apply_journal()?;
+ metric_collection::init()?;
let mut indexpath = PathBuf::from(pbs_buildcfg::JS_DIR);
indexpath.push("index.hbs");
@@ -356,7 +345,7 @@ async fn run() -> Result<(), Error> {
});
start_task_scheduler();
- start_stat_generator();
+ metric_collection::start_collection_task();
start_traffic_control_updater();
server.await?;
@@ -389,14 +378,6 @@ fn make_tls_acceptor() -> Result<SslAcceptor, Error> {
acceptor.build()
}
-fn start_stat_generator() {
- tokio::spawn(async {
- let abort_future = pin!(proxmox_daemon::shutdown_future());
- let future = pin!(run_stat_generator());
- futures::future::select(future, abort_future).await;
- });
-}
-
fn start_task_scheduler() {
tokio::spawn(async {
let abort_future = pin!(proxmox_daemon::shutdown_future());
@@ -867,349 +848,6 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
}
}
-async fn run_stat_generator() {
- loop {
- let delay_target = Instant::now() + Duration::from_secs(10);
-
- let stats_future = tokio::task::spawn_blocking(|| {
- let hoststats = collect_host_stats_sync();
- let (hostdisk, datastores) = collect_disk_stats_sync();
- Arc::new((hoststats, hostdisk, datastores))
- });
- let stats = match stats_future.await {
- Ok(res) => res,
- Err(err) => {
- log::error!("collecting host stats panicked: {err}");
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
- continue;
- }
- };
-
- let rrd_future = tokio::task::spawn_blocking({
- let stats = Arc::clone(&stats);
- move || {
- rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
- rrd_sync_journal();
- }
- });
-
- let metrics_future = send_data_to_metric_servers(stats);
-
- let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
- if let Err(err) = rrd_res {
- log::error!("rrd update panicked: {err}");
- }
- if let Err(err) = metrics_res {
- log::error!("error during metrics sending: {err}");
- }
-
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
- }
-}
-
-async fn send_data_to_metric_servers(
- stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
-) -> Result<(), Error> {
- let (config, _digest) = pbs_config::metrics::config()?;
- let channel_list = get_metric_server_connections(config)?;
-
- if channel_list.is_empty() {
- return Ok(());
- }
-
- let ctime = proxmox_time::epoch_i64();
- let nodename = proxmox_sys::nodename();
-
- let mut values = Vec::new();
-
- let mut cpuvalue = match &stats.0.proc {
- Some(stat) => serde_json::to_value(stat)?,
- None => json!({}),
- };
-
- if let Some(loadavg) = &stats.0.load {
- cpuvalue["avg1"] = Value::from(loadavg.0);
- cpuvalue["avg5"] = Value::from(loadavg.1);
- cpuvalue["avg15"] = Value::from(loadavg.2);
- }
-
- values.push(Arc::new(
- MetricsData::new("cpustat", ctime, cpuvalue)?
- .tag("object", "host")
- .tag("host", nodename),
- ));
-
- if let Some(stat) = &stats.0.meminfo {
- values.push(Arc::new(
- MetricsData::new("memory", ctime, stat)?
- .tag("object", "host")
- .tag("host", nodename),
- ));
- }
-
- if let Some(netdev) = &stats.0.net {
- for item in netdev {
- values.push(Arc::new(
- MetricsData::new("nics", ctime, item)?
- .tag("object", "host")
- .tag("host", nodename)
- .tag("instance", item.device.clone()),
- ));
- }
- }
-
- values.push(Arc::new(
- MetricsData::new("blockstat", ctime, stats.1.to_value())?
- .tag("object", "host")
- .tag("host", nodename),
- ));
-
- for datastore in stats.2.iter() {
- values.push(Arc::new(
- MetricsData::new("blockstat", ctime, datastore.to_value())?
- .tag("object", "host")
- .tag("host", nodename)
- .tag("datastore", datastore.name.clone()),
- ));
- }
-
- // we must have a concrete functions, because the inferred lifetime from a
- // closure is not general enough for the tokio::spawn call we are in here...
- fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
- &item.0
- }
-
- let results =
- proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
- for (res, name) in results
- .into_iter()
- .zip(channel_list.iter().map(|(_, name)| name))
- {
- if let Err(err) = res {
- log::error!("error sending into channel of {name}: {err}");
- }
- }
-
- futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
- if let Err(err) = channel.join().await {
- log::error!("error sending to metric server {name}: {err}");
- }
- }))
- .await;
-
- Ok(())
-}
-
-/// Get the metric server connections from a config
-pub fn get_metric_server_connections(
- metric_config: proxmox_section_config::SectionConfigData,
-) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
- let mut res = Vec::new();
-
- for config in
- metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
- {
- if !config.enable {
- continue;
- }
- let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
- res.push((future, config.name));
- }
-
- for config in
- metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
- {
- if !config.enable {
- continue;
- }
- let future = proxmox_metrics::influxdb_http(
- &config.url,
- config.organization.as_deref().unwrap_or("proxmox"),
- config.bucket.as_deref().unwrap_or("proxmox"),
- config.token.as_deref(),
- config.verify_tls.unwrap_or(true),
- config.max_body_size.unwrap_or(25_000_000),
- )?;
- res.push((future, config.name));
- }
- Ok(res)
-}
-
-struct HostStats {
- proc: Option<ProcFsStat>,
- meminfo: Option<ProcFsMemInfo>,
- net: Option<Vec<ProcFsNetDev>>,
- load: Option<Loadavg>,
-}
-
-struct DiskStat {
- name: String,
- usage: Option<FileSystemInformation>,
- dev: Option<BlockDevStat>,
-}
-
-impl DiskStat {
- fn to_value(&self) -> Value {
- let mut value = json!({});
- if let Some(usage) = &self.usage {
- value["total"] = Value::from(usage.total);
- value["used"] = Value::from(usage.used);
- value["avail"] = Value::from(usage.available);
- }
-
- if let Some(dev) = &self.dev {
- value["read_ios"] = Value::from(dev.read_ios);
- value["read_bytes"] = Value::from(dev.read_sectors * 512);
- value["write_ios"] = Value::from(dev.write_ios);
- value["write_bytes"] = Value::from(dev.write_sectors * 512);
- value["io_ticks"] = Value::from(dev.io_ticks / 1000);
- }
- value
- }
-}
-
-fn collect_host_stats_sync() -> HostStats {
- use proxmox_sys::linux::procfs::{
- read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
- };
-
- let proc = match read_proc_stat() {
- Ok(stat) => Some(stat),
- Err(err) => {
- eprintln!("read_proc_stat failed - {err}");
- None
- }
- };
-
- let meminfo = match read_meminfo() {
- Ok(stat) => Some(stat),
- Err(err) => {
- eprintln!("read_meminfo failed - {err}");
- None
- }
- };
-
- let net = match read_proc_net_dev() {
- Ok(netdev) => Some(netdev),
- Err(err) => {
- eprintln!("read_prox_net_dev failed - {err}");
- None
- }
- };
-
- let load = match read_loadavg() {
- Ok(loadavg) => Some(loadavg),
- Err(err) => {
- eprintln!("read_loadavg failed - {err}");
- None
- }
- };
-
- HostStats {
- proc,
- meminfo,
- net,
- load,
- }
-}
-
-fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
- let disk_manager = DiskManage::new();
-
- let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
-
- let mut datastores = Vec::new();
- match pbs_config::datastore::config() {
- Ok((config, _)) => {
- let datastore_list: Vec<DataStoreConfig> = config
- .convert_to_typed_array("datastore")
- .unwrap_or_default();
-
- for config in datastore_list {
- if config
- .get_maintenance_mode()
- .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
- {
- continue;
- }
- let path = std::path::Path::new(&config.path);
- datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
- }
- }
- Err(err) => {
- eprintln!("read datastore config failed - {err}");
- }
- }
-
- (root, datastores)
-}
-
-fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
- if let Some(stat) = &host.proc {
- rrd_update_gauge("host/cpu", stat.cpu);
- rrd_update_gauge("host/iowait", stat.iowait_percent);
- }
-
- if let Some(meminfo) = &host.meminfo {
- rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
- rrd_update_gauge("host/memused", meminfo.memused as f64);
- rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
- rrd_update_gauge("host/swapused", meminfo.swapused as f64);
- }
-
- if let Some(netdev) = &host.net {
- use pbs_config::network::is_physical_nic;
- let mut netin = 0;
- let mut netout = 0;
- for item in netdev {
- if !is_physical_nic(&item.device) {
- continue;
- }
- netin += item.receive;
- netout += item.send;
- }
- rrd_update_derive("host/netin", netin as f64);
- rrd_update_derive("host/netout", netout as f64);
- }
-
- if let Some(loadavg) = &host.load {
- rrd_update_gauge("host/loadavg", loadavg.0);
- }
-
- rrd_update_disk_stat(hostdisk, "host");
-
- for stat in datastores {
- let rrd_prefix = format!("datastore/{}", stat.name);
- rrd_update_disk_stat(stat, &rrd_prefix);
- }
-}
-
-fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
- if let Some(status) = &disk.usage {
- let rrd_key = format!("{}/total", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.total as f64);
- let rrd_key = format!("{}/used", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.used as f64);
- let rrd_key = format!("{}/available", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.available as f64);
- }
-
- if let Some(stat) = &disk.dev {
- let rrd_key = format!("{}/read_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.read_ios as f64);
- let rrd_key = format!("{}/read_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
-
- let rrd_key = format!("{}/write_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.write_ios as f64);
- let rrd_key = format!("{}/write_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
-
- let rrd_key = format!("{}/io_ticks", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
- }
-}
-
fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
let event: CalendarEvent = match event_str.parse() {
Ok(event) => event,
@@ -1240,53 +878,6 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
next <= now
}
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
- let usage = match proxmox_sys::fs::fs_info(path) {
- Ok(status) => Some(status),
- Err(err) => {
- eprintln!("read fs info on {path:?} failed - {err}");
- None
- }
- };
-
- let dev = match disk_manager.find_mounted_device(path) {
- Ok(None) => None,
- Ok(Some((fs_type, device, source))) => {
- let mut device_stat = None;
- match (fs_type.as_str(), source) {
- ("zfs", Some(source)) => match source.into_string() {
- Ok(dataset) => match zfs_dataset_stats(&dataset) {
- Ok(stat) => device_stat = Some(stat),
- Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
- },
- Err(source) => {
- eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
- }
- },
- _ => {
- if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
- match disk.read_stat() {
- Ok(stat) => device_stat = stat,
- Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
- }
- }
- }
- }
- device_stat
- }
- Err(err) => {
- eprintln!("find_mounted_device failed - {err}");
- None
- }
- };
-
- DiskStat {
- name: name.to_string(),
- usage,
- dev,
- }
-}
-
// Rate Limiter lookup
async fn run_traffic_control_updater() {
loop {
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
new file mode 100644
index 00000000..e220f51a
--- /dev/null
+++ b/src/server/metric_collection/mod.rs
@@ -0,0 +1,426 @@
+use std::{
+ path::Path,
+ pin::pin,
+ sync::Arc,
+ time::{Duration, Instant},
+};
+
+use anyhow::Error;
+use pbs_api_types::{DataStoreConfig, Operation};
+use serde_json::{json, Value};
+use tokio::join;
+
+use proxmox_metrics::MetricsData;
+use proxmox_sys::{
+ fs::FileSystemInformation,
+ linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
+};
+
+use crate::{
+ rrd_cache::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge},
+ tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage},
+};
+
+pub fn init() -> Result<(), Error> {
+ let rrd_cache = initialize_rrd_cache()?;
+ rrd_cache.apply_journal()?;
+ Ok(())
+}
+
+pub fn start_collection_task() {
+ tokio::spawn(async {
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ let future = pin!(run_stat_generator());
+ futures::future::select(future, abort_future).await;
+ });
+}
+
+async fn run_stat_generator() {
+ loop {
+ let delay_target = Instant::now() + Duration::from_secs(10);
+
+ let stats_future = tokio::task::spawn_blocking(|| {
+ let hoststats = collect_host_stats_sync();
+ let (hostdisk, datastores) = collect_disk_stats_sync();
+ Arc::new((hoststats, hostdisk, datastores))
+ });
+ let stats = match stats_future.await {
+ Ok(res) => res,
+ Err(err) => {
+ log::error!("collecting host stats panicked: {err}");
+ tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+ continue;
+ }
+ };
+
+ let rrd_future = tokio::task::spawn_blocking({
+ let stats = Arc::clone(&stats);
+ move || {
+ rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
+ rrd_sync_journal();
+ }
+ });
+
+ let metrics_future = send_data_to_metric_servers(stats);
+
+ let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+ if let Err(err) = rrd_res {
+ log::error!("rrd update panicked: {err}");
+ }
+ if let Err(err) = metrics_res {
+ log::error!("error during metrics sending: {err}");
+ }
+
+ tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+ }
+}
+
+async fn send_data_to_metric_servers(
+ stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
+) -> Result<(), Error> {
+ let (config, _digest) = pbs_config::metrics::config()?;
+ let channel_list = get_metric_server_connections(config)?;
+
+ if channel_list.is_empty() {
+ return Ok(());
+ }
+
+ let ctime = proxmox_time::epoch_i64();
+ let nodename = proxmox_sys::nodename();
+
+ let mut values = Vec::new();
+
+ let mut cpuvalue = match &stats.0.proc {
+ Some(stat) => serde_json::to_value(stat)?,
+ None => json!({}),
+ };
+
+ if let Some(loadavg) = &stats.0.load {
+ cpuvalue["avg1"] = Value::from(loadavg.0);
+ cpuvalue["avg5"] = Value::from(loadavg.1);
+ cpuvalue["avg15"] = Value::from(loadavg.2);
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("cpustat", ctime, cpuvalue)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ if let Some(stat) = &stats.0.meminfo {
+ values.push(Arc::new(
+ MetricsData::new("memory", ctime, stat)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+ }
+
+ if let Some(netdev) = &stats.0.net {
+ for item in netdev {
+ values.push(Arc::new(
+ MetricsData::new("nics", ctime, item)?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("instance", item.device.clone()),
+ ));
+ }
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, stats.1.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ for datastore in stats.2.iter() {
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, datastore.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("datastore", datastore.name.clone()),
+ ));
+ }
+
+ // we must have a concrete functions, because the inferred lifetime from a
+ // closure is not general enough for the tokio::spawn call we are in here...
+ fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
+ &item.0
+ }
+
+ let results =
+ proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
+ for (res, name) in results
+ .into_iter()
+ .zip(channel_list.iter().map(|(_, name)| name))
+ {
+ if let Err(err) = res {
+ log::error!("error sending into channel of {name}: {err}");
+ }
+ }
+
+ futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
+ if let Err(err) = channel.join().await {
+ log::error!("error sending to metric server {name}: {err}");
+ }
+ }))
+ .await;
+
+ Ok(())
+}
+
+/// Get the metric server connections from a config
+fn get_metric_server_connections(
+ metric_config: proxmox_section_config::SectionConfigData,
+) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
+ let mut res = Vec::new();
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+ {
+ if !config.enable {
+ continue;
+ }
+ let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
+ res.push((future, config.name));
+ }
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+ {
+ if !config.enable {
+ continue;
+ }
+ let future = proxmox_metrics::influxdb_http(
+ &config.url,
+ config.organization.as_deref().unwrap_or("proxmox"),
+ config.bucket.as_deref().unwrap_or("proxmox"),
+ config.token.as_deref(),
+ config.verify_tls.unwrap_or(true),
+ config.max_body_size.unwrap_or(25_000_000),
+ )?;
+ res.push((future, config.name));
+ }
+ Ok(res)
+}
+
+struct HostStats {
+ proc: Option<ProcFsStat>,
+ meminfo: Option<ProcFsMemInfo>,
+ net: Option<Vec<ProcFsNetDev>>,
+ load: Option<Loadavg>,
+}
+
+struct DiskStat {
+ name: String,
+ usage: Option<FileSystemInformation>,
+ dev: Option<BlockDevStat>,
+}
+
+impl DiskStat {
+ fn to_value(&self) -> Value {
+ let mut value = json!({});
+ if let Some(usage) = &self.usage {
+ value["total"] = Value::from(usage.total);
+ value["used"] = Value::from(usage.used);
+ value["avail"] = Value::from(usage.available);
+ }
+
+ if let Some(dev) = &self.dev {
+ value["read_ios"] = Value::from(dev.read_ios);
+ value["read_bytes"] = Value::from(dev.read_sectors * 512);
+ value["write_ios"] = Value::from(dev.write_ios);
+ value["write_bytes"] = Value::from(dev.write_sectors * 512);
+ value["io_ticks"] = Value::from(dev.io_ticks / 1000);
+ }
+ value
+ }
+}
+
+fn collect_host_stats_sync() -> HostStats {
+ use proxmox_sys::linux::procfs::{
+ read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
+ };
+
+ let proc = match read_proc_stat() {
+ Ok(stat) => Some(stat),
+ Err(err) => {
+ eprintln!("read_proc_stat failed - {err}");
+ None
+ }
+ };
+
+ let meminfo = match read_meminfo() {
+ Ok(stat) => Some(stat),
+ Err(err) => {
+ eprintln!("read_meminfo failed - {err}");
+ None
+ }
+ };
+
+ let net = match read_proc_net_dev() {
+ Ok(netdev) => Some(netdev),
+ Err(err) => {
+ eprintln!("read_prox_net_dev failed - {err}");
+ None
+ }
+ };
+
+ let load = match read_loadavg() {
+ Ok(loadavg) => Some(loadavg),
+ Err(err) => {
+ eprintln!("read_loadavg failed - {err}");
+ None
+ }
+ };
+
+ HostStats {
+ proc,
+ meminfo,
+ net,
+ load,
+ }
+}
+
+fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
+ let disk_manager = DiskManage::new();
+
+ let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+
+ let mut datastores = Vec::new();
+ match pbs_config::datastore::config() {
+ Ok((config, _)) => {
+ let datastore_list: Vec<DataStoreConfig> = config
+ .convert_to_typed_array("datastore")
+ .unwrap_or_default();
+
+ for config in datastore_list {
+ if config
+ .get_maintenance_mode()
+ .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
+ {
+ continue;
+ }
+ let path = std::path::Path::new(&config.path);
+ datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
+ }
+ }
+ Err(err) => {
+ eprintln!("read datastore config failed - {err}");
+ }
+ }
+
+ (root, datastores)
+}
+
+fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
+ if let Some(stat) = &host.proc {
+ rrd_update_gauge("host/cpu", stat.cpu);
+ rrd_update_gauge("host/iowait", stat.iowait_percent);
+ }
+
+ if let Some(meminfo) = &host.meminfo {
+ rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+ rrd_update_gauge("host/memused", meminfo.memused as f64);
+ rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+ rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+ }
+
+ if let Some(netdev) = &host.net {
+ use pbs_config::network::is_physical_nic;
+ let mut netin = 0;
+ let mut netout = 0;
+ for item in netdev {
+ if !is_physical_nic(&item.device) {
+ continue;
+ }
+ netin += item.receive;
+ netout += item.send;
+ }
+ rrd_update_derive("host/netin", netin as f64);
+ rrd_update_derive("host/netout", netout as f64);
+ }
+
+ if let Some(loadavg) = &host.load {
+ rrd_update_gauge("host/loadavg", loadavg.0);
+ }
+
+ rrd_update_disk_stat(hostdisk, "host");
+
+ for stat in datastores {
+ let rrd_prefix = format!("datastore/{}", stat.name);
+ rrd_update_disk_stat(stat, &rrd_prefix);
+ }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+ if let Some(status) = &disk.usage {
+ let rrd_key = format!("{}/total", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.total as f64);
+ let rrd_key = format!("{}/used", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.used as f64);
+ let rrd_key = format!("{}/available", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.available as f64);
+ }
+
+ if let Some(stat) = &disk.dev {
+ let rrd_key = format!("{}/read_ios", rrd_prefix);
+ rrd_update_derive(&rrd_key, stat.read_ios as f64);
+ let rrd_key = format!("{}/read_bytes", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
+
+ let rrd_key = format!("{}/write_ios", rrd_prefix);
+ rrd_update_derive(&rrd_key, stat.write_ios as f64);
+ let rrd_key = format!("{}/write_bytes", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
+
+ let rrd_key = format!("{}/io_ticks", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
+ }
+}
+
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
+ let usage = match proxmox_sys::fs::fs_info(path) {
+ Ok(status) => Some(status),
+ Err(err) => {
+ eprintln!("read fs info on {path:?} failed - {err}");
+ None
+ }
+ };
+
+ let dev = match disk_manager.find_mounted_device(path) {
+ Ok(None) => None,
+ Ok(Some((fs_type, device, source))) => {
+ let mut device_stat = None;
+ match (fs_type.as_str(), source) {
+ ("zfs", Some(source)) => match source.into_string() {
+ Ok(dataset) => match zfs_dataset_stats(&dataset) {
+ Ok(stat) => device_stat = Some(stat),
+ Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
+ },
+ Err(source) => {
+ eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
+ }
+ },
+ _ => {
+ if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
+ match disk.read_stat() {
+ Ok(stat) => device_stat = stat,
+ Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
+ }
+ }
+ }
+ }
+ device_stat
+ }
+ Err(err) => {
+ eprintln!("find_mounted_device failed - {err}");
+ None
+ }
+ };
+
+ DiskStat {
+ name: name.to_string(),
+ usage,
+ dev,
+ }
+}
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 7f845e5b..ff92a821 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -33,6 +33,8 @@ pub use report::*;
pub mod auth;
+pub mod metric_collection;
+
pub(crate) mod pull;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-10-11 10:51 UTC|newest]
Thread overview: 16+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
2024-10-11 10:51 ` Lukas Wagner [this message]
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 02/13] metric collection: add doc comments for public functions Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 03/13] metric collection: move rrd_cache to new metric_collection module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 04/13] metric_collection: split out push metric part Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 05/13] metric collection: rrd: move rrd update function to rrd module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 06/13] metric collection: rrd: restrict function visibility Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 07/13] metric collection: rrd: remove rrd prefix from some function names Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 08/13] metric collection: drop std::path prefix where not needed Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 09/13] metric collection: move impl block for DiskStats to metric_server module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 10/13] pbs-api-types: add types for the new metrics endpoint Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 11/13] metric collection: initialize metric cache on startup Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 12/13] metric collection: put metrics in a cache Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 13/13] api: add /status/metrics API Lukas Wagner
2024-10-14 10:02 ` Wolfgang Bumiller
2024-10-15 7:27 ` Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241011105137.131530-2-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal