public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* Re: [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking
@ 2022-01-24 13:44 Hannes Laimer
  2022-01-31 14:44 ` Dominik Csapak
  0 siblings, 1 reply; 3+ messages in thread
From: Hannes Laimer @ 2022-01-24 13:44 UTC (permalink / raw)
  To: pbs-devel

Saves the currently active read/write operation counts in a file. The
file is updated whenever a reference returned by lookup_datastore is
dropped and whenever a reference is returned by lookup_datastore. The
files are locked before every access, there is one file per datastore.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
FIXUP: fix logic, previous version of this patch missed updates for 
PIDs that were not present in non empty list

 pbs-api-types/src/maintenance.rs   |   1 +
 pbs-datastore/Cargo.toml           |   1 +
 pbs-datastore/src/datastore.rs     | 100 ++++++++++++++++++++---------
 pbs-datastore/src/lib.rs           |   4 ++
 pbs-datastore/src/task_tracking.rs |  72 +++++++++++++++++++++
 src/bin/proxmox-backup-api.rs      |   1 +
 src/server/mod.rs                  |  16 ++++-
 7 files changed, 163 insertions(+), 32 deletions(-)
 create mode 100644 pbs-datastore/src/task_tracking.rs

diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs
index 4d3ccb3b..1b5753d2 100644
--- a/pbs-api-types/src/maintenance.rs
+++ b/pbs-api-types/src/maintenance.rs
@@ -25,6 +25,7 @@ pub enum MaintenanceType {
     Offline(String),
 }
 
+#[derive(Copy ,Clone)]
 /// Operation requirments, used when checking for maintenance mode.
 pub enum Operation {
     Read,
diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index d8cefa00..0c703d44 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -35,5 +35,6 @@ proxmox-uuid = "1"
 proxmox-sys = "0.2"
 
 pbs-api-types = { path = "../pbs-api-types" }
+pbs-buildcfg = { path = "../pbs-buildcfg" }
 pbs-tools = { path = "../pbs-tools" }
 pbs-config = { path = "../pbs-config" }
diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index c65d4574..774dea80 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -9,11 +9,12 @@ use std::time::Duration;
 use anyhow::{bail, format_err, Error};
 use lazy_static::lazy_static;
 
-use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions};
+use proxmox_sys::fs::{replace_file, file_read_optional_string,
+    CreateOptions, lock_dir_noblock, DirLockGuard,
+};
 use proxmox_sys::process_locker::ProcessLockSharedGuard;
 use proxmox_sys::WorkerTaskContext;
 use proxmox_sys::{task_log, task_warn};
-use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard};
 
 use pbs_api_types::{
     UPID, DataStoreConfig, Authid, MaintenanceType, Operation, GarbageCollectionStatus, HumanByte
@@ -31,9 +32,10 @@ use crate::manifest::{
     ArchiveType, BackupManifest,
     archive_type,
 };
+use crate::task_tracking::update_active_operations;
 
 lazy_static! {
-    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
+    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
 }
 
 /// checks if auth_id is owner, or, if owner is a token, if
@@ -54,13 +56,42 @@ pub fn check_backup_owner(
 ///
 /// A Datastore can store severals backups, and provides the
 /// management interface for backup.
-pub struct DataStore {
+pub struct DataStoreImpl {
     chunk_store: Arc<ChunkStore>,
     gc_mutex: Mutex<()>,
     last_gc_status: Mutex<GarbageCollectionStatus>,
     verify_new: bool,
 }
 
+pub struct DataStore {
+    inner: Arc<DataStoreImpl>,
+    operation: Option<Operation>,
+}
+
+impl Clone for DataStore {
+    fn clone(&self) -> Self {
+        if let Some(operation) = self.operation.clone() {
+            if let Err(e) = update_active_operations(self.name(), operation, -1) {
+                eprintln!("could not update active operations - {}", e);
+            }
+        }
+        DataStore {
+            inner: self.inner.clone(),
+            operation: self.operation.clone(),
+        }
+    }
+}
+
+impl Drop for DataStore {
+    fn drop(&mut self) {
+        if let Some(operation) = self.operation.clone() {
+            if let Err(e) = update_active_operations(self.name(), operation, -1) {
+                eprintln!("could not update active operations - {}", e);
+            }
+        }
+    }
+}
+
 impl DataStore {
     pub fn lookup_datastore(
         name: &str,
@@ -75,6 +106,7 @@ impl DataStore {
             | (Some(MaintenanceType::Offline(message)), Some(_)) => {
                 bail!("Datastore '{}' is in maintenance mode: {}", name, message);
             },
+            (_, Some(operation)) => update_active_operations(name, operation, 1)?,
             _ => {}
         }
 
@@ -85,7 +117,10 @@ impl DataStore {
             if datastore.chunk_store.base() == path &&
                 datastore.verify_new == config.verify_new.unwrap_or(false)
             {
-                return Ok(datastore.clone());
+                return Ok(Arc::new(Self {
+                    inner: datastore.clone(),
+                    operation,
+                }))
             }
         }
 
@@ -94,7 +129,10 @@ impl DataStore {
         let datastore = Arc::new(datastore);
         map.insert(name.to_string(), datastore.clone());
 
-        Ok(datastore)
+        Ok(Arc::new(Self {
+            inner: datastore,
+            operation,
+        }))
     }
 
     /// removes all datastores that are not configured anymore
@@ -109,7 +147,7 @@ impl DataStore {
         Ok(())
     }
 
-    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<Self, Error> {
+    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<DataStoreImpl, Error> {
         let chunk_store = ChunkStore::open(store_name, path)?;
 
         let mut gc_status_path = chunk_store.base_path();
@@ -127,7 +165,7 @@ impl DataStore {
             GarbageCollectionStatus::default()
         };
 
-        Ok(Self {
+        Ok(DataStoreImpl {
             chunk_store: Arc::new(chunk_store),
             gc_mutex: Mutex::new(()),
             last_gc_status: Mutex::new(gc_status),
@@ -141,19 +179,19 @@ impl DataStore {
         impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
         Error
     > {
-        self.chunk_store.get_chunk_iterator()
+        self.inner.chunk_store.get_chunk_iterator()
     }
 
     pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
 
-        let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
+        let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
 
         Ok(index)
     }
 
     pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
 
-        let full_path =  self.chunk_store.relative_path(filename.as_ref());
+        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
 
         let index = FixedIndexReader::open(&full_path)?;
 
@@ -165,14 +203,14 @@ impl DataStore {
     ) -> Result<DynamicIndexWriter, Error> {
 
         let index = DynamicIndexWriter::create(
-            self.chunk_store.clone(), filename.as_ref())?;
+            self.inner.chunk_store.clone(), filename.as_ref())?;
 
         Ok(index)
     }
 
     pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
 
-        let full_path =  self.chunk_store.relative_path(filename.as_ref());
+        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
 
         let index = DynamicIndexReader::open(&full_path)?;
 
@@ -222,11 +260,11 @@ impl DataStore {
     }
 
     pub fn name(&self) -> &str {
-        self.chunk_store.name()
+        self.inner.chunk_store.name()
     }
 
     pub fn base_path(&self) -> PathBuf {
-        self.chunk_store.base_path()
+        self.inner.chunk_store.base_path()
     }
 
     /// Cleanup a backup directory
@@ -529,7 +567,7 @@ impl DataStore {
             worker.check_abort()?;
             worker.fail_on_shutdown()?;
             let digest = index.index_digest(pos).unwrap();
-            if !self.chunk_store.cond_touch_chunk(digest, false)? {
+            if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
                 task_warn!(
                     worker,
                     "warning: unable to access non-existent chunk {}, required by {:?}",
@@ -545,7 +583,7 @@ impl DataStore {
                     let mut bad_path = PathBuf::new();
                     bad_path.push(self.chunk_path(digest).0);
                     bad_path.set_extension(bad_ext);
-                    self.chunk_store.cond_touch_path(&bad_path, false)?;
+                    self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
                 }
             }
         }
@@ -625,24 +663,24 @@ impl DataStore {
     }
 
     pub fn last_gc_status(&self) -> GarbageCollectionStatus {
-        self.last_gc_status.lock().unwrap().clone()
+        self.inner.last_gc_status.lock().unwrap().clone()
     }
 
     pub fn garbage_collection_running(&self) -> bool {
-        !matches!(self.gc_mutex.try_lock(), Ok(_))
+        !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
     }
 
     pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
 
-        if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
+        if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
 
             // avoids that we run GC if an old daemon process has still a
             // running backup writer, which is not save as we have no "oldest
             // writer" information and thus no safe atime cutoff
-            let _exclusive_lock =  self.chunk_store.try_exclusive_lock()?;
+            let _exclusive_lock =  self.inner.chunk_store.try_exclusive_lock()?;
 
             let phase1_start_time = proxmox_time::epoch_i64();
-            let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
+            let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
 
             let mut gc_status = GarbageCollectionStatus::default();
             gc_status.upid = Some(upid.to_string());
@@ -652,7 +690,7 @@ impl DataStore {
             self.mark_used_chunks(&mut gc_status, worker)?;
 
             task_log!(worker, "Start GC phase2 (sweep unused chunks)");
-            self.chunk_store.sweep_unused_chunks(
+            self.inner.chunk_store.sweep_unused_chunks(
                 oldest_writer,
                 phase1_start_time,
                 &mut gc_status,
@@ -729,7 +767,7 @@ impl DataStore {
                 let _ = replace_file(path, serialized.as_bytes(), options, false);
             }
 
-            *self.last_gc_status.lock().unwrap() = gc_status;
+            *self.inner.last_gc_status.lock().unwrap() = gc_status;
 
         } else {
             bail!("Start GC failed - (already running/locked)");
@@ -739,15 +777,15 @@ impl DataStore {
     }
 
     pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
-        self.chunk_store.try_shared_lock()
+        self.inner.chunk_store.try_shared_lock()
     }
 
     pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
-        self.chunk_store.chunk_path(digest)
+        self.inner.chunk_store.chunk_path(digest)
     }
 
     pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
-        self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
+        self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
     }
 
     pub fn insert_chunk(
@@ -755,7 +793,7 @@ impl DataStore {
         chunk: &DataBlob,
         digest: &[u8; 32],
     ) -> Result<(bool, u64), Error> {
-        self.chunk_store.insert_chunk(chunk, digest)
+        self.inner.chunk_store.insert_chunk(chunk, digest)
     }
 
     pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
@@ -771,13 +809,13 @@ impl DataStore {
 
 
     pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
-        let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
+        let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
         std::fs::metadata(chunk_path).map_err(Error::from)
     }
 
     pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
 
-        let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
+        let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
 
         proxmox_lang::try_block!({
             let mut file = std::fs::File::open(&chunk_path)?;
@@ -891,7 +929,7 @@ impl DataStore {
     }
 
     pub fn verify_new(&self) -> bool {
-        self.verify_new
+        self.inner.verify_new
     }
 
     /// returns a list of chunks sorted by their inode number on disk
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index d50a64a5..131040c6 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -145,6 +145,9 @@
 // Note: .pcat1 => Proxmox Catalog Format version 1
 pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
 
+/// Directory path where active operations counters are saved.
+pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
+
 #[macro_export]
 macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
     () => {
@@ -179,6 +182,7 @@ pub mod paperkey;
 pub mod prune;
 pub mod read_chunk;
 pub mod store_progress;
+pub mod task_tracking;
 
 pub mod dynamic_index;
 pub mod fixed_index;
diff --git a/pbs-datastore/src/task_tracking.rs b/pbs-datastore/src/task_tracking.rs
new file mode 100644
index 00000000..608a3096
--- /dev/null
+++ b/pbs-datastore/src/task_tracking.rs
@@ -0,0 +1,72 @@
+use std::path::PathBuf;
+use anyhow::Error;
+use libc::pid_t;
+
+use proxmox_sys::fs::{CreateOptions, file_read_optional_string, open_file_locked, replace_file};
+use proxmox_sys::linux::procfs;
+use pbs_api_types::Operation;
+use serde::{Deserialize, Serialize};
+
+#[derive(Deserialize, Serialize, Clone)]
+struct TaskOperations {
+    pid: u32,
+    reading_operations: i64,
+    writing_operations: i64,
+}
+
+pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
+    let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
+    let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name));
+
+    let user = pbs_config::backup_user()?;
+    let options = CreateOptions::new()
+        .group(user.gid)
+        .owner(user.uid)
+        .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+    let timeout = std::time::Duration::new(10, 0);
+    open_file_locked(&lock_path, timeout, true, options.clone())?;
+
+    let pid = std::process::id();
+    let mut updated = false;
+
+    let mut updated_tasks = match file_read_optional_string(&path)? {
+        Some(data) => {
+            let mut active_tasks: Vec<TaskOperations> = serde_json::from_str::<Vec<TaskOperations>>(&data)?
+                .iter()
+                .filter(|task|
+                    procfs::check_process_running(task.pid as pid_t).is_some())
+                .cloned()
+                .collect();
+
+            active_tasks.iter_mut()
+                .for_each(|task| {
+                    if task.pid == pid {
+                        updated = true;
+                        match operation {
+                            Operation::Read => task.reading_operations += count,
+                            Operation::Write => task.writing_operations += count,
+                        }
+                    }
+                });
+            active_tasks
+        }
+        None => Vec::new(),
+    };
+
+    if !updated {
+        updated_tasks.push(match operation {
+            Operation::Read => TaskOperations {
+                pid,
+                reading_operations: 1,
+                writing_operations: 0,
+            },
+            Operation::Write => TaskOperations {
+                pid,
+                reading_operations: 0,
+                writing_operations: 1,
+            }
+        })
+    }
+    replace_file(&path, serde_json::to_string(&updated_tasks)?.as_bytes(), options, false)
+}
\ No newline at end of file
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index e6fc5f23..445994dc 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -73,6 +73,7 @@ async fn run() -> Result<(), Error> {
 
     proxmox_backup::server::create_run_dir()?;
     proxmox_backup::server::create_state_dir()?;
+    proxmox_backup::server::create_active_operations_dir()?;
     proxmox_backup::server::jobstate::create_jobstate_dir()?;
     proxmox_backup::tape::create_tape_status_dir()?;
     proxmox_backup::tape::create_drive_state_dir()?;
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 2b54cdf0..5a9884e9 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -4,7 +4,7 @@
 //! services. We want async IO, so this is built on top of
 //! tokio/hyper.
 
-use anyhow::Error;
+use anyhow::{format_err, Error};
 use serde_json::Value;
 
 use proxmox_sys::fs::{create_path, CreateOptions};
@@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
     create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
     Ok(())
 }
+
+/// Create active operations dir with correct permission.
+pub fn create_active_operations_dir() -> Result<(), Error> {
+    let backup_user = pbs_config::backup_user()?;
+    let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
+    let options = CreateOptions::new()
+        .perm(mode)
+        .owner(backup_user.uid)
+        .group(backup_user.gid);
+
+    create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
+        .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
+    Ok(())
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 3+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v5 0/8] closes #3071: maintenance mode for datastore
@ 2022-01-24 12:31 Hannes Laimer
  2022-01-24 12:31 ` [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking Hannes Laimer
  0 siblings, 1 reply; 3+ messages in thread
From: Hannes Laimer @ 2022-01-24 12:31 UTC (permalink / raw)
  To: pbs-devel

v5:
 - use simple struct and serde instead of manual parsing for file
 - move tracking related stuff into new file (task_tracking.rs)

v4:
 - clones are not also tracked
 - use lockfile, instead of locking the file
 - track pid of the process which started smth
 - updating maintenance mode is now always possible
 - add get_active_operations endpoint for datastore
 - ui: show count of conflicting tasks (or checkmark if no conflicting
     operations are active)

 v3, based on Dominik Csapak <d.csapak@proxmox.com>'s feedback:
 - added Operation enum(r/w), as suggested by
 - added active operation tracking
 - combine type and message into on field

v2:
 - check for maintenance now directly in lookup_datastore
 - parameter for checking is now the last acceptable maintenance type,
   description in commit msg of 2nd patch
 - ui cleanup


Dominik Csapak (1):
  api: tape: fix datastore lookup operations

Hannes Laimer (6):
  api-types: add maintenance type
  datastore: add check for maintenance in lookup
  pbs-datastore: add active operations tracking
  api: make maintenance_type updatable
  api: add get_active_operations endpoint
  ui: add option to change the maintenance type

Thomas Lamprecht (1):
  datastore: avoid tuple-match, use plain if

 pbs-api-types/src/datastore.rs       |   8 +-
 pbs-api-types/src/lib.rs             |   3 +
 pbs-api-types/src/maintenance.rs     |  83 +++++++++++++++++++
 pbs-datastore/Cargo.toml             |   1 +
 pbs-datastore/src/datastore.rs       | 119 +++++++++++++++++++--------
 pbs-datastore/src/lib.rs             |   4 +
 pbs-datastore/src/snapshot_reader.rs |   6 +-
 pbs-datastore/src/task_tracking.rs   |  92 +++++++++++++++++++++
 src/api2/admin/datastore.rs          |  82 ++++++++++++------
 src/api2/backup/mod.rs               |   4 +-
 src/api2/config/datastore.rs         |   5 ++
 src/api2/reader/mod.rs               |   6 +-
 src/api2/status.rs                   |   4 +-
 src/api2/tape/backup.rs              |   6 +-
 src/api2/tape/restore.rs             |   6 +-
 src/bin/proxmox-backup-api.rs        |   1 +
 src/bin/proxmox-backup-proxy.rs      |   6 +-
 src/server/mod.rs                    |  16 +++-
 src/server/prune_job.rs              |   4 +-
 src/server/pull.rs                   |   4 +-
 src/server/verify_job.rs             |   4 +-
 www/Makefile                         |   1 +
 www/Utils.js                         |  23 ++++++
 www/datastore/OptionView.js          |  30 +++++++
 www/window/MaintenanceOptions.js     |  72 ++++++++++++++++
 25 files changed, 505 insertions(+), 85 deletions(-)
 create mode 100644 pbs-api-types/src/maintenance.rs
 create mode 100644 pbs-datastore/src/task_tracking.rs
 create mode 100644 www/window/MaintenanceOptions.js

-- 
2.30.2





^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2022-01-31 14:45 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-01-24 13:44 [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking Hannes Laimer
2022-01-31 14:44 ` Dominik Csapak
  -- strict thread matches above, loose matches on Subject: below --
2022-01-24 12:31 [pbs-devel] [PATCH proxmox-backup v5 0/8] closes #3071: maintenance mode for datastore Hannes Laimer
2022-01-24 12:31 ` [pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking Hannes Laimer

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal