From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 8229565012 for ; Mon, 31 Jan 2022 15:45:18 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 7051A25CDE for ; Mon, 31 Jan 2022 15:44:48 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id F03C025CD0 for ; Mon, 31 Jan 2022 15:44:45 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id B894143CAB; Mon, 31 Jan 2022 15:44:45 +0100 (CET) Message-ID: <2ab2ff46-c46c-bf3f-63fd-1981a91d0cde@proxmox.com> Date: Mon, 31 Jan 2022 15:44:42 +0100 MIME-Version: 1.0 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:97.0) Gecko/20100101 Thunderbird/97.0 Content-Language: en-US To: Proxmox Backup Server development discussion , Hannes Laimer References: <20220124134458.44600-1-h.laimer@proxmox.com> From: Dominik Csapak In-Reply-To: <20220124134458.44600-1-h.laimer@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 7bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.164 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment NICE_REPLY_A -0.001 Looks like a legit reply (A) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: Re: [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 31 Jan 2022 14:45:18 -0000 looks mostly fine (one comment inline) but did you intentionally ignore thomas comment about the pid reusing and saving/checking of the starttime? On 1/24/22 14:44, Hannes Laimer wrote: > Saves the currently active read/write operation counts in a file. The > file is updated whenever a reference returned by lookup_datastore is > dropped and whenever a reference is returned by lookup_datastore. The > files are locked before every access, there is one file per datastore. > > Signed-off-by: Hannes Laimer > --- > FIXUP: fix logic, previous version of this patch missed updates for > PIDs that were not present in non empty list > > pbs-api-types/src/maintenance.rs | 1 + > pbs-datastore/Cargo.toml | 1 + > pbs-datastore/src/datastore.rs | 100 ++++++++++++++++++++--------- > pbs-datastore/src/lib.rs | 4 ++ > pbs-datastore/src/task_tracking.rs | 72 +++++++++++++++++++++ > src/bin/proxmox-backup-api.rs | 1 + > src/server/mod.rs | 16 ++++- > 7 files changed, 163 insertions(+), 32 deletions(-) > create mode 100644 pbs-datastore/src/task_tracking.rs > > diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs > index 4d3ccb3b..1b5753d2 100644 > --- a/pbs-api-types/src/maintenance.rs > +++ b/pbs-api-types/src/maintenance.rs > @@ -25,6 +25,7 @@ pub enum MaintenanceType { > Offline(String), > } > > +#[derive(Copy ,Clone)] > /// Operation requirments, used when checking for maintenance mode. > pub enum Operation { > Read, > diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml > index d8cefa00..0c703d44 100644 > --- a/pbs-datastore/Cargo.toml > +++ b/pbs-datastore/Cargo.toml > @@ -35,5 +35,6 @@ proxmox-uuid = "1" > proxmox-sys = "0.2" > > pbs-api-types = { path = "../pbs-api-types" } > +pbs-buildcfg = { path = "../pbs-buildcfg" } > pbs-tools = { path = "../pbs-tools" } > pbs-config = { path = "../pbs-config" } > diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs > index c65d4574..774dea80 100644 > --- a/pbs-datastore/src/datastore.rs > +++ b/pbs-datastore/src/datastore.rs > @@ -9,11 +9,12 @@ use std::time::Duration; > use anyhow::{bail, format_err, Error}; > use lazy_static::lazy_static; > > -use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions}; > +use proxmox_sys::fs::{replace_file, file_read_optional_string, > + CreateOptions, lock_dir_noblock, DirLockGuard, > +}; > use proxmox_sys::process_locker::ProcessLockSharedGuard; > use proxmox_sys::WorkerTaskContext; > use proxmox_sys::{task_log, task_warn}; > -use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard}; > > use pbs_api_types::{ > UPID, DataStoreConfig, Authid, MaintenanceType, Operation, GarbageCollectionStatus, HumanByte > @@ -31,9 +32,10 @@ use crate::manifest::{ > ArchiveType, BackupManifest, > archive_type, > }; > +use crate::task_tracking::update_active_operations; > > lazy_static! { > - static ref DATASTORE_MAP: Mutex>> = Mutex::new(HashMap::new()); > + static ref DATASTORE_MAP: Mutex>> = Mutex::new(HashMap::new()); > } > > /// checks if auth_id is owner, or, if owner is a token, if > @@ -54,13 +56,42 @@ pub fn check_backup_owner( > /// > /// A Datastore can store severals backups, and provides the > /// management interface for backup. > -pub struct DataStore { > +pub struct DataStoreImpl { > chunk_store: Arc, > gc_mutex: Mutex<()>, > last_gc_status: Mutex, > verify_new: bool, > } > > +pub struct DataStore { > + inner: Arc, > + operation: Option, > +} > + > +impl Clone for DataStore { > + fn clone(&self) -> Self { > + if let Some(operation) = self.operation.clone() { > + if let Err(e) = update_active_operations(self.name(), operation, -1) { > + eprintln!("could not update active operations - {}", e); > + } > + } > + DataStore { > + inner: self.inner.clone(), > + operation: self.operation.clone(), > + } > + } > +} > + > +impl Drop for DataStore { > + fn drop(&mut self) { > + if let Some(operation) = self.operation.clone() { > + if let Err(e) = update_active_operations(self.name(), operation, -1) { > + eprintln!("could not update active operations - {}", e); > + } > + } > + } > +} > + > impl DataStore { > pub fn lookup_datastore( > name: &str, > @@ -75,6 +106,7 @@ impl DataStore { > | (Some(MaintenanceType::Offline(message)), Some(_)) => { > bail!("Datastore '{}' is in maintenance mode: {}", name, message); > }, > + (_, Some(operation)) => update_active_operations(name, operation, 1)?, > _ => {} > } > > @@ -85,7 +117,10 @@ impl DataStore { > if datastore.chunk_store.base() == path && > datastore.verify_new == config.verify_new.unwrap_or(false) > { > - return Ok(datastore.clone()); > + return Ok(Arc::new(Self { > + inner: datastore.clone(), > + operation, > + })) > } > } > > @@ -94,7 +129,10 @@ impl DataStore { > let datastore = Arc::new(datastore); > map.insert(name.to_string(), datastore.clone()); > > - Ok(datastore) > + Ok(Arc::new(Self { > + inner: datastore, > + operation, > + })) > } > > /// removes all datastores that are not configured anymore > @@ -109,7 +147,7 @@ impl DataStore { > Ok(()) > } > > - fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result { > + fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result { > let chunk_store = ChunkStore::open(store_name, path)?; > > let mut gc_status_path = chunk_store.base_path(); > @@ -127,7 +165,7 @@ impl DataStore { > GarbageCollectionStatus::default() > }; > > - Ok(Self { > + Ok(DataStoreImpl { > chunk_store: Arc::new(chunk_store), > gc_mutex: Mutex::new(()), > last_gc_status: Mutex::new(gc_status), > @@ -141,19 +179,19 @@ impl DataStore { > impl Iterator, usize, bool)>, > Error > > { > - self.chunk_store.get_chunk_iterator() > + self.inner.chunk_store.get_chunk_iterator() > } > > pub fn create_fixed_writer>(&self, filename: P, size: usize, chunk_size: usize) -> Result { > > - let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?; > + let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?; > > Ok(index) > } > > pub fn open_fixed_reader>(&self, filename: P) -> Result { > > - let full_path = self.chunk_store.relative_path(filename.as_ref()); > + let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); > > let index = FixedIndexReader::open(&full_path)?; > > @@ -165,14 +203,14 @@ impl DataStore { > ) -> Result { > > let index = DynamicIndexWriter::create( > - self.chunk_store.clone(), filename.as_ref())?; > + self.inner.chunk_store.clone(), filename.as_ref())?; > > Ok(index) > } > > pub fn open_dynamic_reader>(&self, filename: P) -> Result { > > - let full_path = self.chunk_store.relative_path(filename.as_ref()); > + let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); > > let index = DynamicIndexReader::open(&full_path)?; > > @@ -222,11 +260,11 @@ impl DataStore { > } > > pub fn name(&self) -> &str { > - self.chunk_store.name() > + self.inner.chunk_store.name() > } > > pub fn base_path(&self) -> PathBuf { > - self.chunk_store.base_path() > + self.inner.chunk_store.base_path() > } > > /// Cleanup a backup directory > @@ -529,7 +567,7 @@ impl DataStore { > worker.check_abort()?; > worker.fail_on_shutdown()?; > let digest = index.index_digest(pos).unwrap(); > - if !self.chunk_store.cond_touch_chunk(digest, false)? { > + if !self.inner.chunk_store.cond_touch_chunk(digest, false)? { > task_warn!( > worker, > "warning: unable to access non-existent chunk {}, required by {:?}", > @@ -545,7 +583,7 @@ impl DataStore { > let mut bad_path = PathBuf::new(); > bad_path.push(self.chunk_path(digest).0); > bad_path.set_extension(bad_ext); > - self.chunk_store.cond_touch_path(&bad_path, false)?; > + self.inner.chunk_store.cond_touch_path(&bad_path, false)?; > } > } > } > @@ -625,24 +663,24 @@ impl DataStore { > } > > pub fn last_gc_status(&self) -> GarbageCollectionStatus { > - self.last_gc_status.lock().unwrap().clone() > + self.inner.last_gc_status.lock().unwrap().clone() > } > > pub fn garbage_collection_running(&self) -> bool { > - !matches!(self.gc_mutex.try_lock(), Ok(_)) > + !matches!(self.inner.gc_mutex.try_lock(), Ok(_)) > } > > pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> { > > - if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() { > + if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() { > > // avoids that we run GC if an old daemon process has still a > // running backup writer, which is not save as we have no "oldest > // writer" information and thus no safe atime cutoff > - let _exclusive_lock = self.chunk_store.try_exclusive_lock()?; > + let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?; > > let phase1_start_time = proxmox_time::epoch_i64(); > - let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time); > + let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time); > > let mut gc_status = GarbageCollectionStatus::default(); > gc_status.upid = Some(upid.to_string()); > @@ -652,7 +690,7 @@ impl DataStore { > self.mark_used_chunks(&mut gc_status, worker)?; > > task_log!(worker, "Start GC phase2 (sweep unused chunks)"); > - self.chunk_store.sweep_unused_chunks( > + self.inner.chunk_store.sweep_unused_chunks( > oldest_writer, > phase1_start_time, > &mut gc_status, > @@ -729,7 +767,7 @@ impl DataStore { > let _ = replace_file(path, serialized.as_bytes(), options, false); > } > > - *self.last_gc_status.lock().unwrap() = gc_status; > + *self.inner.last_gc_status.lock().unwrap() = gc_status; > > } else { > bail!("Start GC failed - (already running/locked)"); > @@ -739,15 +777,15 @@ impl DataStore { > } > > pub fn try_shared_chunk_store_lock(&self) -> Result { > - self.chunk_store.try_shared_lock() > + self.inner.chunk_store.try_shared_lock() > } > > pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) { > - self.chunk_store.chunk_path(digest) > + self.inner.chunk_store.chunk_path(digest) > } > > pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result { > - self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist) > + self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist) > } > > pub fn insert_chunk( > @@ -755,7 +793,7 @@ impl DataStore { > chunk: &DataBlob, > digest: &[u8; 32], > ) -> Result<(bool, u64), Error> { > - self.chunk_store.insert_chunk(chunk, digest) > + self.inner.chunk_store.insert_chunk(chunk, digest) > } > > pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result { > @@ -771,13 +809,13 @@ impl DataStore { > > > pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result { > - let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest); > + let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest); > std::fs::metadata(chunk_path).map_err(Error::from) > } > > pub fn load_chunk(&self, digest: &[u8; 32]) -> Result { > > - let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest); > + let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest); > > proxmox_lang::try_block!({ > let mut file = std::fs::File::open(&chunk_path)?; > @@ -891,7 +929,7 @@ impl DataStore { > } > > pub fn verify_new(&self) -> bool { > - self.verify_new > + self.inner.verify_new > } > > /// returns a list of chunks sorted by their inode number on disk > diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs > index d50a64a5..131040c6 100644 > --- a/pbs-datastore/src/lib.rs > +++ b/pbs-datastore/src/lib.rs > @@ -145,6 +145,9 @@ > // Note: .pcat1 => Proxmox Catalog Format version 1 > pub const CATALOG_NAME: &str = "catalog.pcat1.didx"; > > +/// Directory path where active operations counters are saved. > +pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations"); > + > #[macro_export] > macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 { > () => { > @@ -179,6 +182,7 @@ pub mod paperkey; > pub mod prune; > pub mod read_chunk; > pub mod store_progress; > +pub mod task_tracking; > > pub mod dynamic_index; > pub mod fixed_index; > diff --git a/pbs-datastore/src/task_tracking.rs b/pbs-datastore/src/task_tracking.rs > new file mode 100644 > index 00000000..608a3096 > --- /dev/null > +++ b/pbs-datastore/src/task_tracking.rs > @@ -0,0 +1,72 @@ > +use std::path::PathBuf; > +use anyhow::Error; > +use libc::pid_t; > + > +use proxmox_sys::fs::{CreateOptions, file_read_optional_string, open_file_locked, replace_file}; > +use proxmox_sys::linux::procfs; > +use pbs_api_types::Operation; > +use serde::{Deserialize, Serialize}; > + > +#[derive(Deserialize, Serialize, Clone)] > +struct TaskOperations { > + pid: u32, > + reading_operations: i64, > + writing_operations: i64, > +} > + > +pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> { > + let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name)); > + let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name)); > + > + let user = pbs_config::backup_user()?; > + let options = CreateOptions::new() > + .group(user.gid) > + .owner(user.uid) > + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); > + > + let timeout = std::time::Duration::new(10, 0); > + open_file_locked(&lock_path, timeout, true, options.clone())?; > + > + let pid = std::process::id(); > + let mut updated = false; > + > + let mut updated_tasks = match file_read_optional_string(&path)? { > + Some(data) => { > + let mut active_tasks: Vec = serde_json::from_str::>(&data)? > + .iter() > + .filter(|task| > + procfs::check_process_running(task.pid as pid_t).is_some()) > + .cloned() > + .collect(); > + > + active_tasks.iter_mut() > + .for_each(|task| { > + if task.pid == pid { > + updated = true; > + match operation { > + Operation::Read => task.reading_operations += count, > + Operation::Write => task.writing_operations += count, > + } > + } > + }); > + active_tasks would it not be possible to have that in the filter function too? or by using maybe 'filter_map' ? alternatively you could have a simple for loop which pushes only the correct results into a vec (by using 'into_iter()') in any case, i think we can avoid the double iteration here > + } > + None => Vec::new(), > + }; > + > + if !updated { > + updated_tasks.push(match operation { > + Operation::Read => TaskOperations { > + pid, > + reading_operations: 1, > + writing_operations: 0, > + }, > + Operation::Write => TaskOperations { > + pid, > + reading_operations: 0, > + writing_operations: 1, > + } > + }) > + } > + replace_file(&path, serde_json::to_string(&updated_tasks)?.as_bytes(), options, false) > +} > \ No newline at end of file > diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs > index e6fc5f23..445994dc 100644 > --- a/src/bin/proxmox-backup-api.rs > +++ b/src/bin/proxmox-backup-api.rs > @@ -73,6 +73,7 @@ async fn run() -> Result<(), Error> { > > proxmox_backup::server::create_run_dir()?; > proxmox_backup::server::create_state_dir()?; > + proxmox_backup::server::create_active_operations_dir()?; > proxmox_backup::server::jobstate::create_jobstate_dir()?; > proxmox_backup::tape::create_tape_status_dir()?; > proxmox_backup::tape::create_drive_state_dir()?; > diff --git a/src/server/mod.rs b/src/server/mod.rs > index 2b54cdf0..5a9884e9 100644 > --- a/src/server/mod.rs > +++ b/src/server/mod.rs > @@ -4,7 +4,7 @@ > //! services. We want async IO, so this is built on top of > //! tokio/hyper. > > -use anyhow::Error; > +use anyhow::{format_err, Error}; > use serde_json::Value; > > use proxmox_sys::fs::{create_path, CreateOptions}; > @@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> { > create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?; > Ok(()) > } > + > +/// Create active operations dir with correct permission. > +pub fn create_active_operations_dir() -> Result<(), Error> { > + let backup_user = pbs_config::backup_user()?; > + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750); > + let options = CreateOptions::new() > + .perm(mode) > + .owner(backup_user.uid) > + .group(backup_user.gid); > + > + create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options)) > + .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?; > + Ok(()) > +}