From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <l.wagner@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id 39FBB9554C
 for <pbs-devel@lists.proxmox.com>; Fri, 12 Apr 2024 12:07:13 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id ECD7D79CA
 for <pbs-devel@lists.proxmox.com>; Fri, 12 Apr 2024 12:06:42 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [94.136.29.106])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS
 for <pbs-devel@lists.proxmox.com>; Fri, 12 Apr 2024 12:06:39 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id E472945158
 for <pbs-devel@lists.proxmox.com>; Fri, 12 Apr 2024 12:06:38 +0200 (CEST)
From: Lukas Wagner <l.wagner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Fri, 12 Apr 2024 12:06:01 +0200
Message-Id: <20240412100631.94218-4-l.wagner@proxmox.com>
X-Mailer: git-send-email 2.39.2
In-Reply-To: <20240412100631.94218-1-l.wagner@proxmox.com>
References: <20240412100631.94218-1-l.wagner@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL -0.006 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See
 http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more
 information. [proxmox-backup-api.rs, notifications.rs,
 proxmox-backup-proxy.rs]
Subject: [pbs-devel] [PATCH proxmox-backup 03/33] notifications: allow
 sending notifications via proxmox_notify
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Fri, 12 Apr 2024 10:07:13 -0000

  - Set the context in proxmox_notify
  - Add helper function which queues notifications to a spool
    directory
  - Set up a worker task, running in the privileged process, which
    periodically checks the spool directory for queued notifications

The queuing is needed because on PBS we send most if not all
notifications from the proxy-process running as the `backup` user.
However, to have access to the protected passwords/tokens for various
notification endpoints, we need to read the notification config as
root.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/bin/proxmox-backup-api.rs   | 11 ++++
 src/bin/proxmox-backup-proxy.rs |  1 +
 src/server/notifications.rs     | 96 ++++++++++++++++++++++++++++++++-
 3 files changed, 107 insertions(+), 1 deletion(-)

diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index e46557a0..d5ce9f3b 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -56,6 +56,7 @@ async fn run() -> Result<(), Error> {
     proxmox_backup::server::create_state_dir()?;
     proxmox_backup::server::create_active_operations_dir()?;
     proxmox_backup::server::jobstate::create_jobstate_dir()?;
+    proxmox_backup::server::notifications::create_spool_dir()?;
     proxmox_backup::tape::create_tape_status_dir()?;
     proxmox_backup::tape::create_drive_state_dir()?;
     proxmox_backup::tape::create_changer_state_dir()?;
@@ -72,6 +73,7 @@ async fn run() -> Result<(), Error> {
     let _ = csrf_secret(); // load with lazy_static
 
     proxmox_backup::auth_helpers::setup_auth_context(true);
+    proxmox_backup::server::notifications::init()?;
 
     let backup_user = pbs_config::backup_user()?;
     let mut command_sock = proxmox_rest_server::CommandSocket::new(
@@ -153,6 +155,8 @@ async fn run() -> Result<(), Error> {
         std::thread::sleep(std::time::Duration::from_secs(3));
     });
 
+    start_notification_worker();
+
     server.await?;
     log::info!("server shutting down, waiting for active workers to complete");
     proxmox_rest_server::last_worker_future().await?;
@@ -161,3 +165,10 @@ async fn run() -> Result<(), Error> {
 
     Ok(())
 }
+
+fn start_notification_worker() {
+    let abort_future = proxmox_rest_server::shutdown_future();
+    let future = Box::pin(proxmox_backup::server::notifications::notification_worker());
+    let task = futures::future::select(future, abort_future);
+    tokio::spawn(task);
+}
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index f79ec2f5..15444685 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -198,6 +198,7 @@ async fn run() -> Result<(), Error> {
     }
 
     proxmox_backup::auth_helpers::setup_auth_context(false);
+    proxmox_backup::server::notifications::init()?;
 
     let rrd_cache = initialize_rrd_cache()?;
     rrd_cache.apply_journal()?;
diff --git a/src/server/notifications.rs b/src/server/notifications.rs
index 43b55656..8dde2eea 100644
--- a/src/server/notifications.rs
+++ b/src/server/notifications.rs
@@ -1,20 +1,29 @@
 use anyhow::Error;
-use serde_json::json;
+use const_format::concatcp;
+use serde_json::{json, Value};
+use std::collections::HashMap;
+use std::path::Path;
+use std::time::{Duration, Instant};
 
 use handlebars::{
     Context, Handlebars, Helper, HelperResult, Output, RenderContext, RenderError, TemplateError,
 };
+use nix::unistd::Uid;
 
 use proxmox_human_byte::HumanByte;
 use proxmox_lang::try_block;
+use proxmox_notify::context::pbs::PBS_CONTEXT;
 use proxmox_schema::ApiType;
 use proxmox_sys::email::sendmail;
+use proxmox_sys::fs::{create_path, CreateOptions};
 
 use pbs_api_types::{
     APTUpdateInfo, DataStoreConfig, DatastoreNotify, GarbageCollectionStatus, Notify,
     SyncJobConfig, TapeBackupJobSetup, User, Userid, VerificationJobConfig,
 };
+use proxmox_notify::{Notification, Severity};
 
+const SPOOL_DIR: &str = concatcp!(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR, "/notifications");
 const GC_OK_TEMPLATE: &str = r###"
 
 Datastore:            {{datastore}}
@@ -283,6 +292,91 @@ lazy_static::lazy_static! {
     };
 }
 
+/// Initialize the notification system by setting context in proxmox_notify
+pub fn init() -> Result<(), Error> {
+    proxmox_notify::context::set_context(&PBS_CONTEXT);
+    Ok(())
+}
+
+/// Create the directory which will be used to temporarily store notifications
+/// which were sent from an unprivileged process.
+pub fn create_spool_dir() -> Result<(), Error> {
+    let backup_user = pbs_config::backup_user()?;
+    let opts = CreateOptions::new()
+        .owner(backup_user.uid)
+        .group(backup_user.gid);
+
+    create_path(SPOOL_DIR, None, Some(opts))?;
+    Ok(())
+}
+
+async fn send_queued_notifications() -> Result<(), Error> {
+    let mut read_dir = tokio::fs::read_dir(SPOOL_DIR).await?;
+
+    while let Some(entry) = read_dir.next_entry().await? {
+        let path = entry.path();
+
+        if let Some(ext) = path.extension() {
+            if ext == "json" {
+                let p = path.clone();
+
+                let res = tokio::task::spawn_blocking(move || {
+                    let config = pbs_config::notifications::config()?;
+                    let bytes = std::fs::read(p)?;
+                    let notification: Notification = serde_json::from_slice(&bytes)?;
+                    proxmox_notify::api::common::send(&config, &notification)?;
+
+                    Ok::<(), Error>(())
+                })
+                .await?;
+
+                if let Err(err) = res {
+                    log::error!("failed to send notification: {err}");
+                }
+
+                // Currently, there is no retry-mechanism in case of failure...
+                // For retries, we'd have to keep track of which targets succeeded/failed
+                // to send, so we do not retry notifying a target which succeeded before.
+                tokio::fs::remove_file(path).await?;
+            }
+        }
+    }
+
+    Ok::<(), Error>(())
+}
+
+/// Worker task to periodically send any queued notifications.
+pub async fn notification_worker() {
+    loop {
+        let delay_target = Instant::now() + Duration::from_secs(5);
+
+        if let Err(err) = send_queued_notifications().await {
+            log::error!("notification worker task error: {err}");
+        }
+
+        tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+    }
+}
+
+fn send_notification(notification: Notification) -> Result<(), Error> {
+    if nix::unistd::ROOT == Uid::current() {
+        let config = pbs_config::notifications::config()?;
+        proxmox_notify::api::common::send(&config, &notification)?;
+    } else {
+        let ser = serde_json::to_vec(&notification)?;
+        let path = Path::new(SPOOL_DIR).join(format!("{id}.json", id = notification.id()));
+
+        let backup_user = pbs_config::backup_user()?;
+        let opts = CreateOptions::new()
+            .owner(backup_user.uid)
+            .group(backup_user.gid);
+        proxmox_sys::fs::replace_file(path, &ser, opts, true)?;
+        log::info!("queued notification (id={id})", id = notification.id())
+    }
+
+    Ok(())
+}
+
 /// Summary of a successful Tape Job
 #[derive(Default)]
 pub struct TapeBackupJobSummary {
-- 
2.39.2