From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 6096C60996 for ; Wed, 2 Feb 2022 16:50:20 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 571DF1E5AE for ; Wed, 2 Feb 2022 16:49:50 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 593061E54C for ; Wed, 2 Feb 2022 16:49:45 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 2FAE846C9A for ; Wed, 2 Feb 2022 16:49:45 +0100 (CET) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Wed, 2 Feb 2022 15:49:24 +0000 Message-Id: <20220202154927.30572-4-h.laimer@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20220202154927.30572-1-h.laimer@proxmox.com> References: <20220202154927.30572-1-h.laimer@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.045 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH proxmox-backup v6 3/6] pbs-datastore: add active operations tracking X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 02 Feb 2022 15:50:20 -0000 Saves the currently active read/write operation counts in a file. The file is updated whenever a reference returned by lookup_datastore is dropped and whenever a reference is returned by lookup_datastore. The files are locked before every access, there is one file per datastore. Signed-off-by: Hannes Laimer --- pbs-api-types/src/maintenance.rs | 1 + pbs-datastore/Cargo.toml | 1 + pbs-datastore/src/datastore.rs | 114 +++++++++++++++++++---------- pbs-datastore/src/lib.rs | 4 + pbs-datastore/src/task_tracking.rs | 81 ++++++++++++++++++++ src/api2/tape/backup.rs | 4 +- src/api2/tape/restore.rs | 4 +- src/bin/proxmox-backup-api.rs | 1 + src/server/mod.rs | 16 +++- 9 files changed, 184 insertions(+), 42 deletions(-) create mode 100644 pbs-datastore/src/task_tracking.rs diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs index 4d3ccb3b..1b5753d2 100644 --- a/pbs-api-types/src/maintenance.rs +++ b/pbs-api-types/src/maintenance.rs @@ -25,6 +25,7 @@ pub enum MaintenanceType { Offline(String), } +#[derive(Copy ,Clone)] /// Operation requirments, used when checking for maintenance mode. pub enum Operation { Read, diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml index d8cefa00..0c703d44 100644 --- a/pbs-datastore/Cargo.toml +++ b/pbs-datastore/Cargo.toml @@ -35,5 +35,6 @@ proxmox-uuid = "1" proxmox-sys = "0.2" pbs-api-types = { path = "../pbs-api-types" } +pbs-buildcfg = { path = "../pbs-buildcfg" } pbs-tools = { path = "../pbs-tools" } pbs-config = { path = "../pbs-config" } diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs index c65d4574..c643bc55 100644 --- a/pbs-datastore/src/datastore.rs +++ b/pbs-datastore/src/datastore.rs @@ -9,11 +9,12 @@ use std::time::Duration; use anyhow::{bail, format_err, Error}; use lazy_static::lazy_static; -use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions}; +use proxmox_sys::fs::{replace_file, file_read_optional_string, + CreateOptions, lock_dir_noblock, DirLockGuard, +}; use proxmox_sys::process_locker::ProcessLockSharedGuard; use proxmox_sys::WorkerTaskContext; use proxmox_sys::{task_log, task_warn}; -use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard}; use pbs_api_types::{ UPID, DataStoreConfig, Authid, MaintenanceType, Operation, GarbageCollectionStatus, HumanByte @@ -31,9 +32,10 @@ use crate::manifest::{ ArchiveType, BackupManifest, archive_type, }; +use crate::task_tracking::update_active_operations; lazy_static! { - static ref DATASTORE_MAP: Mutex>> = Mutex::new(HashMap::new()); + static ref DATASTORE_MAP: Mutex>> = Mutex::new(HashMap::new()); } /// checks if auth_id is owner, or, if owner is a token, if @@ -54,13 +56,42 @@ pub fn check_backup_owner( /// /// A Datastore can store severals backups, and provides the /// management interface for backup. -pub struct DataStore { +pub struct DataStoreImpl { chunk_store: Arc, gc_mutex: Mutex<()>, last_gc_status: Mutex, verify_new: bool, } +pub struct DataStore { + inner: Arc, + operation: Option, +} + +impl Clone for DataStore { + fn clone(&self) -> Self { + if let Some(operation) = self.operation.clone() { + if let Err(e) = update_active_operations(self.name(), operation, -1) { + eprintln!("could not update active operations - {}", e); + } + } + DataStore { + inner: self.inner.clone(), + operation: self.operation.clone(), + } + } +} + +impl Drop for DataStore { + fn drop(&mut self) { + if let Some(operation) = self.operation.clone() { + if let Err(e) = update_active_operations(self.name(), operation, -1) { + eprintln!("could not update active operations - {}", e); + } + } + } +} + impl DataStore { pub fn lookup_datastore( name: &str, @@ -70,12 +101,15 @@ impl DataStore { let config: DataStoreConfig = config.lookup("datastore", name)?; let path = PathBuf::from(&config.path); - match (&config.maintenance_type, operation) { - (Some(MaintenanceType::ReadOnly(message)), Some(Operation::Write)) - | (Some(MaintenanceType::Offline(message)), Some(_)) => { - bail!("Datastore '{}' is in maintenance mode: {}", name, message); - }, - _ => {} + if let Some(MaintenanceType::Offline(message)) = &config.maintenance_type { + bail!("datastore '{}' is in offline maintenance mode: {}", name, message); + } else if let Some(MaintenanceType::ReadOnly(message)) = &config.maintenance_type { + if let Some(Operation::Write) = operation { + bail!("datastore '{}' is in read-only maintenance mode: {}", name, message); + } + } + if let Some(op) = operation { + update_active_operations(name, op, 1)?; } let mut map = DATASTORE_MAP.lock().unwrap(); @@ -85,7 +119,10 @@ impl DataStore { if datastore.chunk_store.base() == path && datastore.verify_new == config.verify_new.unwrap_or(false) { - return Ok(datastore.clone()); + return Ok(Arc::new(Self { + inner: datastore.clone(), + operation, + })) } } @@ -94,7 +131,10 @@ impl DataStore { let datastore = Arc::new(datastore); map.insert(name.to_string(), datastore.clone()); - Ok(datastore) + Ok(Arc::new(Self { + inner: datastore, + operation, + })) } /// removes all datastores that are not configured anymore @@ -109,7 +149,7 @@ impl DataStore { Ok(()) } - fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result { + fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result { let chunk_store = ChunkStore::open(store_name, path)?; let mut gc_status_path = chunk_store.base_path(); @@ -127,7 +167,7 @@ impl DataStore { GarbageCollectionStatus::default() }; - Ok(Self { + Ok(DataStoreImpl { chunk_store: Arc::new(chunk_store), gc_mutex: Mutex::new(()), last_gc_status: Mutex::new(gc_status), @@ -141,19 +181,19 @@ impl DataStore { impl Iterator, usize, bool)>, Error > { - self.chunk_store.get_chunk_iterator() + self.inner.chunk_store.get_chunk_iterator() } pub fn create_fixed_writer>(&self, filename: P, size: usize, chunk_size: usize) -> Result { - let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?; + let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?; Ok(index) } pub fn open_fixed_reader>(&self, filename: P) -> Result { - let full_path = self.chunk_store.relative_path(filename.as_ref()); + let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); let index = FixedIndexReader::open(&full_path)?; @@ -165,14 +205,14 @@ impl DataStore { ) -> Result { let index = DynamicIndexWriter::create( - self.chunk_store.clone(), filename.as_ref())?; + self.inner.chunk_store.clone(), filename.as_ref())?; Ok(index) } pub fn open_dynamic_reader>(&self, filename: P) -> Result { - let full_path = self.chunk_store.relative_path(filename.as_ref()); + let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); let index = DynamicIndexReader::open(&full_path)?; @@ -222,11 +262,11 @@ impl DataStore { } pub fn name(&self) -> &str { - self.chunk_store.name() + self.inner.chunk_store.name() } pub fn base_path(&self) -> PathBuf { - self.chunk_store.base_path() + self.inner.chunk_store.base_path() } /// Cleanup a backup directory @@ -529,7 +569,7 @@ impl DataStore { worker.check_abort()?; worker.fail_on_shutdown()?; let digest = index.index_digest(pos).unwrap(); - if !self.chunk_store.cond_touch_chunk(digest, false)? { + if !self.inner.chunk_store.cond_touch_chunk(digest, false)? { task_warn!( worker, "warning: unable to access non-existent chunk {}, required by {:?}", @@ -545,7 +585,7 @@ impl DataStore { let mut bad_path = PathBuf::new(); bad_path.push(self.chunk_path(digest).0); bad_path.set_extension(bad_ext); - self.chunk_store.cond_touch_path(&bad_path, false)?; + self.inner.chunk_store.cond_touch_path(&bad_path, false)?; } } } @@ -625,24 +665,24 @@ impl DataStore { } pub fn last_gc_status(&self) -> GarbageCollectionStatus { - self.last_gc_status.lock().unwrap().clone() + self.inner.last_gc_status.lock().unwrap().clone() } pub fn garbage_collection_running(&self) -> bool { - !matches!(self.gc_mutex.try_lock(), Ok(_)) + !matches!(self.inner.gc_mutex.try_lock(), Ok(_)) } pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> { - if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() { + if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() { // avoids that we run GC if an old daemon process has still a // running backup writer, which is not save as we have no "oldest // writer" information and thus no safe atime cutoff - let _exclusive_lock = self.chunk_store.try_exclusive_lock()?; + let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?; let phase1_start_time = proxmox_time::epoch_i64(); - let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time); + let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time); let mut gc_status = GarbageCollectionStatus::default(); gc_status.upid = Some(upid.to_string()); @@ -652,7 +692,7 @@ impl DataStore { self.mark_used_chunks(&mut gc_status, worker)?; task_log!(worker, "Start GC phase2 (sweep unused chunks)"); - self.chunk_store.sweep_unused_chunks( + self.inner.chunk_store.sweep_unused_chunks( oldest_writer, phase1_start_time, &mut gc_status, @@ -729,7 +769,7 @@ impl DataStore { let _ = replace_file(path, serialized.as_bytes(), options, false); } - *self.last_gc_status.lock().unwrap() = gc_status; + *self.inner.last_gc_status.lock().unwrap() = gc_status; } else { bail!("Start GC failed - (already running/locked)"); @@ -739,15 +779,15 @@ impl DataStore { } pub fn try_shared_chunk_store_lock(&self) -> Result { - self.chunk_store.try_shared_lock() + self.inner.chunk_store.try_shared_lock() } pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) { - self.chunk_store.chunk_path(digest) + self.inner.chunk_store.chunk_path(digest) } pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result { - self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist) + self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist) } pub fn insert_chunk( @@ -755,7 +795,7 @@ impl DataStore { chunk: &DataBlob, digest: &[u8; 32], ) -> Result<(bool, u64), Error> { - self.chunk_store.insert_chunk(chunk, digest) + self.inner.chunk_store.insert_chunk(chunk, digest) } pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result { @@ -771,13 +811,13 @@ impl DataStore { pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result { - let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest); + let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest); std::fs::metadata(chunk_path).map_err(Error::from) } pub fn load_chunk(&self, digest: &[u8; 32]) -> Result { - let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest); + let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest); proxmox_lang::try_block!({ let mut file = std::fs::File::open(&chunk_path)?; @@ -891,7 +931,7 @@ impl DataStore { } pub fn verify_new(&self) -> bool { - self.verify_new + self.inner.verify_new } /// returns a list of chunks sorted by their inode number on disk diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs index d50a64a5..131040c6 100644 --- a/pbs-datastore/src/lib.rs +++ b/pbs-datastore/src/lib.rs @@ -145,6 +145,9 @@ // Note: .pcat1 => Proxmox Catalog Format version 1 pub const CATALOG_NAME: &str = "catalog.pcat1.didx"; +/// Directory path where active operations counters are saved. +pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations"); + #[macro_export] macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 { () => { @@ -179,6 +182,7 @@ pub mod paperkey; pub mod prune; pub mod read_chunk; pub mod store_progress; +pub mod task_tracking; pub mod dynamic_index; pub mod fixed_index; diff --git a/pbs-datastore/src/task_tracking.rs b/pbs-datastore/src/task_tracking.rs new file mode 100644 index 00000000..a02d9a17 --- /dev/null +++ b/pbs-datastore/src/task_tracking.rs @@ -0,0 +1,81 @@ +use anyhow::Error; +use libc::pid_t; +use nix::unistd::Pid; +use std::path::PathBuf; + +use pbs_api_types::Operation; +use proxmox_sys::fs::{file_read_optional_string, open_file_locked, replace_file, CreateOptions}; +use proxmox_sys::linux::procfs; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Clone)] +struct TaskOperations { + pid: u32, + starttime: u64, + reading_operations: i64, + writing_operations: i64, +} + +pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> { + let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name)); + let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name)); + + let user = pbs_config::backup_user()?; + let options = CreateOptions::new() + .group(user.gid) + .owner(user.uid) + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); + + let timeout = std::time::Duration::new(10, 0); + open_file_locked(&lock_path, timeout, true, options.clone())?; + + let pid = std::process::id(); + let starttime = procfs::PidStat::read_from_pid(Pid::from_raw(pid as pid_t))?.starttime; + let mut updated = false; + + let mut updated_tasks: Vec = match file_read_optional_string(&path)? { + Some(data) => serde_json::from_str::>(&data)? + .iter_mut() + .filter_map( + |task| match procfs::check_process_running(task.pid as pid_t) { + Some(stat) if pid == task.pid && stat.starttime != task.starttime => None, + Some(_) => { + if pid == task.pid { + updated = true; + match operation { + Operation::Read => task.reading_operations += count, + Operation::Write => task.writing_operations += count, + }; + } + Some(task.clone()) + } + _ => None, + }, + ) + .collect(), + None => Vec::new(), + }; + + if !updated { + updated_tasks.push(match operation { + Operation::Read => TaskOperations { + pid, + starttime, + reading_operations: 1, + writing_operations: 0, + }, + Operation::Write => TaskOperations { + pid, + starttime, + reading_operations: 0, + writing_operations: 1, + }, + }) + } + replace_file( + &path, + serde_json::to_string(&updated_tasks)?.as_bytes(), + options, + false, + ) +} diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs index 89273627..28b24168 100644 --- a/src/api2/tape/backup.rs +++ b/src/api2/tape/backup.rs @@ -168,7 +168,7 @@ pub fn do_tape_backup_job( let worker_type = job.jobtype().to_string(); - let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Write))?; + let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?; let (config, _digest) = pbs_config::media_pool::config()?; let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?; @@ -349,7 +349,7 @@ pub fn backup( &setup.drive, )?; - let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Write))?; + let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?; let (config, _digest) = pbs_config::media_pool::config()?; let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?; diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs index 8ff28763..8fadbeca 100644 --- a/src/api2/tape/restore.rs +++ b/src/api2/tape/restore.rs @@ -93,10 +93,10 @@ impl TryFrom for DataStoreMap { if let Some(index) = store.find('=') { let mut target = store.split_off(index); target.remove(0); // remove '=' - let datastore = DataStore::lookup_datastore(&target, Some(Operation::Read))?; + let datastore = DataStore::lookup_datastore(&target, Some(Operation::Write))?; map.insert(store, datastore); } else if default.is_none() { - default = Some(DataStore::lookup_datastore(&store, Some(Operation::Read))?); + default = Some(DataStore::lookup_datastore(&store, Some(Operation::Write))?); } else { bail!("multiple default stores given"); } diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index ee037a3b..68aa4863 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -75,6 +75,7 @@ async fn run() -> Result<(), Error> { proxmox_backup::server::create_run_dir()?; proxmox_backup::server::create_state_dir()?; + proxmox_backup::server::create_active_operations_dir()?; proxmox_backup::server::jobstate::create_jobstate_dir()?; proxmox_backup::tape::create_tape_status_dir()?; proxmox_backup::tape::create_drive_state_dir()?; diff --git a/src/server/mod.rs b/src/server/mod.rs index 2b54cdf0..5a9884e9 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -4,7 +4,7 @@ //! services. We want async IO, so this is built on top of //! tokio/hyper. -use anyhow::Error; +use anyhow::{format_err, Error}; use serde_json::Value; use proxmox_sys::fs::{create_path, CreateOptions}; @@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> { create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?; Ok(()) } + +/// Create active operations dir with correct permission. +pub fn create_active_operations_dir() -> Result<(), Error> { + let backup_user = pbs_config::backup_user()?; + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750); + let options = CreateOptions::new() + .perm(mode) + .owner(backup_user.uid) + .group(backup_user.gid); + + create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options)) + .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?; + Ok(()) +} -- 2.30.2