From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 874DD9B1B3 for ; Wed, 24 May 2023 15:58:06 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 5B8F41F66D for ; Wed, 24 May 2023 15:57:36 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 24 May 2023 15:57:29 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 8653546E2A for ; Wed, 24 May 2023 15:57:29 +0200 (CEST) From: Lukas Wagner To: pve-devel@lists.proxmox.com Date: Wed, 24 May 2023 15:56:15 +0200 Message-Id: <20230524135649.934881-9-l.wagner@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20230524135649.934881-1-l.wagner@proxmox.com> References: <20230524135649.934881-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.168 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pve-devel] [PATCH v2 proxmox 08/42] notify: add notification channels X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 24 May 2023 13:58:06 -0000 A notification channel is basically a 'group' of endpoints. When notifying, a notification is now sent to a 'channel', and forwared to all included endpoints. To illustrate why the channel concept is useful, consider a backup job. The plan is to provide a new option there where the user can choose a notification channel that should be used for any notifications. The channel decouples the job configuration from any endpoints present in the system. I expected this to be nicer than: - notifying via *all* endpoints. If this is not desired, the user would be forced to configure notification filtering (to be introduced in a later patch). The filtering approach is a bit cumbersome, since it requires the filter to be adapted for each and every new backup job. - adding the endpoints directly to the job configuration. This would mean that new/removed endpoints have to be added/removed from *all* affected backup job configurations. Signed-off-by: Lukas Wagner --- proxmox-notify/src/channel.rs | 53 ++++++++++++++ proxmox-notify/src/config.rs | 9 +++ proxmox-notify/src/lib.rs | 133 ++++++++++++++++++++++++++++++---- 3 files changed, 179 insertions(+), 16 deletions(-) create mode 100644 proxmox-notify/src/channel.rs diff --git a/proxmox-notify/src/channel.rs b/proxmox-notify/src/channel.rs new file mode 100644 index 00000000..dc9edf98 --- /dev/null +++ b/proxmox-notify/src/channel.rs @@ -0,0 +1,53 @@ +use crate::schema::COMMENT_SCHEMA; +use proxmox_schema::{api, Updater}; +use serde::{Deserialize, Serialize}; + +pub(crate) const CHANNEL_TYPENAME: &str = "channel"; + +#[api( + properties: { + "endpoint": { + optional: true, + type: Array, + items: { + description: "Name of the included endpoint(s)", + type: String, + }, + }, + comment: { + optional: true, + schema: COMMENT_SCHEMA, + }, + }, +)] +#[derive(Debug, Serialize, Deserialize, Updater)] +#[serde(rename_all = "kebab-case")] +/// Config for notification channels +pub struct ChannelConfig { + /// Name of the channel + #[updater(skip)] + pub name: String, + /// Endpoints for this channel + #[serde(skip_serializing_if = "Option::is_none")] + pub endpoint: Option>, + /// Comment + #[serde(skip_serializing_if = "Option::is_none")] + pub comment: Option, +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum DeleteableChannelProperty { + Endpoint, + Comment, +} + +impl ChannelConfig { + pub fn should_notify_via_endpoint(&self, endpoint: &str) -> bool { + if let Some(endpoints) = &self.endpoint { + endpoints.iter().any(|e| *e == endpoint) + } else { + false + } + } +} diff --git a/proxmox-notify/src/config.rs b/proxmox-notify/src/config.rs index 362ca0fc..3064065b 100644 --- a/proxmox-notify/src/config.rs +++ b/proxmox-notify/src/config.rs @@ -2,6 +2,7 @@ use lazy_static::lazy_static; use proxmox_schema::{ApiType, ObjectSchema}; use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin}; +use crate::channel::{ChannelConfig, CHANNEL_TYPENAME}; use crate::schema::BACKEND_NAME_SCHEMA; use crate::Error; @@ -13,6 +14,14 @@ lazy_static! { fn config_init() -> SectionConfig { let mut config = SectionConfig::new(&BACKEND_NAME_SCHEMA); + const CHANNEL_SCHEMA: &ObjectSchema = ChannelConfig::API_SCHEMA.unwrap_object_schema(); + + config.register_plugin(SectionConfigPlugin::new( + CHANNEL_TYPENAME.to_string(), + Some(String::from("name")), + CHANNEL_SCHEMA, + )); + config } diff --git a/proxmox-notify/src/lib.rs b/proxmox-notify/src/lib.rs index f2d0e16c..3b881e26 100644 --- a/proxmox-notify/src/lib.rs +++ b/proxmox-notify/src/lib.rs @@ -1,5 +1,6 @@ use std::fmt::Display; +use channel::{ChannelConfig, CHANNEL_TYPENAME}; use proxmox_schema::api; use proxmox_section_config::SectionConfigData; use serde::{Deserialize, Serialize}; @@ -9,6 +10,7 @@ use serde_json::Value; use std::error::Error as StdError; pub mod api; +pub mod channel; mod config; pub mod endpoints; pub mod schema; @@ -19,6 +21,7 @@ pub enum Error { ConfigDeserialization(Box), NotifyFailed(String, Box), EndpointDoesNotExist(String), + ChannelDoesNotExist(String), } impl Display for Error { @@ -36,6 +39,9 @@ impl Display for Error { Error::EndpointDoesNotExist(endpoint) => { write!(f, "endpoint '{endpoint}' does not exist") } + Error::ChannelDoesNotExist(channel) => { + write!(f, "channel '{channel}' does not exist") + } } } } @@ -47,6 +53,7 @@ impl StdError for Error { Error::ConfigDeserialization(err) => Some(&**err), Error::NotifyFailed(_, err) => Some(&**err), Error::EndpointDoesNotExist(_) => None, + Error::ChannelDoesNotExist(_) => None, } } } @@ -142,6 +149,7 @@ impl Config { #[derive(Default)] pub struct Bus { endpoints: Vec>, + channels: Vec, } #[allow(unused_macros)] @@ -204,7 +212,15 @@ impl Bus { pub fn from_config(config: &Config) -> Result { let mut endpoints = Vec::new(); - Ok(Bus { endpoints }) + let channels = config + .config + .convert_to_typed_array(CHANNEL_TYPENAME) + .map_err(|err| Error::ConfigDeserialization(err.into()))?; + + Ok(Bus { + endpoints, + channels, + }) } #[cfg(test)] @@ -212,20 +228,43 @@ impl Bus { self.endpoints.push(endpoint); } - /// Send a notification to all registered endpoints - pub fn send(&self, notification: &Notification) -> Result<(), Error> { - log::info!( - "sending notification with title '{title}'", + #[cfg(test)] + pub fn add_channel(&mut self, channel: ChannelConfig) { + self.channels.push(channel); + } + + pub fn send(&self, channel: &str, notification: &Notification) -> Result<(), Error> { + log::debug!( + "sending notification with title `{title}`", title = notification.title ); + // TODO: Maybe fallback to some default channel (e.g. send to all endpoints) in case + // the channel does not exist? Just to ensure that notifications are *never* swallowed... + let channel = self + .channels + .iter() + .find(|c| c.name == channel) + .ok_or(Error::ChannelDoesNotExist(channel.into()))?; + for endpoint in &self.endpoints { - endpoint.send(notification).unwrap_or_else(|e| { + if !channel.should_notify_via_endpoint(endpoint.name()) { + log::debug!( + "channel '{channel}' does not notify via endpoint '{endpoint}', skipping", + channel = channel.name, + endpoint = endpoint.name() + ); + continue; + } + + if let Err(e) = endpoint.send(notification) { log::error!( "could not notfiy via endpoint `{name}`: {e}", name = endpoint.name() - ) - }) + ); + } else { + log::info!("notified via endpoint `{name}`", name = endpoint.name()); + } } Ok(()) @@ -257,6 +296,7 @@ mod tests { #[derive(Default, Clone)] struct MockEndpoint { + name: &'static str, messages: Rc>>, } @@ -268,11 +308,18 @@ mod tests { } fn name(&self) -> &str { - "mock-endpoint" + self.name } } impl MockEndpoint { + fn new(name: &'static str, filter: Option) -> Self { + Self { + name, + ..Default::default() + } + } + fn messages(&self) -> Vec { self.messages.borrow().clone() } @@ -283,18 +330,72 @@ mod tests { let mock = MockEndpoint::default(); let mut bus = Bus::default(); - bus.add_endpoint(Box::new(mock.clone())); + bus.add_channel(ChannelConfig { + name: "channel".to_string(), + endpoint: Some(vec!["".into()]), + comment: None, + }); + + bus.send( + "channel", + &Notification { + title: "Title".into(), + body: "Body".into(), + severity: Severity::Info, + properties: Default::default(), + }, + )?; - bus.send(&Notification { - title: "Title".into(), - body: "Body".into(), - severity: Severity::Info, - properties: Default::default(), - })?; let messages = mock.messages(); assert_eq!(messages.len(), 1); Ok(()) } + + #[test] + fn test_channels() -> Result<(), Error> { + let endpoint1 = MockEndpoint::new("mock1", None); + let endpoint2 = MockEndpoint::new("mock2", None); + + let mut bus = Bus::default(); + + bus.add_channel(ChannelConfig { + name: "channel1".to_string(), + endpoint: Some(vec!["mock1".into()]), + comment: None, + }); + + bus.add_channel(ChannelConfig { + name: "channel2".to_string(), + endpoint: Some(vec!["mock2".into()]), + comment: None, + }); + + bus.add_endpoint(Box::new(endpoint1.clone())); + bus.add_endpoint(Box::new(endpoint2.clone())); + + let send_to_channel = |channel| { + bus.send( + channel, + &Notification { + title: "Title".into(), + body: "Body".into(), + severity: Severity::Info, + properties: Default::default(), + }, + ) + .unwrap(); + }; + + send_to_channel("channel1"); + assert_eq!(endpoint1.messages().len(), 1); + assert_eq!(endpoint2.messages().len(), 0); + + send_to_channel("channel2"); + assert_eq!(endpoint1.messages().len(), 1); + assert_eq!(endpoint2.messages().len(), 1); + + Ok(()) + } } -- 2.30.2