* [pbs-devel] [PATCH proxmox-backup v6] proxmox-backup-proxy: send metrics to configured metrics server
@ 2022-02-03 9:45 Dominik Csapak
0 siblings, 0 replies; only message in thread
From: Dominik Csapak @ 2022-02-03 9:45 UTC (permalink / raw)
To: pbs-devel
and keep the data as similar as possible to pve (tags/fields)
datastores get their own 'object' type and reside in the "blockstat"
measurement
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
replacement for the patch 5/8 of my v5 of 'add metrics server capability'
changes:
* adapted to MetrcsData changes by Wolfgang
src/bin/proxmox-backup-proxy.rs | 124 +++++++++++++++++++++++++++++++-
1 file changed, 121 insertions(+), 3 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 302fece9..687d4d72 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -23,11 +23,13 @@ use proxmox_sys::linux::{
};
use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
use proxmox_lang::try_block;
+use proxmox_metrics::MetricsData;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
use proxmox_sys::{task_log, task_warn};
use proxmox_sys::logrotate::LogRotate;
+use pbs_config::metrics::get_metric_server_connections;
use pbs_datastore::DataStore;
use proxmox_rest_server::{
@@ -959,16 +961,112 @@ async fn run_stat_generator() {
}
};
- let rrd_future = tokio::task::spawn_blocking(move || {
- rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
- rrd_sync_journal();
+ let rrd_future = tokio::task::spawn_blocking({
+ let stats = Arc::clone(&stats);
+ move || {
+ rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
+ rrd_sync_journal();
+ }
});
+ let metrics_future = send_data_to_metric_servers(stats);
+
+ let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+ if let Err(err) = rrd_res {
+ log::error!("rrd update panicked: {}", err);
+ }
+ if let Err(err) = metrics_res {
+ log::error!("error during metrics sending: {}", err);
+ }
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
}
+}
+
+async fn send_data_to_metric_servers(
+ stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
+) -> Result<(), Error> {
+ let (config, _digest) = pbs_config::metrics::config()?;
+ let (channels, names) = get_metric_server_connections(config)?;
+
+ if channels.is_empty() {
+ return Ok(());
+ }
+
+ let ctime = proxmox_time::epoch_i64();
+ let nodename = proxmox_sys::nodename();
+
+ let mut values = Vec::new();
+ let mut cpuvalue = match &stats.0.proc {
+ Some(stat) => serde_json::to_value(stat)?,
+ None => json!({}),
+ };
+
+ if let Some(loadavg) = &stats.0.load {
+ cpuvalue["avg1"] = Value::from(loadavg.0);
+ cpuvalue["avg5"] = Value::from(loadavg.1);
+ cpuvalue["avg15"] = Value::from(loadavg.2);
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("cpustat", ctime, cpuvalue)?
+ .tag("object", "host")
+ .tag("host", nodename),
+ ));
+
+ if let Some(stat) = &stats.0.meminfo {
+ values.push(Arc::new(
+ MetricsData::new("memory", ctime, stat)?
+ .tag("object", "host")
+ .tag("host", nodename)
+ ));
+ }
+
+ if let Some(netdev) = &stats.0.net {
+ for item in netdev {
+ values.push(Arc::new(
+ MetricsData::new("nics", ctime, item)?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("instance", item.device.clone())
+ ));
+ }
+ }
+
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, stats.1.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename)
+ ));
+
+ for datastore in stats.2.iter() {
+ values.push(Arc::new(
+ MetricsData::new("blockstat", ctime, datastore.to_value())?
+ .tag("object", "host")
+ .tag("host", nodename)
+ .tag("datastore", datastore.name.clone())
+ ));
+ }
+
+ let results = proxmox_metrics::send_data_to_channels(&values, &channels).await;
+ for (res, name) in results.into_iter().zip(names.iter()) {
+ if let Err(err) = res {
+ log::error!("error sending into channel of {}: {}", name, err);
+ }
+ }
+
+ futures::future::join_all(channels.into_iter().zip(names.into_iter()).map(
+ |(channel, name)| async move {
+ if let Err(err) = channel.join().await {
+ log::error!("error sending to metric server {}: {}", name, err);
+ }
+ },
+ ))
+ .await;
+
+ Ok(())
}
struct HostStats {
@@ -984,6 +1082,26 @@ struct DiskStat {
dev: Option<BlockDevStat>,
}
+impl DiskStat {
+ fn to_value(&self) -> Value {
+ let mut value = json!({});
+ if let Some(usage) = &self.usage {
+ value["total"] = Value::from(usage.total);
+ value["used"] = Value::from(usage.used);
+ value["avail"] = Value::from(usage.available);
+ }
+
+ if let Some(dev) = &self.dev {
+ value["read_ios"] = Value::from(dev.read_ios);
+ value["read_bytes"] = Value::from(dev.read_sectors * 512);
+ value["write_ios"] = Value::from(dev.write_ios);
+ value["write_bytes"] = Value::from(dev.write_sectors * 512);
+ value["io_ticks"] = Value::from(dev.io_ticks / 1000);
+ }
+ value
+ }
+}
+
fn collect_host_stats_sync() -> HostStats {
use proxmox_sys::linux::procfs::{
read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
--
2.30.2
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2022-02-03 9:46 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-02-03 9:45 [pbs-devel] [PATCH proxmox-backup v6] proxmox-backup-proxy: send metrics to configured metrics server Dominik Csapak
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal