From: Lukas Wagner <l.wagner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v2 04/13] metric_collection: split out push metric part
Date: Tue, 15 Oct 2024 10:46:27 +0200 [thread overview]
Message-ID: <20241015084636.57106-5-l.wagner@proxmox.com> (raw)
In-Reply-To: <20241015084636.57106-1-l.wagner@proxmox.com>
No functional changes intended.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
src/server/metric_collection/metric_server.rs | 136 ++++++++++++++++++
src/server/metric_collection/mod.rs | 132 +----------------
2 files changed, 138 insertions(+), 130 deletions(-)
create mode 100644 src/server/metric_collection/metric_server.rs
diff --git a/src/server/metric_collection/metric_server.rs b/src/server/metric_collection/metric_server.rs
new file mode 100644
index 00000000..cc9f736a
--- /dev/null
+++ b/src/server/metric_collection/metric_server.rs
@@ -0,0 +1,136 @@
+use std::sync::Arc;
+
+use anyhow::Error;
+use serde_json::{json, Value};
+
+use proxmox_metrics::MetricsData;
+
+use super::{DiskStat, HostStats};
+
+pub async fn send_data_to_metric_servers(
+ stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
+) -> Result<(), Error> {
+ let (config, _digest) = pbs_config::metrics::config()?;
+ let channel_list = get_metric_server_connections(config)?;
+
+ if channel_list.is_empty() {
+ return Ok(());
+ }
+
+ let ctime = proxmox_time::epoch_i64();
+ let nodename = proxmox_sys::nodename();
+
+ let mut values = Vec::new();
+
+ let mut cpuvalue = match &stats.0.proc {
+ Some(stat) => serde_json::to_value(stat)?,
+ None => json!({}),
+ };
+
+ if let Some(loadavg) = &stats.0.load {
+ cpuvalue["avg1"] = Value::from(loadavg.0);
+ cpuvalue["avg5"] = Value::from(loadavg.1);
+ cpuvalue["avg15"] = Value::from(loadavg.2);
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("cpustat", ctime, cpuvalue)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ if let Some(stat) = &stats.0.meminfo {
+ values.push(Arc::new(
+ MetricsData::new("memory", ctime, stat)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+ }
+
+ if let Some(netdev) = &stats.0.net {
+ for item in netdev {
+ values.push(Arc::new(
+ MetricsData::new("nics", ctime, item)?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("instance", item.device.clone()),
+ ));
+ }
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, stats.1.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ for datastore in stats.2.iter() {
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, datastore.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("datastore", datastore.name.clone()),
+ ));
+ }
+
+ // we must have a concrete functions, because the inferred lifetime from a
+ // closure is not general enough for the tokio::spawn call we are in here...
+ fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
+ &item.0
+ }
+
+ let results =
+ proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
+ for (res, name) in results
+ .into_iter()
+ .zip(channel_list.iter().map(|(_, name)| name))
+ {
+ if let Err(err) = res {
+ log::error!("error sending into channel of {name}: {err}");
+ }
+ }
+
+ futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
+ if let Err(err) = channel.join().await {
+ log::error!("error sending to metric server {name}: {err}");
+ }
+ }))
+ .await;
+
+ Ok(())
+}
+
+/// Get the metric server connections from a config
+fn get_metric_server_connections(
+ metric_config: proxmox_section_config::SectionConfigData,
+) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
+ let mut res = Vec::new();
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+ {
+ if !config.enable {
+ continue;
+ }
+ let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
+ res.push((future, config.name));
+ }
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+ {
+ if !config.enable {
+ continue;
+ }
+ let future = proxmox_metrics::influxdb_http(
+ &config.url,
+ config.organization.as_deref().unwrap_or("proxmox"),
+ config.bucket.as_deref().unwrap_or("proxmox"),
+ config.token.as_deref(),
+ config.verify_tls.unwrap_or(true),
+ config.max_body_size.unwrap_or(25_000_000),
+ )?;
+ res.push((future, config.name));
+ }
+ Ok(res)
+}
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 5102227b..5a516564 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -10,7 +10,6 @@ use pbs_api_types::{DataStoreConfig, Operation};
use serde_json::{json, Value};
use tokio::join;
-use proxmox_metrics::MetricsData;
use proxmox_sys::{
fs::FileSystemInformation,
linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
@@ -20,6 +19,7 @@ use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
use rrd::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge};
+mod metric_server;
pub mod rrd;
/// Initialize the metric collection subsystem.
@@ -70,7 +70,7 @@ async fn run_stat_generator() {
}
});
- let metrics_future = send_data_to_metric_servers(stats);
+ let metrics_future = metric_server::send_data_to_metric_servers(stats);
let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
if let Err(err) = rrd_res {
@@ -84,134 +84,6 @@ async fn run_stat_generator() {
}
}
-async fn send_data_to_metric_servers(
- stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
-) -> Result<(), Error> {
- let (config, _digest) = pbs_config::metrics::config()?;
- let channel_list = get_metric_server_connections(config)?;
-
- if channel_list.is_empty() {
- return Ok(());
- }
-
- let ctime = proxmox_time::epoch_i64();
- let nodename = proxmox_sys::nodename();
-
- let mut values = Vec::new();
-
- let mut cpuvalue = match &stats.0.proc {
- Some(stat) => serde_json::to_value(stat)?,
- None => json!({}),
- };
-
- if let Some(loadavg) = &stats.0.load {
- cpuvalue["avg1"] = Value::from(loadavg.0);
- cpuvalue["avg5"] = Value::from(loadavg.1);
- cpuvalue["avg15"] = Value::from(loadavg.2);
- }
-
- values.push(Arc::new(
- MetricsData::new("cpustat", ctime, cpuvalue)?
- .tag("object", "host")
- .tag("host", nodename),
- ));
-
- if let Some(stat) = &stats.0.meminfo {
- values.push(Arc::new(
- MetricsData::new("memory", ctime, stat)?
- .tag("object", "host")
- .tag("host", nodename),
- ));
- }
-
- if let Some(netdev) = &stats.0.net {
- for item in netdev {
- values.push(Arc::new(
- MetricsData::new("nics", ctime, item)?
- .tag("object", "host")
- .tag("host", nodename)
- .tag("instance", item.device.clone()),
- ));
- }
- }
-
- values.push(Arc::new(
- MetricsData::new("blockstat", ctime, stats.1.to_value())?
- .tag("object", "host")
- .tag("host", nodename),
- ));
-
- for datastore in stats.2.iter() {
- values.push(Arc::new(
- MetricsData::new("blockstat", ctime, datastore.to_value())?
- .tag("object", "host")
- .tag("host", nodename)
- .tag("datastore", datastore.name.clone()),
- ));
- }
-
- // we must have a concrete functions, because the inferred lifetime from a
- // closure is not general enough for the tokio::spawn call we are in here...
- fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
- &item.0
- }
-
- let results =
- proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
- for (res, name) in results
- .into_iter()
- .zip(channel_list.iter().map(|(_, name)| name))
- {
- if let Err(err) = res {
- log::error!("error sending into channel of {name}: {err}");
- }
- }
-
- futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
- if let Err(err) = channel.join().await {
- log::error!("error sending to metric server {name}: {err}");
- }
- }))
- .await;
-
- Ok(())
-}
-
-/// Get the metric server connections from a config
-fn get_metric_server_connections(
- metric_config: proxmox_section_config::SectionConfigData,
-) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
- let mut res = Vec::new();
-
- for config in
- metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
- {
- if !config.enable {
- continue;
- }
- let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
- res.push((future, config.name));
- }
-
- for config in
- metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
- {
- if !config.enable {
- continue;
- }
- let future = proxmox_metrics::influxdb_http(
- &config.url,
- config.organization.as_deref().unwrap_or("proxmox"),
- config.bucket.as_deref().unwrap_or("proxmox"),
- config.token.as_deref(),
- config.verify_tls.unwrap_or(true),
- config.max_body_size.unwrap_or(25_000_000),
- )?;
- res.push((future, config.name));
- }
- Ok(res)
-}
-
struct HostStats {
proc: Option<ProcFsStat>,
meminfo: Option<ProcFsMemInfo>,
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-10-15 8:46 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-10-15 8:46 [pbs-devel] [PATCH proxmox-backup v2 00/13] add metric endpoint Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 01/13] proxy: server: move rrd stat/metric server to separate module Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 02/13] metric collection: add doc comments for public functions Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 03/13] metric collection: move rrd_cache to new metric_collection module Lukas Wagner
2024-10-15 8:46 ` Lukas Wagner [this message]
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 05/13] metric collection: rrd: move rrd update function to rrd module Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 06/13] metric collection: rrd: restrict function visibility Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 07/13] metric collection: rrd: remove rrd prefix from some function names Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 08/13] metric collection: drop std::path prefix where not needed Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 09/13] metric collection: move impl block for DiskStats to metric_server module Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 10/13] pbs-api-types: add types for the new metrics endpoint Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 11/13] metric collection: initialize metric cache on startup Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 12/13] metric collection: put metrics in a cache Lukas Wagner
2024-10-15 8:46 ` [pbs-devel] [PATCH proxmox-backup v2 13/13] api: add /status/metrics API Lukas Wagner
2024-10-15 13:02 ` [pbs-devel] applied-series: [PATCH proxmox-backup v2 00/13] add metric endpoint Wolfgang Bumiller
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241015084636.57106-5-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox