From: Lukas Wagner <l.wagner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 04/13] metric_collection: split out push metric part
Date: Fri, 11 Oct 2024 12:51:28 +0200 [thread overview]
Message-ID: <20241011105137.131530-5-l.wagner@proxmox.com> (raw)
In-Reply-To: <20241011105137.131530-1-l.wagner@proxmox.com>
No functional changes intended.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
src/server/metric_collection/metric_server.rs | 136 ++++++++++++++++++
src/server/metric_collection/mod.rs | 132 +----------------
2 files changed, 138 insertions(+), 130 deletions(-)
create mode 100644 src/server/metric_collection/metric_server.rs
diff --git a/src/server/metric_collection/metric_server.rs b/src/server/metric_collection/metric_server.rs
new file mode 100644
index 00000000..cc9f736a
--- /dev/null
+++ b/src/server/metric_collection/metric_server.rs
@@ -0,0 +1,136 @@
+use std::sync::Arc;
+
+use anyhow::Error;
+use serde_json::{json, Value};
+
+use proxmox_metrics::MetricsData;
+
+use super::{DiskStat, HostStats};
+
+pub async fn send_data_to_metric_servers(
+ stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
+) -> Result<(), Error> {
+ let (config, _digest) = pbs_config::metrics::config()?;
+ let channel_list = get_metric_server_connections(config)?;
+
+ if channel_list.is_empty() {
+ return Ok(());
+ }
+
+ let ctime = proxmox_time::epoch_i64();
+ let nodename = proxmox_sys::nodename();
+
+ let mut values = Vec::new();
+
+ let mut cpuvalue = match &stats.0.proc {
+ Some(stat) => serde_json::to_value(stat)?,
+ None => json!({}),
+ };
+
+ if let Some(loadavg) = &stats.0.load {
+ cpuvalue["avg1"] = Value::from(loadavg.0);
+ cpuvalue["avg5"] = Value::from(loadavg.1);
+ cpuvalue["avg15"] = Value::from(loadavg.2);
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("cpustat", ctime, cpuvalue)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ if let Some(stat) = &stats.0.meminfo {
+ values.push(Arc::new(
+ MetricsData::new("memory", ctime, stat)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+ }
+
+ if let Some(netdev) = &stats.0.net {
+ for item in netdev {
+ values.push(Arc::new(
+ MetricsData::new("nics", ctime, item)?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("instance", item.device.clone()),
+ ));
+ }
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, stats.1.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ for datastore in stats.2.iter() {
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, datastore.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("datastore", datastore.name.clone()),
+ ));
+ }
+
+ // we must have a concrete functions, because the inferred lifetime from a
+ // closure is not general enough for the tokio::spawn call we are in here...
+ fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
+ &item.0
+ }
+
+ let results =
+ proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
+ for (res, name) in results
+ .into_iter()
+ .zip(channel_list.iter().map(|(_, name)| name))
+ {
+ if let Err(err) = res {
+ log::error!("error sending into channel of {name}: {err}");
+ }
+ }
+
+ futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
+ if let Err(err) = channel.join().await {
+ log::error!("error sending to metric server {name}: {err}");
+ }
+ }))
+ .await;
+
+ Ok(())
+}
+
+/// Get the metric server connections from a config
+fn get_metric_server_connections(
+ metric_config: proxmox_section_config::SectionConfigData,
+) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
+ let mut res = Vec::new();
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+ {
+ if !config.enable {
+ continue;
+ }
+ let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
+ res.push((future, config.name));
+ }
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+ {
+ if !config.enable {
+ continue;
+ }
+ let future = proxmox_metrics::influxdb_http(
+ &config.url,
+ config.organization.as_deref().unwrap_or("proxmox"),
+ config.bucket.as_deref().unwrap_or("proxmox"),
+ config.token.as_deref(),
+ config.verify_tls.unwrap_or(true),
+ config.max_body_size.unwrap_or(25_000_000),
+ )?;
+ res.push((future, config.name));
+ }
+ Ok(res)
+}
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 5102227b..5a516564 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -10,7 +10,6 @@ use pbs_api_types::{DataStoreConfig, Operation};
use serde_json::{json, Value};
use tokio::join;
-use proxmox_metrics::MetricsData;
use proxmox_sys::{
fs::FileSystemInformation,
linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
@@ -20,6 +19,7 @@ use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
use rrd::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge};
+mod metric_server;
pub mod rrd;
/// Initialize the metric collection subsystem.
@@ -70,7 +70,7 @@ async fn run_stat_generator() {
}
});
- let metrics_future = send_data_to_metric_servers(stats);
+ let metrics_future = metric_server::send_data_to_metric_servers(stats);
let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
if let Err(err) = rrd_res {
@@ -84,134 +84,6 @@ async fn run_stat_generator() {
}
}
-async fn send_data_to_metric_servers(
- stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
-) -> Result<(), Error> {
- let (config, _digest) = pbs_config::metrics::config()?;
- let channel_list = get_metric_server_connections(config)?;
-
- if channel_list.is_empty() {
- return Ok(());
- }
-
- let ctime = proxmox_time::epoch_i64();
- let nodename = proxmox_sys::nodename();
-
- let mut values = Vec::new();
-
- let mut cpuvalue = match &stats.0.proc {
- Some(stat) => serde_json::to_value(stat)?,
- None => json!({}),
- };
-
- if let Some(loadavg) = &stats.0.load {
- cpuvalue["avg1"] = Value::from(loadavg.0);
- cpuvalue["avg5"] = Value::from(loadavg.1);
- cpuvalue["avg15"] = Value::from(loadavg.2);
- }
-
- values.push(Arc::new(
- MetricsData::new("cpustat", ctime, cpuvalue)?
- .tag("object", "host")
- .tag("host", nodename),
- ));
-
- if let Some(stat) = &stats.0.meminfo {
- values.push(Arc::new(
- MetricsData::new("memory", ctime, stat)?
- .tag("object", "host")
- .tag("host", nodename),
- ));
- }
-
- if let Some(netdev) = &stats.0.net {
- for item in netdev {
- values.push(Arc::new(
- MetricsData::new("nics", ctime, item)?
- .tag("object", "host")
- .tag("host", nodename)
- .tag("instance", item.device.clone()),
- ));
- }
- }
-
- values.push(Arc::new(
- MetricsData::new("blockstat", ctime, stats.1.to_value())?
- .tag("object", "host")
- .tag("host", nodename),
- ));
-
- for datastore in stats.2.iter() {
- values.push(Arc::new(
- MetricsData::new("blockstat", ctime, datastore.to_value())?
- .tag("object", "host")
- .tag("host", nodename)
- .tag("datastore", datastore.name.clone()),
- ));
- }
-
- // we must have a concrete functions, because the inferred lifetime from a
- // closure is not general enough for the tokio::spawn call we are in here...
- fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
- &item.0
- }
-
- let results =
- proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
- for (res, name) in results
- .into_iter()
- .zip(channel_list.iter().map(|(_, name)| name))
- {
- if let Err(err) = res {
- log::error!("error sending into channel of {name}: {err}");
- }
- }
-
- futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
- if let Err(err) = channel.join().await {
- log::error!("error sending to metric server {name}: {err}");
- }
- }))
- .await;
-
- Ok(())
-}
-
-/// Get the metric server connections from a config
-fn get_metric_server_connections(
- metric_config: proxmox_section_config::SectionConfigData,
-) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
- let mut res = Vec::new();
-
- for config in
- metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
- {
- if !config.enable {
- continue;
- }
- let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
- res.push((future, config.name));
- }
-
- for config in
- metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
- {
- if !config.enable {
- continue;
- }
- let future = proxmox_metrics::influxdb_http(
- &config.url,
- config.organization.as_deref().unwrap_or("proxmox"),
- config.bucket.as_deref().unwrap_or("proxmox"),
- config.token.as_deref(),
- config.verify_tls.unwrap_or(true),
- config.max_body_size.unwrap_or(25_000_000),
- )?;
- res.push((future, config.name));
- }
- Ok(res)
-}
-
struct HostStats {
proc: Option<ProcFsStat>,
meminfo: Option<ProcFsMemInfo>,
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-10-11 10:51 UTC|newest]
Thread overview: 16+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 01/13] proxy: server: move rrd stat/metric server to separate module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 02/13] metric collection: add doc comments for public functions Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 03/13] metric collection: move rrd_cache to new metric_collection module Lukas Wagner
2024-10-11 10:51 ` Lukas Wagner [this message]
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 05/13] metric collection: rrd: move rrd update function to rrd module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 06/13] metric collection: rrd: restrict function visibility Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 07/13] metric collection: rrd: remove rrd prefix from some function names Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 08/13] metric collection: drop std::path prefix where not needed Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 09/13] metric collection: move impl block for DiskStats to metric_server module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 10/13] pbs-api-types: add types for the new metrics endpoint Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 11/13] metric collection: initialize metric cache on startup Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 12/13] metric collection: put metrics in a cache Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 13/13] api: add /status/metrics API Lukas Wagner
2024-10-14 10:02 ` Wolfgang Bumiller
2024-10-15 7:27 ` Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241011105137.131530-5-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal