From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id E300F1FF17B for ; Tue, 15 Oct 2024 10:46:47 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C5E501186A; Tue, 15 Oct 2024 10:47:18 +0200 (CEST) From: Lukas Wagner To: pbs-devel@lists.proxmox.com Date: Tue, 15 Oct 2024 10:46:27 +0200 Message-Id: <20241015084636.57106-5-l.wagner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20241015084636.57106-1-l.wagner@proxmox.com> References: <20241015084636.57106-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.009 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup v2 04/13] metric_collection: split out push metric part X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" No functional changes intended. Signed-off-by: Lukas Wagner --- src/server/metric_collection/metric_server.rs | 136 ++++++++++++++++++ src/server/metric_collection/mod.rs | 132 +---------------- 2 files changed, 138 insertions(+), 130 deletions(-) create mode 100644 src/server/metric_collection/metric_server.rs diff --git a/src/server/metric_collection/metric_server.rs b/src/server/metric_collection/metric_server.rs new file mode 100644 index 00000000..cc9f736a --- /dev/null +++ b/src/server/metric_collection/metric_server.rs @@ -0,0 +1,136 @@ +use std::sync::Arc; + +use anyhow::Error; +use serde_json::{json, Value}; + +use proxmox_metrics::MetricsData; + +use super::{DiskStat, HostStats}; + +pub async fn send_data_to_metric_servers( + stats: Arc<(HostStats, DiskStat, Vec)>, +) -> Result<(), Error> { + let (config, _digest) = pbs_config::metrics::config()?; + let channel_list = get_metric_server_connections(config)?; + + if channel_list.is_empty() { + return Ok(()); + } + + let ctime = proxmox_time::epoch_i64(); + let nodename = proxmox_sys::nodename(); + + let mut values = Vec::new(); + + let mut cpuvalue = match &stats.0.proc { + Some(stat) => serde_json::to_value(stat)?, + None => json!({}), + }; + + if let Some(loadavg) = &stats.0.load { + cpuvalue["avg1"] = Value::from(loadavg.0); + cpuvalue["avg5"] = Value::from(loadavg.1); + cpuvalue["avg15"] = Value::from(loadavg.2); + } + + values.push(Arc::new( + MetricsData::new("cpustat", ctime, cpuvalue)? + .tag("object", "host") + .tag("host", nodename), + )); + + if let Some(stat) = &stats.0.meminfo { + values.push(Arc::new( + MetricsData::new("memory", ctime, stat)? + .tag("object", "host") + .tag("host", nodename), + )); + } + + if let Some(netdev) = &stats.0.net { + for item in netdev { + values.push(Arc::new( + MetricsData::new("nics", ctime, item)? + .tag("object", "host") + .tag("host", nodename) + .tag("instance", item.device.clone()), + )); + } + } + + values.push(Arc::new( + MetricsData::new("blockstat", ctime, stats.1.to_value())? + .tag("object", "host") + .tag("host", nodename), + )); + + for datastore in stats.2.iter() { + values.push(Arc::new( + MetricsData::new("blockstat", ctime, datastore.to_value())? + .tag("object", "host") + .tag("host", nodename) + .tag("datastore", datastore.name.clone()), + )); + } + + // we must have a concrete functions, because the inferred lifetime from a + // closure is not general enough for the tokio::spawn call we are in here... + fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics { + &item.0 + } + + let results = + proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await; + for (res, name) in results + .into_iter() + .zip(channel_list.iter().map(|(_, name)| name)) + { + if let Err(err) = res { + log::error!("error sending into channel of {name}: {err}"); + } + } + + futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move { + if let Err(err) = channel.join().await { + log::error!("error sending to metric server {name}: {err}"); + } + })) + .await; + + Ok(()) +} + +/// Get the metric server connections from a config +fn get_metric_server_connections( + metric_config: proxmox_section_config::SectionConfigData, +) -> Result, Error> { + let mut res = Vec::new(); + + for config in + metric_config.convert_to_typed_array::("influxdb-udp")? + { + if !config.enable { + continue; + } + let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu); + res.push((future, config.name)); + } + + for config in + metric_config.convert_to_typed_array::("influxdb-http")? + { + if !config.enable { + continue; + } + let future = proxmox_metrics::influxdb_http( + &config.url, + config.organization.as_deref().unwrap_or("proxmox"), + config.bucket.as_deref().unwrap_or("proxmox"), + config.token.as_deref(), + config.verify_tls.unwrap_or(true), + config.max_body_size.unwrap_or(25_000_000), + )?; + res.push((future, config.name)); + } + Ok(res) +} diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs index 5102227b..5a516564 100644 --- a/src/server/metric_collection/mod.rs +++ b/src/server/metric_collection/mod.rs @@ -10,7 +10,6 @@ use pbs_api_types::{DataStoreConfig, Operation}; use serde_json::{json, Value}; use tokio::join; -use proxmox_metrics::MetricsData; use proxmox_sys::{ fs::FileSystemInformation, linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat}, @@ -20,6 +19,7 @@ use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage}; use rrd::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge}; +mod metric_server; pub mod rrd; /// Initialize the metric collection subsystem. @@ -70,7 +70,7 @@ async fn run_stat_generator() { } }); - let metrics_future = send_data_to_metric_servers(stats); + let metrics_future = metric_server::send_data_to_metric_servers(stats); let (rrd_res, metrics_res) = join!(rrd_future, metrics_future); if let Err(err) = rrd_res { @@ -84,134 +84,6 @@ async fn run_stat_generator() { } } -async fn send_data_to_metric_servers( - stats: Arc<(HostStats, DiskStat, Vec)>, -) -> Result<(), Error> { - let (config, _digest) = pbs_config::metrics::config()?; - let channel_list = get_metric_server_connections(config)?; - - if channel_list.is_empty() { - return Ok(()); - } - - let ctime = proxmox_time::epoch_i64(); - let nodename = proxmox_sys::nodename(); - - let mut values = Vec::new(); - - let mut cpuvalue = match &stats.0.proc { - Some(stat) => serde_json::to_value(stat)?, - None => json!({}), - }; - - if let Some(loadavg) = &stats.0.load { - cpuvalue["avg1"] = Value::from(loadavg.0); - cpuvalue["avg5"] = Value::from(loadavg.1); - cpuvalue["avg15"] = Value::from(loadavg.2); - } - - values.push(Arc::new( - MetricsData::new("cpustat", ctime, cpuvalue)? - .tag("object", "host") - .tag("host", nodename), - )); - - if let Some(stat) = &stats.0.meminfo { - values.push(Arc::new( - MetricsData::new("memory", ctime, stat)? - .tag("object", "host") - .tag("host", nodename), - )); - } - - if let Some(netdev) = &stats.0.net { - for item in netdev { - values.push(Arc::new( - MetricsData::new("nics", ctime, item)? - .tag("object", "host") - .tag("host", nodename) - .tag("instance", item.device.clone()), - )); - } - } - - values.push(Arc::new( - MetricsData::new("blockstat", ctime, stats.1.to_value())? - .tag("object", "host") - .tag("host", nodename), - )); - - for datastore in stats.2.iter() { - values.push(Arc::new( - MetricsData::new("blockstat", ctime, datastore.to_value())? - .tag("object", "host") - .tag("host", nodename) - .tag("datastore", datastore.name.clone()), - )); - } - - // we must have a concrete functions, because the inferred lifetime from a - // closure is not general enough for the tokio::spawn call we are in here... - fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics { - &item.0 - } - - let results = - proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await; - for (res, name) in results - .into_iter() - .zip(channel_list.iter().map(|(_, name)| name)) - { - if let Err(err) = res { - log::error!("error sending into channel of {name}: {err}"); - } - } - - futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move { - if let Err(err) = channel.join().await { - log::error!("error sending to metric server {name}: {err}"); - } - })) - .await; - - Ok(()) -} - -/// Get the metric server connections from a config -fn get_metric_server_connections( - metric_config: proxmox_section_config::SectionConfigData, -) -> Result, Error> { - let mut res = Vec::new(); - - for config in - metric_config.convert_to_typed_array::("influxdb-udp")? - { - if !config.enable { - continue; - } - let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu); - res.push((future, config.name)); - } - - for config in - metric_config.convert_to_typed_array::("influxdb-http")? - { - if !config.enable { - continue; - } - let future = proxmox_metrics::influxdb_http( - &config.url, - config.organization.as_deref().unwrap_or("proxmox"), - config.bucket.as_deref().unwrap_or("proxmox"), - config.token.as_deref(), - config.verify_tls.unwrap_or(true), - config.max_body_size.unwrap_or(25_000_000), - )?; - res.push((future, config.name)); - } - Ok(res) -} - struct HostStats { proc: Option, meminfo: Option, -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel