From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 248331FF137 for ; Tue, 03 Mar 2026 16:23:52 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 3B07E1BA02; Tue, 3 Mar 2026 16:24:18 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v4 21/22] datastore: add thresholds notification callback on datastore lookup Date: Tue, 3 Mar 2026 16:23:16 +0100 Message-ID: <20260303152317.934256-35-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260303152317.934256-1-c.ebner@proxmox.com> References: <20260303152317.934256-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1772551388930 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.051 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: VAGK2JVCDFZWAUWW3OR22VF5XPHCAVDA X-Message-ID-Hash: VAGK2JVCDFZWAUWW3OR22VF5XPHCAVDA X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Pass in the callback to be executed when a datastore threshold value exceeds its set limit. This callback is then callable by the s3 client request counters implementation, currently the only component defining thresholds. When a threshold is exceeded, the notification is generated by the callback and queued for being send. Signed-off-by: Christian Ebner --- pbs-datastore/src/datastore.rs | 35 ++++++++++++++++++++++++---- pbs-datastore/src/snapshot_reader.rs | 1 + src/tools/mod.rs | 9 ++++++- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs index 957876376..5b8586b55 100644 --- a/pbs-datastore/src/datastore.rs +++ b/pbs-datastore/src/datastore.rs @@ -202,14 +202,25 @@ impl DataStoreImpl { } } +pub type ThresholdsExceededCallback = fn(&str, &str, u64, u64) -> Result<(), Error>; + pub struct DataStoreLookup<'a> { name: &'a str, operation: Operation, + thresholds_exceeded_callback: Arc>, } impl<'a> DataStoreLookup<'a> { - pub fn with(name: &'a str, operation: Operation) -> Self { - Self { name, operation } + pub fn with( + name: &'a str, + operation: Operation, + thresholds_exceeded_callback: Arc>, + ) -> Self { + Self { + name, + operation, + thresholds_exceeded_callback, + } } } @@ -556,7 +567,12 @@ impl DataStore { )?) }; - let datastore = DataStore::with_store_and_config(chunk_store, config, gen_num)?; + let datastore = DataStore::with_store_and_config( + chunk_store, + config, + gen_num, + Arc::clone(&lookup.thresholds_exceeded_callback), + )?; let datastore = Arc::new(datastore); datastore_cache.insert(lookup.name.to_string(), datastore.clone()); @@ -591,7 +607,7 @@ impl DataStore { { // the datastore drop handler does the checking if tasks are running and clears the // cache entry, so we just have to trigger it here - let lookup = DataStoreLookup::with(name, Operation::Lookup); + let lookup = DataStoreLookup::with(name, Operation::Lookup, Arc::new(None)); let _ = DataStore::lookup_datastore(lookup); } @@ -645,6 +661,7 @@ impl DataStore { Arc::new(chunk_store), config, None, + Arc::new(None), )?); if let Some(operation) = operation { @@ -658,6 +675,7 @@ impl DataStore { chunk_store: Arc, config: DataStoreConfig, generation: Option, + thresholds_exceeded_callback: Arc>, ) -> Result { let mut gc_status_path = chunk_store.base_path(); gc_status_path.push(".gc-status"); @@ -710,6 +728,15 @@ impl DataStore { let mut request_counters = Self::request_counters(&config, &backend_config)?; Self::update_notification_thresholds(&mut request_counters, &config)?; + SharedRequestCounters::update_thresholds_exceeded_callback(Box::new( + move |counter_id, threshold, value| { + thresholds_exceeded_callback.map(|cb| { + if let Err(err) = cb(&config.name, counter_id, threshold, value) { + log::error!("failed to send notification: {err:?}"); + } + }); + }, + )); (Some(cache), Some(Arc::new(request_counters))) } else { diff --git a/pbs-datastore/src/snapshot_reader.rs b/pbs-datastore/src/snapshot_reader.rs index d522a02d7..2c8f19c17 100644 --- a/pbs-datastore/src/snapshot_reader.rs +++ b/pbs-datastore/src/snapshot_reader.rs @@ -166,6 +166,7 @@ impl bool> Iterator for SnapshotChunkIterator<'_, F> { let lookup = DataStoreLookup::with( self.snapshot_reader.datastore_name(), Operation::Read, + Arc::new(None), ); let datastore = DataStore::lookup_datastore(lookup)?; let order = diff --git a/src/tools/mod.rs b/src/tools/mod.rs index a82aaf136..2dfbac885 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Error}; use std::collections::HashSet; +use std::sync::Arc; use pbs_api_types::{ Authid, BackupContent, CryptMode, Operation, SnapshotListItem, SnapshotVerifyState, @@ -192,5 +193,11 @@ pub(crate) fn backup_info_to_snapshot_list_item( /// Lookup the datastore by name with given operation. #[inline(always)] pub fn lookup_with<'a>(name: &'a str, operation: Operation) -> DataStoreLookup<'a> { - DataStoreLookup::with(name, operation) + DataStoreLookup::with( + name, + operation, + Arc::new(Some( + crate::server::notifications::send_datastore_threshold_exceeded, + )), + ) } -- 2.47.3